diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index f3e3faa8..22469039 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -267,6 +267,7 @@ class XMLStream(object): #: A queue of string data to be sent over the stream. self.send_queue = queue.Queue() + self.send_queue_lock = threading.Lock() #: A :class:`~sleekxmpp.xmlstream.scheduler.Scheduler` instance for #: executing callbacks in the future based on time delays. @@ -281,7 +282,7 @@ class XMLStream(object): self.__handlers = [] self.__event_handlers = {} self.__event_handlers_lock = threading.Lock() - self.__filters = {'in': [], 'out': []} + self.__filters = {'in': [], 'out': [], 'out_sync': []} self._id = 0 self._id_lock = threading.Lock() @@ -1080,7 +1081,7 @@ class XMLStream(object): """ return xml - def send(self, data, mask=None, timeout=None, now=False): + def send(self, data, mask=None, timeout=None, now=False, use_filters=True): """A wrapper for :meth:`send_raw()` for sending stanza objects. May optionally block until an expected response is received. @@ -1098,6 +1099,10 @@ class XMLStream(object): sending the stanza immediately. Useful mainly for stream initialization stanzas. Defaults to ``False``. + :param bool use_filters: Indicates if outgoing filters should be + applied to the given stanza data. Disabling + filters is useful when resending stanzas. + Defaults to ``True``. """ if timeout is None: timeout = self.response_timeout @@ -1105,19 +1110,29 @@ class XMLStream(object): mask = mask.xml if isinstance(data, ElementBase): - for filter in self.__filters['out']: - if data is not None: + if use_filters: + for filter in self.__filters['out']: data = filter(data) - if data is None: - return + if data is None: + return - data = str(data) if mask is not None: log.warning("Use of send mask waiters is deprecated.") wait_for = Waiter("SendWait_%s" % self.new_id(), MatchXMLMask(mask)) self.register_handler(wait_for) - self.send_raw(data, now) + + if isinstance(data, ElementBase): + with self.send_queue_lock: + if use_filters: + for filter in self.__filters['out_sync']: + data = filter(data) + if data is None: + return + str_data = str(data) + self.send_raw(str_data, now) + else: + self.send_raw(data, now) if mask is not None: return wait_for.wait(timeout)