Add out_sync filter category.
Added option to XMLStream.send() to skip applying filters. Filters in the out_sync group are synced with placing stanza content either on the wire directly or into the send queue. Because of this, out_sync filters should not block.
This commit is contained in:
parent
33d01fb694
commit
fbdf2bed49
1 changed files with 23 additions and 8 deletions
|
@ -267,6 +267,7 @@ class XMLStream(object):
|
||||||
|
|
||||||
#: A queue of string data to be sent over the stream.
|
#: A queue of string data to be sent over the stream.
|
||||||
self.send_queue = queue.Queue()
|
self.send_queue = queue.Queue()
|
||||||
|
self.send_queue_lock = threading.Lock()
|
||||||
|
|
||||||
#: A :class:`~sleekxmpp.xmlstream.scheduler.Scheduler` instance for
|
#: A :class:`~sleekxmpp.xmlstream.scheduler.Scheduler` instance for
|
||||||
#: executing callbacks in the future based on time delays.
|
#: executing callbacks in the future based on time delays.
|
||||||
|
@ -281,7 +282,7 @@ class XMLStream(object):
|
||||||
self.__handlers = []
|
self.__handlers = []
|
||||||
self.__event_handlers = {}
|
self.__event_handlers = {}
|
||||||
self.__event_handlers_lock = threading.Lock()
|
self.__event_handlers_lock = threading.Lock()
|
||||||
self.__filters = {'in': [], 'out': []}
|
self.__filters = {'in': [], 'out': [], 'out_sync': []}
|
||||||
|
|
||||||
self._id = 0
|
self._id = 0
|
||||||
self._id_lock = threading.Lock()
|
self._id_lock = threading.Lock()
|
||||||
|
@ -1080,7 +1081,7 @@ class XMLStream(object):
|
||||||
"""
|
"""
|
||||||
return xml
|
return xml
|
||||||
|
|
||||||
def send(self, data, mask=None, timeout=None, now=False):
|
def send(self, data, mask=None, timeout=None, now=False, use_filters=True):
|
||||||
"""A wrapper for :meth:`send_raw()` for sending stanza objects.
|
"""A wrapper for :meth:`send_raw()` for sending stanza objects.
|
||||||
|
|
||||||
May optionally block until an expected response is received.
|
May optionally block until an expected response is received.
|
||||||
|
@ -1098,6 +1099,10 @@ class XMLStream(object):
|
||||||
sending the stanza immediately. Useful mainly
|
sending the stanza immediately. Useful mainly
|
||||||
for stream initialization stanzas.
|
for stream initialization stanzas.
|
||||||
Defaults to ``False``.
|
Defaults to ``False``.
|
||||||
|
:param bool use_filters: Indicates if outgoing filters should be
|
||||||
|
applied to the given stanza data. Disabling
|
||||||
|
filters is useful when resending stanzas.
|
||||||
|
Defaults to ``True``.
|
||||||
"""
|
"""
|
||||||
if timeout is None:
|
if timeout is None:
|
||||||
timeout = self.response_timeout
|
timeout = self.response_timeout
|
||||||
|
@ -1105,19 +1110,29 @@ class XMLStream(object):
|
||||||
mask = mask.xml
|
mask = mask.xml
|
||||||
|
|
||||||
if isinstance(data, ElementBase):
|
if isinstance(data, ElementBase):
|
||||||
for filter in self.__filters['out']:
|
if use_filters:
|
||||||
if data is not None:
|
for filter in self.__filters['out']:
|
||||||
data = filter(data)
|
data = filter(data)
|
||||||
if data is None:
|
if data is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
data = str(data)
|
|
||||||
if mask is not None:
|
if mask is not None:
|
||||||
log.warning("Use of send mask waiters is deprecated.")
|
log.warning("Use of send mask waiters is deprecated.")
|
||||||
wait_for = Waiter("SendWait_%s" % self.new_id(),
|
wait_for = Waiter("SendWait_%s" % self.new_id(),
|
||||||
MatchXMLMask(mask))
|
MatchXMLMask(mask))
|
||||||
self.register_handler(wait_for)
|
self.register_handler(wait_for)
|
||||||
self.send_raw(data, now)
|
|
||||||
|
if isinstance(data, ElementBase):
|
||||||
|
with self.send_queue_lock:
|
||||||
|
if use_filters:
|
||||||
|
for filter in self.__filters['out_sync']:
|
||||||
|
data = filter(data)
|
||||||
|
if data is None:
|
||||||
|
return
|
||||||
|
str_data = str(data)
|
||||||
|
self.send_raw(str_data, now)
|
||||||
|
else:
|
||||||
|
self.send_raw(data, now)
|
||||||
if mask is not None:
|
if mask is not None:
|
||||||
return wait_for.wait(timeout)
|
return wait_for.wait(timeout)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue