Updated SleekTest and related tests.

May now use a component for stream testing.
Methods provided for testing stream headers.
This commit is contained in:
Lance Stout 2010-10-06 18:10:04 -04:00
parent c294c1a85c
commit f234dc02cf
3 changed files with 128 additions and 17 deletions

View file

@ -14,7 +14,7 @@ except ImportError:
import Queue as queue
import sleekxmpp
from sleekxmpp import ClientXMPP
from sleekxmpp import ClientXMPP, ComponentXMPP
from sleekxmpp.stanza import Message, Iq, Presence
from sleekxmpp.xmlstream.stanzabase import registerStanzaPlugin, ET
from sleekxmpp.xmlstream.tostring import tostring
@ -307,7 +307,7 @@ class SleekTest(unittest.TestCase):
return self.checkStanza(Message, msg, xml_string,
defaults=['type'],
use_values = use_values)
use_values=use_values)
def checkIq(self, iq, xml_string, use_values=True):
"""
@ -348,7 +348,7 @@ class SleekTest(unittest.TestCase):
# ------------------------------------------------------------------
# Methods for simulating stanza streams.
def streamStart(self, mode='client', skip=True):
def streamStart(self, mode='client', skip=True, header=None):
"""
Initialize an XMPP client or component using a dummy XML stream.
@ -361,21 +361,30 @@ class SleekTest(unittest.TestCase):
"""
if mode == 'client':
self.xmpp = ClientXMPP('tester@localhost', 'test')
self.xmpp.setSocket(TestSocket())
elif mode == 'component':
self.xmpp = ComponentXMPP('tester.localhost', 'test',
'localhost', 8888)
else:
raise ValueError("Unknown XMPP connection mode.")
self.xmpp.setSocket(TestSocket())
self.xmpp.state.set('reconnect', False)
self.xmpp.state.set('is client', True)
self.xmpp.state.set('connected', True)
# Must have the stream header ready for xmpp.process() to work
self.xmpp.socket.recvData(self.xmpp.stream_header)
# Must have the stream header ready for xmpp.process() to work.
if not header:
header = self.xmpp.stream_header
self.xmpp.socket.recvData(header)
self.xmpp.connectTCP = lambda a, b, c, d: True
self.xmpp.startTLS = lambda: True
self.xmpp.connect = lambda a=None, b=None, c=None, d=None: True
self.xmpp.start_tls = lambda: True
self.xmpp.process(threaded=True)
if skip:
# Clear startup stanzas
self.xmpp.socket.nextSent(timeout=0.01)
if mode == 'component':
self.xmpp.socket.nextSent(timeout=0.01)
def streamRecv(self, data):
"""
@ -388,6 +397,87 @@ class SleekTest(unittest.TestCase):
data = str(data)
self.xmpp.socket.recvData(data)
def makeStreamHeader(self, sto='',
sfrom='',
sid='',
stream_ns="http://etherx.jabber.org/streams",
default_ns="jabber:client",
version="1.0",
xml_header=True):
"""
Create a stream header to be received by the test XMPP agent.
The header must be saved and passed to streamStart.
Arguments:
sto -- The recipient of the stream header.
sfrom -- The agent sending the stream header.
sid -- The stream's id.
stream_ns -- The namespace of the stream's root element.
default_ns -- The default stanza namespace.
version -- The stream version.
xml_header -- Indicates if the XML version header should be
appended before the stream header.
"""
header = '<stream:stream %s>'
if xml_header:
header = '<?xml version="1.0"?>' + header
if sto:
parts.append('to="%s"' % sto)
if sfrom:
parts.append('from="%s"' % sfrom)
if sid:
parts.append('id="%s"' % sid)
parts.append('version="%s"' % version)
parts.append('xmlns:stream="%s"' % stream_ns)
parts.append('xmlns="%s"' % default_ns)
return header % ' '.join(parts)
def streamSendHeader(self, sto='',
sfrom='',
sid='',
stream_ns="http://etherx.jabber.org/streams",
default_ns="jabber:client",
version="1.0",
xml_header=False,
timeout=0.1):
"""
Check that a given stream header was sent.
Arguments:
sto -- The recipient of the stream header.
sfrom -- The agent sending the stream header.
sid -- The stream's id.
stream_ns -- The namespace of the stream's root element.
default_ns -- The default stanza namespace.
version -- The stream version.
xml_header -- Indicates if the XML version header should be
appended before the stream header.
timeout -- Length of time to wait in seconds for a
response.
"""
header = self.makeStreamHeader(sto, sfrom, sid,
stream_ns=stream_ns,
default_ns=default_ns,
version=version,
xml_header=xml_header)
sent_header = self.xmpp.socket.nextSent(timeout)
if sent_header is None:
raise ValueError("Socket did not return data.")
# Apply closing elements so that we can construct
# XML objects for comparison.
header2 += '</stream:stream>'
sent_header2 += '</stream:stream>'
xml = ET.fromstring(header2)
sent_xml = ET.fromstring(sent_header2)
self.failUnless(
self.compare(xml, sent_xml),
"Stream headers do not match:\nDesired:\n%s\nSent:\n%s" % (
header, sent_header))
def streamSendMessage(self, data, use_values=True, timeout=.1):
"""
Check that the XMPP client sent the given stanza XML.

View file

@ -5,7 +5,7 @@ from sleekxmpp.xmlstream.matcher import *
class TestHandlers(SleekTest):
"""
Test that we can simulate and test a stanza stream.
Test using handlers and waiters.
"""
def setUp(self):

View file

@ -7,13 +7,13 @@ class TestStreamTester(SleekTest):
Test that we can simulate and test a stanza stream.
"""
def setUp(self):
self.streamStart()
def tearDown(self):
self.streamClose()
def testEcho(self):
def testClientEcho(self):
"""Test that we can interact with a ClientXMPP instance."""
self.streamStart(mode='client')
def echo(msg):
msg.reply('Thanks for sending: %(body)s' % msg).send()
@ -31,4 +31,25 @@ class TestStreamTester(SleekTest):
</message>
""")
def testComponentEcho(self):
"""Test that we can interact with a ComponentXMPP instance."""
self.streamStart(mode='component')
def echo(msg):
msg.reply('Thanks for sending: %(body)s' % msg).send()
self.xmpp.add_event_handler('message', echo)
self.streamRecv("""
<message to="tester.localhost" from="user@localhost">
<body>Hi!</body>
</message>
""")
self.streamSendMessage("""
<message to="user@localhost" from="tester.localhost">
<body>Thanks for sending: Hi!</body>
</message>
""")
suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamTester)