Allow event handlers to be coroutine functions
And do not copy data when running events with XMLStream.event()
This commit is contained in:
parent
92e4bc752a
commit
2b3b86e281
3 changed files with 100 additions and 12 deletions
|
@ -7,9 +7,10 @@
|
|||
"""
|
||||
|
||||
from slixmpp.xmlstream.handler.callback import Callback
|
||||
from slixmpp.xmlstream.handler.coroutine_callback import CoroutineCallback
|
||||
from slixmpp.xmlstream.handler.collector import Collector
|
||||
from slixmpp.xmlstream.handler.waiter import Waiter
|
||||
from slixmpp.xmlstream.handler.xmlcallback import XMLCallback
|
||||
from slixmpp.xmlstream.handler.xmlwaiter import XMLWaiter
|
||||
|
||||
__all__ = ['Callback', 'Waiter', 'XMLCallback', 'XMLWaiter']
|
||||
__all__ = ['Callback', 'CoroutineCallback', 'Waiter', 'XMLCallback', 'XMLWaiter']
|
||||
|
|
84
slixmpp/xmlstream/handler/coroutine_callback.py
Normal file
84
slixmpp/xmlstream/handler/coroutine_callback.py
Normal file
|
@ -0,0 +1,84 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
slixmpp.xmlstream.handler.callback
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Part of Slixmpp: The Slick XMPP Library
|
||||
|
||||
:copyright: (c) 2011 Nathanael C. Fritz
|
||||
:license: MIT, see LICENSE for more details
|
||||
"""
|
||||
|
||||
from slixmpp.xmlstream.handler.base import BaseHandler
|
||||
from slixmpp.xmlstream.asyncio import asyncio
|
||||
|
||||
|
||||
class CoroutineCallback(BaseHandler):
|
||||
|
||||
"""
|
||||
The Callback handler will execute a callback function with
|
||||
matched stanzas.
|
||||
|
||||
The handler may execute the callback either during stream
|
||||
processing or during the main event loop.
|
||||
|
||||
The event will be scheduled to be run soon in the event loop instead
|
||||
of immediately.
|
||||
|
||||
:param string name: The name of the handler.
|
||||
:param matcher: A :class:`~slixmpp.xmlstream.matcher.base.MatcherBase`
|
||||
derived object for matching stanza objects.
|
||||
:param pointer: The function to execute during callback. If ``pointer``
|
||||
is not a coroutine, this function will raise a ValueError.
|
||||
:param bool once: Indicates if the handler should be used only
|
||||
once. Defaults to False.
|
||||
:param bool instream: Indicates if the callback should be executed
|
||||
during stream processing instead of in the
|
||||
main event loop.
|
||||
:param stream: The :class:`~slixmpp.xmlstream.xmlstream.XMLStream`
|
||||
instance this handler should monitor.
|
||||
"""
|
||||
|
||||
def __init__(self, name, matcher, pointer, once=False,
|
||||
instream=False, stream=None):
|
||||
BaseHandler.__init__(self, name, matcher, stream)
|
||||
if not asyncio.iscoroutinefunction(pointer):
|
||||
raise ValueError("Given function is not a coroutine")
|
||||
|
||||
@asyncio.coroutine
|
||||
def pointer_wrapper(stanza, *args, **kwargs):
|
||||
try:
|
||||
yield from pointer(stanza, *args, **kwargs)
|
||||
except Exception as e:
|
||||
stanza.exception(e)
|
||||
|
||||
self._pointer = pointer_wrapper
|
||||
self._once = once
|
||||
self._instream = instream
|
||||
|
||||
def prerun(self, payload):
|
||||
"""Execute the callback during stream processing, if
|
||||
the callback was created with ``instream=True``.
|
||||
|
||||
:param payload: The matched
|
||||
:class:`~slixmpp.xmlstream.stanzabase.ElementBase` object.
|
||||
"""
|
||||
if self._once:
|
||||
self._destroy = True
|
||||
if self._instream:
|
||||
self.run(payload, True)
|
||||
|
||||
def run(self, payload, instream=False):
|
||||
"""Execute the callback function with the matched stanza payload.
|
||||
|
||||
:param payload: The matched
|
||||
:class:`~slixmpp.xmlstream.stanzabase.ElementBase` object.
|
||||
:param bool instream: Force the handler to execute during stream
|
||||
processing. This should only be used by
|
||||
:meth:`prerun()`. Defaults to ``False``.
|
||||
"""
|
||||
if not self._instream or instream:
|
||||
asyncio.async(self._pointer(payload))
|
||||
if self._once:
|
||||
self._destroy = True
|
||||
del self._pointer
|
|
@ -13,7 +13,6 @@
|
|||
"""
|
||||
|
||||
import functools
|
||||
import copy
|
||||
import logging
|
||||
import socket as Socket
|
||||
import ssl
|
||||
|
@ -705,18 +704,22 @@ class XMLStream(asyncio.BaseProtocol):
|
|||
|
||||
handlers = self.__event_handlers.get(name, [])
|
||||
for handler in handlers:
|
||||
#TODO: Data should not be copied, but should be read only,
|
||||
# but this might break current code so it's left for future.
|
||||
handler_callback, disposable = handler
|
||||
|
||||
out_data = copy.copy(data) if len(handlers) > 1 else data
|
||||
old_exception = getattr(data, 'exception', None)
|
||||
try:
|
||||
handler_callback(out_data)
|
||||
except Exception as e:
|
||||
if old_exception:
|
||||
old_exception(e)
|
||||
else:
|
||||
# If the callback is a coroutine, schedule it instead of
|
||||
# running it directly
|
||||
if asyncio.iscoroutinefunction(handler_callback):
|
||||
@asyncio.coroutine
|
||||
def handler_callback_routine(cb):
|
||||
try:
|
||||
yield from cb(data)
|
||||
except Exception as e:
|
||||
self.exception(e)
|
||||
asyncio.async(handler_callback_routine(handler_callback))
|
||||
else:
|
||||
try:
|
||||
handler_callback(data)
|
||||
except Exception as e:
|
||||
self.exception(e)
|
||||
if disposable:
|
||||
# If the handler is disposable, we will go ahead and
|
||||
|
|
Loading…
Reference in a new issue