a83c00e933
(eewww)
725 lines
29 KiB
Python
725 lines
29 KiB
Python
"""
|
|
Slixmpp: The Slick XMPP Library
|
|
Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
|
|
This file is part of Slixmpp.
|
|
|
|
See the file LICENSE for copying permission.
|
|
"""
|
|
|
|
import unittest
|
|
from queue import Queue
|
|
from xml.parsers.expat import ExpatError
|
|
|
|
from slixmpp.test import TestTransport
|
|
from slixmpp import ClientXMPP, ComponentXMPP
|
|
from slixmpp.stanza import Message, Iq, Presence
|
|
from slixmpp.xmlstream import ET
|
|
from slixmpp.xmlstream import ElementBase
|
|
from slixmpp.xmlstream.tostring import tostring, highlight
|
|
from slixmpp.xmlstream.matcher import StanzaPath, MatcherId, MatchIDSender
|
|
from slixmpp.xmlstream.matcher import MatchXMLMask, MatchXPath
|
|
|
|
import asyncio
|
|
cls = asyncio.get_event_loop().__class__
|
|
|
|
cls.idle_call = lambda self, callback: callback()
|
|
|
|
class SlixTest(unittest.TestCase):
|
|
|
|
"""
|
|
A Slixmpp specific TestCase class that provides
|
|
methods for comparing message, iq, and presence stanzas.
|
|
|
|
Methods:
|
|
Message -- Create a Message stanza object.
|
|
Iq -- Create an Iq stanza object.
|
|
Presence -- Create a Presence stanza object.
|
|
check_jid -- Check a JID and its component parts.
|
|
check -- Compare a stanza against an XML string.
|
|
stream_start -- Initialize a dummy XMPP client.
|
|
stream_close -- Disconnect the XMPP client.
|
|
make_header -- Create a stream header.
|
|
send_header -- Check that the given header has been sent.
|
|
send_feature -- Send a raw XML element.
|
|
send -- Check that the XMPP client sent the given
|
|
generic stanza.
|
|
recv -- Queue data for XMPP client to receive, or
|
|
verify the data that was received from a
|
|
live connection.
|
|
recv_header -- Check that a given stream header
|
|
was received.
|
|
recv_feature -- Check that a given, raw XML element
|
|
was recveived.
|
|
fix_namespaces -- Add top-level namespace to an XML object.
|
|
compare -- Compare XML objects against each other.
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
unittest.TestCase.__init__(self, *args, **kwargs)
|
|
self.xmpp = None
|
|
|
|
def parse_xml(self, xml_string):
|
|
try:
|
|
xml = ET.fromstring(xml_string)
|
|
return xml
|
|
except (SyntaxError, ExpatError) as e:
|
|
msg = e.msg if hasattr(e, 'msg') else e.message
|
|
if 'unbound' in msg:
|
|
known_prefixes = {
|
|
'stream': 'http://etherx.jabber.org/streams'}
|
|
|
|
prefix = xml_string.split('<')[1].split(':')[0]
|
|
if prefix in known_prefixes:
|
|
xml_string = '<fixns xmlns:%s="%s">%s</fixns>' % (
|
|
prefix,
|
|
known_prefixes[prefix],
|
|
xml_string)
|
|
xml = self.parse_xml(xml_string)
|
|
xml = list(xml)[0]
|
|
return xml
|
|
else:
|
|
self.fail("XML data was mal-formed:\n%s" % xml_string)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Shortcut methods for creating stanza objects
|
|
|
|
def Message(self, *args, **kwargs):
|
|
"""
|
|
Create a Message stanza.
|
|
|
|
Uses same arguments as StanzaBase.__init__
|
|
|
|
Arguments:
|
|
xml -- An XML object to use for the Message's values.
|
|
"""
|
|
return Message(self.xmpp, *args, **kwargs)
|
|
|
|
def Iq(self, *args, **kwargs):
|
|
"""
|
|
Create an Iq stanza.
|
|
|
|
Uses same arguments as StanzaBase.__init__
|
|
|
|
Arguments:
|
|
xml -- An XML object to use for the Iq's values.
|
|
"""
|
|
return Iq(self.xmpp, *args, **kwargs)
|
|
|
|
def Presence(self, *args, **kwargs):
|
|
"""
|
|
Create a Presence stanza.
|
|
|
|
Uses same arguments as StanzaBase.__init__
|
|
|
|
Arguments:
|
|
xml -- An XML object to use for the Iq's values.
|
|
"""
|
|
return Presence(self.xmpp, *args, **kwargs)
|
|
|
|
def check_jid(self, jid, user=None, domain=None, resource=None,
|
|
bare=None, full=None, string=None):
|
|
"""
|
|
Verify the components of a JID.
|
|
|
|
Arguments:
|
|
jid -- The JID object to test.
|
|
user -- Optional. The user name portion of the JID.
|
|
domain -- Optional. The domain name portion of the JID.
|
|
resource -- Optional. The resource portion of the JID.
|
|
bare -- Optional. The bare JID.
|
|
full -- Optional. The full JID.
|
|
string -- Optional. The string version of the JID.
|
|
"""
|
|
if user is not None:
|
|
self.assertEqual(jid.user, user,
|
|
"User does not match: %s" % jid.user)
|
|
if domain is not None:
|
|
self.assertEqual(jid.domain, domain,
|
|
"Domain does not match: %s" % jid.domain)
|
|
if resource is not None:
|
|
self.assertEqual(jid.resource, resource,
|
|
"Resource does not match: %s" % jid.resource)
|
|
if bare is not None:
|
|
self.assertEqual(jid.bare, bare,
|
|
"Bare JID does not match: %s" % jid.bare)
|
|
if full is not None:
|
|
self.assertEqual(jid.full, full,
|
|
"Full JID does not match: %s" % jid.full)
|
|
if string is not None:
|
|
self.assertEqual(str(jid), string,
|
|
"String does not match: %s" % str(jid))
|
|
|
|
def check_roster(self, owner, jid, name=None, subscription=None,
|
|
afrom=None, ato=None, pending_out=None, pending_in=None,
|
|
groups=None):
|
|
roster = self.xmpp.roster[owner][jid]
|
|
if name is not None:
|
|
self.assertEqual(roster['name'], name,
|
|
"Incorrect name value: %s" % roster['name'])
|
|
if subscription is not None:
|
|
self.assertEqual(roster['subscription'], subscription,
|
|
"Incorrect subscription: %s" % roster['subscription'])
|
|
if afrom is not None:
|
|
self.assertEqual(roster['from'], afrom,
|
|
"Incorrect from state: %s" % roster['from'])
|
|
if ato is not None:
|
|
self.assertEqual(roster['to'], ato,
|
|
"Incorrect to state: %s" % roster['to'])
|
|
if pending_out is not None:
|
|
self.assertEqual(roster['pending_out'], pending_out,
|
|
"Incorrect pending_out state: %s" % roster['pending_out'])
|
|
if pending_in is not None:
|
|
self.assertEqual(roster['pending_in'], pending_out,
|
|
"Incorrect pending_in state: %s" % roster['pending_in'])
|
|
if groups is not None:
|
|
self.assertEqual(roster['groups'], groups,
|
|
"Incorrect groups: %s" % roster['groups'])
|
|
|
|
# ------------------------------------------------------------------
|
|
# Methods for comparing stanza objects to XML strings
|
|
|
|
def check(self, stanza, criteria, method='exact',
|
|
defaults=None, use_values=True):
|
|
"""
|
|
Create and compare several stanza objects to a correct XML string.
|
|
|
|
If use_values is False, tests using stanza.values will not be used.
|
|
|
|
Some stanzas provide default values for some interfaces, but
|
|
these defaults can be problematic for testing since they can easily
|
|
be forgotten when supplying the XML string. A list of interfaces that
|
|
use defaults may be provided and the generated stanzas will use the
|
|
default values for those interfaces if needed.
|
|
|
|
However, correcting the supplied XML is not possible for interfaces
|
|
that add or remove XML elements. Only interfaces that map to XML
|
|
attributes may be set using the defaults parameter. The supplied XML
|
|
must take into account any extra elements that are included by default.
|
|
|
|
Arguments:
|
|
stanza -- The stanza object to test.
|
|
criteria -- An expression the stanza must match against.
|
|
method -- The type of matching to use; one of:
|
|
'exact', 'mask', 'id', 'xpath', and 'stanzapath'.
|
|
Defaults to the value of self.match_method.
|
|
defaults -- A list of stanza interfaces that have default
|
|
values. These interfaces will be set to their
|
|
defaults for the given and generated stanzas to
|
|
prevent unexpected test failures.
|
|
use_values -- Indicates if testing using stanza.values should
|
|
be used. Defaults to True.
|
|
"""
|
|
if method is None and hasattr(self, 'match_method'):
|
|
method = getattr(self, 'match_method')
|
|
|
|
if method != 'exact':
|
|
matchers = {'stanzapath': StanzaPath,
|
|
'xpath': MatchXPath,
|
|
'mask': MatchXMLMask,
|
|
'idsender': MatchIDSender,
|
|
'id': MatcherId}
|
|
Matcher = matchers.get(method, None)
|
|
if Matcher is None:
|
|
raise ValueError("Unknown matching method.")
|
|
test = Matcher(criteria)
|
|
self.assertTrue(test.match(stanza),
|
|
"Stanza did not match using %s method:\n" % method + \
|
|
"Criteria:\n%s\n" % str(criteria) + \
|
|
"Stanza:\n%s" % str(stanza))
|
|
else:
|
|
stanza_class = stanza.__class__
|
|
if not isinstance(criteria, ElementBase):
|
|
xml = self.parse_xml(criteria)
|
|
else:
|
|
xml = criteria.xml
|
|
|
|
# Ensure that top level namespaces are used, even if they
|
|
# were not provided.
|
|
self.fix_namespaces(stanza.xml, 'jabber:client')
|
|
self.fix_namespaces(xml, 'jabber:client')
|
|
|
|
stanza2 = stanza_class(xml=xml)
|
|
|
|
if use_values:
|
|
# Using stanza.values will add XML for any interface that
|
|
# has a default value. We need to set those defaults on
|
|
# the existing stanzas and XML so that they will compare
|
|
# correctly.
|
|
default_stanza = stanza_class()
|
|
if defaults is None:
|
|
known_defaults = {
|
|
Message: ['type'],
|
|
Presence: ['priority']
|
|
}
|
|
defaults = known_defaults.get(stanza_class, [])
|
|
for interface in defaults:
|
|
stanza[interface] = stanza[interface]
|
|
stanza2[interface] = stanza2[interface]
|
|
# Can really only automatically add defaults for top
|
|
# level attribute values. Anything else must be accounted
|
|
# for in the provided XML string.
|
|
if interface not in xml.attrib:
|
|
if interface in default_stanza.xml.attrib:
|
|
value = default_stanza.xml.attrib[interface]
|
|
xml.attrib[interface] = value
|
|
|
|
values = stanza2.values
|
|
stanza3 = stanza_class()
|
|
stanza3.values = values
|
|
|
|
debug = "Three methods for creating stanzas do not match.\n"
|
|
debug += "Given XML:\n%s\n" % highlight(tostring(xml))
|
|
debug += "Given stanza:\n%s\n" % highlight(tostring(stanza.xml))
|
|
debug += "Generated stanza:\n%s\n" % highlight(tostring(stanza2.xml))
|
|
debug += "Second generated stanza:\n%s\n" % highlight(tostring(stanza3.xml))
|
|
result = self.compare(xml, stanza.xml, stanza2.xml, stanza3.xml)
|
|
else:
|
|
debug = "Two methods for creating stanzas do not match.\n"
|
|
debug += "Given XML:\n%s\n" % highlight(tostring(xml))
|
|
debug += "Given stanza:\n%s\n" % highlight(tostring(stanza.xml))
|
|
debug += "Generated stanza:\n%s\n" % highlight(tostring(stanza2.xml))
|
|
result = self.compare(xml, stanza.xml, stanza2.xml)
|
|
|
|
self.assertTrue(result, debug)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Methods for simulating stanza streams.
|
|
|
|
def stream_disconnect(self):
|
|
"""
|
|
Simulate a stream disconnection.
|
|
"""
|
|
if self.xmpp:
|
|
self.xmpp.socket.disconnect_error()
|
|
|
|
def stream_start(self, mode='client', skip=True, header=None,
|
|
socket='mock', jid='tester@localhost/resource',
|
|
password='test', server='localhost',
|
|
port=5222, sasl_mech=None,
|
|
plugins=None, plugin_config={}):
|
|
"""
|
|
Initialize an XMPP client or component using a dummy XML stream.
|
|
|
|
Arguments:
|
|
mode -- Either 'client' or 'component'. Defaults to 'client'.
|
|
skip -- Indicates if the first item in the sent queue (the
|
|
stream header) should be removed. Tests that wish
|
|
to test initializing the stream should set this to
|
|
False. Otherwise, the default of True should be used.
|
|
socket -- Either 'mock' or 'live' to indicate if the socket
|
|
should be a dummy, mock socket or a live, functioning
|
|
socket. Defaults to 'mock'.
|
|
jid -- The JID to use for the connection.
|
|
Defaults to 'tester@localhost/resource'.
|
|
password -- The password to use for the connection.
|
|
Defaults to 'test'.
|
|
server -- The name of the XMPP server. Defaults to 'localhost'.
|
|
port -- The port to use when connecting to the server.
|
|
Defaults to 5222.
|
|
plugins -- List of plugins to register. By default, all plugins
|
|
are loaded.
|
|
"""
|
|
if not plugin_config:
|
|
plugin_config = {}
|
|
|
|
if mode == 'client':
|
|
self.xmpp = ClientXMPP(jid, password,
|
|
sasl_mech=sasl_mech,
|
|
plugin_config=plugin_config)
|
|
elif mode == 'component':
|
|
self.xmpp = ComponentXMPP(jid, password,
|
|
server, port,
|
|
plugin_config=plugin_config)
|
|
else:
|
|
raise ValueError("Unknown XMPP connection mode.")
|
|
|
|
self.xmpp.connection_made(TestTransport(self.xmpp))
|
|
self.xmpp.session_bind_event.set()
|
|
# Remove unique ID prefix to make it easier to test
|
|
self.xmpp._id_prefix = ''
|
|
self.xmpp.default_lang = None
|
|
self.xmpp.peer_default_lang = None
|
|
|
|
def new_id():
|
|
self.xmpp._id += 1
|
|
return str(self.xmpp._id)
|
|
|
|
self.xmpp._id = 0
|
|
self.xmpp.new_id = new_id
|
|
|
|
# Must have the stream header ready for xmpp.process() to work.
|
|
if not header:
|
|
header = self.xmpp.stream_header
|
|
|
|
self.xmpp.data_received(header)
|
|
self.wait_for_send_queue()
|
|
|
|
if skip:
|
|
self.xmpp.socket.next_sent()
|
|
if mode == 'component':
|
|
self.xmpp.socket.next_sent()
|
|
|
|
|
|
if plugins is None:
|
|
self.xmpp.register_plugins()
|
|
else:
|
|
for plugin in plugins:
|
|
self.xmpp.register_plugin(plugin)
|
|
|
|
# Some plugins require messages to have ID values. Set
|
|
# this to True in tests related to those plugins.
|
|
self.xmpp.use_message_ids = False
|
|
self.xmpp.use_presence_ids = False
|
|
|
|
def make_header(self, sto='',
|
|
sfrom='',
|
|
sid='',
|
|
stream_ns="http://etherx.jabber.org/streams",
|
|
default_ns="jabber:client",
|
|
default_lang="en",
|
|
version="1.0",
|
|
xml_header=True):
|
|
"""
|
|
Create a stream header to be received by the test XMPP agent.
|
|
|
|
The header must be saved and passed to stream_start.
|
|
|
|
Arguments:
|
|
sto -- The recipient of the stream header.
|
|
sfrom -- The agent sending the stream header.
|
|
sid -- The stream's id.
|
|
stream_ns -- The namespace of the stream's root element.
|
|
default_ns -- The default stanza namespace.
|
|
version -- The stream version.
|
|
xml_header -- Indicates if the XML version header should be
|
|
appended before the stream header.
|
|
"""
|
|
header = '<stream:stream %s>'
|
|
parts = []
|
|
if xml_header:
|
|
header = '<?xml version="1.0"?>' + header
|
|
if sto:
|
|
parts.append('to="%s"' % sto)
|
|
if sfrom:
|
|
parts.append('from="%s"' % sfrom)
|
|
if sid:
|
|
parts.append('id="%s"' % sid)
|
|
if default_lang:
|
|
parts.append('xml:lang="%s"' % default_lang)
|
|
parts.append('version="%s"' % version)
|
|
parts.append('xmlns:stream="%s"' % stream_ns)
|
|
parts.append('xmlns="%s"' % default_ns)
|
|
return header % ' '.join(parts)
|
|
|
|
def recv(self, data, defaults=None, method='exact', use_values=True, timeout=1):
|
|
"""
|
|
Pass data to the dummy XMPP client as if it came from an XMPP server.
|
|
|
|
If using a live connection, verify what the server has sent.
|
|
|
|
Arguments:
|
|
data -- If a dummy socket is being used, the XML that is to
|
|
be received next. Otherwise it is the criteria used
|
|
to match against live data that is received.
|
|
defaults -- A list of stanza interfaces with default values that
|
|
may interfere with comparisons.
|
|
method -- Select the type of comparison to use for
|
|
verifying the received stanza. Options are 'exact',
|
|
'id', 'stanzapath', 'xpath', and 'mask'.
|
|
Defaults to the value of self.match_method.
|
|
use_values -- Indicates if stanza comparisons should test using
|
|
stanza.values. Defaults to True.
|
|
timeout -- Time to wait in seconds for data to be received by
|
|
a live connection.
|
|
"""
|
|
self.xmpp.data_received(data)
|
|
|
|
def recv_header(self, sto='',
|
|
sfrom='',
|
|
sid='',
|
|
stream_ns="http://etherx.jabber.org/streams",
|
|
default_ns="jabber:client",
|
|
version="1.0",
|
|
xml_header=False,
|
|
timeout=1):
|
|
"""
|
|
Check that a given stream header was received.
|
|
|
|
Arguments:
|
|
sto -- The recipient of the stream header.
|
|
sfrom -- The agent sending the stream header.
|
|
sid -- The stream's id. Set to None to ignore.
|
|
stream_ns -- The namespace of the stream's root element.
|
|
default_ns -- The default stanza namespace.
|
|
version -- The stream version.
|
|
xml_header -- Indicates if the XML version header should be
|
|
appended before the stream header.
|
|
timeout -- Length of time to wait in seconds for a
|
|
response.
|
|
"""
|
|
header = self.make_header(sto, sfrom, sid,
|
|
stream_ns=stream_ns,
|
|
default_ns=default_ns,
|
|
version=version,
|
|
xml_header=xml_header)
|
|
recv_header = self.xmpp.socket.next_recv(timeout)
|
|
if recv_header is None:
|
|
raise ValueError("Socket did not return data.")
|
|
|
|
# Apply closing elements so that we can construct
|
|
# XML objects for comparison.
|
|
header2 = header + '</stream:stream>'
|
|
recv_header2 = recv_header + '</stream:stream>'
|
|
|
|
xml = self.parse_xml(header2)
|
|
recv_xml = self.parse_xml(recv_header2)
|
|
|
|
if sid is None:
|
|
# Ignore the id sent by the server since
|
|
# we can't know in advance what it will be.
|
|
if 'id' in recv_xml.attrib:
|
|
del recv_xml.attrib['id']
|
|
|
|
# Ignore the xml:lang attribute for now.
|
|
if 'xml:lang' in recv_xml.attrib:
|
|
del recv_xml.attrib['xml:lang']
|
|
xml_ns = 'http://www.w3.org/XML/1998/namespace'
|
|
if '{%s}lang' % xml_ns in recv_xml.attrib:
|
|
del recv_xml.attrib['{%s}lang' % xml_ns]
|
|
|
|
if list(recv_xml):
|
|
# We received more than just the header
|
|
for xml in recv_xml:
|
|
self.xmpp.data_received(tostring(xml))
|
|
|
|
attrib = recv_xml.attrib
|
|
recv_xml.clear()
|
|
recv_xml.attrib = attrib
|
|
|
|
self.assertTrue(
|
|
self.compare(xml, recv_xml),
|
|
"Stream headers do not match:\nDesired:\n%s\nReceived:\n%s" % (
|
|
'%s %s' % (xml.tag, xml.attrib),
|
|
'%s %s' % (recv_xml.tag, recv_xml.attrib)))
|
|
|
|
def recv_feature(self, data, method='mask', use_values=True, timeout=1):
|
|
"""
|
|
"""
|
|
if method is None and hasattr(self, 'match_method'):
|
|
method = getattr(self, 'match_method')
|
|
|
|
self.xmpp.socket.data_received(data)
|
|
|
|
def send_header(self, sto='',
|
|
sfrom='',
|
|
sid='',
|
|
stream_ns="http://etherx.jabber.org/streams",
|
|
default_ns="jabber:client",
|
|
default_lang="en",
|
|
version="1.0",
|
|
xml_header=False,
|
|
timeout=1):
|
|
"""
|
|
Check that a given stream header was sent.
|
|
|
|
Arguments:
|
|
sto -- The recipient of the stream header.
|
|
sfrom -- The agent sending the stream header.
|
|
sid -- The stream's id.
|
|
stream_ns -- The namespace of the stream's root element.
|
|
default_ns -- The default stanza namespace.
|
|
version -- The stream version.
|
|
xml_header -- Indicates if the XML version header should be
|
|
appended before the stream header.
|
|
timeout -- Length of time to wait in seconds for a
|
|
response.
|
|
"""
|
|
header = self.make_header(sto, sfrom, sid,
|
|
stream_ns=stream_ns,
|
|
default_ns=default_ns,
|
|
default_lang=default_lang,
|
|
version=version,
|
|
xml_header=xml_header)
|
|
sent_header = self.xmpp.socket.next_sent(timeout)
|
|
if sent_header is None:
|
|
raise ValueError("Socket did not return data.")
|
|
|
|
# Apply closing elements so that we can construct
|
|
# XML objects for comparison.
|
|
header2 = header + '</stream:stream>'
|
|
sent_header2 = sent_header + b'</stream:stream>'
|
|
|
|
xml = self.parse_xml(header2)
|
|
sent_xml = self.parse_xml(sent_header2)
|
|
|
|
self.assertTrue(
|
|
self.compare(xml, sent_xml),
|
|
"Stream headers do not match:\nDesired:\n%s\nSent:\n%s" % (
|
|
header, sent_header))
|
|
|
|
def send_feature(self, data, method='mask', use_values=True, timeout=1):
|
|
"""
|
|
"""
|
|
sent_data = self.xmpp.socket.next_sent(timeout)
|
|
xml = self.parse_xml(data)
|
|
sent_xml = self.parse_xml(sent_data)
|
|
if sent_data is None:
|
|
self.fail("No stanza was sent.")
|
|
if method == 'exact':
|
|
self.assertTrue(self.compare(xml, sent_xml),
|
|
"Features do not match.\nDesired:\n%s\nReceived:\n%s" % (
|
|
highlight(tostring(xml)), highlight(tostring(sent_xml))))
|
|
elif method == 'mask':
|
|
matcher = MatchXMLMask(xml)
|
|
self.assertTrue(matcher.match(sent_xml),
|
|
"Stanza did not match using %s method:\n" % method + \
|
|
"Criteria:\n%s\n" % highlight(tostring(xml)) + \
|
|
"Stanza:\n%s" % highlight(tostring(sent_xml)))
|
|
else:
|
|
raise ValueError("Uknown matching method: %s" % method)
|
|
|
|
def send(self, data, defaults=None, use_values=True,
|
|
timeout=.5, method='exact'):
|
|
"""
|
|
Check that the XMPP client sent the given stanza XML.
|
|
|
|
Extracts the next sent stanza and compares it with the given
|
|
XML using check.
|
|
|
|
Arguments:
|
|
stanza_class -- The class of the sent stanza object.
|
|
data -- The XML string of the expected Message stanza,
|
|
or an equivalent stanza object.
|
|
use_values -- Modifies the type of tests used by check_message.
|
|
defaults -- A list of stanza interfaces that have defaults
|
|
values which may interfere with comparisons.
|
|
timeout -- Time in seconds to wait for a stanza before
|
|
failing the check.
|
|
method -- Select the type of comparison to use for
|
|
verifying the sent stanza. Options are 'exact',
|
|
'id', 'stanzapath', 'xpath', and 'mask'.
|
|
Defaults to the value of self.match_method.
|
|
"""
|
|
self.wait_for_send_queue()
|
|
sent = self.xmpp.socket.next_sent(timeout)
|
|
if data is None and sent is None:
|
|
return
|
|
if data is None and sent is not None:
|
|
self.fail("Stanza data was sent: %s" % sent)
|
|
if sent is None:
|
|
self.fail("No stanza was sent.")
|
|
|
|
xml = self.parse_xml(sent)
|
|
self.fix_namespaces(xml, 'jabber:client')
|
|
sent = self.xmpp._build_stanza(xml, 'jabber:client')
|
|
self.check(sent, data,
|
|
method=method,
|
|
defaults=defaults,
|
|
use_values=use_values)
|
|
|
|
def wait_for_send_queue(self):
|
|
loop = asyncio.get_event_loop()
|
|
future = asyncio.ensure_future(self.xmpp.run_filters(), loop=loop)
|
|
queue = self.xmpp.waiting_queue
|
|
print(queue)
|
|
loop.run_until_complete(queue.join())
|
|
future.cancel()
|
|
|
|
def stream_close(self):
|
|
"""
|
|
Disconnect the dummy XMPP client.
|
|
|
|
Can be safely called even if stream_start has not been called.
|
|
|
|
Must be placed in the tearDown method of a test class to ensure
|
|
that the XMPP client is disconnected after an error.
|
|
"""
|
|
if hasattr(self, 'xmpp') and self.xmpp is not None:
|
|
self.xmpp.data_received(self.xmpp.stream_footer)
|
|
self.xmpp.disconnect()
|
|
|
|
# ------------------------------------------------------------------
|
|
# XML Comparison and Cleanup
|
|
|
|
def fix_namespaces(self, xml, ns):
|
|
"""
|
|
Assign a namespace to an element and any children that
|
|
don't have a namespace.
|
|
|
|
Arguments:
|
|
xml -- The XML object to fix.
|
|
ns -- The namespace to add to the XML object.
|
|
"""
|
|
if xml.tag.startswith('{'):
|
|
return
|
|
xml.tag = '{%s}%s' % (ns, xml.tag)
|
|
for child in xml:
|
|
self.fix_namespaces(child, ns)
|
|
|
|
def compare(self, xml, *other):
|
|
"""
|
|
Compare XML objects.
|
|
|
|
Arguments:
|
|
xml -- The XML object to compare against.
|
|
*other -- The list of XML objects to compare.
|
|
"""
|
|
if not other:
|
|
return False
|
|
|
|
# Compare multiple objects
|
|
if len(other) > 1:
|
|
for xml2 in other:
|
|
if not self.compare(xml, xml2):
|
|
return False
|
|
return True
|
|
|
|
other = other[0]
|
|
|
|
# Step 1: Check tags
|
|
if xml.tag != other.tag:
|
|
return False
|
|
|
|
# Step 2: Check attributes
|
|
if xml.attrib != other.attrib:
|
|
return False
|
|
|
|
# Step 3: Check text
|
|
if xml.text is None:
|
|
xml.text = ""
|
|
if other.text is None:
|
|
other.text = ""
|
|
xml.text = xml.text.strip()
|
|
other.text = other.text.strip()
|
|
|
|
if xml.text != other.text:
|
|
return False
|
|
|
|
# Step 4: Check children count
|
|
if len(list(xml)) != len(list(other)):
|
|
return False
|
|
|
|
# Step 5: Recursively check children
|
|
for child in xml:
|
|
child2s = other.findall("%s" % child.tag)
|
|
if child2s is None:
|
|
return False
|
|
for child2 in child2s:
|
|
if self.compare(child, child2):
|
|
break
|
|
else:
|
|
return False
|
|
|
|
# Step 6: Recursively check children the other way.
|
|
for child in other:
|
|
child2s = xml.findall("%s" % child.tag)
|
|
if child2s is None:
|
|
return False
|
|
for child2 in child2s:
|
|
if self.compare(child, child2):
|
|
break
|
|
else:
|
|
return False
|
|
|
|
# Everything matches
|
|
return True
|