diff --git a/slixmpp/xmlstream/handler/__init__.py b/slixmpp/xmlstream/handler/__init__.py index 31de9dfc..51a7ca6a 100644 --- a/slixmpp/xmlstream/handler/__init__.py +++ b/slixmpp/xmlstream/handler/__init__.py @@ -7,9 +7,10 @@ """ from slixmpp.xmlstream.handler.callback import Callback +from slixmpp.xmlstream.handler.coroutine_callback import CoroutineCallback from slixmpp.xmlstream.handler.collector import Collector from slixmpp.xmlstream.handler.waiter import Waiter from slixmpp.xmlstream.handler.xmlcallback import XMLCallback from slixmpp.xmlstream.handler.xmlwaiter import XMLWaiter -__all__ = ['Callback', 'Waiter', 'XMLCallback', 'XMLWaiter'] +__all__ = ['Callback', 'CoroutineCallback', 'Waiter', 'XMLCallback', 'XMLWaiter'] diff --git a/slixmpp/xmlstream/handler/coroutine_callback.py b/slixmpp/xmlstream/handler/coroutine_callback.py new file mode 100644 index 00000000..8ad9572e --- /dev/null +++ b/slixmpp/xmlstream/handler/coroutine_callback.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +""" + slixmpp.xmlstream.handler.callback + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Part of Slixmpp: The Slick XMPP Library + + :copyright: (c) 2011 Nathanael C. Fritz + :license: MIT, see LICENSE for more details +""" + +from slixmpp.xmlstream.handler.base import BaseHandler +from slixmpp.xmlstream.asyncio import asyncio + + +class CoroutineCallback(BaseHandler): + + """ + The Callback handler will execute a callback function with + matched stanzas. + + The handler may execute the callback either during stream + processing or during the main event loop. + + The event will be scheduled to be run soon in the event loop instead + of immediately. + + :param string name: The name of the handler. + :param matcher: A :class:`~slixmpp.xmlstream.matcher.base.MatcherBase` + derived object for matching stanza objects. + :param pointer: The function to execute during callback. If ``pointer`` + is not a coroutine, this function will raise a ValueError. + :param bool once: Indicates if the handler should be used only + once. Defaults to False. + :param bool instream: Indicates if the callback should be executed + during stream processing instead of in the + main event loop. + :param stream: The :class:`~slixmpp.xmlstream.xmlstream.XMLStream` + instance this handler should monitor. + """ + + def __init__(self, name, matcher, pointer, once=False, + instream=False, stream=None): + BaseHandler.__init__(self, name, matcher, stream) + if not asyncio.iscoroutinefunction(pointer): + raise ValueError("Given function is not a coroutine") + + @asyncio.coroutine + def pointer_wrapper(stanza, *args, **kwargs): + try: + yield from pointer(stanza, *args, **kwargs) + except Exception as e: + stanza.exception(e) + + self._pointer = pointer_wrapper + self._once = once + self._instream = instream + + def prerun(self, payload): + """Execute the callback during stream processing, if + the callback was created with ``instream=True``. + + :param payload: The matched + :class:`~slixmpp.xmlstream.stanzabase.ElementBase` object. + """ + if self._once: + self._destroy = True + if self._instream: + self.run(payload, True) + + def run(self, payload, instream=False): + """Execute the callback function with the matched stanza payload. + + :param payload: The matched + :class:`~slixmpp.xmlstream.stanzabase.ElementBase` object. + :param bool instream: Force the handler to execute during stream + processing. This should only be used by + :meth:`prerun()`. Defaults to ``False``. + """ + if not self._instream or instream: + asyncio.async(self._pointer(payload)) + if self._once: + self._destroy = True + del self._pointer diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index 2e109792..573ca829 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -13,7 +13,6 @@ """ import functools -import copy import logging import socket as Socket import ssl @@ -705,18 +704,22 @@ class XMLStream(asyncio.BaseProtocol): handlers = self.__event_handlers.get(name, []) for handler in handlers: - #TODO: Data should not be copied, but should be read only, - # but this might break current code so it's left for future. handler_callback, disposable = handler - out_data = copy.copy(data) if len(handlers) > 1 else data - old_exception = getattr(data, 'exception', None) - try: - handler_callback(out_data) - except Exception as e: - if old_exception: - old_exception(e) - else: + # If the callback is a coroutine, schedule it instead of + # running it directly + if asyncio.iscoroutinefunction(handler_callback): + @asyncio.coroutine + def handler_callback_routine(cb): + try: + yield from cb(data) + except Exception as e: + self.exception(e) + asyncio.async(handler_callback_routine(handler_callback)) + else: + try: + handler_callback(data) + except Exception as e: self.exception(e) if disposable: # If the handler is disposable, we will go ahead and