Track threads to ensure all have exited when disconnecting.

This commit is contained in:
Lance Stout 2012-04-22 18:13:36 -07:00
parent 913738444e
commit a20a9c505d
3 changed files with 57 additions and 38 deletions

View file

@ -332,6 +332,7 @@ class SleekTest(unittest.TestCase):
# Remove unique ID prefix to make it easier to test # Remove unique ID prefix to make it easier to test
self.xmpp._id_prefix = '' self.xmpp._id_prefix = ''
self.xmpp._disconnect_wait_for_threads = False
# We will use this to wait for the session_start event # We will use this to wait for the session_start event
# for live connections. # for live connections.

View file

@ -139,39 +139,45 @@ class Scheduler(object):
"""Process scheduled tasks.""" """Process scheduled tasks."""
self.run = True self.run = True
try: try:
while self.run and not self.stop.isSet(): while self.run and not self.stop.is_set():
wait = 1 wait = 0.1
updated = False updated = False
if self.schedule: if self.schedule:
wait = self.schedule[0].next - time.time() wait = self.schedule[0].next - time.time()
try: try:
if wait <= 0.0: if wait <= 0.0:
newtask = self.addq.get(False) newtask = self.addq.get(False)
else:
if wait >= 3.0:
wait = 3.0
newtask = self.addq.get(True, wait)
except queue.Empty:
cleanup = []
self.schedule_lock.acquire()
for task in self.schedule:
if time.time() >= task.next:
updated = True
if not task.run():
cleanup.append(task)
else:
break
for task in cleanup:
self.schedule.pop(self.schedule.index(task))
else: else:
updated = True if wait >= 3.0:
self.schedule_lock.acquire() wait = 3.0
self.schedule.append(newtask) newtask = None
finally: elapsed = 0
if updated: while not self.stop.is_set() and \
self.schedule = sorted(self.schedule, newtask is None and \
key=lambda task: task.next) elapsed < wait:
self.schedule_lock.release() newtask = self.addq.get(True, 0.1)
elapsed += 0.1
except queue.Empty:
cleanup = []
self.schedule_lock.acquire()
for task in self.schedule:
if time.time() >= task.next:
updated = True
if not task.run():
cleanup.append(task)
else:
break
for task in cleanup:
self.schedule.pop(self.schedule.index(task))
else:
updated = True
self.schedule_lock.acquire()
self.schedule.append(newtask)
finally:
if updated:
self.schedule = sorted(self.schedule,
key=lambda task: task.next)
self.schedule_lock.release()
except KeyboardInterrupt: except KeyboardInterrupt:
self.run = False self.run = False
except SystemExit: except SystemExit:

View file

@ -52,7 +52,7 @@ RESPONSE_TIMEOUT = 30
#: The time in seconds to wait for events from the event queue, and also the #: The time in seconds to wait for events from the event queue, and also the
#: time between checks for the process stop signal. #: time between checks for the process stop signal.
WAIT_TIMEOUT = 1 WAIT_TIMEOUT = 0.1
#: The number of threads to use to handle XML stream events. This is not the #: The number of threads to use to handle XML stream events. This is not the
#: same as the number of custom event handling threads. #: same as the number of custom event handling threads.
@ -285,6 +285,7 @@ class XMLStream(object):
self.__thread_count = 0 self.__thread_count = 0
self.__thread_cond = threading.Condition() self.__thread_cond = threading.Condition()
self._use_daemons = False self._use_daemons = False
self._disconnect_wait_for_threads = True
self._id = 0 self._id = 0
self._id_lock = threading.Lock() self._id_lock = threading.Lock()
@ -652,7 +653,8 @@ class XMLStream(object):
if not self.auto_reconnect: if not self.auto_reconnect:
self.stop.set() self.stop.set()
self._wait_for_threads() if self._disconnect_wait_for_threads:
self._wait_for_threads()
try: try:
self.socket.shutdown(Socket.SHUT_RDWR) self.socket.shutdown(Socket.SHUT_RDWR)
@ -1183,8 +1185,9 @@ class XMLStream(object):
self.__thread[name].daemon = self._use_daemons self.__thread[name].daemon = self._use_daemons
self.__thread[name].start() self.__thread[name].start()
with self.__thread_cond: if track:
self.__thread_count += 1 with self.__thread_cond:
self.__thread_count += 1
def _end_thread(self, name): def _end_thread(self, name):
with self.__thread_cond: with self.__thread_cond:
@ -1196,7 +1199,12 @@ class XMLStream(object):
def _wait_for_threads(self): def _wait_for_threads(self):
with self.__thread_cond: with self.__thread_cond:
self.__thread_cond.wait() if self.__thread_count != 0:
log.debug("Waiting for %s threads to exit." %
self.__thread_count)
self.__thread_cond.wait()
if self.__thread_count != 0:
raise Exception("Hanged threads: %s" % threading.enumerate())
def process(self, **kwargs): def process(self, **kwargs):
"""Initialize the XML streams and begin processing events. """Initialize the XML streams and begin processing events.
@ -1501,7 +1509,7 @@ class XMLStream(object):
while not self.stop.is_set(): while not self.stop.is_set():
while not self.stop.is_set() and \ while not self.stop.is_set() and \
not self.session_started_event.is_set(): not self.session_started_event.is_set():
self.session_started_event.wait(timeout=1) self.session_started_event.wait(timeout=0.1)
if self.__failed_send_stanza is not None: if self.__failed_send_stanza is not None:
data = self.__failed_send_stanza data = self.__failed_send_stanza
self.__failed_send_stanza = None self.__failed_send_stanza = None
@ -1541,12 +1549,16 @@ class XMLStream(object):
log.warning("Failed to send %s", data) log.warning("Failed to send %s", data)
if not self.stop.is_set(): if not self.stop.is_set():
self.__failed_send_stanza = data self.__failed_send_stanza = data
self._end_thread('send')
self.disconnect(self.auto_reconnect, send_close=False) self.disconnect(self.auto_reconnect, send_close=False)
return
except Exception as ex: except Exception as ex:
log.exception('Unexpected error in send thread: %s', ex) log.exception('Unexpected error in send thread: %s', ex)
self.exception(ex) self.exception(ex)
if not self.stop.is_set(): if not self.stop.is_set():
self._end_thread('send')
self.disconnect(self.auto_reconnect) self.disconnect(self.auto_reconnect)
return
self._end_thread('send') self._end_thread('send')