XEP-0153: Add typing and switch to default args

(and refactor a bit)
This commit is contained in:
mathieui 2021-02-04 21:54:14 +01:00
parent ace5aeb80e
commit 69e04d7d2e

View file

@ -8,13 +8,19 @@
import hashlib import hashlib
import logging import logging
from asyncio import Future, ensure_future
from typing import (
Dict,
Optional,
)
from slixmpp import JID
from slixmpp.stanza import Presence from slixmpp.stanza import Presence
from slixmpp.exceptions import XMPPError, IqTimeout from slixmpp.exceptions import XMPPError, IqTimeout
from slixmpp.xmlstream import register_stanza_plugin from slixmpp.xmlstream import register_stanza_plugin, ElementBase
from slixmpp.plugins.base import BasePlugin from slixmpp.plugins.base import BasePlugin
from slixmpp.plugins.xep_0153 import stanza, VCardTempUpdate from slixmpp.plugins.xep_0153 import stanza, VCardTempUpdate
from slixmpp import asyncio, future_wrapper from slixmpp import future_wrapper
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -35,7 +41,6 @@ class XEP_0153(BasePlugin):
self.xmpp.add_filter('out', self._update_presence) self.xmpp.add_filter('out', self._update_presence)
self.xmpp.add_event_handler('session_start', self._start) self.xmpp.add_event_handler('session_start', self._start)
self.xmpp.add_event_handler('session_end', self._end)
self.xmpp.add_event_handler('presence_available', self._recv_presence) self.xmpp.add_event_handler('presence_available', self._recv_presence)
self.xmpp.add_event_handler('presence_dnd', self._recv_presence) self.xmpp.add_event_handler('presence_dnd', self._recv_presence)
@ -58,45 +63,47 @@ class XEP_0153(BasePlugin):
self.xmpp.del_event_handler('presence_away', self._recv_presence) self.xmpp.del_event_handler('presence_away', self._recv_presence)
@future_wrapper @future_wrapper
def set_avatar(self, jid=None, avatar=None, mtype=None, timeout=None, def set_avatar(self, jid: Optional[JID] = None,
callback=None, timeout_callback=None): avatar: Optional[bytes] = None,
mtype: Optional[str] = None, **iqkwargs) -> Future:
"""Set a VCard avatar.
:param jid: The JID to set the avatar for.
:param avatar: Avatar content.
:param mtype: Avatar file type (e.g. image/jpeg).
"""
if jid is None: if jid is None:
jid = self.xmpp.boundjid.bare jid = self.xmpp.boundjid.bare
async def get_and_set_avatar():
future = asyncio.Future() timeout = iqkwargs.get('timeout', None)
timeout_cb = iqkwargs.get('timeout_callback', None)
def propagate_timeout_exception(fut):
try: try:
fut.done() result = await self.xmpp['xep_0054'].get_vcard(
except IqTimeout as e: jid,
future.set_exception(e) cached=False,
timeout=timeout
def custom_callback(result): )
except IqTimeout as exc:
if timeout_cb is not None:
timeout_cb(exc)
raise
vcard = result['vcard_temp'] vcard = result['vcard_temp']
vcard['PHOTO']['TYPE'] = mtype vcard['PHOTO']['TYPE'] = mtype
vcard['PHOTO']['BINVAL'] = avatar vcard['PHOTO']['BINVAL'] = avatar
new_future = self.xmpp['xep_0054'].publish_vcard(jid=jid, try:
vcard=vcard, result = await self.xmpp['xep_0054'].publish_vcard(
timeout=timeout, jid=jid,
callback=next_callback, vcard=vcard,
timeout_callback=timeout_callback) **iqkwargs
new_future.add_done_callback(propagate_timeout_exception) )
except IqTimeout as exc:
timeout_cb(exc)
raise
self.api['reset_hash'](jid)
self.xmpp.roster[jid].send_last_presence()
def next_callback(result): return ensure_future(get_and_set_avatar(), loop=self.xmpp.loop)
if result['type'] == 'error':
future.set_exception(result)
else:
self.api['reset_hash'](jid)
self.xmpp.roster[jid].send_last_presence()
future.set_result(result)
first_future = self.xmpp['xep_0054'].get_vcard(jid, cached=False, timeout=timeout,
callback=custom_callback,
timeout_callback=timeout_callback)
first_future.add_done_callback(propagate_timeout_exception)
return future
async def _start(self, event): async def _start(self, event):
try: try:
@ -110,10 +117,7 @@ class XEP_0153(BasePlugin):
except XMPPError: except XMPPError:
log.debug('Could not retrieve vCard for %s', self.xmpp.boundjid.bare) log.debug('Could not retrieve vCard for %s', self.xmpp.boundjid.bare)
def _end(self, event): def _update_presence(self, stanza: ElementBase) -> ElementBase:
pass
def _update_presence(self, stanza):
if not isinstance(stanza, Presence): if not isinstance(stanza, Presence):
return stanza return stanza
@ -124,7 +128,27 @@ class XEP_0153(BasePlugin):
stanza['vcard_temp_update']['photo'] = current_hash stanza['vcard_temp_update']['photo'] = current_hash
return stanza return stanza
def _reset_hash(self, jid, node, ifrom, args): def _recv_presence(self, pres: Presence):
try:
if pres.get_plugin('muc', check=True):
# Don't process vCard avatars for MUC occupants
# since they all share the same bare JID.
return
except:
pass
if not pres.match('presence/vcard_temp_update'):
self.api['set_hash'](pres['from'], args=None)
return
data = pres['vcard_temp_update']['photo']
if data is None:
return
self.xmpp.event('vcard_avatar_update', pres)
# =================================================================
def _reset_hash(self, jid: JID, node: str, ifrom: JID, args: Dict):
own_jid = (jid.bare == self.xmpp.boundjid.bare) own_jid = (jid.bare == self.xmpp.boundjid.bare)
if self.xmpp.is_component: if self.xmpp.is_component:
own_jid = (jid.domain == self.xmpp.boundjid.domain) own_jid = (jid.domain == self.xmpp.boundjid.domain)
@ -152,28 +176,8 @@ class XEP_0153(BasePlugin):
self.xmpp['xep_0054'].get_vcard(jid=jid.bare, ifrom=ifrom, self.xmpp['xep_0054'].get_vcard(jid=jid.bare, ifrom=ifrom,
callback=callback) callback=callback)
def _recv_presence(self, pres: Presence): def _get_hash(self, jid: JID, node: str, ifrom: JID, args: Dict):
try:
if pres.get_plugin('muc', check=True):
# Don't process vCard avatars for MUC occupants
# since they all share the same bare JID.
return
except:
pass
if not pres.match('presence/vcard_temp_update'):
self.api['set_hash'](pres['from'], args=None)
return
data = pres['vcard_temp_update']['photo']
if data is None:
return
self.xmpp.event('vcard_avatar_update', pres)
# =================================================================
def _get_hash(self, jid, node, ifrom, args):
return self._hashes.get(jid.bare, None) return self._hashes.get(jid.bare, None)
def _set_hash(self, jid, node, ifrom, args): def _set_hash(self, jid: JID, node: str, ifrom: JID, args: Dict):
self._hashes[jid.bare] = args self._hashes[jid.bare] = args