Count and track the main threads, so we can delay disconnecting until all have quit.

This commit is contained in:
Lance Stout 2012-04-21 10:34:09 -07:00
parent 8ee30179ea
commit 913738444e

View file

@ -282,6 +282,8 @@ class XMLStream(object):
self.__event_handlers = {}
self.__event_handlers_lock = threading.Lock()
self.__filters = {'in': [], 'out': [], 'out_sync': []}
self.__thread_count = 0
self.__thread_cond = threading.Condition()
self._use_daemons = False
self._id = 0
@ -650,6 +652,8 @@ class XMLStream(object):
if not self.auto_reconnect:
self.stop.set()
self._wait_for_threads()
try:
self.socket.shutdown(Socket.SHUT_RDWR)
self.socket.close()
@ -1174,6 +1178,26 @@ class XMLStream(object):
self.send_queue.put(data)
return True
def _start_thread(self, name, target, track=True):
self.__thread[name] = threading.Thread(name=name, target=target)
self.__thread[name].daemon = self._use_daemons
self.__thread[name].start()
with self.__thread_cond:
self.__thread_count += 1
def _end_thread(self, name):
with self.__thread_cond:
self.__thread_count -= 1
log.debug("Stopped %s thread. %s threads remain." % (
name, self.__thread_count))
if self.__thread_count == 0:
self.__thread_cond.notify()
def _wait_for_threads(self):
with self.__thread_cond:
self.__thread_cond.wait()
def process(self, **kwargs):
"""Initialize the XML streams and begin processing events.
@ -1207,22 +1231,16 @@ class XMLStream(object):
else:
threaded = kwargs.get('threaded', True)
self.scheduler.process(threaded=True, daemon=self._use_daemons)
def start_thread(name, target):
self.__thread[name] = threading.Thread(name=name, target=target)
self.__thread[name].daemon = self._use_daemons
self.__thread[name].start()
for t in range(0, HANDLER_THREADS):
log.debug("Starting HANDLER THREAD")
start_thread('stream_event_handler_%s' % t, self._event_runner)
self._start_thread('event_thread_%s' % t, self._event_runner)
start_thread('send_thread', self._send_thread)
self._start_thread('send_thread', self._send_thread)
self._start_thread('scheduler_thread', self._scheduler_thread)
if threaded:
# Run the XML stream in the background for another application.
start_thread('process', self._process)
self._start_thread('read_thread', self._process, track=False)
else:
self._process()
@ -1466,16 +1484,16 @@ class XMLStream(object):
self.exception(e)
elif etype == 'quit':
log.debug("Quitting event runner thread")
return False
break
except KeyboardInterrupt:
log.debug("Keyboard Escape Detected in _event_runner")
self.event('killed', direct=True)
self.disconnect()
return
except SystemExit:
self.disconnect()
self.event_queue.put(('quit', None, None))
return
self._end_thread('event runner')
def _send_thread(self):
"""Extract stanzas from the send queue and send them on the stream."""
@ -1530,6 +1548,12 @@ class XMLStream(object):
if not self.stop.is_set():
self.disconnect(self.auto_reconnect)
self._end_thread('send')
def _scheduler_thread(self):
self.scheduler.process(threaded=False)
self._end_thread('scheduler')
def exception(self, exception):
"""Process an unknown exception.