Allow IQ timeouts to be asynchronous, by passing a timeout_callback parameter to send(). An example modification of disco is included. If this approach is approved, I'll go through and update the other plugins.

This commit is contained in:
Joe Hildebrand 2012-10-29 10:03:32 -06:00
parent 06a690a259
commit 75a18b5ffe
3 changed files with 69 additions and 7 deletions

View file

@ -324,6 +324,8 @@ class XEP_0030(BasePlugin):
callback -- Optional callback to execute when a reply is callback -- Optional callback to execute when a reply is
received instead of blocking and waiting for received instead of blocking and waiting for
the reply. the reply.
timeout_callback -- Optional callback to execute when no result
has been received in timeout seconds.
""" """
if local is None: if local is None:
if jid is not None and not isinstance(jid, JID): if jid is not None and not isinstance(jid, JID):
@ -364,7 +366,8 @@ class XEP_0030(BasePlugin):
iq['disco_info']['node'] = node if node else '' iq['disco_info']['node'] = node if node else ''
return iq.send(timeout=kwargs.get('timeout', None), return iq.send(timeout=kwargs.get('timeout', None),
block=kwargs.get('block', True), block=kwargs.get('block', True),
callback=kwargs.get('callback', None)) callback=kwargs.get('callback', None),
timeout_callback=kwargs.get('timeout_callback', None))
def set_info(self, jid=None, node=None, info=None): def set_info(self, jid=None, node=None, info=None):
""" """
@ -405,6 +408,8 @@ class XEP_0030(BasePlugin):
iterator -- If True, return a result set iterator using iterator -- If True, return a result set iterator using
the XEP-0059 plugin, if the plugin is loaded. the XEP-0059 plugin, if the plugin is loaded.
Otherwise the parameter is ignored. Otherwise the parameter is ignored.
timeout_callback -- Optional callback to execute when no result
has been received in timeout seconds.
""" """
if local or local is None and jid is None: if local or local is None and jid is None:
items = self.api['get_items'](jid, node, items = self.api['get_items'](jid, node,
@ -423,7 +428,8 @@ class XEP_0030(BasePlugin):
else: else:
return iq.send(timeout=kwargs.get('timeout', None), return iq.send(timeout=kwargs.get('timeout', None),
block=kwargs.get('block', True), block=kwargs.get('block', True),
callback=kwargs.get('callback', None)) callback=kwargs.get('callback', None),
timeout_callback=kwargs.get('timeout_callback', None))
def set_items(self, jid=None, node=None, **kwargs): def set_items(self, jid=None, node=None, **kwargs):
""" """

View file

@ -154,7 +154,7 @@ class Iq(RootStanza):
StanzaBase.reply(self, clear) StanzaBase.reply(self, clear)
return self return self
def send(self, block=True, timeout=None, callback=None, now=False): def send(self, block=True, timeout=None, callback=None, now=False, timeout_callback=None):
""" """
Send an <iq> stanza over the XML stream. Send an <iq> stanza over the XML stream.
@ -181,15 +181,32 @@ class Iq(RootStanza):
now -- Indicates if the send queue should be skipped and send now -- Indicates if the send queue should be skipped and send
the stanza immediately. Used during stream the stanza immediately. Used during stream
initialization. Defaults to False. initialization. Defaults to False.
timeout_callback -- Optional reference to a stream handler function.
Will be executed when the timeout expires before a
response has been received with the originally-sent IQ
stanza. Only called if there is a callback parameter
(and therefore are in async mode).
""" """
if timeout is None: if timeout is None:
timeout = self.stream.response_timeout timeout = self.stream.response_timeout
if callback is not None and self['type'] in ('get', 'set'): if callback is not None and self['type'] in ('get', 'set'):
handler_name = 'IqCallback_%s' % self['id'] handler_name = 'IqCallback_%s' % self['id']
handler = Callback(handler_name, if timeout_callback:
MatcherId(self['id']), self.callback = callback
callback, self.timeout_callback = timeout_callback
once=True) self.stream.schedule('IqTimeout_%s' % self['id'],
timeout,
self._fire_timeout,
repeat=False)
handler = Callback(handler_name,
MatcherId(self['id']),
self._handle_result,
once=True)
else:
handler = Callback(handler_name,
MatcherId(self['id']),
callback,
once=True)
self.stream.register_handler(handler) self.stream.register_handler(handler)
StanzaBase.send(self, now=now) StanzaBase.send(self, now=now)
return handler_name return handler_name
@ -206,6 +223,16 @@ class Iq(RootStanza):
else: else:
return StanzaBase.send(self, now=now) return StanzaBase.send(self, now=now)
def _handle_result(self, iq):
# we got the IQ, so don't fire the timeout
self.stream.scheduler.remove('IqTimeout_%s' % self['id'])
self.callback(iq)
def _fire_timeout(self):
# don't fire the handler for the IQ, if it finally does come in
self.stream.remove_handler('IqCallback_%s' % self['id'])
self.timeout_callback(self)
def _set_stanza_values(self, values): def _set_stanza_values(self, values):
""" """
Set multiple stanza interface values using a dictionary. Set multiple stanza interface values using a dictionary.

View file

@ -153,6 +153,35 @@ class TestHandlers(SleekTest):
self.failUnless(events == ['foo'], self.failUnless(events == ['foo'],
"Iq callback was not executed: %s" % events) "Iq callback was not executed: %s" % events)
def testIqTimeoutCallback(self):
"""Test that iq.send(tcallback=handle_foo, timeout_callback=handle_timeout) works."""
events = []
def handle_foo(iq):
events.append('foo')
def handle_timeout(iq):
events.append('timeout')
iq = self.Iq()
iq['type'] = 'get'
iq['id'] = 'test-foo'
iq['to'] = 'user@localhost'
iq['query'] = 'foo'
iq.send(callback=handle_foo, timeout_callback=handle_timeout, timeout=0.05)
self.send("""
<iq type="get" id="test-foo" to="user@localhost">
<query xmlns="foo" />
</iq>
""")
# Give event queue time to process
time.sleep(0.1)
self.failUnless(events == ['timeout'],
"Iq timeout was not executed: %s" % events)
def testMultipleHandlersForStanza(self): def testMultipleHandlersForStanza(self):
""" """
Test that multiple handlers for a single stanza work Test that multiple handlers for a single stanza work