diff --git a/slixmpp/plugins/xep_0199/ping.py b/slixmpp/plugins/xep_0199/ping.py index d1a82026..3221ba6a 100644 --- a/slixmpp/plugins/xep_0199/ping.py +++ b/slixmpp/plugins/xep_0199/ping.py @@ -9,7 +9,8 @@ import time import logging -from typing import Optional, Callable +from asyncio import Future +from typing import Optional, Callable, List from slixmpp.jid import JID from slixmpp.stanza import Iq @@ -64,9 +65,10 @@ class XEP_0199(BasePlugin): """ Start the XEP-0199 plugin. """ - register_stanza_plugin(Iq, Ping) + self.__pending_futures: List[Future] = [] + self.xmpp.register_handler( Callback('Ping', StanzaPath('iq@type=get/ping'), @@ -75,7 +77,9 @@ class XEP_0199(BasePlugin): if self.keepalive: self.xmpp.add_event_handler('session_start', self.enable_keepalive) - self.xmpp.add_event_handler('session_end', + self.xmpp.add_event_handler('session_resumed', + self.enable_keepalive) + self.xmpp.add_event_handler('disconnected', self.disable_keepalive) def plugin_end(self): @@ -84,12 +88,23 @@ class XEP_0199(BasePlugin): if self.keepalive: self.xmpp.del_event_handler('session_start', self.enable_keepalive) - self.xmpp.del_event_handler('session_end', + self.xmpp.del_event_handler('session_resumed', + self.enable_keepalive) + self.xmpp.del_event_handler('disconnected', self.disable_keepalive) def session_bind(self, jid): self.xmpp['xep_0030'].add_feature(Ping.namespace) + + def _clear_pending_futures(self): + """Cancel all pending ping futures""" + if self.__pending_futures: + log.debug('Clearing %s pdnding pings', len(self.__pending_futures)) + for future in self.__pending_futures: + future.cancel() + self.__pending_futures.clear() + def enable_keepalive(self, interval=None, timeout=None): if interval: self.interval = interval @@ -97,18 +112,31 @@ class XEP_0199(BasePlugin): self.timeout = timeout self.keepalive = True - handler = lambda event=None: asyncio.ensure_future( - self._keepalive(event), - loop=self.xmpp.loop, - ) + def handler(event=None): + # Cleanup futures + if self.__pending_futures: + tmp_futures = [] + for future in self.__pending_futures[:]: + if not future.done(): + tmp_futures.append(future) + self.__pending_futures = tmp_futures + + future = asyncio.ensure_future( + self._keepalive(event), + loop=self.xmpp.loop, + ) + self.__pending_futures.append(future) self.xmpp.schedule('Ping keepalive', self.interval, handler, repeat=True) def disable_keepalive(self, event=None): + self._clear_pending_futures() self.xmpp.cancel_schedule('Ping keepalive') + session_end = disable_keepalive + async def _keepalive(self, event=None): log.debug("Keepalive ping...") try: diff --git a/slixmpp/stanza/iq.py b/slixmpp/stanza/iq.py index a3f16e2f..0f53425b 100644 --- a/slixmpp/stanza/iq.py +++ b/slixmpp/stanza/iq.py @@ -194,9 +194,11 @@ class Iq(RootStanza): def callback_success(result): type_ = result['type'] if type_ == 'result': - future.set_result(result) + if not future.done(): + future.set_result(result) elif type_ == 'error': - future.set_exception(IqError(result)) + if not future.done(): + future.set_exception(IqError(result)) else: # Most likely an iq addressed to ourself, rearm the callback. handler = constr(handler_name, @@ -212,7 +214,8 @@ class Iq(RootStanza): callback(result) def callback_timeout(): - future.set_exception(IqTimeout(self)) + if not future.done(): + future.set_exception(IqTimeout(self)) self.stream.remove_handler('IqCallback_%s' % self['id']) if timeout_callback is not None: timeout_callback(self)