diff --git a/docs/architecture.rst b/docs/architecture.rst index 52bb8d9c..2d0170d2 100644 --- a/docs/architecture.rst +++ b/docs/architecture.rst @@ -108,10 +108,6 @@ when this bit of XML is received (with an assumed namespace of handlers `. Each stanza/handler pair is then put into the event queue. - .. note:: - It is possible to skip the event queue and process an event immediately - by using ``direct=True`` when raising the event. - The code for :meth:`BaseXMPP._handle_message` follows this pattern, and raises a ``'message'`` event:: diff --git a/slixmpp/componentxmpp.py b/slixmpp/componentxmpp.py index 632db189..52829dfa 100644 --- a/slixmpp/componentxmpp.py +++ b/slixmpp/componentxmpp.py @@ -152,7 +152,7 @@ class ComponentXMPP(BaseXMPP): """ self.session_bind_event.set() self.session_started_event.set() - self.event('session_bind', self.boundjid, direct=True) + self.event('session_bind', self.boundjid) self.event('session_start') def _handle_probe(self, pres): diff --git a/slixmpp/features/feature_bind/bind.py b/slixmpp/features/feature_bind/bind.py index e26c3ce6..d41bbc3f 100644 --- a/slixmpp/features/feature_bind/bind.py +++ b/slixmpp/features/feature_bind/bind.py @@ -54,7 +54,7 @@ class FeatureBind(BasePlugin): def _on_bind_response(self, response): self.xmpp.boundjid = JID(response['bind']['jid'], cache_lock=True) self.xmpp.bound = True - self.xmpp.event('session_bind', self.xmpp.boundjid, direct=True) + self.xmpp.event('session_bind', self.xmpp.boundjid) self.xmpp.session_bind_event.set() self.xmpp.features.add('bind') diff --git a/slixmpp/features/feature_mechanisms/mechanisms.py b/slixmpp/features/feature_mechanisms/mechanisms.py index ee7db41e..fe38fbd5 100644 --- a/slixmpp/features/feature_mechanisms/mechanisms.py +++ b/slixmpp/features/feature_mechanisms/mechanisms.py @@ -172,8 +172,8 @@ class FeatureMechanisms(BasePlugin): min_mech=self.min_mech) except sasl.SASLNoAppropriateMechanism: log.error("No appropriate login method.") - self.xmpp.event("no_auth", direct=True) - self.xmpp.event("failed_auth", direct=True) + self.xmpp.event("no_auth") + self.xmpp.event("failed_auth") self.attempted_mechs = set() return self.xmpp.disconnect() except StringPrepError: @@ -232,7 +232,7 @@ class FeatureMechanisms(BasePlugin): self.attempted_mechs = set() self.xmpp.authenticated = True self.xmpp.features.add('mechanisms') - self.xmpp.event('auth_success', stanza, direct=True) + self.xmpp.event('auth_success', stanza) # Restart the stream self.xmpp.init_parser() self.xmpp.send_raw(self.xmpp.stream_header) @@ -241,6 +241,6 @@ class FeatureMechanisms(BasePlugin): """SASL authentication failed. Disconnect and shutdown.""" self.attempted_mechs.add(self.mech.name) log.info("Authentication failed: %s", stanza['condition']) - self.xmpp.event("failed_auth", stanza, direct=True) + self.xmpp.event("failed_auth", stanza) self._send_auth() return True diff --git a/slixmpp/plugins/xep_0077/register.py b/slixmpp/plugins/xep_0077/register.py index d83ff1a7..7c6d99a0 100644 --- a/slixmpp/plugins/xep_0077/register.py +++ b/slixmpp/plugins/xep_0077/register.py @@ -77,7 +77,7 @@ class XEP_0077(BasePlugin): if self.create_account and self.xmpp.event_handled('register'): form = self.get_registration() - self.xmpp.event('register', form, direct=True) + self.xmpp.event('register', form) return True return False diff --git a/slixmpp/plugins/xep_0078/legacyauth.py b/slixmpp/plugins/xep_0078/legacyauth.py index 8d2ea230..eac1b57e 100644 --- a/slixmpp/plugins/xep_0078/legacyauth.py +++ b/slixmpp/plugins/xep_0078/legacyauth.py @@ -82,12 +82,12 @@ class XEP_0078(BasePlugin): resp = iq.send(now=True) except IqError as err: log.info("Authentication failed: %s", err.iq['error']['condition']) - self.xmpp.event('failed_auth', direct=True) + self.xmpp.event('failed_auth') self.xmpp.disconnect() return True except IqTimeout: log.info("Authentication failed: %s", 'timeout') - self.xmpp.event('failed_auth', direct=True) + self.xmpp.event('failed_auth') self.xmpp.disconnect() return True @@ -123,11 +123,11 @@ class XEP_0078(BasePlugin): result = iq.send(now=True) except IqError as err: log.info("Authentication failed") - self.xmpp.event("failed_auth", direct=True) + self.xmpp.event("failed_auth") self.xmpp.disconnect() except IqTimeout: log.info("Authentication failed") - self.xmpp.event("failed_auth", direct=True) + self.xmpp.event("failed_auth") self.xmpp.disconnect() self.xmpp.features.add('auth') @@ -137,7 +137,7 @@ class XEP_0078(BasePlugin): self.xmpp.boundjid = JID(self.xmpp.requested_jid, resource=resource, cache_lock=True) - self.xmpp.event('session_bind', self.xmpp.boundjid, direct=True) + self.xmpp.event('session_bind', self.xmpp.boundjid) log.debug("Established Session") self.xmpp.sessionstarted = True diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index 838f3649..3df98862 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -612,16 +612,13 @@ class XMLStream(object): """ return len(self.__event_handlers.get(name, [])) - def event(self, name, data={}, direct=False): + def event(self, name, data={}): """Manually trigger a custom event. :param name: The name of the event to trigger. :param data: Data that will be passed to each event handler. Defaults to an empty dictionary, but is usually a stanza object. - :param direct: Runs the event directly if True, skipping the - event queue. All event handlers will run in the - same thread. """ log.debug("Event triggered: " + name) @@ -633,18 +630,13 @@ class XMLStream(object): out_data = copy.copy(data) if len(handlers) > 1 else data old_exception = getattr(data, 'exception', None) - if direct: - try: - handler_callback(out_data) - except Exception as e: - error_msg = 'Error processing event handler: %s' - log.exception(error_msg, str(handler_callback)) - if old_exception: - old_exception(e) - else: - self.exception(e) - else: - self.run_event(('event', handler, out_data)) + try: + handler_callback(out_data) + except Exception as e: + if old_exception: + old_exception(e) + else: + self.exception(e) if disposable: # If the handler is disposable, we will go ahead and # remove it now instead of waiting for it to be @@ -687,11 +679,19 @@ class XMLStream(object): except KeyError: log.debug("Tried to cancel unscheduled event: %s" % (name,)) + def _safe_cb_run(self, name, cb): + log.debug('Scheduled event: %s', name) + try: + cb() + except Exception as e: + log.exception('Error processing scheduled task') + self.exception(e) + def _execute_and_reschedule(self, name, cb, seconds): """Simple method that calls the given callback, and then schedule itself to be called after the given number of seconds. """ - cb() + self._safe_cb_run(name, cb) loop = asyncio.get_event_loop() handle = loop.call_later(seconds, self._execute_and_reschedule, name, cb, seconds) @@ -701,7 +701,7 @@ class XMLStream(object): """ Execute the callback and remove the handler for it. """ - cb() + self._safe_cb_run(name, cb) del self.scheduled_events[name] def incoming_filter(self, xml): @@ -817,7 +817,7 @@ class XMLStream(object): # Match the stanza against registered handlers. Handlers marked # to run "in stream" will be executed immediately; the rest will # be queued. - unhandled = True + handled = False matched_handlers = [h for h in self.__handlers if h.match(stanza)] for handler in matched_handlers: if len(matched_handlers) > 1: @@ -825,50 +825,20 @@ class XMLStream(object): else: stanza_copy = stanza handler.prerun(stanza_copy) - self.run_event(('stanza', handler, stanza_copy)) try: - if handler.check_delete(): - self.__handlers.remove(handler) - except: - pass # not thread safe - unhandled = False - - # Some stanzas require responses, such as Iq queries. A default - # handler will be executed immediately for this case. - if unhandled: - stanza.unhandled() - - def run_event(self, event): - etype, handler = event[0:2] - args = event[2:] - orig = copy.copy(args[0]) - - if etype == 'stanza': - try: - handler.run(args[0]) + handler.run(stanza_copy) except Exception as e: error_msg = 'Error processing stream handler: %s' log.exception(error_msg, handler.name) - orig.exception(e) - elif etype == 'schedule': - name = args[2] - try: - log.debug('Scheduled event: %s: %s', name, args[0]) - handler(*args[0], **args[1]) - except Exception as e: - log.exception('Error processing scheduled task') - self.exception(e) - elif etype == 'event': - func, disposable = handler - try: - func(*args) - except Exception as e: - error_msg = 'Error processing event handler: %s' - log.exception(error_msg, str(func)) - if hasattr(orig, 'exception'): - orig.exception(e) - else: - self.exception(e) + stanza_copy.exception(e) + if handler.check_delete(): + self.__handlers.remove(handler) + handled = True + + # Some stanzas require responses, such as Iq queries. A default + # handler will be executed immediately for this case. + if not handled: + stanza.unhandled() def exception(self, exception): """Process an unknown exception.