Change the API to make iq.send() always return a future
remove coroutine_wrapper, add a future_wrapper (which is only needed when the result stanza can be cached). Update the documentation as well.
This commit is contained in:
parent
c66a4d4097
commit
bf5d7c83af
5 changed files with 87 additions and 106 deletions
|
@ -32,12 +32,12 @@ Differences from SleekXMPP
|
|||
handlers, which will be also handled in the event loop.
|
||||
|
||||
The :class:`~.slixmpp.stanza.Iq` object’s :meth:`~.slixmpp.stanza.Iq.send`
|
||||
method now takes a *coroutine* parameter which, if set to ``True``,
|
||||
will return a coroutine which will (asyncio-)block until the reply
|
||||
is received.
|
||||
method now **always** return a :class:`~.asyncio.Future` which result will be set
|
||||
to the IQ reply when it is received, or to ``None`` if the IQ is not of
|
||||
type ``get`` or ``set``.
|
||||
|
||||
Many plugins (WIP) calls which retrieve information also accept this
|
||||
``coroutine`` parameter.
|
||||
Many plugins (WIP) calls which retrieve information also return the same
|
||||
future.
|
||||
|
||||
**Architectural differences**
|
||||
slixmpp does not have an event queue anymore, and instead processes
|
||||
|
|
|
@ -7,23 +7,46 @@ Using asyncio
|
|||
Block on IQ sending
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
:meth:`.Iq.send` now accepts a ``coroutine`` parameter which, if ``True``,
|
||||
will return a coroutine waiting for the IQ reply to be received.
|
||||
:meth:`.Iq.send` now returns a :class:`~.Future` so you can easily block with:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
result = yield from iq.send(coroutine=True)
|
||||
result = yield from iq.send()
|
||||
|
||||
.. warning::
|
||||
|
||||
If the reply is an IQ with an ``error`` type, this will raise an
|
||||
:class:`.IqError`, and if it timeouts, it will raise an
|
||||
:class:`.IqTimeout`. Don't forget to catch it.
|
||||
|
||||
You can still use callbacks instead.
|
||||
|
||||
XEP plugin integration
|
||||
~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Many XEP plugins have been modified to accept this ``coroutine`` parameter as
|
||||
well, so you can do things like:
|
||||
The same changes from the SleekXMPP API apply, so you can do:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
iq_info = yield from self.xmpp['xep_0030'].get_info(jid, coroutine=True)
|
||||
iq_info = yield from self.xmpp['xep_0030'].get_info(jid)
|
||||
|
||||
But the following will only return a Future:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
iq_info = self.xmpp['xep_0030'].get_info(jid)
|
||||
|
||||
|
||||
Callbacks, Event Handlers, and Stream Handlers
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
IQ callbacks and :term:`Event Handlers <event handler>` can be coroutine
|
||||
functions; in this case, they will be scheduled in the event loop using
|
||||
:meth:`.asyncio.async` and not ran immediately.
|
||||
|
||||
A :class:`.CoroutineCallback` class has been added as well for
|
||||
:term:`Stream Handlers <stream handler>`, which will use
|
||||
:meth:`.asyncio.async` to schedule the callback.
|
||||
|
||||
Running the event loop
|
||||
~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
@ -52,7 +75,7 @@ callbacks while everything is not ready.
|
|||
|
||||
client = slixmpp.ClientXMPP('jid@example', 'password')
|
||||
client.connected_event = asyncio.Event()
|
||||
callback = lambda event: client.connected_event.set()
|
||||
callback = lambda _: client.connected_event.set()
|
||||
client.add_event_handler('session_start', callback)
|
||||
client.connect()
|
||||
loop.run_until_complete(event.wait())
|
||||
|
@ -112,8 +135,7 @@ JID indicating its findings.
|
|||
def on_message(self, event):
|
||||
# You should probably handle IqError and IqTimeout exceptions here
|
||||
# but this is an example.
|
||||
version = yield from self['xep_0092'].get_version(message['from'],
|
||||
coroutine=True)
|
||||
version = yield from self['xep_0092'].get_version(message['from'])
|
||||
text = "%s sent me a message, he runs %s" % (message['from'],
|
||||
version['software_version']['name'])
|
||||
self.send_message(mto='master@example.tld', mbody=text)
|
||||
|
|
|
@ -16,7 +16,7 @@ from slixmpp.xmlstream.stanzabase import ET, ElementBase, register_stanza_plugin
|
|||
from slixmpp.xmlstream.handler import *
|
||||
from slixmpp.xmlstream import XMLStream
|
||||
from slixmpp.xmlstream.matcher import *
|
||||
from slixmpp.xmlstream.asyncio import asyncio, coroutine_wrapper
|
||||
from slixmpp.xmlstream.asyncio import asyncio, future_wrapper
|
||||
from slixmpp.basexmpp import BaseXMPP
|
||||
from slixmpp.clientxmpp import ClientXMPP
|
||||
from slixmpp.componentxmpp import ComponentXMPP
|
||||
|
|
|
@ -152,59 +152,15 @@ class Iq(RootStanza):
|
|||
new_iq['type'] = 'result'
|
||||
return new_iq
|
||||
|
||||
@asyncio.coroutine
|
||||
def _send_coroutine(self, matcher=None, timeout=None):
|
||||
"""Send an <iq> stanza over the XML stream.
|
||||
|
||||
Blocks (with asyncio) until a the reply is received.
|
||||
Use with yield from iq.send(coroutine=True).
|
||||
|
||||
Overrides StanzaBase.send
|
||||
|
||||
:param int timeout: The length of time (in seconds) to wait for a
|
||||
response before an IqTimeout is raised
|
||||
"""
|
||||
|
||||
future = asyncio.Future()
|
||||
|
||||
def callback(result):
|
||||
future.set_result(result)
|
||||
|
||||
def callback_timeout():
|
||||
future.set_result(None)
|
||||
|
||||
handler_name = 'IqCallback_%s' % self['id']
|
||||
|
||||
if timeout:
|
||||
self.callback = callback
|
||||
self.stream.schedule('IqTimeout_%s' % self['id'],
|
||||
timeout,
|
||||
callback_timeout,
|
||||
repeat=False)
|
||||
handler = Callback(handler_name,
|
||||
matcher,
|
||||
self._handle_result,
|
||||
once=True)
|
||||
else:
|
||||
handler = Callback(handler_name,
|
||||
matcher,
|
||||
callback,
|
||||
once=True)
|
||||
self.stream.register_handler(handler)
|
||||
StanzaBase.send(self)
|
||||
result = yield from future
|
||||
if result is None:
|
||||
raise IqTimeout(self)
|
||||
if result['type'] == 'error':
|
||||
raise IqError(result)
|
||||
return result
|
||||
|
||||
def send(self, callback=None, timeout=None, timeout_callback=None, coroutine=False):
|
||||
"""Send an <iq> stanza over the XML stream.
|
||||
|
||||
A callback handler can be provided that will be executed when the Iq
|
||||
stanza's result reply is received.
|
||||
|
||||
Returns a future which result will be set to the result Iq if it is of type 'get' or 'set'
|
||||
(when it is received), or a future with the result set to None if it has another type.
|
||||
|
||||
Overrides StanzaBase.send
|
||||
|
||||
:param function callback: Optional reference to a stream handler
|
||||
|
@ -218,8 +174,7 @@ class Iq(RootStanza):
|
|||
timeout expires before a response has
|
||||
been received for the originally-sent
|
||||
IQ stanza.
|
||||
:param bool coroutine: This function will return a coroutine if this
|
||||
argument is True.
|
||||
:rtype: asyncio.Future
|
||||
"""
|
||||
if self.stream.session_bind_event.is_set():
|
||||
matcher = MatchIDSender({
|
||||
|
@ -230,36 +185,45 @@ class Iq(RootStanza):
|
|||
else:
|
||||
matcher = MatcherId(self['id'])
|
||||
|
||||
if not coroutine:
|
||||
if callback is not None and self['type'] in ('get', 'set'):
|
||||
handler_name = 'IqCallback_%s' % self['id']
|
||||
if asyncio.iscoroutinefunction(callback):
|
||||
constr = CoroutineCallback
|
||||
else:
|
||||
constr = Callback
|
||||
if timeout_callback:
|
||||
self.callback = callback
|
||||
self.timeout_callback = timeout_callback
|
||||
self.stream.schedule('IqTimeout_%s' % self['id'],
|
||||
timeout,
|
||||
self._fire_timeout,
|
||||
repeat=False)
|
||||
handler = constr(handler_name,
|
||||
matcher,
|
||||
self._handle_result,
|
||||
once=True)
|
||||
else:
|
||||
handler = constr(handler_name,
|
||||
matcher,
|
||||
callback,
|
||||
once=True)
|
||||
self.stream.register_handler(handler)
|
||||
StanzaBase.send(self)
|
||||
return handler_name
|
||||
future = asyncio.Future()
|
||||
|
||||
def callback_success(result):
|
||||
if result['type'] == 'error':
|
||||
future.set_exception(IqError(result))
|
||||
else:
|
||||
return StanzaBase.send(self)
|
||||
future.set_result(result)
|
||||
|
||||
if timeout_callback is not None and timeout is not None:
|
||||
self.stream.cancel_schedule('IqTimeout_%s' % self['id'])
|
||||
if callback is not None:
|
||||
callback(result)
|
||||
|
||||
def callback_timeout():
|
||||
future.set_exception(IqTimeout(self))
|
||||
self.stream.remove_handler('IqCallback_%s' % self['id'])
|
||||
if timeout_callback is not None:
|
||||
timeout_callback(self)
|
||||
|
||||
if self['type'] in ('get', 'set'):
|
||||
handler_name = 'IqCallback_%s' % self['id']
|
||||
if asyncio.iscoroutinefunction(callback):
|
||||
constr = CoroutineCallback
|
||||
else:
|
||||
constr = Callback
|
||||
if timeout_callback is not None and timeout is not None:
|
||||
self.stream.schedule('IqTimeout_%s' % self['id'],
|
||||
timeout,
|
||||
callback_timeout,
|
||||
repeat=False)
|
||||
handler = constr(handler_name,
|
||||
matcher,
|
||||
callback_success,
|
||||
once=True)
|
||||
self.stream.register_handler(handler)
|
||||
else:
|
||||
return self._send_coroutine(timeout=timeout, matcher=matcher)
|
||||
future.set_result(None)
|
||||
StanzaBase.send(self)
|
||||
return future
|
||||
|
||||
def _handle_result(self, iq):
|
||||
# we got the IQ, so don't fire the timeout
|
||||
|
|
|
@ -33,23 +33,18 @@ cls.idle_call = idle_call
|
|||
real_run_once = cls._run_once
|
||||
cls._run_once = my_run_once
|
||||
|
||||
|
||||
def coroutine_wrapper(func):
|
||||
def future_wrapper(func):
|
||||
"""
|
||||
Make sure the result of a function call is a coroutine
|
||||
if the ``coroutine`` keyword argument is true.
|
||||
Make sure the result of a function call is an asyncio.Future()
|
||||
object.
|
||||
"""
|
||||
def wrap_coro(result):
|
||||
if asyncio.iscoroutinefunction(result):
|
||||
return result
|
||||
else:
|
||||
return asyncio.coroutine(lambda: result)()
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
if kwargs.get('coroutine', False):
|
||||
return wrap_coro(func(*args, **kwargs))
|
||||
else:
|
||||
return func(*args, **kwargs)
|
||||
result = func(*args, **kwargs)
|
||||
if isinstance(result, asyncio.Future):
|
||||
return result
|
||||
future = asyncio.Future()
|
||||
future.set_result(result)
|
||||
return future
|
||||
|
||||
return wrapper
|
||||
|
|
Loading…
Reference in a new issue