XEP-0198: fix race conditions on enable/resume
This code splits out the `enabled` property into `enabled_in` and `enabled_out` to reflect that client and server enable 0198 asynchronously. This also moves the actual enabling code into the stanza processing logic, because apparently, `enable.send()` was just added into the end of the send queue, but `enable` got enabled immediately, so that poezio requested ACKs for whatever happened to be in the queue before. Async is hard, let's go get fishing.
This commit is contained in:
parent
deb6d4f176
commit
85c9967b9c
1 changed files with 26 additions and 10 deletions
|
@ -71,7 +71,8 @@ class XEP_0198(BasePlugin):
|
|||
|
||||
self.window_counter = self.window
|
||||
|
||||
self.enabled = False
|
||||
self.enabled_in = False
|
||||
self.enabled_out = False
|
||||
self.unacked_queue = collections.deque()
|
||||
|
||||
register_stanza_plugin(StreamFeatures, stanza.StreamManagement)
|
||||
|
@ -158,11 +159,17 @@ class XEP_0198(BasePlugin):
|
|||
|
||||
def disconnected(self, event):
|
||||
"""Reset enabled state until we can resume/reenable."""
|
||||
self.enabled = False
|
||||
log.debug("disconnected, disabling SM")
|
||||
self.xmpp.event('sm_disabled', event)
|
||||
self.enabled_in = False
|
||||
self.enabled_out = False
|
||||
|
||||
def session_end(self, event):
|
||||
"""Reset stream management state."""
|
||||
self.enabled = False
|
||||
log.debug("session_end, disabling SM")
|
||||
self.xmpp.event('sm_disabled', event)
|
||||
self.enabled_in = False
|
||||
self.enabled_out = False
|
||||
self.unacked_queue.clear()
|
||||
self.sm_id = None
|
||||
self.handled = 0
|
||||
|
@ -177,6 +184,7 @@ class XEP_0198(BasePlugin):
|
|||
|
||||
def request_ack(self, e=None):
|
||||
"""Request an ack from the server."""
|
||||
log.debug("requesting ack")
|
||||
req = stanza.RequestAck(self.xmpp)
|
||||
self.xmpp.send_raw(str(req))
|
||||
|
||||
|
@ -199,9 +207,7 @@ class XEP_0198(BasePlugin):
|
|||
enable = stanza.Enable(self.xmpp)
|
||||
enable['resume'] = self.allow_resume
|
||||
enable.send()
|
||||
self.enabled = True
|
||||
self.handled = 0
|
||||
self.unacked_queue.clear()
|
||||
log.debug("enabling SM")
|
||||
|
||||
waiter = Waiter('enabled_or_failed',
|
||||
MatchMany([
|
||||
|
@ -210,11 +216,11 @@ class XEP_0198(BasePlugin):
|
|||
self.xmpp.register_handler(waiter)
|
||||
result = await waiter.wait()
|
||||
elif self.sm_id and self.allow_resume and 'bind' not in self.xmpp.features:
|
||||
self.enabled = True
|
||||
resume = stanza.Resume(self.xmpp)
|
||||
resume['h'] = self.handled
|
||||
resume['previd'] = self.sm_id
|
||||
resume.send()
|
||||
log.debug("resuming SM")
|
||||
|
||||
# Wait for a response before allowing stream feature processing
|
||||
# to continue. The actual result processing will be done in the
|
||||
|
@ -237,6 +243,8 @@ class XEP_0198(BasePlugin):
|
|||
self.xmpp.features.add('stream_management')
|
||||
if stanza['id']:
|
||||
self.sm_id = stanza['id']
|
||||
self.enabled_in = True
|
||||
self.handled = 0
|
||||
self.xmpp.event('sm_enabled', stanza)
|
||||
|
||||
def _handle_resumed(self, stanza):
|
||||
|
@ -245,6 +253,7 @@ class XEP_0198(BasePlugin):
|
|||
Raises a :term:`session_resumed` event.
|
||||
"""
|
||||
self.xmpp.features.add('stream_management')
|
||||
self.enabled_in = True
|
||||
self._handle_ack(stanza)
|
||||
for id, stanza in self.unacked_queue:
|
||||
self.xmpp.send(stanza, use_filters=False)
|
||||
|
@ -258,7 +267,8 @@ class XEP_0198(BasePlugin):
|
|||
|
||||
Raises an :term:`sm_failed` event.
|
||||
"""
|
||||
self.enabled = False
|
||||
self.enabled_in = False
|
||||
self.enabled_out = False
|
||||
self.unacked_queue.clear()
|
||||
self.xmpp.event('sm_failed', stanza)
|
||||
|
||||
|
@ -295,7 +305,7 @@ class XEP_0198(BasePlugin):
|
|||
|
||||
def _handle_incoming(self, stanza):
|
||||
"""Increment the handled counter for each inbound stanza."""
|
||||
if not self.enabled:
|
||||
if not self.enabled_in:
|
||||
return stanza
|
||||
|
||||
if isinstance(stanza, (Message, Presence, Iq)):
|
||||
|
@ -305,7 +315,13 @@ class XEP_0198(BasePlugin):
|
|||
|
||||
def _handle_outgoing(self, stanza):
|
||||
"""Store outgoing stanzas in a queue to be acked."""
|
||||
if not self.enabled:
|
||||
from slixmpp.plugins.xep_0198 import stanza as st
|
||||
if isinstance(stanza, (st.Enable, st.Resume)):
|
||||
self.enabled_out = True
|
||||
self.unacked_queue.clear()
|
||||
log.debug("enabling outgoing SM: %s" % stanza)
|
||||
|
||||
if not self.enabled_out:
|
||||
return stanza
|
||||
|
||||
if isinstance(stanza, (Message, Presence, Iq)):
|
||||
|
|
Loading…
Reference in a new issue