XEP-0152: Add type hints and switch to default args

This commit is contained in:
mathieui 2021-02-04 21:28:18 +01:00
parent 99c2e5cafd
commit 712ac671e1

View file

@ -8,6 +8,8 @@
import logging
from asyncio import Future
from slixmpp import JID
from typing import Dict, List, Optional, Callable
from slixmpp.plugins.base import BasePlugin
@ -37,17 +39,12 @@ class XEP_0152(BasePlugin):
self.xmpp['xep_0163'].register_pep('reachability', Reachability)
def publish_reachability(self, addresses: List[Dict[str, str]],
options: Optional[Form] = None,
ifrom: Optional[JID] = None,
callback: Optional[Callable] = None,
timeout: Optional[int] = None,
timeout_callback: Optional[Callable] = None):
**pubsubkwargs) -> Future:
"""
Publish alternative addresses where the user can be reached.
:param addresses: A list of dictionaries containing the URI and
optional description for each address.
:param options: Optional form of publish options.
"""
if not isinstance(addresses, (list, tuple)):
addresses = [addresses]
@ -60,25 +57,19 @@ class XEP_0152(BasePlugin):
for key, val in address.items():
addr[key] = val
reach.append(addr)
return self.xmpp['xep_0163'].publish(reach,
node=Reachability.namespace,
options=options,
ifrom=ifrom,
callback=callback,
timeout=timeout,
timeout_callback=timeout_callback)
return self.xmpp['xep_0163'].publish(
reach,
node=Reachability.namespace,
**pubsubkwargs
)
def stop(self, ifrom: Optional[JID] = None,
callback: Optional[Callable] = None,
timeout: Optional[int] = None,
timeout_callback: Optional[Callable] = None):
def stop(self, **pubsubkwargs) -> Future:
"""
Clear existing user activity information to stop notifications.
"""
reach = Reachability()
return self.xmpp['xep_0163'].publish(reach,
node=Reachability.namespace,
ifrom=ifrom,
callback=callback,
timeout=timeout,
timeout_callback=timeout_callback)
return self.xmpp['xep_0163'].publish(
reach,
node=Reachability.namespace,
**pubsubkwargs
)