An other cleanup of xmlstream.py

Remove some useless things (like handling signals, managing the threads,
etc), add some comment to recently added/fixed methods…
This commit is contained in:
Florent Le Coz 2014-07-22 02:58:34 +02:00
parent d3b56a5d94
commit ede9dcd18f

View file

@ -23,7 +23,6 @@ import signal
import socket as Socket
import ssl
import sys
import threading
import time
import random
import weakref
@ -46,7 +45,6 @@ RESPONSE_TIMEOUT = 30
log = logging.getLogger(__name__)
class NotConnectedError(Exception):
"""
Raised when we try to send something over the wire but we are not
@ -205,28 +203,15 @@ class XMLStream(object):
#: if the connection is terminated.
self.end_session_on_disconnect = True
#: A queue of string data to be sent over the stream.
self.send_queue = Queue()
self.send_queue_lock = threading.Lock()
self.send_lock = threading.RLock()
self.__failed_send_stanza = None
#: A mapping of XML namespaces to well-known prefixes.
self.namespace_map = {StanzaBase.xml_ns: 'xml'}
self.__thread = {}
self.__root_stanza = []
self.__handlers = []
self.__event_handlers = {}
self.__filters = {'in': [], 'out': [], 'out_sync': []}
self.__thread_count = 0
self.__thread_cond = threading.Condition()
self.__active_threads = set()
self._use_daemons = False
self._id = 0
self._id_lock = threading.Lock()
#: We use an ID prefix to ensure that all ID values are unique.
self._id_prefix = '%s-' % uuid.uuid4()
@ -241,53 +226,6 @@ class XMLStream(object):
self.add_event_handler('disconnected', self._remove_schedules)
self.add_event_handler('session_start', self._start_keepalive)
self.add_event_handler('session_start', self._cert_expiration)
def use_signals(self, signals=None):
"""Register signal handlers for ``SIGHUP`` and ``SIGTERM``.
By using signals, a ``'killed'`` event will be raised when the
application is terminated.
If a signal handler already existed, it will be executed first,
before the ``'killed'`` event is raised.
:param list signals: A list of signal names to be monitored.
Defaults to ``['SIGHUP', 'SIGTERM']``.
"""
if signals is None:
signals = ['SIGHUP', 'SIGTERM']
existing_handlers = {}
for sig_name in signals:
if hasattr(signal, sig_name):
sig = getattr(signal, sig_name)
handler = signal.getsignal(sig)
if handler:
existing_handlers[sig] = handler
def handle_kill(signum, frame):
"""
Capture kill event and disconnect cleanly after first
spawning the ``'killed'`` event.
"""
if signum in existing_handlers and \
existing_handlers[signum] != handle_kill:
existing_handlers[signum](signum, frame)
self.event("killed", direct=True)
self.disconnect()
try:
for sig_name in signals:
if hasattr(signal, sig_name):
sig = getattr(signal, sig_name)
signal.signal(sig, handle_kill)
self.__signals_installed = True
except:
log.debug("Can not set interrupt signal handlers. " + \
"Slixmpp is not running from a main thread.")
def new_id(self):
"""Generate and return a new stream ID in hexadecimal form.
@ -296,9 +234,8 @@ class XMLStream(object):
ID values. Using this method ensures that all new ID values
are unique in this stream.
"""
with self._id_lock:
self._id += 1
return self.get_id()
self._id += 1
return self.get_id()
def get_id(self):
"""Return the current unique stream ID in hexadecimal form."""
@ -350,11 +287,16 @@ class XMLStream(object):
asyncio.async(connect_routine)
def init_parser(self):
"""init the XML parser. The parser must always be reset for each new
connexion
"""
self.xml_depth = 0
self.xml_root = None
self.parser = xml.etree.ElementTree.XMLPullParser(("start", "end"))
def connection_made(self, transport):
"""Called when the TCP connection has been established with the server
"""
self.event("connected")
self.transport = transport
self.socket = self.transport.get_extra_info("socket")
@ -362,6 +304,12 @@ class XMLStream(object):
self.send_raw(self.stream_header)
def data_received(self, data):
"""Called when incoming data is received on the socket.
We feed that data to the parser and the see if this produced any XML
event. This could trigger one or more event (a stanza is received,
the stream is opened, etc).
"""
self.parser.feed(data)
for event, xml in self.parser.read_events():
if event == 'start':
@ -372,6 +320,7 @@ class XMLStream(object):
stream=self,
top_level=True,
open_only=True))
self.start_stream_handler(self.xml_root)
self.xml_depth += 1
if event == 'end':
self.xml_depth -= 1
@ -381,8 +330,8 @@ class XMLStream(object):
log.debug("End of stream received")
self.abort()
elif self.xml_depth == 1:
# We only raise events for stanzas that are direct
# children of the root element.
# A stanza is an XML element that is a direct child of
# the root element, hence the check of depth == 1
self.__spawn_event(xml)
if self.xml_root is not None:
# Keep the root element empty of children to
@ -390,28 +339,31 @@ class XMLStream(object):
self.xml_root.clear()
def eof_received(self):
"""
When the TCP connection is properly closed by the remote end
"""When the TCP connection is properly closed by the remote end
"""
log.debug("eof_received")
def connection_lost(self, exception):
"""On any kind of disconnection, initiated by us or not. This signals the
closure of the TCP connection
"""
On any kind of disconnection
"""
log.warning("connection_lost: %s", (exception,))
log.info("connection_lost: %s", (exception,))
self.event("disconnected")
if self.end_session_on_disconnect:
self.event('session_end')
# All these objects are associated with one TCP connection. Since
# we are not connected anymore, destroy them
self.parser = None
self.transport = None
self.socket = None
self.event("disconnected")
def disconnect(self, wait=2.0):
"""Close the XML stream and wait for an acknowldgement from the server for
at most `wait` seconds. After the given number of seconds has
passed without a response from the serveur, abort() is called. If
wait is 0.0, this is equivalent to calling abort() directly.
passed without a response from the serveur, or when the server
successfuly responds with a closure of its own stream, abort() is
called. If wait is 0.0, this is almost equivalent to calling abort()
directly.
Does nothing if we are not connected.
@ -429,7 +381,7 @@ class XMLStream(object):
"""
if self.transport:
self.transport.abort()
self.event("killed", direct=True)
self.event("killed")
def reconnect(self, wait=2.0):
"""Calls disconnect(), and once we are disconnected (after the timeout, or
@ -475,40 +427,6 @@ class XMLStream(object):
server_hostname=self.address[0])
asyncio.async(ssl_connect_routine)
def _cert_expiration(self, event):
"""Schedule an event for when the TLS certificate expires."""
if not self._der_cert:
log.warn("TLS or SSL was enabled, but no certificate was found.")
return
def restart():
if not self.event_handled('ssl_expired_cert'):
log.warn("The server certificate has expired. Restarting.")
self.reconnect()
else:
pem_cert = ssl.DER_cert_to_PEM_cert(self._der_cert)
self.event('ssl_expired_cert', pem_cert)
cert_ttl = cert.get_ttl(self._der_cert)
if cert_ttl is None:
return
if cert_ttl.days < 0:
log.warn('CERT: Certificate has expired.')
restart()
try:
total_seconds = cert_ttl.total_seconds()
except AttributeError:
# for Python < 2.7
total_seconds = (cert_ttl.microseconds + (cert_ttl.seconds + cert_ttl.days * 24 * 3600) * 10**6) / 10**6
log.info('CERT: Time until certificate expiration: %s' % cert_ttl)
self.schedule('Certificate Expiration',
total_seconds,
restart)
def _start_keepalive(self, event):
"""Begin sending whitespace periodically to keep the connection alive.
@ -596,39 +514,6 @@ class XMLStream(object):
"""Remove an incoming or outgoing filter."""
self.__filters[mode].remove(handler)
def add_handler(self, mask, pointer, name=None, disposable=False,
threaded=False, filter=False, instream=False):
"""A shortcut method for registering a handler using XML masks.
The use of :meth:`register_handler()` is preferred.
:param mask: An XML snippet matching the structure of the
stanzas that will be passed to this handler.
:param pointer: The handler function itself.
:parm name: A unique name for the handler. A name will
be generated if one is not provided.
:param disposable: Indicates if the handler should be discarded
after one use.
:param threaded: **DEPRECATED**.
Remains for backwards compatibility.
:param filter: **DEPRECATED**.
Remains for backwards compatibility.
:param instream: Indicates if the handler should execute during
stream processing and not during normal event
processing.
"""
# To prevent circular dependencies, we must load the matcher
# and handler classes here.
if name is None:
name = 'add_handler_%s' % self.new_id()
self.register_handler(
XMLCallback(name,
MatchXMLMask(mask, self.default_ns),
pointer,
once=disposable,
instream=instream))
def register_handler(self, handler, before=None, after=None):
"""Add a stream event handler that will be executed when a matching
stanza is received.
@ -688,22 +573,19 @@ class XMLStream(object):
else:
return next(self.dns_answers)
def add_event_handler(self, name, pointer,
threaded=False, disposable=False):
def add_event_handler(self, name, pointer, disposable=False):
"""Add a custom event handler that will be executed whenever
its event is manually triggered.
:param name: The name of the event that will trigger
this handler.
:param pointer: The function to execute.
:param threaded: If set to ``True``, the handler will execute
in its own thread. Defaults to ``False``.
:param disposable: If set to ``True``, the handler will be
discarded after one use. Defaults to ``False``.
"""
if not name in self.__event_handlers:
self.__event_handlers[name] = []
self.__event_handlers[name].append((pointer, threaded, disposable))
self.__event_handlers[name].append((pointer, disposable))
def del_event_handler(self, name, pointer):
"""Remove a function as a handler for an event.
@ -747,30 +629,32 @@ class XMLStream(object):
for handler in handlers:
#TODO: Data should not be copied, but should be read only,
# but this might break current code so it's left for future.
handler_callback, disposable = handler
out_data = copy.copy(data) if len(handlers) > 1 else data
old_exception = getattr(data, 'exception', None)
if direct:
try:
handler[0](out_data)
handler_callback(out_data)
except Exception as e:
error_msg = 'Error processing event handler: %s'
log.exception(error_msg, str(handler[0]))
log.exception(error_msg, str(handler_callback))
if old_exception:
old_exception(e)
else:
self.exception(e)
else:
self.run_event(('event', handler, out_data))
if handler[2]:
if disposable:
# If the handler is disposable, we will go ahead and
# remove it now instead of waiting for it to be
# processed in the queue.
try:
h_index = self.__event_handlers[name].index(handler)
self.__event_handlers[name].pop(h_index)
except:
pass
else:
self.__event_handlers[name].pop(h_index)
def schedule(self, name, seconds, callback, args=tuple(),
kwargs={}, repeat=False):
@ -877,22 +761,13 @@ class XMLStream(object):
:param string data: Any bytes or utf-8 string value.
"""
print("SEND: %s" % (data))
if not self.transport:
raise NotConnectedError()
if isinstance(data, str):
data = data.encode('utf-8')
self.transport.write(data)
def _start_thread(self, name, target, track=True):
self.__thread[name] = threading.Thread(name=name, target=target)
self.__thread[name].daemon = self._use_daemons
self.__thread[name].start()
if track:
self.__active_threads.add(name)
with self.__thread_cond:
self.__thread_count += 1
def _build_stanza(self, xml, default_ns=None):
"""Create a stanza object from a given XML object.
@ -965,25 +840,6 @@ class XMLStream(object):
if unhandled:
stanza.unhandled()
def _threaded_event_wrapper(self, func, args):
"""Capture exceptions for event handlers that run
in individual threads.
:param func: The event handler to execute.
:param args: Arguments to the event handler.
"""
# this is always already copied before this is invoked
orig = args[0]
try:
func(*args)
except Exception as e:
error_msg = 'Error processing event handler: %s'
log.exception(error_msg, str(func))
if hasattr(orig, 'exception'):
orig.exception(e)
else:
self.exception(e)
def run_event(self, event):
etype, handler = event[0:2]
args = event[2:]
@ -1005,17 +861,9 @@ class XMLStream(object):
log.exception('Error processing scheduled task')
self.exception(e)
elif etype == 'event':
func, threaded, disposable = handler
func, disposable = handler
try:
if threaded:
x = threading.Thread(
name="Event_%s" % str(func),
target=self._threaded_event_wrapper,
args=(func, args))
x.daemon = self._use_daemons
x.start()
else:
func(*args)
func(*args)
except Exception as e:
error_msg = 'Error processing event handler: %s'
log.exception(error_msg, str(func))