diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index 5df656ad..2846d4a4 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -267,10 +267,6 @@ class XMLStream(object): #: :attr:`whitespace_keepalive` is enabled. self.whitespace_keepalive_interval = 300 - #: An :class:`~threading.Event` to signal that the application - #: is stopping, and that all threads should shutdown. - self.stop = threading.Event() - #: An :class:`~threading.Event` to signal receiving a closing #: stream tag from the server. self.stream_end_event = threading.Event() @@ -424,8 +420,6 @@ class XMLStream(object): localhost """ - self.stop.clear() - if host and port: self.address = (host, int(port)) try: @@ -552,7 +546,6 @@ class XMLStream(object): def abort(self): self.session_started_event.clear() - self.set_stop() if self._disconnect_wait_for_threads: self._wait_for_threads() try: @@ -1087,9 +1080,6 @@ class XMLStream(object): if self.__thread_count == 0: self.__thread_cond.notify() - def set_stop(self): - self.stop.set() - def _build_stanza(self, xml, default_ns=None): """Create a stanza object from a given XML object. @@ -1221,69 +1211,6 @@ class XMLStream(object): else: self.exception(e) - def _send_thread(self): - """Extract stanzas from the send queue and send them on the stream.""" - try: - while not self.stop.is_set(): - while not self.stop.is_set() and \ - not self.session_started_event.is_set(): - self.session_started_event.wait(timeout=0.1) # Wait for session start - if self.__failed_send_stanza is not None: - data = self.__failed_send_stanza - self.__failed_send_stanza = None - else: - data = self.send_queue.get() # Wait for data to send - if data is None: - continue - log.debug("SEND: %s", data) - enc_data = data.encode('utf-8') - total = len(enc_data) - sent = 0 - count = 0 - tries = 0 - try: - with self.send_lock: - while sent < total and not self.stop.is_set() and \ - self.session_started_event.is_set(): - try: - sent += self.socket.send(enc_data[sent:]) - count += 1 - except Socket.error as serr: - if serr.errno != errno.EINTR: - raise - except ssl.SSLError as serr: - if tries >= self.ssl_retry_max: - log.debug('SSL error: max retries reached') - self.exception(serr) - log.warning("Failed to send %s", data) - if not self.stop.is_set(): - self.disconnect(self.auto_reconnect, - send_close=False) - log.warning('SSL write error: retrying') - if not self.stop.is_set(): - time.sleep(self.ssl_retry_delay) - tries += 1 - if count > 1: - log.debug('SENT: %d chunks', count) - self.send_queue.task_done() - except (Socket.error, ssl.SSLError) as serr: - self.event('socket_error', serr, direct=True) - log.warning("Failed to send %s", data) - if not self.stop.is_set(): - self.__failed_send_stanza = data - self._end_thread('send') - self.disconnect(self.auto_reconnect, send_close=False) - return - except Exception as ex: - log.exception('Unexpected error in send thread: %s', ex) - self.exception(ex) - if not self.stop.is_set(): - self._end_thread('send') - self.disconnect(self.auto_reconnect) - return - - self._end_thread('send') - def exception(self, exception): """Process an unknown exception.