Add initial support for XEP-0198 for stream management.

This commit is contained in:
Lance Stout 2012-03-18 01:02:19 -07:00
parent fbdf2bed49
commit 73cabcb6ae
5 changed files with 439 additions and 0 deletions

View file

@ -78,6 +78,7 @@ packages = [ 'sleekxmpp',
'sleekxmpp/plugins/xep_0128',
'sleekxmpp/plugins/xep_0172',
'sleekxmpp/plugins/xep_0184',
'sleekxmpp/plugins/xep_0198',
'sleekxmpp/plugins/xep_0199',
'sleekxmpp/plugins/xep_0202',
'sleekxmpp/plugins/xep_0203',

View file

@ -41,6 +41,7 @@ __all__ = [
'xep_0163', # Personal Eventing Protocol
'xep_0172', # User Nickname
'xep_0184', # Message Receipts
'xep_0198', # Stream Management
'xep_0199', # Ping
'xep_0202', # Entity Time
'xep_0203', # Delayed Delivery

View file

@ -0,0 +1,20 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
from sleekxmpp.plugins.base import register_plugin
from sleekxmpp.plugins.xep_0198.stanza import Enable, Enabled
from sleekxmpp.plugins.xep_0198.stanza import Resume, Resumed
from sleekxmpp.plugins.xep_0198.stanza import Failed
from sleekxmpp.plugins.xep_0198.stanza import StreamManagement
from sleekxmpp.plugins.xep_0198.stanza import Ack, RequestAck
from sleekxmpp.plugins.xep_0198.stream_management import XEP_0198
register_plugin(XEP_0198)

View file

@ -0,0 +1,151 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
from sleekxmpp.stanza import Error
from sleekxmpp.xmlstream import ElementBase, StanzaBase
class Enable(StanzaBase):
name = 'enable'
namespace = 'urn:xmpp:sm:3'
interfaces = set(['max', 'resume'])
def setup(self, xml):
StanzaBase.setup(self, xml)
self.xml.tag = self.tag_name()
def get_resume(self):
return self._get_attr('resume', 'false').lower() in ('true', '1')
def set_resume(self, val):
self._del_attr('resume')
self._set_attr('resume', 'true' if val else 'false')
class Enabled(StanzaBase):
name = 'enabled'
namespace = 'urn:xmpp:sm:3'
interfaces = set(['id', 'location', 'max', 'resume'])
def setup(self, xml):
StanzaBase.setup(self, xml)
self.xml.tag = self.tag_name()
def get_resume(self):
return self._get_attr('resume', 'false').lower() in ('true', '1')
def set_resume(self, val):
self._del_attr('resume')
self._set_attr('resume', 'true' if val else 'false')
class Resume(StanzaBase):
name = 'resume'
namespace = 'urn:xmpp:sm:3'
interfaces = set(['h', 'previd'])
def setup(self, xml):
StanzaBase.setup(self, xml)
self.xml.tag = self.tag_name()
def get_h(self):
h = self._get_attr('h', None)
if h:
return int(h)
return None
def set_h(self, val):
self._set_attr('h', str(val))
class Resumed(StanzaBase):
name = 'resumed'
namespace = 'urn:xmpp:sm:3'
interfaces = set(['h', 'previd'])
def setup(self, xml):
StanzaBase.setup(self, xml)
self.xml.tag = self.tag_name()
def get_h(self):
h = self._get_attr('h', None)
if h:
return int(h)
return None
def set_h(self, val):
self._set_attr('h', str(val))
class Failed(StanzaBase, Error):
name = 'failed'
namespace = 'urn:xmpp:sm:3'
interfaces = set()
def setup(self, xml):
StanzaBase.setup(self, xml)
self.xml.tag = self.tag_name()
class StreamManagement(ElementBase):
name = 'sm'
namespace = 'urn:xmpp:sm:3'
plugin_attrib = name
interfaces = set(['required', 'optional'])
def get_required(self):
return self.find('{%s}required' % self.namespace) is not None
def set_required(self, val):
self.del_required()
if val:
self._set_sub_text('required', '', keep=True)
def del_required(self):
self._del_sub('required')
def get_optional(self):
return self.find('{%s}optional' % self.namespace) is not None
def set_optional(self, val):
self.del_optional()
if val:
self._set_sub_text('optional', '', keep=True)
def del_optional(self):
self._del_sub('optional')
class RequestAck(StanzaBase):
name = 'r'
namespace = 'urn:xmpp:sm:3'
interfaces = set()
def setup(self, xml):
StanzaBase.setup(self, xml)
self.xml.tag = self.tag_name()
class Ack(StanzaBase):
name = 'a'
namespace = 'urn:xmpp:sm:3'
interfaces = set(['h'])
def setup(self, xml):
StanzaBase.setup(self, xml)
self.xml.tag = self.tag_name()
def get_h(self):
h = self._get_attr('h', None)
if h:
return int(h)
return None
def set_h(self, val):
self._set_attr('h', str(val))

View file

@ -0,0 +1,266 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
import logging
import threading
import collections
from sleekxmpp.stanza import Message, Presence, Iq, StreamFeatures
from sleekxmpp.xmlstream import register_stanza_plugin
from sleekxmpp.xmlstream.handler import Callback, Waiter
from sleekxmpp.xmlstream.matcher import MatchXPath, MatchMany
from sleekxmpp.plugins.base import BasePlugin
from sleekxmpp.plugins.xep_0198 import stanza
log = logging.getLogger(__name__)
MAX_SEQ = 2**32
class XEP_0198(BasePlugin):
"""
XEP-0198: Stream Management
"""
name = 'xep_0198'
description = 'XEP-0198: Stream Management'
dependencies = set()
stanza = stanza
def plugin_init(self):
"""Start the XEP-0198 plugin."""
# Only enable stream management for non-components,
# since components do not yet perform feature negotiation.
if self.xmpp.is_component:
return
#: The stream management ID for the stream. Knowing this value is
#: required in order to do stream resumption.
self.sm_id = self.config.get('sm_id', None)
#: A counter of handled incoming stanzas, mod 2^32.
self.handled = self.config.get('handled', 0)
#: A counter of unacked outgoing stanzas, mod 2^32.
self.seq = self.config.get('seq', 0)
#: The last ack number received from the server.
self.last_ack = self.config.get('last_ack', 0)
#: The number of stanzas to wait between sending ack requests to
#: the server. Setting this to ``1`` will send an ack request after
#: every sent stanza. Defaults to ``5``.
self.window = self.config.get('window', 5)
#: Control whether or not the ability to resume the stream will be
#: requested when enabling stream management. Defaults to ``True``.
self.allow_resume = self.config.get('allow_resume', True)
self.enabled = threading.Event()
self.unacked_queue = collections.deque()
self.seq_lock = threading.Lock()
self.handled_lock = threading.Lock()
self.ack_lock = threading.Lock()
register_stanza_plugin(StreamFeatures, stanza.StreamManagement)
self.xmpp.register_stanza(stanza.Enable)
self.xmpp.register_stanza(stanza.Enabled)
self.xmpp.register_stanza(stanza.Resume)
self.xmpp.register_stanza(stanza.Resumed)
self.xmpp.register_stanza(stanza.Ack)
self.xmpp.register_stanza(stanza.RequestAck)
# Register the feature twice because it may be ordered two
# different ways: enabling after binding and resumption
# before binding.
self.xmpp.register_feature('sm',
self._handle_sm_feature,
restart=True,
order=self.config.get('order', 10100))
self.xmpp.register_feature('sm',
self._handle_sm_feature,
restart=True,
order=self.config.get('resume_order', 9000))
self.xmpp.register_handler(
Callback('Stream Management Enabled',
MatchXPath(stanza.Enabled.tag_name()),
self._handle_enabled,
instream=True))
self.xmpp.register_handler(
Callback('Stream Management Resumed',
MatchXPath(stanza.Resumed.tag_name()),
self._handle_resumed,
instream=True))
self.xmpp.register_handler(
Callback('Stream Management Failed',
MatchXPath(stanza.Failed.tag_name()),
self._handle_failed,
instream=True))
self.xmpp.register_handler(
Callback('Stream Management Ack',
MatchXPath(stanza.Ack.tag_name()),
self._handle_ack,
instream=True))
self.xmpp.register_handler(
Callback('Stream Management Request Ack',
MatchXPath(stanza.RequestAck.tag_name()),
self._handle_request_ack,
instream=True))
self.xmpp.add_filter('in', self._handle_incoming)
self.xmpp.add_filter('out_sync', self._handle_outgoing)
self.xmpp.add_event_handler('need_ack', self.request_ack)
def send_ack(self):
"""Send the current ack count to the server."""
ack = stanza.Ack(self.xmpp)
with self.handled_lock:
ack['h'] = self.handled
ack.send()
def request_ack(self, e=None):
"""Request an ack from the server."""
req = stanza.RequestAck(self.xmpp)
req.send()
def _handle_sm_feature(self, features):
"""
Enable or resume stream management.
If no SM-ID is stored, and resource binding has taken place,
stream management will be enabled.
If an SM-ID is known, and the server allows resumption, the
previous stream will be resumed.
"""
if 'stream_management' in self.xmpp.features:
# We've already negotiated stream management,
# so no need to do it again.
return False
if not self.sm_id:
if 'bind' in self.xmpp.features:
self.enabled.set()
enable = stanza.Enable(self.xmpp)
enable['resume'] = self.allow_resume
enable.send()
self.handled = 0
elif self.sm_id and self.allow_resume:
self.enabled.set()
resume = stanza.Resume(self.xmpp)
resume['h'] = self.handled
resume['previd'] = self.sm_id
resume.send(now=True)
# Wait for a response before allowing stream feature processing
# to continue. The actual result processing will be done in the
# _handle_resumed() or _handle_failed() methods.
waiter = Waiter('resumed_or_failed',
MatchMany([
MatchXPath(stanza.Resumed.tag_name()),
MatchXPath(stanza.Failed.tag_name())]))
self.xmpp.register_handler(waiter)
result = waiter.wait()
if result is not None and result.name == 'resumed':
return True
return False
def _handle_enabled(self, stanza):
"""Save the SM-ID, if provided.
Raises an :term:`sm_enabled` event.
"""
self.xmpp.features.add('stream_management')
if stanza['id']:
self.sm_id = stanza['id']
self.xmpp.event('sm_enabled', stanza)
def _handle_resumed(self, stanza):
"""Finish resuming a stream by resending unacked stanzas.
Raises a :term:`session_resumed` event.
"""
self.xmpp.features.add('stream_management')
self._handle_ack(stanza)
for id, stanza in self.unacked_queue:
self.xmpp.send(stanza, now=True, use_filters=False)
self.xmpp.session_started_event.set()
self.xmpp.event('session_resumed', stanza)
def _handle_failed(self, stanza):
"""
Disable and reset any features used since stream management was
requested (tracked stanzas may have been sent during the interval
between the enable request and the enabled response).
Raises an :term:`sm_failed` event.
"""
self.enabled.clear()
self.unacked_queue.clear()
self.xmpp.event('sm_failed', stanza)
def _handle_ack(self, ack):
"""Process a server ack by freeing acked stanzas from the queue.
Raises a :term:`stanza_acked` event for each acked stanza.
"""
if ack['h'] == self.last_ack:
return
with self.ack_lock:
num_acked = (ack['h'] - self.last_ack) % MAX_SEQ
log.debug("Ack: %s, Last Ack: %s, Num Acked: %s, Unacked: %s",
ack['h'],
self.last_ack,
num_acked,
len(self.unacked_queue))
for x in range(num_acked):
seq, stanza = self.unacked_queue.popleft()
self.xmpp.event('stanza_acked', stanza)
self.last_ack = ack['h']
def _handle_request_ack(self, req):
"""Handle an ack request by sending an ack."""
self.send_ack()
def _handle_incoming(self, stanza):
"""Increment the handled counter for each inbound stanza."""
if not self.enabled.is_set():
return stanza
if isinstance(stanza, (Message, Presence, Iq)):
with self.handled_lock:
# Sequence numbers are mod 2^32
self.handled = (self.handled + 1) % MAX_SEQ
return stanza
def _handle_outgoing(self, stanza):
"""Store outgoing stanzas in a queue to be acked."""
if not self.enabled.is_set():
return stanza
if isinstance(stanza, (Message, Presence, Iq)):
seq = None
with self.seq_lock:
# Sequence numbers are mod 2^32
self.seq = (self.seq + 1) % MAX_SEQ
seq = self.seq
self.unacked_queue.append((seq, stanza))
if len(self.unacked_queue) > self.window:
self.xmpp.event('need_ack')
return stanza