Merge branch 'updat-typing-and-generic-args' into 'master'

Update typing and generic args for plugins (step 1)

See merge request poezio/slixmpp!120
This commit is contained in:
mathieui 2021-02-05 20:14:41 +01:00
commit cff4588499
16 changed files with 577 additions and 209 deletions

32
itests/test_blocking.py Normal file
View file

@ -0,0 +1,32 @@
import unittest
from slixmpp import JID
from slixmpp.test.integration import SlixIntegration
class TestBlocking(SlixIntegration):
async def asyncSetUp(self):
await super().asyncSetUp()
self.add_client(
self.envjid('CI_ACCOUNT1'),
self.envstr('CI_ACCOUNT1_PASSWORD'),
)
self.register_plugins(['xep_0191'])
await self.connect_clients()
async def test_blocking(self):
"""Check we can block, unblock, and list blocked"""
await self.clients[0]['xep_0191'].block(
[JID('toto@example.com'), JID('titi@example.com')]
)
blocked = {JID('toto@example.com'), JID('titi@example.com')}
iq = await self.clients[0]['xep_0191'].get_blocked()
self.assertEqual(iq['blocklist']['items'], blocked)
info = await self.clients[0]['xep_0191'].unblock(
blocked,
)
iq = await self.clients[0]['xep_0191'].get_blocked()
self.assertEqual(len(iq['blocklist']['items']), 0)
suite = unittest.TestLoader().loadTestsFromTestCase(TestBlocking)

View file

@ -0,0 +1,33 @@
import unittest
from slixmpp.test.integration import SlixIntegration
class TestLastActivity(SlixIntegration):
async def asyncSetUp(self):
await super().asyncSetUp()
self.add_client(
self.envjid('CI_ACCOUNT1'),
self.envstr('CI_ACCOUNT1_PASSWORD'),
)
self.add_client(
self.envjid('CI_ACCOUNT2'),
self.envstr('CI_ACCOUNT2_PASSWORD'),
)
self.register_plugins(['xep_0012'])
await self.connect_clients()
async def test_activity(self):
"""Check we can set and get last activity"""
self.clients[0]['xep_0012'].set_last_activity(
status='coucou',
seconds=4242,
)
act = await self.clients[1]['xep_0012'].get_last_activity(
self.clients[0].boundjid.full
)
self.assertEqual(act['last_activity']['status'], 'coucou')
self.assertGreater(act['last_activity']['seconds'], 4241)
self.assertGreater(4250, act['last_activity']['seconds'])
suite = unittest.TestLoader().loadTestsFromTestCase(TestLastActivity)

View file

@ -0,0 +1,61 @@
import asyncio
import unittest
from slixmpp import JID
from slixmpp.test.integration import SlixIntegration
class TestUserAvatar(SlixIntegration):
async def asyncSetUp(self):
await super().asyncSetUp()
self.add_client(
self.envjid('CI_ACCOUNT1'),
self.envstr('CI_ACCOUNT1_PASSWORD'),
)
self.register_plugins(['xep_0084'])
self.data = b'coucou coucou'
await self.connect_clients()
async def _clear_avatar(self):
"""Utility for purging remote state"""
await self.clients[0]['xep_0084'].stop()
await self.clients[0]['xep_0084'].publish_avatar(b'')
async def test_set_avatar(self):
"""Check we can set and get a PEP avatar and metadata"""
await self._clear_avatar()
await self.clients[0]['xep_0084'].publish_avatar(
self.data
)
metadata = {
'id': self.clients[0]['xep_0084'].generate_id(self.data),
'bytes': 13,
'type': 'image/jpeg',
}
# Wait for metadata publish event
event = self.clients[0].wait_until('avatar_metadata_publish')
publish = self.clients[0]['xep_0084'].publish_avatar_metadata(
metadata,
)
res = await asyncio.gather(
event,
publish,
)
message = res[0]
recv_meta = message['pubsub_event']['items']['item']['avatar_metadata']
info = recv_meta['info']
self.assertEqual(info['bytes'], metadata['bytes'])
self.assertEqual(info['type'], metadata['type'])
self.assertEqual(info['id'], metadata['id'])
recv = await self.clients[0]['xep_0084'].retrieve_avatar(
JID(self.clients[0].boundjid.bare),
info['id']
)
avatar = recv['pubsub']['items']['item']['avatar_data']['value']
self.assertEqual(avatar, self.data)
await self._clear_avatar()
suite = unittest.TestLoader().loadTestsFromTestCase(TestUserAvatar)

49
itests/test_vcard.py Normal file
View file

@ -0,0 +1,49 @@
import unittest
from slixmpp.test.integration import SlixIntegration
class TestVcardTemp(SlixIntegration):
async def asyncSetUp(self):
await super().asyncSetUp()
self.add_client(
self.envjid('CI_ACCOUNT1'),
self.envstr('CI_ACCOUNT1_PASSWORD'),
)
self.add_client(
self.envjid('CI_ACCOUNT2'),
self.envstr('CI_ACCOUNT2_PASSWORD'),
)
self.register_plugins(['xep_0054'])
await self.connect_clients()
async def _clear_vcard(self):
# cleanup
await self.clients[0]['xep_0054'].publish_vcard(
self.clients[0]['xep_0054'].make_vcard()
)
async def test_vcard(self):
"""Check we can set and get a vcard"""
await self._clear_vcard()
# Check that vcard is empty
recv = await self.clients[1]['xep_0054'].get_vcard(
self.clients[0].boundjid.bare
)
self.assertEqual(recv['vcard_temp']['TITLE'], None)
vcard = self.clients[0]['xep_0054'].make_vcard()
vcard['TITLE'] = 'Coucou coucou'
await self.clients[0]['xep_0054'].publish_vcard(
vcard,
)
#
recv = await self.clients[1]['xep_0054'].get_vcard(
self.clients[0].boundjid.bare
)
self.assertEqual(recv['vcard_temp']['TITLE'], 'Coucou coucou')
await self._clear_vcard()
suite = unittest.TestLoader().loadTestsFromTestCase(TestVcardTemp)

View file

@ -0,0 +1,49 @@
import asyncio
import unittest
from slixmpp import JID
from slixmpp.test.integration import SlixIntegration
from hashlib import sha1
class TestVcardAvatar(SlixIntegration):
async def asyncSetUp(self):
await super().asyncSetUp()
self.add_client(
self.envjid('CI_ACCOUNT1'),
self.envstr('CI_ACCOUNT1_PASSWORD'),
)
self.register_plugins(['xep_0153'])
self.data = b'coucou coucou'
self.hashed_data = sha1(self.data).hexdigest()
await self.connect_clients()
async def _clear_avatar(self):
"""Utility for purging remote state"""
await self.clients[0]['xep_0153'].set_avatar(avatar=b'')
async def test_set_avatar(self):
"""Check we can set and get a PEP avatar and metadata"""
await self._clear_avatar()
event = self.clients[0].wait_until('vcard_avatar_update')
update = self.clients[0]['xep_0153'].set_avatar(
avatar=self.data
)
result = await asyncio.gather(
event,
update,
)
presence = result[0]
hash = presence['vcard_temp_update']['photo']
self.assertEqual(hash, self.hashed_data)
iq = await self.clients[0]['xep_0054'].get_vcard(
JID(self.clients[0].boundjid.bare)
)
photo = iq['vcard_temp']['PHOTO']['BINVAL']
self.assertEqual(photo, self.data)
await self._clear_avatar()
suite = unittest.TestLoader().loadTestsFromTestCase(TestVcardAvatar)

37
itests/test_version.py Normal file
View file

@ -0,0 +1,37 @@
import unittest
from slixmpp.test.integration import SlixIntegration
class TestVersion(SlixIntegration):
async def asyncSetUp(self):
await super().asyncSetUp()
self.add_client(
self.envjid('CI_ACCOUNT1'),
self.envstr('CI_ACCOUNT1_PASSWORD'),
)
self.add_client(
self.envjid('CI_ACCOUNT2'),
self.envstr('CI_ACCOUNT2_PASSWORD'),
)
self.register_plugins(
['xep_0092'],
configs=[{
'software_name': 'Slix Test',
'version': '1.2.3.4',
'os': 'I use arch btw',
}]
)
await self.connect_clients()
async def test_version(self):
"""Check we can set and query software version info"""
iq = await self.clients[1]['xep_0092'].get_version(
self.clients[0].boundjid.full
)
version = iq['software_version']
self.assertEqual(version['name'], 'Slix Test')
self.assertEqual(version['version'], '1.2.3.4')
self.assertEqual(version['os'], 'I use arch btw')
suite = unittest.TestLoader().loadTestsFromTestCase(TestVersion)

View file

@ -7,10 +7,16 @@
"""
import logging
from asyncio import Future
from datetime import datetime, timedelta
from typing import (
Dict,
Optional
)
from slixmpp.plugins import BasePlugin, register_plugin
from slixmpp import future_wrapper, Iq
from slixmpp import future_wrapper, JID
from slixmpp.stanza import Iq
from slixmpp.exceptions import XMPPError
from slixmpp.xmlstream import JID, register_stanza_plugin
from slixmpp.xmlstream.handler import Callback
@ -59,7 +65,11 @@ class XEP_0012(BasePlugin):
def session_bind(self, jid):
self.xmpp['xep_0030'].add_feature('jabber:iq:last')
def begin_idle(self, jid=None, status=None):
def begin_idle(self, jid: Optional[JID] = None, status: str = None):
"""Reset the last activity for the given JID.
:param status: Optional status.
"""
self.set_last_activity(jid, 0, status)
def end_idle(self, jid=None):
@ -77,8 +87,12 @@ class XEP_0012(BasePlugin):
self.api['del_last_activity'](jid)
@future_wrapper
def get_last_activity(self, jid, local=False, ifrom=None, timeout=None,
callback=None, timeout_callback=None):
def get_last_activity(self, jid: JID, local: bool = False,
ifrom: Optional[JID] = None, **iqkwargs) -> Future:
"""Get last activity for a specific JID.
:param local: Fetch the value from the local cache.
"""
if jid is not None and not isinstance(jid, JID):
jid = JID(jid)
@ -94,15 +108,11 @@ class XEP_0012(BasePlugin):
log.debug("Looking up local last activity data for %s", jid)
return self.api['get_last_activity'](jid, None, ifrom, None)
iq = self.xmpp.Iq()
iq['from'] = ifrom
iq['to'] = jid
iq['type'] = 'get'
iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom)
iq.enable('last_activity')
return iq.send(timeout=timeout, callback=callback,
timeout_callback=timeout_callback)
return iq.send(**iqkwargs)
def _handle_get_last_activity(self, iq):
def _handle_get_last_activity(self, iq: Iq):
log.debug("Received last activity query from " + \
"<%s> to <%s>.", iq['from'], iq['to'])
reply = self.api['get_last_activity'](iq['to'], None, iq['from'], iq)
@ -112,7 +122,7 @@ class XEP_0012(BasePlugin):
# Default in-memory implementations for storing last activity data.
# =================================================================
def _default_set_last_activity(self, jid, node, ifrom, data):
def _default_set_last_activity(self, jid: JID, node: str, ifrom: JID, data: Dict):
seconds = data.get('seconds', None)
if seconds is None:
seconds = 0
@ -125,11 +135,11 @@ class XEP_0012(BasePlugin):
'seconds': datetime.now() - timedelta(seconds=seconds),
'status': status}
def _default_del_last_activity(self, jid, node, ifrom, data):
def _default_del_last_activity(self, jid: JID, node: str, ifrom: JID, data: Dict):
if jid in self._last_activities:
del self._last_activities[jid]
def _default_get_last_activity(self, jid, node, ifrom, iq):
def _default_get_last_activity(self, jid: JID, node: str, ifrom: JID, iq: Iq) -> Iq:
if not isinstance(iq, Iq):
reply = self.xmpp.Iq()
else:

View file

@ -7,12 +7,19 @@
"""
import logging
from typing import (
List,
Optional,
Union,
)
from asyncio import Future
from slixmpp import Iq
from slixmpp import JID
from slixmpp.stanza import Iq
from slixmpp.plugins import BasePlugin
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.xmlstream import register_stanza_plugin, ElementBase
from slixmpp.plugins.xep_0049 import stanza, PrivateXML
@ -32,26 +39,25 @@ class XEP_0049(BasePlugin):
def register(self, stanza):
register_stanza_plugin(PrivateXML, stanza, iterable=True)
def store(self, data, ifrom=None, timeout=None, callback=None,
timeout_callback=None):
iq = self.xmpp.Iq()
iq['type'] = 'set'
iq['from'] = ifrom
def store(self, data: Union[List[ElementBase], ElementBase], ifrom: Optional[JID] = None, **iqkwargs) -> Future:
"""Store data in Private XML Storage.
:param data: An XML element or list of xml element to store.
"""
iq = self.xmpp.make_iq_set(ifrom=ifrom)
if not isinstance(data, list):
data = [data]
for elem in data:
iq['private'].append(elem)
return iq.send(timeout=timeout, callback=callback,
timeout_callback=timeout_callback)
return iq.send(**iqkwargs)
def retrieve(self, name, ifrom=None, timeout=None, callback=None,
timeout_callback=None):
iq = self.xmpp.Iq()
iq['type'] = 'get'
iq['from'] = ifrom
def retrieve(self, name: str, ifrom: Optional[JID] = None, **iqkwargs) -> Future:
"""Get previously stored data from Private XML Storage.
:param name: Name of the payload to retrieve (slixmpp plugin attribute)
"""
iq = self.xmpp.make_iq_get(ifrom=ifrom)
iq['private'].enable(name)
return iq.send(timeout=timeout, callback=callback,
timeout_callback=timeout_callback)
return iq.send(**iqkwargs)

View file

@ -7,8 +7,11 @@
"""
import logging
from asyncio import Future
from typing import Optional
from slixmpp import JID, Iq
from slixmpp import JID
from slixmpp.stanza import Iq
from slixmpp.exceptions import XMPPError
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.xmlstream.handler import Callback
@ -57,12 +60,22 @@ class XEP_0054(BasePlugin):
def session_bind(self, jid):
self.xmpp['xep_0030'].add_feature('vcard-temp')
def make_vcard(self):
def make_vcard(self) -> VCardTemp:
"""Return an empty vcard element."""
return VCardTemp()
@future_wrapper
def get_vcard(self, jid=None, ifrom=None, local=None, cached=False,
callback=None, timeout=None, timeout_callback=None):
def get_vcard(self, jid: Optional[JID] = None, *,
local: Optional[bool] = None, cached: bool = False,
ifrom: Optional[JID] = None,
**iqkwargs) -> Future:
"""Retrieve a VCard.
:param jid: JID of the entity to fetch the VCard from.
:param local: Only check internally for a vcard.
:param cached: Whether to check in the local cache before
sending a query.
"""
if local is None:
if jid is not None and not isinstance(jid, JID):
jid = JID(jid)
@ -95,31 +108,28 @@ class XEP_0054(BasePlugin):
return iq
return vcard
iq = self.xmpp.Iq()
iq['to'] = jid
iq['from'] = ifrom
iq['type'] = 'get'
iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom)
iq.enable('vcard_temp')
return iq.send(callback=callback, timeout=timeout,
timeout_callback=timeout_callback)
return iq.send(**iqkwargs)
@future_wrapper
def publish_vcard(self, vcard=None, jid=None, ifrom=None,
callback=None, timeout=None, timeout_callback=None):
def publish_vcard(self, vcard: Optional[VCardTemp] = None,
jid: Optional[JID] = None,
ifrom: Optional[JID] = None, **iqkwargs) -> Future:
"""Publish a vcard.
:param vcard: The VCard to publish.
:param jid: The JID to publish the VCard to.
"""
self.api['set_vcard'](jid, None, ifrom, vcard)
if self.xmpp.is_component:
return
iq = self.xmpp.Iq()
iq['to'] = jid
iq['from'] = ifrom
iq['type'] = 'set'
iq = self.xmpp.make_iq_set(ito=jid, ifrom=ifrom)
iq.append(vcard)
return iq.send(callback=callback, timeout=timeout,
timeout_callback=timeout_callback)
return iq.send(**iqkwargs)
def _handle_get_vcard(self, iq):
def _handle_get_vcard(self, iq: Iq):
if iq['type'] == 'result':
self.api['set_vcard'](jid=iq['from'], args=iq['vcard_temp'])
return

View file

@ -6,15 +6,49 @@
See the file LICENSE for copying permission.
"""
from __future__ import annotations
import hashlib
import logging
from slixmpp import Iq
from asyncio import Future
from typing import (
Dict,
Iterable,
List,
Optional,
Set,
Union,
TYPE_CHECKING,
)
from slixmpp.stanza import Iq
from slixmpp.plugins import BasePlugin
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.xmlstream import register_stanza_plugin, JID
from slixmpp.plugins.xep_0084 import stanza, Data, MetaData
from slixmpp.plugins.xep_0084.stanza import Data, MetaData, Pointer
from slixmpp.plugins.xep_0084 import stanza
try:
from typing import TypedDict
except ImportError:
from typing_extensions import TypedDict
class AvatarMetadataItem(TypedDict, total=False):
bytes: int
id: str
type: str
height: int
width: int
url: str
MetadataItems = Union[
AvatarMetadataItem,
List[AvatarMetadataItem],
Set[AvatarMetadataItem]
]
log = logging.getLogger(__name__)
@ -41,32 +75,43 @@ class XEP_0084(BasePlugin):
def session_bind(self, jid):
self.xmpp['xep_0163'].register_pep('avatar_metadata', MetaData)
def generate_id(self, data):
def generate_id(self, data) -> str:
return hashlib.sha1(data).hexdigest()
def retrieve_avatar(self, jid, id, url=None, ifrom=None,
callback=None, timeout=None, timeout_callback=None):
return self.xmpp['xep_0060'].get_item(jid, Data.namespace, id,
ifrom=ifrom,
callback=callback,
timeout=timeout,
timeout_callback=timeout_callback)
def retrieve_avatar(self, jid: JID, id: str, **pubsubkwargs) -> Future:
"""Retrieve an avatar.
def publish_avatar(self, data, ifrom=None, callback=None,
timeout=None, timeout_callback=None):
:param jid: JID of the entity to get the avatar from.
:param id: Identifier of the item containing the avatar.
"""
return self.xmpp['xep_0060'].get_item(
jid,
Data.namespace,
id,
**pubsubkwargs
)
def publish_avatar(self, data: bytes, **pubsubkwargs) -> Future:
"""Publish an avatar.
:param data: The avatar, in bytes representation.
"""
payload = Data()
payload['value'] = data
return self.xmpp['xep_0163'].publish(payload,
id=self.generate_id(data),
ifrom=ifrom,
callback=callback,
timeout=timeout,
timeout_callback=timeout_callback)
return self.xmpp['xep_0163'].publish(
payload,
id=self.generate_id(data),
**pubsubkwargs
)
def publish_avatar_metadata(self, items=None, pointers=None,
ifrom=None,
callback=None, timeout=None,
timeout_callback=None):
def publish_avatar_metadata(self, items: Optional[MetadataItems] = None,
pointers: Optional[Iterable[Pointer]] = None,
**pubsubkwargs) -> Future:
"""Publish avatar metadata.
:param items: Metadata items to store
:param pointers: Optional pointers
"""
metadata = MetaData()
if items is None:
items = []
@ -82,21 +127,19 @@ class XEP_0084(BasePlugin):
for pointer in pointers:
metadata.add_pointer(pointer)
return self.xmpp['xep_0163'].publish(metadata,
id=info['id'],
ifrom=ifrom,
callback=callback,
timeout=timeout,
timeout_callback=timeout_callback)
return self.xmpp['xep_0163'].publish(
metadata,
id=info['id'],
**pubsubkwargs
)
def stop(self, ifrom=None, callback=None, timeout=None, timeout_callback=None):
def stop(self, **pubsubkwargs) -> Future:
"""
Clear existing avatar metadata information to stop notifications.
"""
metadata = MetaData()
return self.xmpp['xep_0163'].publish(metadata,
node=MetaData.namespace,
ifrom=ifrom,
callback=callback,
timeout=timeout,
timeout_callback=timeout_callback)
return self.xmpp['xep_0163'].publish(
metadata,
node=MetaData.namespace,
**pubsubkwargs
)

View file

@ -65,6 +65,35 @@ class Info(ElementBase):
plugin_multi_attrib = 'items'
interfaces = {'bytes', 'height', 'id', 'type', 'url', 'width'}
def _get_int(self, name: str) -> int:
try:
return int(self._get_attr(name))
except ValueError:
return 0
def _set_int(self, name: str, value: int):
if value not in ('', None):
int(value)
self._set_attr(name, value)
def get_bytes(self) -> int:
return self._get_int('bytes')
def _set_bytes(self, value: int):
self._set_int('bytes', value)
def get_height(self) -> int:
self._get_int('height')
def set_height(self, value: int):
self._set_int('height', value)
def get_width(self) -> int:
self._get_int(self, 'width')
def set_width(self, value: int):
self._set_int('with', value)
class Pointer(ElementBase):
name = 'pointer'

View file

@ -8,8 +8,12 @@
import logging
from asyncio import Future
from typing import Optional
import slixmpp
from slixmpp import Iq
from slixmpp import JID
from slixmpp.stanza import Iq
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
@ -57,12 +61,11 @@ class XEP_0092(BasePlugin):
def session_bind(self, jid):
self.xmpp.plugin['xep_0030'].add_feature('jabber:iq:version')
def _handle_version(self, iq):
def _handle_version(self, iq: Iq):
"""
Respond to a software version query.
Arguments:
iq -- The Iq stanza containing the software version query.
:param iq: The Iq stanza containing the software version query.
"""
iq = iq.reply()
if self.software_name:
@ -75,18 +78,12 @@ class XEP_0092(BasePlugin):
iq['error']['condition'] = 'service-unavailable'
iq.send()
def get_version(self, jid, ifrom=None, timeout=None, callback=None,
timeout_callback=None):
def get_version(self, jid: JID, ifrom: Optional[JID] = None, **iqkwargs) -> Future:
"""
Retrieve the software version of a remote agent.
Arguments:
jid -- The JID of the entity to query.
:param jid: The JID of the entity to query.
"""
iq = self.xmpp.Iq()
iq['to'] = jid
iq['from'] = ifrom
iq['type'] = 'get'
iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom)
iq['query'] = Version.namespace
return iq.send(timeout=timeout, callback=callback,
timeout_callback=timeout_callback)
return iq.send(**iqkwargs)

View file

@ -8,6 +8,8 @@
import logging
from asyncio import Future
from slixmpp import JID
from typing import Dict, List, Optional, Callable
from slixmpp.plugins.base import BasePlugin
@ -37,17 +39,12 @@ class XEP_0152(BasePlugin):
self.xmpp['xep_0163'].register_pep('reachability', Reachability)
def publish_reachability(self, addresses: List[Dict[str, str]],
options: Optional[Form] = None,
ifrom: Optional[JID] = None,
callback: Optional[Callable] = None,
timeout: Optional[int] = None,
timeout_callback: Optional[Callable] = None):
**pubsubkwargs) -> Future:
"""
Publish alternative addresses where the user can be reached.
:param addresses: A list of dictionaries containing the URI and
optional description for each address.
:param options: Optional form of publish options.
"""
if not isinstance(addresses, (list, tuple)):
addresses = [addresses]
@ -60,25 +57,19 @@ class XEP_0152(BasePlugin):
for key, val in address.items():
addr[key] = val
reach.append(addr)
return self.xmpp['xep_0163'].publish(reach,
node=Reachability.namespace,
options=options,
ifrom=ifrom,
callback=callback,
timeout=timeout,
timeout_callback=timeout_callback)
return self.xmpp['xep_0163'].publish(
reach,
node=Reachability.namespace,
**pubsubkwargs
)
def stop(self, ifrom: Optional[JID] = None,
callback: Optional[Callable] = None,
timeout: Optional[int] = None,
timeout_callback: Optional[Callable] = None):
def stop(self, **pubsubkwargs) -> Future:
"""
Clear existing user activity information to stop notifications.
"""
reach = Reachability()
return self.xmpp['xep_0163'].publish(reach,
node=Reachability.namespace,
ifrom=ifrom,
callback=callback,
timeout=timeout,
timeout_callback=timeout_callback)
return self.xmpp['xep_0163'].publish(
reach,
node=Reachability.namespace,
**pubsubkwargs
)

View file

@ -8,13 +8,19 @@
import hashlib
import logging
from asyncio import Future, ensure_future
from typing import (
Dict,
Optional,
)
from slixmpp import JID
from slixmpp.stanza import Presence
from slixmpp.exceptions import XMPPError, IqTimeout
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.xmlstream import register_stanza_plugin, ElementBase
from slixmpp.plugins.base import BasePlugin
from slixmpp.plugins.xep_0153 import stanza, VCardTempUpdate
from slixmpp import asyncio, future_wrapper
from slixmpp import future_wrapper
log = logging.getLogger(__name__)
@ -35,7 +41,6 @@ class XEP_0153(BasePlugin):
self.xmpp.add_filter('out', self._update_presence)
self.xmpp.add_event_handler('session_start', self._start)
self.xmpp.add_event_handler('session_end', self._end)
self.xmpp.add_event_handler('presence_available', self._recv_presence)
self.xmpp.add_event_handler('presence_dnd', self._recv_presence)
@ -58,45 +63,47 @@ class XEP_0153(BasePlugin):
self.xmpp.del_event_handler('presence_away', self._recv_presence)
@future_wrapper
def set_avatar(self, jid=None, avatar=None, mtype=None, timeout=None,
callback=None, timeout_callback=None):
def set_avatar(self, jid: Optional[JID] = None,
avatar: Optional[bytes] = None,
mtype: Optional[str] = None, **iqkwargs) -> Future:
"""Set a VCard avatar.
:param jid: The JID to set the avatar for.
:param avatar: Avatar content.
:param mtype: Avatar file type (e.g. image/jpeg).
"""
if jid is None:
jid = self.xmpp.boundjid.bare
future = asyncio.Future()
def propagate_timeout_exception(fut):
async def get_and_set_avatar():
timeout = iqkwargs.get('timeout', None)
timeout_cb = iqkwargs.get('timeout_callback', None)
try:
fut.done()
except IqTimeout as e:
future.set_exception(e)
def custom_callback(result):
result = await self.xmpp['xep_0054'].get_vcard(
jid,
cached=False,
timeout=timeout
)
except IqTimeout as exc:
if timeout_cb is not None:
timeout_cb(exc)
raise
vcard = result['vcard_temp']
vcard['PHOTO']['TYPE'] = mtype
vcard['PHOTO']['BINVAL'] = avatar
new_future = self.xmpp['xep_0054'].publish_vcard(jid=jid,
vcard=vcard,
timeout=timeout,
callback=next_callback,
timeout_callback=timeout_callback)
new_future.add_done_callback(propagate_timeout_exception)
try:
result = await self.xmpp['xep_0054'].publish_vcard(
jid=jid,
vcard=vcard,
**iqkwargs
)
except IqTimeout as exc:
timeout_cb(exc)
raise
self.api['reset_hash'](jid)
self.xmpp.roster[jid].send_last_presence()
def next_callback(result):
if result['type'] == 'error':
future.set_exception(result)
else:
self.api['reset_hash'](jid)
self.xmpp.roster[jid].send_last_presence()
future.set_result(result)
first_future = self.xmpp['xep_0054'].get_vcard(jid, cached=False, timeout=timeout,
callback=custom_callback,
timeout_callback=timeout_callback)
first_future.add_done_callback(propagate_timeout_exception)
return future
return ensure_future(get_and_set_avatar(), loop=self.xmpp.loop)
async def _start(self, event):
try:
@ -110,10 +117,7 @@ class XEP_0153(BasePlugin):
except XMPPError:
log.debug('Could not retrieve vCard for %s', self.xmpp.boundjid.bare)
def _end(self, event):
pass
def _update_presence(self, stanza):
def _update_presence(self, stanza: ElementBase) -> ElementBase:
if not isinstance(stanza, Presence):
return stanza
@ -124,7 +128,27 @@ class XEP_0153(BasePlugin):
stanza['vcard_temp_update']['photo'] = current_hash
return stanza
def _reset_hash(self, jid, node, ifrom, args):
def _recv_presence(self, pres: Presence):
try:
if pres.get_plugin('muc', check=True):
# Don't process vCard avatars for MUC occupants
# since they all share the same bare JID.
return
except:
pass
if not pres.match('presence/vcard_temp_update'):
self.api['set_hash'](pres['from'], args=None)
return
data = pres['vcard_temp_update']['photo']
if data is None:
return
self.xmpp.event('vcard_avatar_update', pres)
# =================================================================
def _reset_hash(self, jid: JID, node: str, ifrom: JID, args: Dict):
own_jid = (jid.bare == self.xmpp.boundjid.bare)
if self.xmpp.is_component:
own_jid = (jid.domain == self.xmpp.boundjid.domain)
@ -152,27 +176,8 @@ class XEP_0153(BasePlugin):
self.xmpp['xep_0054'].get_vcard(jid=jid.bare, ifrom=ifrom,
callback=callback)
def _recv_presence(self, pres):
try:
if pres['muc']['affiliation']:
# Don't process vCard avatars for MUC occupants
# since they all share the same bare JID.
return
except: pass
if not pres.match('presence/vcard_temp_update'):
self.api['set_hash'](pres['from'], args=None)
return
data = pres['vcard_temp_update']['photo']
if data is None:
return
self.xmpp.event('vcard_avatar_update', pres)
# =================================================================
def _get_hash(self, jid, node, ifrom, args):
def _get_hash(self, jid: JID, node: str, ifrom: JID, args: Dict):
return self._hashes.get(jid.bare, None)
def _set_hash(self, jid, node, ifrom, args):
def _set_hash(self, jid: JID, node: str, ifrom: JID, args: Dict):
self._hashes[jid.bare] = args

View file

@ -8,7 +8,15 @@
import logging
from slixmpp import Iq
from asyncio import Future
from typing import (
List,
Optional,
Set,
Union,
)
from slixmpp.stanza import Iq
from slixmpp.plugins import BasePlugin
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
@ -18,6 +26,12 @@ from slixmpp.plugins.xep_0191 import stanza, Block, Unblock, BlockList
log = logging.getLogger(__name__)
BlockedJIDs = Union[
JID,
Set[JID],
List[JID]
]
class XEP_0191(BasePlugin):
@ -45,42 +59,39 @@ class XEP_0191(BasePlugin):
self.xmpp.remove_handler('Blocked Contact')
self.xmpp.remove_handler('Unblocked Contact')
def get_blocked(self, ifrom=None, timeout=None, callback=None,
timeout_callback=None):
iq = self.xmpp.Iq()
iq['type'] = 'get'
iq['from'] = ifrom
def get_blocked(self, ifrom: Optional[JID] = None, **iqkwargs) -> Future:
"""Get the list of blocked JIDs."""
iq = self.xmpp.make_iq_get(ifrom=ifrom)
iq.enable('blocklist')
return iq.send(timeout=timeout, callback=callback,
timeout_callback=timeout_callback)
return iq.send(**iqkwargs)
def block(self, jids, ifrom=None, timeout=None, callback=None,
timeout_callback=None):
iq = self.xmpp.Iq()
iq['type'] = 'set'
iq['from'] = ifrom
def block(self, jids: BlockedJIDs,
ifrom: Optional[JID] = None, **iqkwargs) -> Future:
"""Block a JID or a list of JIDs.
:param jids: JID(s) to block.
"""
iq = self.xmpp.make_iq_set(ifrom=ifrom)
if not isinstance(jids, (set, list)):
jids = [jids]
iq['block']['items'] = jids
return iq.send(timeout=timeout, callback=callback,
timeout_callback=timeout_callback)
return iq.send(**iqkwargs)
def unblock(self, jids=None, ifrom=None, timeout=None, callback=None,
timeout_callback=None):
iq = self.xmpp.Iq()
iq['type'] = 'set'
iq['from'] = ifrom
def unblock(self, jids: BlockedJIDs, ifrom: Optional[JID] = None, **iqkwargs) -> Future:
"""Unblock a JID or a list of JIDs.
:param jids: JID(s) to unblock.
"""
if jids is None:
jids = []
raise ValueError("jids cannot be empty.")
iq = self.xmpp.make_iq_set(ifrom=ifrom)
if not isinstance(jids, (set, list)):
jids = [jids]
iq['unblock']['items'] = jids
return iq.send(timeout=timeout, callback=callback,
timeout_callback=timeout_callback)
return iq.send(**iqkwargs)
def _handle_blocked(self, iq):
self.xmpp.event('blocked', iq)

View file

@ -16,7 +16,9 @@ except ImportError:
# not usable.
from unittest import TestCase as IsolatedAsyncioTestCase
from typing import (
Dict,
List,
Optional,
)
from slixmpp import JID
@ -39,11 +41,14 @@ class SlixIntegration(IsolatedAsyncioTestCase):
"""get a str from an env var"""
return os.getenv(name)
def register_plugins(self, plugins: List[str]):
def register_plugins(self, plugins: List[str], configs: Optional[List[Dict]] = None):
"""Register plugins on all known clients"""
for plugin in plugins:
for index, plugin in enumerate(plugins):
for client in self.clients:
client.register_plugin(plugin)
if configs is not None:
client.register_plugin(plugin, pconfig=configs[index])
else:
client.register_plugin(plugin)
def add_client(self, jid: JID, password: str):
"""Register a new client"""