Fixed line lengths and trailing whitespace.
The pep8 command is now pleased.
This commit is contained in:
parent
7c10ff16fb
commit
6de87a1cbf
1 changed files with 37 additions and 36 deletions
|
@ -33,7 +33,7 @@ if sys.version_info < (3, 0):
|
||||||
# The time in seconds to wait before timing out waiting for response stanzas.
|
# The time in seconds to wait before timing out waiting for response stanzas.
|
||||||
RESPONSE_TIMEOUT = 10
|
RESPONSE_TIMEOUT = 10
|
||||||
|
|
||||||
# The number of threads to use to handle XML stream events. This is not the
|
# The number of threads to use to handle XML stream events. This is not the
|
||||||
# same as the number of custom event handling threads. HANDLER_THREADS must
|
# same as the number of custom event handling threads. HANDLER_THREADS must
|
||||||
# be at least 1.
|
# be at least 1.
|
||||||
HANDLER_THREADS = 1
|
HANDLER_THREADS = 1
|
||||||
|
@ -53,9 +53,9 @@ class XMLStream(object):
|
||||||
"""
|
"""
|
||||||
An XML stream connection manager and event dispatcher.
|
An XML stream connection manager and event dispatcher.
|
||||||
|
|
||||||
The XMLStream class abstracts away the issues of establishing a
|
The XMLStream class abstracts away the issues of establishing a
|
||||||
connection with a server and sending and receiving XML "stanzas".
|
connection with a server and sending and receiving XML "stanzas".
|
||||||
A stanza is a complete XML element that is a direct child of a root
|
A stanza is a complete XML element that is a direct child of a root
|
||||||
document element. Two streams are used, one for each communication
|
document element. Two streams are used, one for each communication
|
||||||
direction, over the same socket. Once the connection is closed, both
|
direction, over the same socket. Once the connection is closed, both
|
||||||
streams should be complete and valid XML documents.
|
streams should be complete and valid XML documents.
|
||||||
|
@ -65,7 +65,7 @@ class XMLStream(object):
|
||||||
to events in a SAX XML parser.
|
to events in a SAX XML parser.
|
||||||
Custom -- Triggered manually.
|
Custom -- Triggered manually.
|
||||||
Scheduled -- Triggered based on time delays.
|
Scheduled -- Triggered based on time delays.
|
||||||
|
|
||||||
Typically, stanzas are first processed by a stream event handler which
|
Typically, stanzas are first processed by a stream event handler which
|
||||||
will then trigger custom events to continue further processing,
|
will then trigger custom events to continue further processing,
|
||||||
especially since custom event handlers may run in individual threads.
|
especially since custom event handlers may run in individual threads.
|
||||||
|
@ -75,7 +75,7 @@ class XMLStream(object):
|
||||||
address -- The hostname and port of the server.
|
address -- The hostname and port of the server.
|
||||||
default_ns -- The default XML namespace that will be applied
|
default_ns -- The default XML namespace that will be applied
|
||||||
to all non-namespaced stanzas.
|
to all non-namespaced stanzas.
|
||||||
event_queue -- A queue of stream, custom, and scheduled
|
event_queue -- A queue of stream, custom, and scheduled
|
||||||
events to be processed.
|
events to be processed.
|
||||||
filesocket -- A filesocket created from the main connection socket.
|
filesocket -- A filesocket created from the main connection socket.
|
||||||
Required for ElementTree.iterparse.
|
Required for ElementTree.iterparse.
|
||||||
|
@ -117,7 +117,7 @@ class XMLStream(object):
|
||||||
send_xml -- Send an XML string on the stream.
|
send_xml -- Send an XML string on the stream.
|
||||||
set_socket -- Set the stream's socket and generate a new
|
set_socket -- Set the stream's socket and generate a new
|
||||||
filesocket.
|
filesocket.
|
||||||
start_stream_handler -- Meant to be overridden.
|
start_stream_handler -- Meant to be overridden.
|
||||||
start_tls -- Establish a TLS connection and restart
|
start_tls -- Establish a TLS connection and restart
|
||||||
the stream.
|
the stream.
|
||||||
"""
|
"""
|
||||||
|
@ -148,12 +148,12 @@ class XMLStream(object):
|
||||||
|
|
||||||
# TODO: Integrate the new state machine.
|
# TODO: Integrate the new state machine.
|
||||||
self.state = StateMachine()
|
self.state = StateMachine()
|
||||||
self.state.addStates({'connected': False,
|
self.state.addStates({'connected': False,
|
||||||
'is client': False,
|
'is client': False,
|
||||||
'ssl': False,
|
'ssl': False,
|
||||||
'tls': False,
|
'tls': False,
|
||||||
'reconnect': True,
|
'reconnect': True,
|
||||||
'processing': False,
|
'processing': False,
|
||||||
'disconnecting': False})
|
'disconnecting': False})
|
||||||
|
|
||||||
self.address = (host, int(port))
|
self.address = (host, int(port))
|
||||||
|
@ -170,7 +170,7 @@ class XMLStream(object):
|
||||||
self.event_queue = queue.Queue()
|
self.event_queue = queue.Queue()
|
||||||
self.send_queue = queue.Queue()
|
self.send_queue = queue.Queue()
|
||||||
self.scheduler = Scheduler(self.event_queue)
|
self.scheduler = Scheduler(self.event_queue)
|
||||||
|
|
||||||
self.namespace_map = {}
|
self.namespace_map = {}
|
||||||
|
|
||||||
self.__thread = {}
|
self.__thread = {}
|
||||||
|
@ -179,7 +179,7 @@ class XMLStream(object):
|
||||||
|
|
||||||
self.run = True
|
self.run = True
|
||||||
|
|
||||||
def connect(self, host='', port=0, use_ssl=False,
|
def connect(self, host='', port=0, use_ssl=False,
|
||||||
use_tls=True, reattempt=True):
|
use_tls=True, reattempt=True):
|
||||||
"""
|
"""
|
||||||
Create a new socket and connect to the server.
|
Create a new socket and connect to the server.
|
||||||
|
@ -224,7 +224,7 @@ class XMLStream(object):
|
||||||
self.state.set('connected', True)
|
self.state.set('connected', True)
|
||||||
return True
|
return True
|
||||||
except socket.error as serr:
|
except socket.error as serr:
|
||||||
error_msg = "Could not connect. Socket Error #%s: %s"
|
error_msg = "Could not connect. Socket Error #%s: %s"
|
||||||
logging.error(error_msg % (serr.errno, serr.strerror))
|
logging.error(error_msg % (serr.errno, serr.strerror))
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
|
@ -265,8 +265,8 @@ class XMLStream(object):
|
||||||
"""
|
"""
|
||||||
Reset the stream's state and reconnect to the server.
|
Reset the stream's state and reconnect to the server.
|
||||||
"""
|
"""
|
||||||
self.state.set('tls',False)
|
self.state.set('tls', False)
|
||||||
self.state.set('ssl',False)
|
self.state.set('ssl', False)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
self.connect()
|
self.connect()
|
||||||
|
|
||||||
|
@ -302,8 +302,8 @@ class XMLStream(object):
|
||||||
"""
|
"""
|
||||||
if self.ssl_support:
|
if self.ssl_support:
|
||||||
logging.info("Negotiating TLS")
|
logging.info("Negotiating TLS")
|
||||||
self.socket = ssl.wrap_socket(self.socket,
|
self.socket = ssl.wrap_socket(self.socket,
|
||||||
ssl_version=ssl.PROTOCOL_TLSv1,
|
ssl_version=ssl.PROTOCOL_TLSv1,
|
||||||
do_handshake_on_connect=False)
|
do_handshake_on_connect=False)
|
||||||
self.socket.do_handshake()
|
self.socket.do_handshake()
|
||||||
self.set_socket(self.socket)
|
self.set_socket(self.socket)
|
||||||
|
@ -347,7 +347,7 @@ class XMLStream(object):
|
||||||
|
|
||||||
def register_handler(self, handler, before=None, after=None):
|
def register_handler(self, handler, before=None, after=None):
|
||||||
"""
|
"""
|
||||||
Add a stream event handler that will be executed when a matching
|
Add a stream event handler that will be executed when a matching
|
||||||
stanza is received.
|
stanza is received.
|
||||||
|
|
||||||
Arguments:
|
Arguments:
|
||||||
|
@ -372,7 +372,7 @@ class XMLStream(object):
|
||||||
idx += 1
|
idx += 1
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def schedule(self, name, seconds, callback, args=None,
|
def schedule(self, name, seconds, callback, args=None,
|
||||||
kwargs=None, repeat=False):
|
kwargs=None, repeat=False):
|
||||||
"""
|
"""
|
||||||
Schedule a callback function to execute after a given delay.
|
Schedule a callback function to execute after a given delay.
|
||||||
|
@ -387,7 +387,7 @@ class XMLStream(object):
|
||||||
repeat -- Flag indicating if the scheduled event should
|
repeat -- Flag indicating if the scheduled event should
|
||||||
be reset and repeat after executing.
|
be reset and repeat after executing.
|
||||||
"""
|
"""
|
||||||
self.scheduler.add(name, seconds, callback, args, kwargs,
|
self.scheduler.add(name, seconds, callback, args, kwargs,
|
||||||
repeat, qpointer=self.event_queue)
|
repeat, qpointer=self.event_queue)
|
||||||
|
|
||||||
def incoming_filter(self, xml):
|
def incoming_filter(self, xml):
|
||||||
|
@ -420,11 +420,11 @@ class XMLStream(object):
|
||||||
|
|
||||||
Arguments:
|
Arguments:
|
||||||
threaded -- If threaded=True then event dispatcher will run
|
threaded -- If threaded=True then event dispatcher will run
|
||||||
in a separate thread, allowing for the stream to be used
|
in a separate thread, allowing for the stream to be
|
||||||
in the background for another application. Defaults
|
used in the background for another application.
|
||||||
to True.
|
Defaults to True.
|
||||||
|
|
||||||
Event handlers and the send queue will be threaded
|
Event handlers and the send queue will be threaded
|
||||||
regardless of this parameter's value.
|
regardless of this parameter's value.
|
||||||
"""
|
"""
|
||||||
self.scheduler.process(threaded=True)
|
self.scheduler.process(threaded=True)
|
||||||
|
@ -450,7 +450,7 @@ class XMLStream(object):
|
||||||
Start processing the XML streams.
|
Start processing the XML streams.
|
||||||
|
|
||||||
Processing will continue after any recoverable errors
|
Processing will continue after any recoverable errors
|
||||||
if reconnections are allowed.
|
if reconnections are allowed.
|
||||||
"""
|
"""
|
||||||
firstrun = True
|
firstrun = True
|
||||||
|
|
||||||
|
@ -465,10 +465,10 @@ class XMLStream(object):
|
||||||
self.send_raw(self.stream_header)
|
self.send_raw(self.stream_header)
|
||||||
# The call to self.__read_xml will block and prevent
|
# The call to self.__read_xml will block and prevent
|
||||||
# the body of the loop from running until a diconnect
|
# the body of the loop from running until a diconnect
|
||||||
# occurs. After any reconnection, the stream header will
|
# occurs. After any reconnection, the stream header will
|
||||||
# be resent and processing will resume.
|
# be resent and processing will resume.
|
||||||
while self.run and self.__read_xml():
|
while self.run and self.__read_xml():
|
||||||
# Ensure the stream header is sent for any
|
# Ensure the stream header is sent for any
|
||||||
# new connections.
|
# new connections.
|
||||||
if self.state['is client']:
|
if self.state['is client']:
|
||||||
self.send_raw(self.stream_header)
|
self.send_raw(self.stream_header)
|
||||||
|
@ -506,7 +506,7 @@ class XMLStream(object):
|
||||||
Parse the incoming XML stream, raising stream events for
|
Parse the incoming XML stream, raising stream events for
|
||||||
each received stanza.
|
each received stanza.
|
||||||
"""
|
"""
|
||||||
depth = 0
|
depth = 0
|
||||||
root = None
|
root = None
|
||||||
for (event, xml) in ET.iterparse(self.filesocket, (b'end', b'start')):
|
for (event, xml) in ET.iterparse(self.filesocket, (b'end', b'start')):
|
||||||
if event == b'start':
|
if event == b'start':
|
||||||
|
@ -532,7 +532,7 @@ class XMLStream(object):
|
||||||
self.__spawn_event(xml)
|
self.__spawn_event(xml)
|
||||||
except RestartStream:
|
except RestartStream:
|
||||||
return True
|
return True
|
||||||
if root:
|
if root:
|
||||||
# Keep the root element empty of children to
|
# Keep the root element empty of children to
|
||||||
# save on memory use.
|
# save on memory use.
|
||||||
root.clear()
|
root.clear()
|
||||||
|
@ -547,8 +547,8 @@ class XMLStream(object):
|
||||||
Arguments:
|
Arguments:
|
||||||
xml -- The XML stanza to analyze.
|
xml -- The XML stanza to analyze.
|
||||||
"""
|
"""
|
||||||
logging.debug("RECV: %s" % tostring(xml,
|
logging.debug("RECV: %s" % tostring(xml,
|
||||||
xmlns=self.default_ns,
|
xmlns=self.default_ns,
|
||||||
stream=self))
|
stream=self))
|
||||||
# Apply any preprocessing filters.
|
# Apply any preprocessing filters.
|
||||||
xml = self.incoming_filter(xml)
|
xml = self.incoming_filter(xml)
|
||||||
|
@ -571,7 +571,7 @@ class XMLStream(object):
|
||||||
stanza_copy = stanza_type(self, copy.deepcopy(xml))
|
stanza_copy = stanza_type(self, copy.deepcopy(xml))
|
||||||
handler.prerun(stanza_copy)
|
handler.prerun(stanza_copy)
|
||||||
self.event_queue.put(('stanza', handler, stanza_copy))
|
self.event_queue.put(('stanza', handler, stanza_copy))
|
||||||
if handler.checkDelete():
|
if handler.checkDelete():
|
||||||
self.__handlers.pop(self.__handlers.index(handler))
|
self.__handlers.pop(self.__handlers.index(handler))
|
||||||
unhandled = False
|
unhandled = False
|
||||||
|
|
||||||
|
@ -608,7 +608,8 @@ class XMLStream(object):
|
||||||
try:
|
try:
|
||||||
handler.run(args[0])
|
handler.run(args[0])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.exception('Error processing event handler: %s' % handler.name)
|
error_msg = 'Error processing event handler: %s'
|
||||||
|
logging.exception(error_msg % handler.name)
|
||||||
args[0].exception(e)
|
args[0].exception(e)
|
||||||
elif etype == 'schedule':
|
elif etype == 'schedule':
|
||||||
try:
|
try:
|
||||||
|
|
Loading…
Reference in a new issue