From 67235c42145ef40d841e9cb7ffa4bbcac490568a Mon Sep 17 00:00:00 2001 From: Joe Hildebrand Date: Mon, 29 Oct 2012 10:03:32 -0600 Subject: [PATCH 1/3] Allow IQ timeouts to be asynchronous, by passing a timeout_callback parameter to send(). An example modification of disco is included. If this approach is approved, I'll go through and update the other plugins. --- sleekxmpp/plugins/xep_0030/disco.py | 10 ++++++-- sleekxmpp/stanza/iq.py | 37 +++++++++++++++++++++++++---- tests/test_stream_handlers.py | 29 ++++++++++++++++++++++ 3 files changed, 69 insertions(+), 7 deletions(-) diff --git a/sleekxmpp/plugins/xep_0030/disco.py b/sleekxmpp/plugins/xep_0030/disco.py index 278b4a34..8a397923 100644 --- a/sleekxmpp/plugins/xep_0030/disco.py +++ b/sleekxmpp/plugins/xep_0030/disco.py @@ -324,6 +324,8 @@ class XEP_0030(BasePlugin): callback -- Optional callback to execute when a reply is received instead of blocking and waiting for the reply. + timeout_callback -- Optional callback to execute when no result + has been received in timeout seconds. """ if local is None: if jid is not None and not isinstance(jid, JID): @@ -364,7 +366,8 @@ class XEP_0030(BasePlugin): iq['disco_info']['node'] = node if node else '' return iq.send(timeout=kwargs.get('timeout', None), block=kwargs.get('block', True), - callback=kwargs.get('callback', None)) + callback=kwargs.get('callback', None), + timeout_callback=kwargs.get('timeout_callback', None)) def set_info(self, jid=None, node=None, info=None): """ @@ -405,6 +408,8 @@ class XEP_0030(BasePlugin): iterator -- If True, return a result set iterator using the XEP-0059 plugin, if the plugin is loaded. Otherwise the parameter is ignored. + timeout_callback -- Optional callback to execute when no result + has been received in timeout seconds. """ if local or local is None and jid is None: items = self.api['get_items'](jid, node, @@ -423,7 +428,8 @@ class XEP_0030(BasePlugin): else: return iq.send(timeout=kwargs.get('timeout', None), block=kwargs.get('block', True), - callback=kwargs.get('callback', None)) + callback=kwargs.get('callback', None), + timeout_callback=kwargs.get('timeout_callback', None)) def set_items(self, jid=None, node=None, **kwargs): """ diff --git a/sleekxmpp/stanza/iq.py b/sleekxmpp/stanza/iq.py index f45b3c67..71c0444d 100644 --- a/sleekxmpp/stanza/iq.py +++ b/sleekxmpp/stanza/iq.py @@ -154,7 +154,7 @@ class Iq(RootStanza): StanzaBase.reply(self, clear) return self - def send(self, block=True, timeout=None, callback=None, now=False): + def send(self, block=True, timeout=None, callback=None, now=False, timeout_callback=None): """ Send an stanza over the XML stream. @@ -181,15 +181,32 @@ class Iq(RootStanza): now -- Indicates if the send queue should be skipped and send the stanza immediately. Used during stream initialization. Defaults to False. + timeout_callback -- Optional reference to a stream handler function. + Will be executed when the timeout expires before a + response has been received with the originally-sent IQ + stanza. Only called if there is a callback parameter + (and therefore are in async mode). """ if timeout is None: timeout = self.stream.response_timeout if callback is not None and self['type'] in ('get', 'set'): handler_name = 'IqCallback_%s' % self['id'] - handler = Callback(handler_name, - MatcherId(self['id']), - callback, - once=True) + if timeout_callback: + self.callback = callback + self.timeout_callback = timeout_callback + self.stream.schedule('IqTimeout_%s' % self['id'], + timeout, + self._fire_timeout, + repeat=False) + handler = Callback(handler_name, + MatcherId(self['id']), + self._handle_result, + once=True) + else: + handler = Callback(handler_name, + MatcherId(self['id']), + callback, + once=True) self.stream.register_handler(handler) StanzaBase.send(self, now=now) return handler_name @@ -206,6 +223,16 @@ class Iq(RootStanza): else: return StanzaBase.send(self, now=now) + def _handle_result(self, iq): + # we got the IQ, so don't fire the timeout + self.stream.scheduler.remove('IqTimeout_%s' % self['id']) + self.callback(iq) + + def _fire_timeout(self): + # don't fire the handler for the IQ, if it finally does come in + self.stream.remove_handler('IqCallback_%s' % self['id']) + self.timeout_callback(self) + def _set_stanza_values(self, values): """ Set multiple stanza interface values using a dictionary. diff --git a/tests/test_stream_handlers.py b/tests/test_stream_handlers.py index 7fd4e648..cdd128bb 100644 --- a/tests/test_stream_handlers.py +++ b/tests/test_stream_handlers.py @@ -153,6 +153,35 @@ class TestHandlers(SleekTest): self.failUnless(events == ['foo'], "Iq callback was not executed: %s" % events) + def testIqTimeoutCallback(self): + """Test that iq.send(tcallback=handle_foo, timeout_callback=handle_timeout) works.""" + events = [] + + def handle_foo(iq): + events.append('foo') + + def handle_timeout(iq): + events.append('timeout') + + iq = self.Iq() + iq['type'] = 'get' + iq['id'] = 'test-foo' + iq['to'] = 'user@localhost' + iq['query'] = 'foo' + iq.send(callback=handle_foo, timeout_callback=handle_timeout, timeout=0.05) + + self.send(""" + + + + """) + + # Give event queue time to process + time.sleep(0.1) + + self.failUnless(events == ['timeout'], + "Iq timeout was not executed: %s" % events) + def testMultipleHandlersForStanza(self): """ Test that multiple handlers for a single stanza work From 61aff9f49ad0c2352553280a5dc60d2877bd245d Mon Sep 17 00:00:00 2001 From: Joe Hildebrand Date: Mon, 29 Oct 2012 14:15:07 -0600 Subject: [PATCH 2/3] update JID_CACHE logic again. --- sleekxmpp/jid.py | 67 +++++++++++++++++++++++++++++------------------- 1 file changed, 40 insertions(+), 27 deletions(-) diff --git a/sleekxmpp/jid.py b/sleekxmpp/jid.py index feab4082..8f1a81d4 100644 --- a/sleekxmpp/jid.py +++ b/sleekxmpp/jid.py @@ -69,6 +69,20 @@ JID_CACHE = OrderedDict() JID_CACHE_LOCK = threading.Lock() JID_CACHE_MAX_SIZE = 1024 +def _cache(key, parts, locked): + JID_CACHE[key] = (parts, locked) + if len(JID_CACHE) > JID_CACHE_MAX_SIZE: + with JID_CACHE_LOCK: + while len(JID_CACHE) > JID_CACHE_MAX_SIZE: + found = None + for key, item in JID_CACHE.iteritems(): + if not item[1]: # if not locked + found = key + break + if not found: # more than MAX_SIZE locked + # warn? + break + del JID_CACHE[found] # pylint: disable=c0103 #: The nodeprep profile of stringprep used to validate the local, @@ -418,19 +432,29 @@ class JID(object): # pylint: disable=W0212 def __init__(self, jid=None, **kwargs): - jid_data = (jid, kwargs.get('local', None), - kwargs.get('domain', None), - kwargs.get('resource', None)) - locked = kwargs.get('cache_lock', False) + in_local = kwargs.get('local', None) + in_domain = kwargs.get('domain', None) + in_resource = kwargs.get('resource', None) + parts = None + if in_local or in_domain or in_resource: + parts = (in_local, in_domain, in_resource) - if jid_data in JID_CACHE: - parsed_jid, locked = JID_CACHE[jid_data] - self._jid = parsed_jid - else: - if jid is None: - jid = '' - + # only check cache if there is a jid string, or parts, not if there + # are both + self._jid = None + key = None + if (jid is not None) and (parts is None): + if isinstance(jid, JID): + # it's already good to go, and there are no additions + self._jid = jid._jid + return + key = jid + self._jid, locked = JID_CACHE.get(jid, (None, locked)) + elif jid is None and parts is not None: + key = parts + self._jid, locked = JID_CACHE.get(parts, (None, locked)) + if not self._jid: if not jid: parsed_jid = (None, None, None) elif not isinstance(jid, JID): @@ -440,27 +464,16 @@ class JID(object): local, domain, resource = parsed_jid - local = kwargs.get('local', local) - domain = kwargs.get('domain', domain) - resource = kwargs.get('resource', resource) - if 'local' in kwargs: - local = _escape_node(local) + local = _escape_node(in_local) if 'domain' in kwargs: - domain = _validate_domain(domain) + domain = _validate_domain(in_domain) if 'resource' in kwargs: - resource = _validate_resource(resource) + resource = _validate_resource(in_resource) self._jid = (local, domain, resource) - - JID_CACHE[jid_data] = (self._jid, locked) - if len(JID_CACHE) > JID_CACHE_MAX_SIZE: - with JID_CACHE_LOCK: - key, item = JID_CACHE.popitem(False) - if item[1]: - # Need to reinsert locked JIDs - JID_CACHE[key] = item - + if key: + _cache(key, self._jid, locked) def unescape(self): """Return an unescaped JID object. From 2229ad8d8e51b73a7900b118dfab5d7b449f17ce Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Wed, 31 Oct 2012 13:41:55 -0700 Subject: [PATCH 3/3] Relax timing issues in Iq timeout callback test. --- tests/test_stream_handlers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_stream_handlers.py b/tests/test_stream_handlers.py index cdd128bb..d3850a94 100644 --- a/tests/test_stream_handlers.py +++ b/tests/test_stream_handlers.py @@ -177,7 +177,7 @@ class TestHandlers(SleekTest): """) # Give event queue time to process - time.sleep(0.1) + time.sleep(1) self.failUnless(events == ['timeout'], "Iq timeout was not executed: %s" % events)