From deb6d4f1768600866390133ea083f912ae75642b Mon Sep 17 00:00:00 2001 From: Georg Lukas Date: Sat, 28 Mar 2020 20:45:39 +0100 Subject: [PATCH 1/2] XEP-0198: properly disable on disconnect, fix reconnect-loop When the connection is disconnected (but the session didn't "end", because 0198 resumption is enabled), poezio will reconnect and try to send an element because the 0198 plugin doesn't realize that SM wasn't re-enabled yet. --- slixmpp/plugins/xep_0198/stream_management.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/slixmpp/plugins/xep_0198/stream_management.py b/slixmpp/plugins/xep_0198/stream_management.py index 759e82e1..a092736f 100644 --- a/slixmpp/plugins/xep_0198/stream_management.py +++ b/slixmpp/plugins/xep_0198/stream_management.py @@ -131,6 +131,7 @@ class XEP_0198(BasePlugin): self.xmpp.add_filter('in', self._handle_incoming) self.xmpp.add_filter('out_sync', self._handle_outgoing) + self.xmpp.add_event_handler('disconnected', self.disconnected) self.xmpp.add_event_handler('session_end', self.session_end) def plugin_end(self): @@ -139,6 +140,7 @@ class XEP_0198(BasePlugin): self.xmpp.unregister_feature('sm', self.order) self.xmpp.unregister_feature('sm', self.resume_order) + self.xmpp.del_event_handler('disconnected', self.disconnected) self.xmpp.del_event_handler('session_end', self.session_end) self.xmpp.del_filter('in', self._handle_incoming) self.xmpp.del_filter('out_sync', self._handle_outgoing) @@ -154,6 +156,10 @@ class XEP_0198(BasePlugin): self.xmpp.remove_stanza(stanza.Ack) self.xmpp.remove_stanza(stanza.RequestAck) + def disconnected(self, event): + """Reset enabled state until we can resume/reenable.""" + self.enabled = False + def session_end(self, event): """Reset stream management state.""" self.enabled = False From 85c9967b9ce55fd6ea1e2aa59cf065710568661f Mon Sep 17 00:00:00 2001 From: Georg Lukas Date: Sat, 28 Mar 2020 21:18:13 +0100 Subject: [PATCH 2/2] XEP-0198: fix race conditions on enable/resume This code splits out the `enabled` property into `enabled_in` and `enabled_out` to reflect that client and server enable 0198 asynchronously. This also moves the actual enabling code into the stanza processing logic, because apparently, `enable.send()` was just added into the end of the send queue, but `enable` got enabled immediately, so that poezio requested ACKs for whatever happened to be in the queue before. Async is hard, let's go get fishing. --- slixmpp/plugins/xep_0198/stream_management.py | 36 +++++++++++++------ 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/slixmpp/plugins/xep_0198/stream_management.py b/slixmpp/plugins/xep_0198/stream_management.py index a092736f..2b68faec 100644 --- a/slixmpp/plugins/xep_0198/stream_management.py +++ b/slixmpp/plugins/xep_0198/stream_management.py @@ -71,7 +71,8 @@ class XEP_0198(BasePlugin): self.window_counter = self.window - self.enabled = False + self.enabled_in = False + self.enabled_out = False self.unacked_queue = collections.deque() register_stanza_plugin(StreamFeatures, stanza.StreamManagement) @@ -158,11 +159,17 @@ class XEP_0198(BasePlugin): def disconnected(self, event): """Reset enabled state until we can resume/reenable.""" - self.enabled = False + log.debug("disconnected, disabling SM") + self.xmpp.event('sm_disabled', event) + self.enabled_in = False + self.enabled_out = False def session_end(self, event): """Reset stream management state.""" - self.enabled = False + log.debug("session_end, disabling SM") + self.xmpp.event('sm_disabled', event) + self.enabled_in = False + self.enabled_out = False self.unacked_queue.clear() self.sm_id = None self.handled = 0 @@ -177,6 +184,7 @@ class XEP_0198(BasePlugin): def request_ack(self, e=None): """Request an ack from the server.""" + log.debug("requesting ack") req = stanza.RequestAck(self.xmpp) self.xmpp.send_raw(str(req)) @@ -199,9 +207,7 @@ class XEP_0198(BasePlugin): enable = stanza.Enable(self.xmpp) enable['resume'] = self.allow_resume enable.send() - self.enabled = True - self.handled = 0 - self.unacked_queue.clear() + log.debug("enabling SM") waiter = Waiter('enabled_or_failed', MatchMany([ @@ -210,11 +216,11 @@ class XEP_0198(BasePlugin): self.xmpp.register_handler(waiter) result = await waiter.wait() elif self.sm_id and self.allow_resume and 'bind' not in self.xmpp.features: - self.enabled = True resume = stanza.Resume(self.xmpp) resume['h'] = self.handled resume['previd'] = self.sm_id resume.send() + log.debug("resuming SM") # Wait for a response before allowing stream feature processing # to continue. The actual result processing will be done in the @@ -237,6 +243,8 @@ class XEP_0198(BasePlugin): self.xmpp.features.add('stream_management') if stanza['id']: self.sm_id = stanza['id'] + self.enabled_in = True + self.handled = 0 self.xmpp.event('sm_enabled', stanza) def _handle_resumed(self, stanza): @@ -245,6 +253,7 @@ class XEP_0198(BasePlugin): Raises a :term:`session_resumed` event. """ self.xmpp.features.add('stream_management') + self.enabled_in = True self._handle_ack(stanza) for id, stanza in self.unacked_queue: self.xmpp.send(stanza, use_filters=False) @@ -258,7 +267,8 @@ class XEP_0198(BasePlugin): Raises an :term:`sm_failed` event. """ - self.enabled = False + self.enabled_in = False + self.enabled_out = False self.unacked_queue.clear() self.xmpp.event('sm_failed', stanza) @@ -295,7 +305,7 @@ class XEP_0198(BasePlugin): def _handle_incoming(self, stanza): """Increment the handled counter for each inbound stanza.""" - if not self.enabled: + if not self.enabled_in: return stanza if isinstance(stanza, (Message, Presence, Iq)): @@ -305,7 +315,13 @@ class XEP_0198(BasePlugin): def _handle_outgoing(self, stanza): """Store outgoing stanzas in a queue to be acked.""" - if not self.enabled: + from slixmpp.plugins.xep_0198 import stanza as st + if isinstance(stanza, (st.Enable, st.Resume)): + self.enabled_out = True + self.unacked_queue.clear() + log.debug("enabling outgoing SM: %s" % stanza) + + if not self.enabled_out: return stanza if isinstance(stanza, (Message, Presence, Iq)):