diff --git a/slixmpp/plugins/xep_0030/static.py b/slixmpp/plugins/xep_0030/static.py index 1ae34148..6863830f 100644 --- a/slixmpp/plugins/xep_0030/static.py +++ b/slixmpp/plugins/xep_0030/static.py @@ -1,20 +1,46 @@ - # Slixmpp: The Slick XMPP Library # Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout # This file is part of Slixmpp. # See the file LICENSE for copying permission. +from __future__ import annotations + import logging -from slixmpp import Iq +from typing import ( + Optional, + Any, + Dict, + Tuple, + TYPE_CHECKING, + Union, + Collection, +) + +from slixmpp import BaseXMPP, JID +from slixmpp.stanza import Iq +from slixmpp.types import TypedDict, OptJidStr, OptJid from slixmpp.exceptions import XMPPError, IqError, IqTimeout -from slixmpp.xmlstream import JID from slixmpp.plugins.xep_0030 import DiscoInfo, DiscoItems log = logging.getLogger(__name__) +if TYPE_CHECKING: + from slixmpp.plugins.xep_0030 import XEP_0030 -class StaticDisco(object): + +class NodeType(TypedDict): + info: DiscoInfo + items: DiscoItems + + +NodesType = Dict[ + Tuple[str, str, str], + NodeType +] + + +class StaticDisco: """ While components will likely require fully dynamic handling @@ -25,75 +51,87 @@ class StaticDisco(object): StaticDisco provides a set of node handlers that will store static sets of disco info and items in memory. - Attributes: - nodes -- A dictionary mapping (JID, node) tuples to a dict - containing a disco#info and a disco#items stanza. - xmpp -- The main Slixmpp object. + :var nodes: A dictionary mapping (JID, node) tuples to a dict + containing a disco#info and a disco#items stanza. + :var xmpp: The main Slixmpp object. + :var disco: The instance of the XEP-0030 plugin. """ - def __init__(self, xmpp, disco): + def __init__(self, xmpp: 'BaseXMPP', disco: 'XEP_0030'): """ Create a static disco interface. Sets of disco#info and disco#items are maintained for every given JID and node combination. These stanzas are used to store disco information in memory without any additional processing. - Arguments: - xmpp -- The main Slixmpp object. + :param xmpp: The main Slixmpp object. + :param disco: The XEP-0030 plugin. """ - self.nodes = {} - self.xmpp = xmpp - self.disco = disco + self.nodes: NodesType = {} + self.xmpp: BaseXMPP = xmpp + self.disco: 'XEP_0030' = disco - def add_node(self, jid=None, node=None, ifrom=None): - """ - Create a new set of stanzas for the provided - JID and node combination. - - Arguments: - jid -- The JID that will own the new stanzas. - node -- The node that will own the new stanzas. - """ + def add_node(self, jid: OptJidStr = None, node: Optional[str] = None, + ifrom: OptJidStr = None) -> NodeType: if jid is None: - jid = self.xmpp.boundjid.full + node_jid = self.xmpp.boundjid.full + elif isinstance(jid, JID): + node_jid = jid.full + if ifrom is None: + node_ifrom = '' + elif isinstance(ifrom, JID): + node_ifrom = ifrom.full + else: + node_ifrom = ifrom + if node is None: + node = '' + if (node_jid, node, node_ifrom) not in self.nodes: + node_dict: NodeType = { + 'info': DiscoInfo(), + 'items': DiscoItems(), + } + node_dict['info']['node'] = node + node_dict['items']['node'] = node + self.nodes[(node_jid, node, node_ifrom)] = node_dict + return self.nodes[(node_jid, node, node_ifrom)] + + def get_node(self, jid: OptJidStr = None, node: Optional[str] = None, + ifrom: OptJidStr = None) -> NodeType: + if jid is None: + node_jid = self.xmpp.boundjid.full + elif isinstance(jid, JID): + node_jid = jid.full + else: + node_jid = jid if node is None: node = '' if ifrom is None: - ifrom = '' - if isinstance(ifrom, JID): - ifrom = ifrom.full - if (jid, node, ifrom) not in self.nodes: - new_node = {'info': DiscoInfo(), 'items': DiscoItems()} - new_node['info']['node'] = node - new_node['items']['node'] = node - self.nodes[(jid, node, ifrom)] = new_node - return self.nodes[(jid, node, ifrom)] + node_ifrom = '' + elif isinstance(ifrom, JID): + node_ifrom = ifrom.full + else: + node_ifrom = ifrom + if (node_jid, node, node_ifrom) not in self.nodes: + self.add_node(node_jid, node, node_ifrom) + return self.nodes[(node_jid, node, node_ifrom)] - def get_node(self, jid=None, node=None, ifrom=None): + def node_exists(self, jid: OptJidStr = None, node: Optional[str] = None, + ifrom: OptJidStr = None) -> bool: if jid is None: - jid = self.xmpp.boundjid.full + node_jid = self.xmpp.boundjid.full + elif isinstance(jid, JID): + node_jid = jid.full + else: + node_jid = jid if node is None: node = '' if ifrom is None: - ifrom = '' - if isinstance(ifrom, JID): - ifrom = ifrom.full - if (jid, node, ifrom) not in self.nodes: - self.add_node(jid, node, ifrom) - return self.nodes[(jid, node, ifrom)] - - def node_exists(self, jid=None, node=None, ifrom=None): - if jid is None: - jid = self.xmpp.boundjid.full - if node is None: - node = '' - if ifrom is None: - ifrom = '' - if isinstance(ifrom, JID): - ifrom = ifrom.full - if (jid, node, ifrom) not in self.nodes: - return False - return True + node_ifrom = '' + elif isinstance(ifrom, JID): + node_ifrom = ifrom.full + else: + node_ifrom = ifrom + return (node_jid, node, node_ifrom) in self.nodes # ================================================================= # Node Handlers @@ -109,18 +147,20 @@ class StaticDisco(object): # the requester's JID, except for cached results. To do that, # register a custom node handler. - async def supports(self, jid, node, ifrom, data): + async def supports(self, jid: OptJid, node: Optional[str], ifrom: OptJid, + data: Any) -> Optional[bool]: """ Check if a JID supports a given feature. The data parameter may provide: - feature -- The feature to check for support. - local -- If true, then the query is for a JID/node + + :param feature: The feature to check for support. + :param local: If true, then the query is for a JID/node combination handled by this Slixmpp instance and no stanzas need to be sent. Otherwise, a disco stanza must be sent to the remove JID to retrieve the info. - cached -- If true, then look for the disco info data from + :param cached: If true, then look for the disco info data from the local cache system. If no results are found, send the query as usual. The self.use_cache setting must be set to true for this option to @@ -147,26 +187,29 @@ class StaticDisco(object): except IqTimeout: return None - async def has_identity(self, jid, node, ifrom, data): + async def has_identity(self, jid: OptJid, node: Optional[str], + ifrom: OptJid, data: Dict[str, Any] + ) -> Optional[bool]: """ Check if a JID has a given identity. The data parameter may provide: - category -- The category of the identity to check. - itype -- The type of the identity to check. - lang -- The language of the identity to check. - local -- If true, then the query is for a JID/node - combination handled by this Slixmpp instance and - no stanzas need to be sent. - Otherwise, a disco stanza must be sent to the - remove JID to retrieve the info. - cached -- If true, then look for the disco info data from - the local cache system. If no results are found, - send the query as usual. The self.use_cache - setting must be set to true for this option to - be useful. If set to false, then the cache will - be skipped, even if a result has already been - cached. Defaults to false. + + :param category: The category of the identity to check. + :param itype: The type of the identity to check. + :param lang: The language of the identity to check. + :param local: If true, then the query is for a JID/node + combination handled by this Slixmpp instance and + no stanzas need to be sent. + Otherwise, a disco stanza must be sent to the + remove JID to retrieve the info. + :param cached: If true, then look for the disco info data from + the local cache system. If no results are found, + send the query as usual. The self.use_cache + setting must be set to true for this option to + be useful. If set to false, then the cache will + be skipped, even if a result has already been + cached. Defaults to false. """ identity = (data.get('category', None), data.get('itype', None), @@ -179,14 +222,17 @@ class StaticDisco(object): info = await self.disco.get_info(jid=jid, node=node, ifrom=ifrom, **data) info = self.disco._wrap(ifrom, jid, info, True) - trunc = lambda i: (i[0], i[1], i[2]) + + def trunc(i): + return (i[0], i[1], i[2]) return identity in map(trunc, info['disco_info']['identities']) except IqError: return False except IqTimeout: return None - def get_info(self, jid, node, ifrom, data): + def get_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid, + data: Any) -> Optional[DiscoInfo]: """ Return the stored info data for the requested JID/node combination. @@ -200,7 +246,8 @@ class StaticDisco(object): else: return self.get_node(jid, node)['info'] - def set_info(self, jid, node, ifrom, data): + def set_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid, + data: DiscoInfo): """ Set the entire info stanza for a JID/node at once. @@ -209,7 +256,8 @@ class StaticDisco(object): new_node = self.add_node(jid, node) new_node['info'] = data - def del_info(self, jid, node, ifrom, data): + def del_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid, + data: Any): """ Reset the info stanza for a given JID/node combination. @@ -218,7 +266,8 @@ class StaticDisco(object): if self.node_exists(jid, node): self.get_node(jid, node)['info'] = DiscoInfo() - def get_items(self, jid, node, ifrom, data): + def get_items(self, jid: OptJid, node: Optional[str], ifrom: OptJid, + data: Any) -> Optional[DiscoItems]: """ Return the stored items data for the requested JID/node combination. @@ -232,7 +281,8 @@ class StaticDisco(object): else: return self.get_node(jid, node)['items'] - def set_items(self, jid, node, ifrom, data): + def set_items(self, jid: OptJid, node: Optional[str], ifrom: OptJid, + data: Dict[str, Collection[Tuple]]): """ Replace the stored items data for a JID/node combination. @@ -243,7 +293,8 @@ class StaticDisco(object): new_node = self.add_node(jid, node) new_node['items']['items'] = items - def del_items(self, jid, node, ifrom, data): + def del_items(self, jid: OptJid, node: Optional[str], ifrom: OptJid, + data: Any): """ Reset the items stanza for a given JID/node combination. @@ -252,15 +303,17 @@ class StaticDisco(object): if self.node_exists(jid, node): self.get_node(jid, node)['items'] = DiscoItems() - def add_identity(self, jid, node, ifrom, data): + def add_identity(self, jid: OptJid, node: Optional[str], ifrom: OptJid, + data: Dict[str, Optional[str]]): """ Add a new identity to the JID/node combination. The data parameter may provide: - category -- The general category to which the agent belongs. - itype -- A more specific designation with the category. - name -- Optional human readable name for this identity. - lang -- Optional standard xml:lang value. + + :param category: The general category to which the agent belongs. + :param itype: A more specific designation with the category. + :param name: Optional human readable name for this identity. + :param lang: Optional standard xml:lang value. """ new_node = self.add_node(jid, node) new_node['info'].add_identity( @@ -269,27 +322,31 @@ class StaticDisco(object): data.get('name', None), data.get('lang', None)) - def set_identities(self, jid, node, ifrom, data): + def set_identities(self, jid: OptJid, node: Optional[str], ifrom: OptJid, + data: Dict[str, Collection[str]]): """ Add or replace all identities for a JID/node combination. The data parameter should include: - identities -- A list of identities in tuple form: - (category, type, name, lang) + + :param identities: A list of identities in tuple form: + (category, type, name, lang) """ identities = data.get('identities', set()) new_node = self.add_node(jid, node) new_node['info']['identities'] = identities - def del_identity(self, jid, node, ifrom, data): + def del_identity(self, jid: OptJid, node: Optional[str], ifrom: OptJid, + data: Dict[str, Optional[str]]): """ Remove an identity from a JID/node combination. The data parameter may provide: - category -- The general category to which the agent belonged. - itype -- A more specific designation with the category. - name -- Optional human readable name for this identity. - lang -- Optional, standard xml:lang value. + + :param category: The general category to which the agent belonged. + :param itype: A more specific designation with the category. + :param name: Optional human readable name for this identity. + :param lang: Optional, standard xml:lang value. """ if self.node_exists(jid, node): self.get_node(jid, node)['info'].del_identity( @@ -298,7 +355,8 @@ class StaticDisco(object): data.get('name', None), data.get('lang', None)) - def del_identities(self, jid, node, ifrom, data): + def del_identities(self, jid: OptJid, node: Optional[str], ifrom: OptJid, + data: Any): """ Remove all identities from a JID/node combination. @@ -307,40 +365,47 @@ class StaticDisco(object): if self.node_exists(jid, node): del self.get_node(jid, node)['info']['identities'] - def add_feature(self, jid, node, ifrom, data): + def add_feature(self, jid: OptJid, node: Optional[str], ifrom: OptJid, + data: Dict[str, str]): """ Add a feature to a JID/node combination. The data parameter should include: - feature -- The namespace of the supported feature. + + :param feature: The namespace of the supported feature. """ new_node = self.add_node(jid, node) new_node['info'].add_feature( data.get('feature', '')) - def set_features(self, jid, node, ifrom, data): + def set_features(self, jid: OptJid, node: Optional[str], ifrom: OptJid, + data: Dict[str, Collection[str]]): """ Add or replace all features for a JID/node combination. The data parameter should include: - features -- The new set of supported features. + + :param features: The new set of supported features. """ features = data.get('features', set()) new_node = self.add_node(jid, node) new_node['info']['features'] = features - def del_feature(self, jid, node, ifrom, data): + def del_feature(self, jid: OptJid, node: Optional[str], ifrom: OptJid, + data: Dict[str, str]): """ Remove a feature from a JID/node combination. The data parameter should include: - feature -- The namespace of the removed feature. + + :param feature: The namespace of the removed feature. """ if self.node_exists(jid, node): self.get_node(jid, node)['info'].del_feature( data.get('feature', '')) - def del_features(self, jid, node, ifrom, data): + def del_features(self, jid: OptJid, node: Optional[str], ifrom: OptJid, + data: Any): """ Remove all features from a JID/node combination. @@ -350,15 +415,17 @@ class StaticDisco(object): return del self.get_node(jid, node)['info']['features'] - def add_item(self, jid, node, ifrom, data): + def add_item(self, jid: OptJid, node: Optional[str], ifrom: OptJid, + data: Dict[str, str]): """ Add an item to a JID/node combination. The data parameter may include: - ijid -- The JID for the item. - inode -- Optional additional information to reference - non-addressable items. - name -- Optional human readable name for the item. + + :param ijid: The JID for the item. + :param inode: Optional additional information to reference + non-addressable items. + :param name: Optional human readable name for the item. """ new_node = self.add_node(jid, node) new_node['items'].add_item( @@ -366,20 +433,23 @@ class StaticDisco(object): node=data.get('inode', ''), name=data.get('name', '')) - def del_item(self, jid, node, ifrom, data): + def del_item(self, jid: OptJid, node: Optional[str], ifrom: OptJid, + data: Dict[str, str]): """ Remove an item from a JID/node combination. The data parameter may include: - ijid -- JID of the item to remove. - inode -- Optional extra identifying information. + + :param ijid: JID of the item to remove. + :param inode: Optional extra identifying information. """ if self.node_exists(jid, node): self.get_node(jid, node)['items'].del_item( data.get('ijid', ''), node=data.get('inode', None)) - def cache_info(self, jid, node, ifrom, data): + def cache_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid, + data: Union[Iq, DiscoInfo]): """ Cache disco information for an external JID. @@ -388,12 +458,15 @@ class StaticDisco(object): the disco#info substanza itself. """ if isinstance(data, Iq): - data = data['disco_info'] + info = data['disco_info'] + else: + info = data new_node = self.add_node(jid, node, ifrom) - new_node['info'] = data + new_node['info'] = info - def get_cached_info(self, jid, node, ifrom, data): + def get_cached_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid, + data: Any) -> Optional[DiscoInfo]: """ Retrieve cached disco info data. @@ -401,5 +474,4 @@ class StaticDisco(object): """ if not self.node_exists(jid, node, ifrom): return None - else: - return self.get_node(jid, node, ifrom)['info'] + return self.get_node(jid, node, ifrom)['info']