From 373505f48351a310f7d7ddf0da92fd604682daf1 Mon Sep 17 00:00:00 2001 From: Florent Le Coz Date: Mon, 21 Jul 2014 20:27:53 +0200 Subject: [PATCH] Clean a new bunch of stuf --- examples/register_account.py | 2 +- slixmpp/clientxmpp.py | 1 - slixmpp/componentxmpp.py | 2 +- slixmpp/features/feature_bind/bind.py | 1 - .../features/feature_mechanisms/mechanisms.py | 4 +- slixmpp/features/feature_session/session.py | 1 - slixmpp/features/feature_starttls/starttls.py | 2 +- slixmpp/plugins/xep_0078/legacyauth.py | 1 - slixmpp/plugins/xep_0115/caps.py | 2 +- slixmpp/plugins/xep_0198/stream_management.py | 9 +- slixmpp/stanza/iq.py | 3 - slixmpp/xmlstream/stanzabase.py | 2 +- slixmpp/xmlstream/xmlstream.py | 186 +++--------------- 13 files changed, 37 insertions(+), 179 deletions(-) diff --git a/examples/register_account.py b/examples/register_account.py index 199f697c..bbe1dc53 100755 --- a/examples/register_account.py +++ b/examples/register_account.py @@ -101,7 +101,7 @@ class RegisterBot(slixmpp.ClientXMPP): resp['register']['password'] = self.password try: - resp.send(now=True) + resp.send() logging.info("Account created for %s!" % self.boundjid) except IqError as e: logging.error("Could not register account: %s" % diff --git a/slixmpp/clientxmpp.py b/slixmpp/clientxmpp.py index ae9010d4..fa36ad56 100644 --- a/slixmpp/clientxmpp.py +++ b/slixmpp/clientxmpp.py @@ -144,7 +144,6 @@ class ClientXMPP(BaseXMPP): :param use_ssl: Indicates if the older SSL connection method should be used. Defaults to ``False``. """ - self.session_started_event.clear() # If an address was provided, disable using DNS SRV lookup; # otherwise, use the domain from the client JID with the standard diff --git a/slixmpp/componentxmpp.py b/slixmpp/componentxmpp.py index c640c3dc..632db189 100644 --- a/slixmpp/componentxmpp.py +++ b/slixmpp/componentxmpp.py @@ -143,7 +143,7 @@ class ComponentXMPP(BaseXMPP): handshake = ET.Element('{jabber:component:accept}handshake') handshake.text = hashlib.sha1(pre_hash).hexdigest().lower() - self.send_xml(handshake, now=True) + self.send_xml(handshake) def _handle_handshake(self, xml): """The handshake has been accepted. diff --git a/slixmpp/features/feature_bind/bind.py b/slixmpp/features/feature_bind/bind.py index f636abf9..e26c3ce6 100644 --- a/slixmpp/features/feature_bind/bind.py +++ b/slixmpp/features/feature_bind/bind.py @@ -64,5 +64,4 @@ class FeatureBind(BasePlugin): if 'session' not in self.features['features']: log.debug("Established Session") self.xmpp.sessionstarted = True - self.xmpp.session_started_event.set() self.xmpp.event('session_start') diff --git a/slixmpp/features/feature_mechanisms/mechanisms.py b/slixmpp/features/feature_mechanisms/mechanisms.py index 3cbb83f2..00dd481c 100644 --- a/slixmpp/features/feature_mechanisms/mechanisms.py +++ b/slixmpp/features/feature_mechanisms/mechanisms.py @@ -196,7 +196,7 @@ class FeatureMechanisms(BasePlugin): self.attempted_mechs.add(self.mech.name) self.xmpp.disconnect() else: - resp.send(now=True) + resp.send() return True @@ -217,7 +217,7 @@ class FeatureMechanisms(BasePlugin): else: if resp.get_value() == '': resp.del_value() - resp.send(now=True) + resp.send() def _handle_success(self, stanza): """SASL authentication succeeded. Restart the stream.""" diff --git a/slixmpp/features/feature_session/session.py b/slixmpp/features/feature_session/session.py index 08f7480f..e57405db 100644 --- a/slixmpp/features/feature_session/session.py +++ b/slixmpp/features/feature_session/session.py @@ -51,5 +51,4 @@ class FeatureSession(BasePlugin): log.debug("Established Session") self.xmpp.sessionstarted = True - self.xmpp.session_started_event.set() self.xmpp.event('session_start') diff --git a/slixmpp/features/feature_starttls/starttls.py b/slixmpp/features/feature_starttls/starttls.py index a05f755b..5aad8ed4 100644 --- a/slixmpp/features/feature_starttls/starttls.py +++ b/slixmpp/features/feature_starttls/starttls.py @@ -55,7 +55,7 @@ class FeatureSTARTTLS(BasePlugin): elif self.xmpp.disable_starttls: return False else: - self.xmpp.send(features['starttls'], now=True) + self.xmpp.send(features['starttls']) return True def _handle_starttls_proceed(self, proceed): diff --git a/slixmpp/plugins/xep_0078/legacyauth.py b/slixmpp/plugins/xep_0078/legacyauth.py index d3826e59..8d2ea230 100644 --- a/slixmpp/plugins/xep_0078/legacyauth.py +++ b/slixmpp/plugins/xep_0078/legacyauth.py @@ -141,7 +141,6 @@ class XEP_0078(BasePlugin): log.debug("Established Session") self.xmpp.sessionstarted = True - self.xmpp.session_started_event.set() self.xmpp.event('session_start') return True diff --git a/slixmpp/plugins/xep_0115/caps.py b/slixmpp/plugins/xep_0115/caps.py index 378c6ae0..c548de11 100644 --- a/slixmpp/plugins/xep_0115/caps.py +++ b/slixmpp/plugins/xep_0115/caps.py @@ -305,7 +305,7 @@ class XEP_0115(BasePlugin): self.cache_caps(ver, info) self.assign_verstring(jid, ver) - if self.xmpp.session_started_event.is_set() and self.broadcast: + if self.xmpp.sessionstarted and self.broadcast: if self.xmpp.is_component or preserve: for contact in self.xmpp.roster[jid]: self.xmpp.roster[jid][contact].send_last_presence() diff --git a/slixmpp/plugins/xep_0198/stream_management.py b/slixmpp/plugins/xep_0198/stream_management.py index 64052fc5..acf37cd7 100644 --- a/slixmpp/plugins/xep_0198/stream_management.py +++ b/slixmpp/plugins/xep_0198/stream_management.py @@ -173,7 +173,7 @@ class XEP_0198(BasePlugin): ack = stanza.Ack(self.xmpp) with self.handled_lock: ack['h'] = self.handled - self.xmpp.send_raw(str(ack), now=True) + self.xmpp.send_raw(str(ack)) def request_ack(self, e=None): """Request an ack from the server.""" @@ -199,14 +199,14 @@ class XEP_0198(BasePlugin): self.enabled.set() enable = stanza.Enable(self.xmpp) enable['resume'] = self.allow_resume - enable.send(now=True) + enable.send() self.handled = 0 elif self.sm_id and self.allow_resume: self.enabled.set() resume = stanza.Resume(self.xmpp) resume['h'] = self.handled resume['previd'] = self.sm_id - resume.send(now=True) + resume.send() # Wait for a response before allowing stream feature processing # to continue. The actual result processing will be done in the @@ -239,8 +239,7 @@ class XEP_0198(BasePlugin): self.xmpp.features.add('stream_management') self._handle_ack(stanza) for id, stanza in self.unacked_queue: - self.xmpp.send(stanza, now=True, use_filters=False) - self.xmpp.session_started_event.set() + self.xmpp.send(stanza, use_filters=False) self.xmpp.event('session_resumed', stanza) def _handle_failed(self, stanza): diff --git a/slixmpp/stanza/iq.py b/slixmpp/stanza/iq.py index 4be819b0..605e226a 100644 --- a/slixmpp/stanza/iq.py +++ b/slixmpp/stanza/iq.py @@ -191,9 +191,6 @@ class Iq(RootStanza): stanza. Only called if there is a callback parameter (and therefore are in async mode). """ - if timeout is None: - timeout = self.stream.response_timeout - if self.stream.session_bind_event.is_set(): matcher = MatchIDSender({ 'id': self['id'], diff --git a/slixmpp/xmlstream/stanzabase.py b/slixmpp/xmlstream/stanzabase.py index 0921dde4..75555c34 100644 --- a/slixmpp/xmlstream/stanzabase.py +++ b/slixmpp/xmlstream/stanzabase.py @@ -1580,7 +1580,7 @@ class StanzaBase(ElementBase): stanza sent immediately. Useful for stream initialization. Defaults to ``False``. """ - self.stream.send(self, now=now) + self.stream.send(self) def __copy__(self): """Return a copy of the stanza object that does not share the diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index 2846d4a4..81b5b55f 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -45,35 +45,6 @@ from slixmpp.xmlstream.resolver import resolve, default_resolver #: The time in seconds to wait before timing out waiting for response stanzas. RESPONSE_TIMEOUT = 30 -#: The time in seconds to wait for events from the event queue, and also the -#: time between checks for the process stop signal. -WAIT_TIMEOUT = 1.0 - -#: The number of threads to use to handle XML stream events. This is not the -#: same as the number of custom event handling threads. -#: :data:`HANDLER_THREADS` must be at least 1. For Python implementations -#: with a GIL, this should be left at 1, but for implemetnations without -#: a GIL increasing this value can provide better performance. -HANDLER_THREADS = 1 - -#: The time in seconds to delay between attempts to resend data -#: after an SSL error. -SSL_RETRY_DELAY = 0.5 - -#: The maximum number of times to attempt resending data due to -#: an SSL error. -SSL_RETRY_MAX = 10 - -#: Maximum time to delay between connection attempts is one hour. -RECONNECT_MAX_DELAY = 600 - -#: Maximum number of attempts to connect to the server before quitting -#: and raising a 'connect_failed' event. Setting this to ``None`` will -#: allow infinite reconnection attempts, and using ``0`` will disable -#: reconnections. Defaults to ``None``. -RECONNECT_MAX_ATTEMPTS = None - - log = logging.getLogger(__name__) @@ -83,6 +54,11 @@ class RestartStream(Exception): resending the stream header. """ +class NotConnectedError(Exception): + """ + Raised when we try to send something over the wire but we are not + connected. + """ class XMLStream(object): """ @@ -166,36 +142,11 @@ class XMLStream(object): self._der_cert = None - #: The time in seconds to wait for events from the event queue, - #: and also the time between checks for the process stop signal. - self.wait_timeout = WAIT_TIMEOUT - - #: The time in seconds to wait before timing out waiting - #: for response stanzas. - self.response_timeout = RESPONSE_TIMEOUT - #: The current amount to time to delay attempting to reconnect. #: This value doubles (with some jitter) with each failed #: connection attempt up to :attr:`reconnect_max_delay` seconds. self.reconnect_delay = None - #: Maximum time to delay between connection attempts is one hour. - self.reconnect_max_delay = RECONNECT_MAX_DELAY - - #: Maximum number of attempts to connect to the server before - #: quitting and raising a 'connect_failed' event. Setting to - #: ``None`` allows infinite reattempts, while setting it to ``0`` - #: will disable reconnection attempts. Defaults to ``None``. - self.reconnect_max_attempts = RECONNECT_MAX_ATTEMPTS - - #: The time in seconds to delay between attempts to resend data - #: after an SSL error. - self.ssl_retry_max = SSL_RETRY_MAX - - #: The maximum number of times to attempt resending data due to - #: an SSL error. - self.ssl_retry_delay = SSL_RETRY_DELAY - #: The connection state machine tracks if the stream is #: ``'connected'`` or ``'disconnected'``. self.state = StateMachine(('disconnected', 'connected')) @@ -267,20 +218,6 @@ class XMLStream(object): #: :attr:`whitespace_keepalive` is enabled. self.whitespace_keepalive_interval = 300 - #: An :class:`~threading.Event` to signal receiving a closing - #: stream tag from the server. - self.stream_end_event = threading.Event() - self.stream_end_event.set() - - #: An :class:`~threading.Event` to signal the start of a stream - #: session. Until this event fires, the send queue is not used - #: and data is sent immediately over the wire. - self.session_started_event = threading.Event() - - #: The default time in seconds to wait for a session to start - #: after connecting before reconnecting and trying again. - self.session_timeout = 45 - #: Flag for controlling if the session can be considered ended #: if the connection is terminated. self.end_session_on_disconnect = True @@ -312,10 +249,6 @@ class XMLStream(object): #: We use an ID prefix to ensure that all ID values are unique. self._id_prefix = '%s-' % uuid.uuid4() - #: The :attr:`auto_reconnnect` setting controls whether or not - #: the stream will be restarted in the event of an error. - self.auto_reconnect = True - #: The :attr:`disconnect_wait` setting is the default value #: for controlling if the system waits for the send queue to #: empty before ending the stream. This may be overridden by @@ -331,7 +264,6 @@ class XMLStream(object): #: ``_xmpp-client._tcp`` service. self.dns_service = None - self.add_event_handler('connected', self._session_timeout_check) self.add_event_handler('disconnected', self._remove_schedules) self.add_event_handler('session_start', self._start_keepalive) self.add_event_handler('session_start', self._cert_expiration) @@ -887,12 +819,11 @@ class XMLStream(object): # If the handler is disposable, we will go ahead and # remove it now instead of waiting for it to be # processed in the queue. - with self.__event_handlers_lock: - try: - h_index = self.__event_handlers[name].index(handler) - self.__event_handlers[name].pop(h_index) - except: - pass + try: + h_index = self.__event_handlers[name].index(handler) + self.__event_handlers[name].pop(h_index) + except: + pass def schedule(self, name, seconds, callback, args=tuple(), kwargs={}, repeat=False): @@ -954,34 +885,18 @@ class XMLStream(object): """ return xml - def send(self, data, mask=None, timeout=None, now=False, use_filters=True): + def send(self, data, use_filters=True): """A wrapper for :meth:`send_raw()` for sending stanza objects. May optionally block until an expected response is received. :param data: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase` stanza to send on the stream. - :param mask: **DEPRECATED** - An XML string snippet matching the structure - of the expected response. Execution will block - in this thread until the response is received - or a timeout occurs. - :param int timeout: Time in seconds to wait for a response before - continuing. Defaults to :attr:`response_timeout`. - :param bool now: Indicates if the send queue should be skipped, - sending the stanza immediately. Useful mainly - for stream initialization stanzas. - Defaults to ``False``. :param bool use_filters: Indicates if outgoing filters should be applied to the given stanza data. Disabling filters is useful when resending stanzas. Defaults to ``True``. """ - if timeout is None: - timeout = self.response_timeout - if hasattr(mask, 'xml'): - mask = mask.xml - if isinstance(data, ElementBase): if use_filters: for filter in self.__filters['out']: @@ -989,61 +904,37 @@ class XMLStream(object): if data is None: return - if mask is not None: - log.warning("Use of send mask waiters is deprecated.") - wait_for = Waiter("SendWait_%s" % self.new_id(), - MatchXMLMask(mask)) - self.register_handler(wait_for) - if isinstance(data, ElementBase): - with self.send_queue_lock: - if use_filters: - for filter in self.__filters['out_sync']: - data = filter(data) - if data is None: - return - str_data = tostring(data.xml, xmlns=self.default_ns, - stream=self, - top_level=True) - self.send_raw(str_data) + if use_filters: + for filter in self.__filters['out_sync']: + data = filter(data) + if data is None: + return + str_data = tostring(data.xml, xmlns=self.default_ns, + stream=self, + top_level=True) + self.send_raw(str_data) else: self.send_raw(data) - if mask is not None: - return wait_for.wait(timeout) - def send_xml(self, data, mask=None, timeout=None, now=False): - """Send an XML object on the stream, and optionally wait - for a response. + def send_xml(self, data): + """Send an XML object on the stream :param data: The :class:`~xml.etree.ElementTree.Element` XML object to send on the stream. - :param mask: **DEPRECATED** - An XML string snippet matching the structure - of the expected response. Execution will block - in this thread until the response is received - or a timeout occurs. - :param int timeout: Time in seconds to wait for a response before - continuing. Defaults to :attr:`response_timeout`. - :param bool now: Indicates if the send queue should be skipped, - sending the stanza immediately. Useful mainly - for stream initialization stanzas. - Defaults to ``False``. """ - if timeout is None: - timeout = self.response_timeout - return self.send(tostring(data), mask, timeout, now) + return self.send(tostring(data)) def send_raw(self, data): """Send raw data across the stream. :param string data: Any bytes or utf-8 string value. """ + if not self.transport: + raise NotConnectedError() if isinstance(data, str): data = data.encode('utf-8') - if not self.transport: - logger.error("Cannot send data, we are not connected.") - else: - self.transport.write(data) + self.transport.write(data) def _start_thread(self, name, target, track=True): self.__thread[name] = threading.Thread(name=name, target=target) @@ -1055,31 +946,6 @@ class XMLStream(object): with self.__thread_cond: self.__thread_count += 1 - def _end_thread(self, name, early=False): - with self.__thread_cond: - curr_thread = threading.current_thread().name - if curr_thread in self.__active_threads: - self.__thread_count -= 1 - self.__active_threads.remove(curr_thread) - - if early: - log.debug('Threading deadlock prevention!') - log.debug(("Marked %s thread as ended due to " + \ - "disconnect() call. %s threads remain.") % ( - name, self.__thread_count)) - else: - log.debug("Stopped %s thread. %s threads remain." % ( - name, self.__thread_count)) - - else: - log.debug(("Finished exiting %s thread after early " + \ - "termination from disconnect() call. " + \ - "%s threads remain.") % ( - name, self.__thread_count)) - - if self.__thread_count == 0: - self.__thread_cond.notify() - def _build_stanza(self, xml, default_ns=None): """Create a stanza object from a given XML object.