From 62e66e7d03048608fdf5fbedf25aad2887ce6315 Mon Sep 17 00:00:00 2001 From: mathieui Date: Sat, 24 Apr 2021 22:44:41 +0200 Subject: [PATCH] stanzabase: types --- slixmpp/xmlstream/stanzabase.py | 370 +++++++++++++++++++------------- 1 file changed, 219 insertions(+), 151 deletions(-) diff --git a/slixmpp/xmlstream/stanzabase.py b/slixmpp/xmlstream/stanzabase.py index 7679f73a..2f2faa8d 100644 --- a/slixmpp/xmlstream/stanzabase.py +++ b/slixmpp/xmlstream/stanzabase.py @@ -1,4 +1,3 @@ - # slixmpp.xmlstream.stanzabase # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # module implements a wrapper layer for XML objects @@ -11,13 +10,34 @@ from __future__ import annotations import copy import logging import weakref -from typing import Optional +from typing import ( + cast, + Any, + Callable, + ClassVar, + Coroutine, + Dict, + List, + Iterable, + Optional, + Set, + Tuple, + Type, + TYPE_CHECKING, + Union, +) +from weakref import ReferenceType from xml.etree import ElementTree as ET +from slixmpp.types import JidStr from slixmpp.xmlstream import JID from slixmpp.xmlstream.tostring import tostring +if TYPE_CHECKING: + from slixmpp.xmlstream import XMLStream + + log = logging.getLogger(__name__) @@ -28,7 +48,8 @@ XML_TYPE = type(ET.Element('xml')) XML_NS = 'http://www.w3.org/XML/1998/namespace' -def register_stanza_plugin(stanza, plugin, iterable=False, overrides=False): +def register_stanza_plugin(stanza: Type[ElementBase], plugin: Type[ElementBase], + iterable: bool = False, overrides: bool = False) -> None: """ Associate a stanza object as a plugin for another stanza. @@ -85,15 +106,15 @@ def register_stanza_plugin(stanza, plugin, iterable=False, overrides=False): stanza.plugin_overrides[interface] = plugin.plugin_attrib -def multifactory(stanza, plugin_attrib): +def multifactory(stanza: Type[ElementBase], plugin_attrib: str) -> Type[ElementBase]: """ Returns a ElementBase class for handling reoccuring child stanzas """ - def plugin_filter(self): + def plugin_filter(self: Multi) -> Callable[..., bool]: return lambda x: isinstance(x, self._multistanza) - def plugin_lang_filter(self, lang): + def plugin_lang_filter(self: Multi, lang: Optional[str]) -> Callable[..., bool]: return lambda x: isinstance(x, self._multistanza) and \ x['lang'] == lang @@ -101,31 +122,41 @@ def multifactory(stanza, plugin_attrib): """ Template class for multifactory """ - def setup(self, xml=None): - self.xml = ET.Element('') + _multistanza: Type[ElementBase] - def get_multi(self, lang=None): - parent = self.parent() + def setup(self, xml: Optional[ET.Element] = None) -> bool: + self.xml = ET.Element('') + return False + + def get_multi(self: Multi, lang: Optional[str] = None) -> List[ElementBase]: + parent = fail_without_parent(self) if not lang or lang == '*': res = filter(plugin_filter(self), parent) else: - res = filter(plugin_filter(self, lang), parent) + res = filter(plugin_lang_filter(self, lang), parent) return list(res) - def set_multi(self, val, lang=None): - parent = self.parent() + def set_multi(self: Multi, val: Iterable[ElementBase], lang: Optional[str] = None) -> None: + parent = fail_without_parent(self) del_multi = getattr(self, 'del_%s' % plugin_attrib) del_multi(lang) for sub in val: parent.append(sub) - def del_multi(self, lang=None): - parent = self.parent() + def fail_without_parent(self: Multi) -> ElementBase: + parent = None + if self.parent: + parent = self.parent() + if not parent: + raise ValueError('No stanza parent for multifactory') + return parent + + def del_multi(self: Multi, lang: Optional[str] = None) -> None: + parent = fail_without_parent(self) if not lang or lang == '*': - res = filter(plugin_filter(self), parent) + res = list(filter(plugin_filter(self), parent)) else: - res = filter(plugin_filter(self, lang), parent) - res = list(res) + res = list(filter(plugin_lang_filter(self, lang), parent)) if not res: del parent.plugins[(plugin_attrib, None)] parent.loaded_plugins.remove(plugin_attrib) @@ -149,7 +180,8 @@ def multifactory(stanza, plugin_attrib): return Multi -def fix_ns(xpath, split=False, propagate_ns=True, default_ns=''): +def fix_ns(xpath: str, split: bool = False, propagate_ns: bool = True, + default_ns: str = '') -> Union[str, List[str]]: """Apply the stanza's namespace to elements in an XPath expression. :param string xpath: The XPath expression to fix with namespaces. @@ -275,12 +307,12 @@ class ElementBase(object): #: The XML tag name of the element, not including any namespace #: prefixes. For example, an :class:`ElementBase` object for #: ```` would use ``name = 'message'``. - name = 'stanza' + name: ClassVar[str] = 'stanza' #: The XML namespace for the element. Given ````, #: then ``namespace = "bar"`` should be used. The default namespace #: is ``jabber:client`` since this is being used in an XMPP library. - namespace = 'jabber:client' + namespace: str = 'jabber:client' #: For :class:`ElementBase` subclasses which are intended to be used #: as plugins, the ``plugin_attrib`` value defines the plugin name. @@ -290,7 +322,7 @@ class ElementBase(object): #: register_stanza_plugin(Message, FooPlugin) #: msg = Message() #: msg['foo']['an_interface_from_the_foo_plugin'] - plugin_attrib = 'plugin' + plugin_attrib: ClassVar[str] = 'plugin' #: For :class:`ElementBase` subclasses that are intended to be an #: iterable group of items, the ``plugin_multi_attrib`` value defines @@ -300,29 +332,29 @@ class ElementBase(object): #: # Given stanza class Foo, with plugin_multi_attrib = 'foos' #: parent['foos'] #: filter(isinstance(item, Foo), parent['substanzas']) - plugin_multi_attrib = '' + plugin_multi_attrib: ClassVar[str] = '' #: The set of keys that the stanza provides for accessing and #: manipulating the underlying XML object. This set may be augmented #: with the :attr:`plugin_attrib` value of any registered #: stanza plugins. - interfaces = {'type', 'to', 'from', 'id', 'payload'} + interfaces: ClassVar[Set[str]] = {'type', 'to', 'from', 'id', 'payload'} #: A subset of :attr:`interfaces` which maps interfaces to direct #: subelements of the underlying XML object. Using this set, the text #: of these subelements may be set, retrieved, or removed without #: needing to define custom methods. - sub_interfaces = set() + sub_interfaces: ClassVar[Set[str]] = set() #: A subset of :attr:`interfaces` which maps the presence of #: subelements to boolean values. Using this set allows for quickly #: checking for the existence of empty subelements like ````. #: #: .. versionadded:: 1.1 - bool_interfaces = set() + bool_interfaces: ClassVar[Set[str]] = set() #: .. versionadded:: 1.1.2 - lang_interfaces = set() + lang_interfaces: ClassVar[Set[str]] = set() #: In some cases you may wish to override the behaviour of one of the #: parent stanza's interfaces. The ``overrides`` list specifies the @@ -336,7 +368,7 @@ class ElementBase(object): #: be affected. #: #: .. versionadded:: 1.0-Beta5 - overrides = [] + overrides: ClassVar[List[str]] = [] #: If you need to add a new interface to an existing stanza, you #: can create a plugin and set ``is_extension = True``. Be sure @@ -346,7 +378,7 @@ class ElementBase(object): #: parent stanza will be passed to the plugin directly. #: #: .. versionadded:: 1.0-Beta5 - is_extension = False + is_extension: ClassVar[bool] = False #: A map of interface operations to the overriding functions. #: For example, after overriding the ``set`` operation for @@ -355,15 +387,15 @@ class ElementBase(object): #: {'set_body': } #: #: .. versionadded: 1.0-Beta5 - plugin_overrides = {} + plugin_overrides: ClassVar[Dict[str, str]] = {} #: A mapping of the :attr:`plugin_attrib` values of registered #: plugins to their respective classes. - plugin_attrib_map = {} + plugin_attrib_map: ClassVar[Dict[str, Type[ElementBase]]] = {} #: A mapping of root element tag names (in ``'{namespace}elementname'`` #: format) to the plugin classes responsible for them. - plugin_tag_map = {} + plugin_tag_map: ClassVar[Dict[str, Type[ElementBase]]] = {} #: The set of stanza classes that can be iterated over using #: the 'substanzas' interface. Classes are added to this set @@ -372,17 +404,26 @@ class ElementBase(object): #: register_stanza_plugin(DiscoInfo, DiscoItem, iterable=True) #: #: .. versionadded:: 1.0-Beta5 - plugin_iterables = set() + plugin_iterables: ClassVar[Set[Type[ElementBase]]] = set() #: The default XML namespace: ``http://www.w3.org/XML/1998/namespace``. - xml_ns = XML_NS + xml_ns: ClassVar[str] = XML_NS - def __init__(self, xml=None, parent=None): + plugins: Dict[Tuple[str, Optional[str]], ElementBase] + #: The underlying XML object for the stanza. It is a standard + #: :class:`xml.etree.ElementTree` object. + xml: ET.Element + _index: int + loaded_plugins: Set[str] + iterables: List[ElementBase] + tag: str + parent: Optional[ReferenceType[ElementBase]] + + def __init__(self, xml: Optional[ET.Element] = None, parent: Union[Optional[ElementBase], ReferenceType[ElementBase]] = None): self._index = 0 - #: The underlying XML object for the stanza. It is a standard - #: :class:`xml.etree.ElementTree` object. - self.xml = xml + if xml is not None: + self.xml = xml #: An ordered dictionary of plugin stanzas, mapped by their #: :attr:`plugin_attrib` value. @@ -419,7 +460,7 @@ class ElementBase(object): existing_xml=child, reuse=False) - def setup(self, xml=None): + def setup(self, xml: Optional[ET.Element] = None) -> bool: """Initialize the stanza's XML contents. Will return ``True`` if XML was generated according to the stanza's @@ -429,29 +470,31 @@ class ElementBase(object): :param xml: An existing XML object to use for the stanza's content instead of generating new XML. """ - if self.xml is None: + if hasattr(self, 'xml'): + return False + if not hasattr(self, 'xml') and xml is not None: self.xml = xml - - last_xml = self.xml - if self.xml is None: - # Generate XML from the stanza definition - for ename in self.name.split('/'): - new = ET.Element("{%s}%s" % (self.namespace, ename)) - if self.xml is None: - self.xml = new - else: - last_xml.append(new) - last_xml = new - if self.parent is not None: - self.parent().xml.append(self.xml) - - # We had to generate XML - return True - else: - # We did not generate XML return False - def enable(self, attrib, lang=None): + + # Generate XML from the stanza definition + last_xml = ET.Element('') + for ename in self.name.split('/'): + new = ET.Element("{%s}%s" % (self.namespace, ename)) + if not hasattr(self, 'xml'): + self.xml = new + else: + last_xml.append(new) + last_xml = new + if self.parent is not None: + parent = self.parent() + if parent: + parent.xml.append(self.xml) + + # We had to generate XML + return True + + def enable(self, attrib: str, lang: Optional[str] = None) -> ElementBase: """Enable and initialize a stanza plugin. Alias for :meth:`init_plugin`. @@ -487,7 +530,10 @@ class ElementBase(object): else: return None if check else self.init_plugin(name, lang) - def init_plugin(self, attrib, lang=None, existing_xml=None, element=None, reuse=True): + def init_plugin(self, attrib: str, lang: Optional[str] = None, + existing_xml: Optional[ET.Element] = None, + reuse: bool = True, + element: Optional[ElementBase] = None) -> ElementBase: """Enable and initialize a stanza plugin. :param string attrib: The :attr:`plugin_attrib` value of the @@ -525,7 +571,7 @@ class ElementBase(object): return plugin - def _get_stanza_values(self): + def _get_stanza_values(self) -> Dict[str, Any]: """Return A JSON/dictionary version of the XML content exposed through the stanza's interfaces:: @@ -567,7 +613,7 @@ class ElementBase(object): values['substanzas'] = iterables return values - def _set_stanza_values(self, values): + def _set_stanza_values(self, values: Dict[str, Any]) -> ElementBase: """Set multiple stanza interface values using a dictionary. Stanza plugin values may be set using nested dictionaries. @@ -623,7 +669,7 @@ class ElementBase(object): plugin.values = value return self - def __getitem__(self, full_attrib): + def __getitem__(self, full_attrib: str) -> Any: """Return the value of a stanza interface using dict-like syntax. Example:: @@ -688,7 +734,7 @@ class ElementBase(object): else: return '' - def __setitem__(self, attrib, value): + def __setitem__(self, attrib: str, value: Any) -> Any: """Set the value of a stanza interface using dictionary-like syntax. Example:: @@ -773,7 +819,7 @@ class ElementBase(object): plugin[full_attrib] = value return self - def __delitem__(self, attrib): + def __delitem__(self, attrib: str) -> Any: """Delete the value of a stanza interface using dict-like syntax. Example:: @@ -851,7 +897,7 @@ class ElementBase(object): pass return self - def _set_attr(self, name, value): + def _set_attr(self, name: str, value: Optional[JidStr]) -> None: """Set the value of a top level attribute of the XML object. If the new value is None or an empty string, then the attribute will @@ -868,7 +914,7 @@ class ElementBase(object): value = str(value) self.xml.attrib[name] = value - def _del_attr(self, name): + def _del_attr(self, name: str) -> None: """Remove a top level attribute of the XML object. :param name: The name of the attribute. @@ -876,7 +922,7 @@ class ElementBase(object): if name in self.xml.attrib: del self.xml.attrib[name] - def _get_attr(self, name, default=''): + def _get_attr(self, name: str, default: str = '') -> str: """Return the value of a top level attribute of the XML object. In case the attribute has not been set, a default value can be @@ -889,7 +935,8 @@ class ElementBase(object): """ return self.xml.attrib.get(name, default) - def _get_sub_text(self, name, default='', lang=None): + def _get_sub_text(self, name: str, default: str = '', + lang: Optional[str] = None) -> Union[str, Dict[str, str]]: """Return the text contents of a sub element. In case the element does not exist, or it has no textual content, @@ -900,7 +947,7 @@ class ElementBase(object): :param default: Optional default to return if the element does not exists. An empty string is returned otherwise. """ - name = self._fix_ns(name) + name = cast(str, self._fix_ns(name)) if lang == '*': return self._get_all_sub_text(name, default, None) @@ -924,8 +971,9 @@ class ElementBase(object): return result return default - def _get_all_sub_text(self, name, default='', lang=None): - name = self._fix_ns(name) + def _get_all_sub_text(self, name: str, default: str = '', + lang: Optional[str] = None) -> Dict[str, str]: + name = cast(str, self._fix_ns(name)) default_lang = self.get_lang() results = {} @@ -935,10 +983,16 @@ class ElementBase(object): stanza_lang = stanza.attrib.get('{%s}lang' % XML_NS, default_lang) if not lang or lang == '*' or stanza_lang == lang: - results[stanza_lang] = stanza.text + if stanza.text is None: + text = default + else: + text = stanza.text + results[stanza_lang] = text return results - def _set_sub_text(self, name, text=None, keep=False, lang=None): + def _set_sub_text(self, name: str, text: Optional[str] = None, + keep: bool = False, + lang: Optional[str] = None) -> Optional[ET.Element]: """Set the text contents of a sub element. In case the element does not exist, a element will be created, @@ -959,15 +1013,16 @@ class ElementBase(object): lang = default_lang if not text and not keep: - return self._del_sub(name, lang=lang) + self._del_sub(name, lang=lang) + return None - path = self._fix_ns(name, split=True) + path = cast(List[str], self._fix_ns(name, split=True)) name = path[-1] - parent = self.xml + parent: Optional[ET.Element] = self.xml # The first goal is to find the parent of the subelement, or, if # we can't find that, the closest grandparent element. - missing_path = [] + missing_path: List[str] = [] search_order = path[:-1] while search_order: parent = self.xml.find('/'.join(search_order)) @@ -1008,15 +1063,17 @@ class ElementBase(object): parent.append(element) return element - def _set_all_sub_text(self, name, values, keep=False, lang=None): - self._del_sub(name, lang) + def _set_all_sub_text(self, name: str, values: Dict[str, str], + keep: bool = False, + lang: Optional[str] = None) -> None: + self._del_sub(name, lang=lang) for value_lang, value in values.items(): if not lang or lang == '*' or value_lang == lang: self._set_sub_text(name, text=value, keep=keep, lang=value_lang) - def _del_sub(self, name, all=False, lang=None): + def _del_sub(self, name: str, all: bool = False, lang: Optional[str] = None) -> None: """Remove sub elements that match the given name or XPath. If the element is in a path, then any parent elements that become @@ -1034,11 +1091,11 @@ class ElementBase(object): if not lang: lang = default_lang - parent = self.xml + parent: Optional[ET.Element] = self.xml for level, _ in enumerate(path): # Generate the paths to the target elements and their parent. element_path = "/".join(path[:len(path) - level]) - parent_path = "/".join(path[:len(path) - level - 1]) + parent_path: Optional[str] = "/".join(path[:len(path) - level - 1]) elements = self.xml.findall(element_path) if parent_path == '': @@ -1061,7 +1118,7 @@ class ElementBase(object): # after deleting the first level of elements. return - def match(self, xpath): + def match(self, xpath: Union[str, List[str]]) -> bool: """Compare a stanza object with an XPath-like expression. If the XPath matches the contents of the stanza object, the match @@ -1127,7 +1184,7 @@ class ElementBase(object): # Everything matched. return True - def get(self, key, default=None): + def get(self, key: str, default: Optional[Any] = None) -> Any: """Return the value of a stanza interface. If the found value is None or an empty string, return the supplied @@ -1144,7 +1201,7 @@ class ElementBase(object): return default return value - def keys(self): + def keys(self) -> List[str]: """Return the names of all stanza interfaces provided by the stanza object. @@ -1158,7 +1215,7 @@ class ElementBase(object): out.append('substanzas') return out - def append(self, item): + def append(self, item: Union[ET.Element, ElementBase]) -> ElementBase: """Append either an XML object or a substanza to this stanza object. If a substanza object is appended, it will be added to the list @@ -1189,7 +1246,7 @@ class ElementBase(object): return self - def appendxml(self, xml): + def appendxml(self, xml: ET.Element) -> ElementBase: """Append an XML object to the stanza's XML. The added XML will not be included in the list of @@ -1200,7 +1257,7 @@ class ElementBase(object): self.xml.append(xml) return self - def pop(self, index=0): + def pop(self, index: int = 0) -> ElementBase: """Remove and return the last substanza in the list of iterable substanzas. @@ -1212,11 +1269,11 @@ class ElementBase(object): self.xml.remove(substanza.xml) return substanza - def next(self): + def next(self) -> ElementBase: """Return the next iterable substanza.""" return self.__next__() - def clear(self): + def clear(self) -> ElementBase: """Remove all XML element contents and plugins. Any attribute values will be preserved. @@ -1229,7 +1286,7 @@ class ElementBase(object): return self @classmethod - def tag_name(cls): + def tag_name(cls) -> str: """Return the namespaced name of the stanza's root element. The format for the tag name is:: @@ -1241,29 +1298,32 @@ class ElementBase(object): """ return "{%s}%s" % (cls.namespace, cls.name) - def get_lang(self, lang=None): + def get_lang(self, lang: Optional[str] = None) -> str: result = self.xml.attrib.get('{%s}lang' % XML_NS, '') - if not result and self.parent and self.parent(): - return self.parent()['lang'] + if not result and self.parent: + parent = self.parent() + if parent: + return cast(str, parent['lang']) return result - def set_lang(self, lang): + def set_lang(self, lang: Optional[str]) -> None: self.del_lang() attr = '{%s}lang' % XML_NS if lang: self.xml.attrib[attr] = lang - def del_lang(self): + def del_lang(self) -> None: attr = '{%s}lang' % XML_NS if attr in self.xml.attrib: del self.xml.attrib[attr] - def _fix_ns(self, xpath, split=False, propagate_ns=True): + def _fix_ns(self, xpath: str, split: bool = False, + propagate_ns: bool = True) -> Union[str, List[str]]: return fix_ns(xpath, split=split, propagate_ns=propagate_ns, default_ns=self.namespace) - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: """Compare the stanza object with another to test for equality. Stanzas are equal if their interfaces return the same values, @@ -1290,7 +1350,7 @@ class ElementBase(object): # must be equal. return True - def __ne__(self, other): + def __ne__(self, other: Any) -> bool: """Compare the stanza object with another to test for inequality. Stanzas are not equal if their interfaces return different values, @@ -1300,16 +1360,16 @@ class ElementBase(object): """ return not self.__eq__(other) - def __bool__(self): + def __bool__(self) -> bool: """Stanza objects should be treated as True in boolean contexts. """ return True - def __len__(self): + def __len__(self) -> int: """Return the number of iterable substanzas in this stanza.""" return len(self.iterables) - def __iter__(self): + def __iter__(self) -> ElementBase: """Return an iterator object for the stanza's substanzas. The iterator is the stanza object itself. Attempting to use two @@ -1318,7 +1378,7 @@ class ElementBase(object): self._index = 0 return self - def __next__(self): + def __next__(self) -> ElementBase: """Return the next iterable substanza.""" self._index += 1 if self._index > len(self.iterables): @@ -1326,13 +1386,16 @@ class ElementBase(object): raise StopIteration return self.iterables[self._index - 1] - def __copy__(self): + def __copy__(self) -> ElementBase: """Return a copy of the stanza object that does not share the same underlying XML object. """ - return self.__class__(xml=copy.deepcopy(self.xml), parent=self.parent) + return self.__class__( + xml=copy.deepcopy(self.xml), + parent=self.parent, + ) - def __str__(self, top_level_ns=True): + def __str__(self, top_level_ns: bool = True) -> str: """Return a string serialization of the underlying XML object. .. seealso:: :ref:`tostring` @@ -1343,12 +1406,33 @@ class ElementBase(object): return tostring(self.xml, xmlns='', top_level=True) - def __repr__(self): + def __repr__(self) -> str: """Use the stanza's serialized XML as its representation.""" return self.__str__() # Compatibility. _get_plugin = get_plugin + get_stanza_values = _get_stanza_values + set_stanza_values = _set_stanza_values + + #: A JSON/dictionary version of the XML content exposed through + #: the stanza interfaces:: + #: + #: >>> msg = Message() + #: >>> msg.values + #: {'body': '', 'from': , 'mucnick': '', 'mucroom': '', + #: 'to': , 'type': 'normal', 'id': '', 'subject': ''} + #: + #: Likewise, assigning to the :attr:`values` will change the XML + #: content:: + #: + #: >>> msg = Message() + #: >>> msg.values = {'body': 'Hi!', 'to': 'user@example.com'} + #: >>> msg + #: 'Hi!' + #: + #: Child stanzas are exposed as nested dictionaries. + values = property(_get_stanza_values, _set_stanza_values) # type: ignore class StanzaBase(ElementBase): @@ -1386,9 +1470,14 @@ class StanzaBase(ElementBase): #: The default XMPP client namespace namespace = 'jabber:client' + types: ClassVar[Set[str]] = set() - def __init__(self, stream=None, xml=None, stype=None, - sto=None, sfrom=None, sid=None, parent=None, recv=False): + def __init__(self, stream: Optional[XMLStream] = None, + xml: Optional[ET.Element] = None, + stype: Optional[str] = None, + sto: Optional[JidStr] = None, sfrom: Optional[JidStr] = None, + sid: Optional[str] = None, + parent: Optional[ElementBase] = None, recv: bool = False): self.stream = stream if stream is not None: self.namespace = stream.default_ns @@ -1403,7 +1492,7 @@ class StanzaBase(ElementBase): self['id'] = sid self.tag = "{%s}%s" % (self.namespace, self.name) - def set_type(self, value): + def set_type(self, value: str) -> StanzaBase: """Set the stanza's ``'type'`` attribute. Only type values contained in :attr:`types` are accepted. @@ -1414,11 +1503,11 @@ class StanzaBase(ElementBase): self.xml.attrib['type'] = value return self - def get_to(self): + def get_to(self) -> JID: """Return the value of the stanza's ``'to'`` attribute.""" return JID(self._get_attr('to')) - def set_to(self, value): + def set_to(self, value: JidStr) -> None: """Set the ``'to'`` attribute of the stanza. :param value: A string or :class:`slixmpp.xmlstream.JID` object @@ -1426,11 +1515,11 @@ class StanzaBase(ElementBase): """ return self._set_attr('to', str(value)) - def get_from(self): + def get_from(self) -> JID: """Return the value of the stanza's ``'from'`` attribute.""" return JID(self._get_attr('from')) - def set_from(self, value): + def set_from(self, value: JidStr) -> None: """Set the 'from' attribute of the stanza. :param from: A string or JID object representing the sender's JID. @@ -1438,11 +1527,11 @@ class StanzaBase(ElementBase): """ return self._set_attr('from', str(value)) - def get_payload(self): + def get_payload(self) -> List[ET.Element]: """Return a list of XML objects contained in the stanza.""" return list(self.xml) - def set_payload(self, value): + def set_payload(self, value: Union[List[ElementBase], ElementBase]) -> StanzaBase: """Add XML content to the stanza. :param value: Either an XML or a stanza object, or a list @@ -1454,12 +1543,12 @@ class StanzaBase(ElementBase): self.append(val) return self - def del_payload(self): + def del_payload(self) -> StanzaBase: """Remove the XML contents of the stanza.""" self.clear() return self - def reply(self, clear=True): + def reply(self, clear: bool = True) -> StanzaBase: """Prepare the stanza for sending a reply. Swaps the ``'from'`` and ``'to'`` attributes. @@ -1475,7 +1564,7 @@ class StanzaBase(ElementBase): new_stanza = copy.copy(self) # if it's a component, use from if self.stream and hasattr(self.stream, "is_component") and \ - self.stream.is_component: + getattr(self.stream, 'is_component'): new_stanza['from'], new_stanza['to'] = self['to'], self['from'] else: new_stanza['to'] = self['from'] @@ -1484,19 +1573,19 @@ class StanzaBase(ElementBase): new_stanza.clear() return new_stanza - def error(self): + def error(self) -> StanzaBase: """Set the stanza's type to ``'error'``.""" self['type'] = 'error' return self - def unhandled(self): + def unhandled(self) -> None: """Called if no handlers have been registered to process this stanza. Meant to be overridden. """ pass - def exception(self, e): + def exception(self, e: Exception) -> None: """Handle exceptions raised during stanza processing. Meant to be overridden. @@ -1504,18 +1593,21 @@ class StanzaBase(ElementBase): log.exception('Error handling {%s}%s stanza', self.namespace, self.name) - def send(self): + def send(self) -> None: """Queue the stanza to be sent on the XML stream.""" - self.stream.send(self) + if self.stream is not None: + self.stream.send(self) + else: + log.error("Tried to send stanza without a stream: %s", self) - def __copy__(self): + def __copy__(self) -> StanzaBase: """Return a copy of the stanza object that does not share the same underlying XML object, but does share the same XML stream. """ return self.__class__(xml=copy.deepcopy(self.xml), stream=self.stream) - def __str__(self, top_level_ns=False): + def __str__(self, top_level_ns: bool = False) -> str: """Serialize the stanza's XML to a string. :param bool top_level_ns: Display the top-most namespace. @@ -1525,27 +1617,3 @@ class StanzaBase(ElementBase): return tostring(self.xml, xmlns=xmlns, stream=self.stream, top_level=(self.stream is None)) - - -#: A JSON/dictionary version of the XML content exposed through -#: the stanza interfaces:: -#: -#: >>> msg = Message() -#: >>> msg.values -#: {'body': '', 'from': , 'mucnick': '', 'mucroom': '', -#: 'to': , 'type': 'normal', 'id': '', 'subject': ''} -#: -#: Likewise, assigning to the :attr:`values` will change the XML -#: content:: -#: -#: >>> msg = Message() -#: >>> msg.values = {'body': 'Hi!', 'to': 'user@example.com'} -#: >>> msg -#: 'Hi!' -#: -#: Child stanzas are exposed as nested dictionaries. -ElementBase.values = property(ElementBase._get_stanza_values, - ElementBase._set_stanza_values) - -ElementBase.get_stanza_values = ElementBase._get_stanza_values -ElementBase.set_stanza_values = ElementBase._set_stanza_values