diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index c5c8581b..7b42b460 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -282,6 +282,8 @@ class XMLStream(object): self.__event_handlers = {} self.__event_handlers_lock = threading.Lock() self.__filters = {'in': [], 'out': [], 'out_sync': []} + self.__thread_count = 0 + self.__thread_cond = threading.Condition() self._use_daemons = False self._id = 0 @@ -650,6 +652,8 @@ class XMLStream(object): if not self.auto_reconnect: self.stop.set() + self._wait_for_threads() + try: self.socket.shutdown(Socket.SHUT_RDWR) self.socket.close() @@ -1174,6 +1178,26 @@ class XMLStream(object): self.send_queue.put(data) return True + def _start_thread(self, name, target, track=True): + self.__thread[name] = threading.Thread(name=name, target=target) + self.__thread[name].daemon = self._use_daemons + self.__thread[name].start() + + with self.__thread_cond: + self.__thread_count += 1 + + def _end_thread(self, name): + with self.__thread_cond: + self.__thread_count -= 1 + log.debug("Stopped %s thread. %s threads remain." % ( + name, self.__thread_count)) + if self.__thread_count == 0: + self.__thread_cond.notify() + + def _wait_for_threads(self): + with self.__thread_cond: + self.__thread_cond.wait() + def process(self, **kwargs): """Initialize the XML streams and begin processing events. @@ -1207,22 +1231,16 @@ class XMLStream(object): else: threaded = kwargs.get('threaded', True) - self.scheduler.process(threaded=True, daemon=self._use_daemons) - - def start_thread(name, target): - self.__thread[name] = threading.Thread(name=name, target=target) - self.__thread[name].daemon = self._use_daemons - self.__thread[name].start() - for t in range(0, HANDLER_THREADS): log.debug("Starting HANDLER THREAD") - start_thread('stream_event_handler_%s' % t, self._event_runner) + self._start_thread('event_thread_%s' % t, self._event_runner) - start_thread('send_thread', self._send_thread) + self._start_thread('send_thread', self._send_thread) + self._start_thread('scheduler_thread', self._scheduler_thread) if threaded: # Run the XML stream in the background for another application. - start_thread('process', self._process) + self._start_thread('read_thread', self._process, track=False) else: self._process() @@ -1466,16 +1484,16 @@ class XMLStream(object): self.exception(e) elif etype == 'quit': log.debug("Quitting event runner thread") - return False + break except KeyboardInterrupt: log.debug("Keyboard Escape Detected in _event_runner") self.event('killed', direct=True) self.disconnect() - return except SystemExit: self.disconnect() self.event_queue.put(('quit', None, None)) - return + + self._end_thread('event runner') def _send_thread(self): """Extract stanzas from the send queue and send them on the stream.""" @@ -1530,6 +1548,12 @@ class XMLStream(object): if not self.stop.is_set(): self.disconnect(self.auto_reconnect) + self._end_thread('send') + + def _scheduler_thread(self): + self.scheduler.process(threaded=False) + self._end_thread('scheduler') + def exception(self, exception): """Process an unknown exception.