Updated SleekTest to be able to simulate and test interactions with an XML stream.

This commit is contained in:
Lance Stout 2010-07-14 15:24:37 -04:00
parent 48f0843ace
commit 35212c7991
2 changed files with 179 additions and 5 deletions

View file

@ -7,11 +7,80 @@
"""
import unittest
import socket
try:
import queue
except ImportError:
import Queue as queue
from xml.etree import cElementTree as ET
from sleekxmpp import ClientXMPP
from sleekxmpp import Message, Iq
from sleekxmpp.stanza.presence import Presence
from sleekxmpp.xmlstream.matcher.stanzapath import StanzaPath
class TestSocket(object):
def __init__(self, *args, **kwargs):
self.socket = socket.socket(*args, **kwargs)
self.recv_queue = queue.Queue()
self.send_queue = queue.Queue()
def __getattr__(self, name):
"""Pass requests through to actual socket"""
# Override a few methods to prevent actual socket connections
overrides = {'connect': lambda *args: None,
'close': lambda *args: None,
'shutdown': lambda *args: None}
return overrides.get(name, getattr(self.socket, name))
# ------------------------------------------------------------------
# Testing Interface
def nextSent(self, timeout=None):
"""Get the next stanza that has been 'sent'"""
args = {'block': False}
if timeout is not None:
args = {'block': True, 'timeout': timeout}
try:
return self.send_queue.get(**args)
except:
return None
def recvData(self, data):
"""Add data to the receiving queue"""
self.recv_queue.put(data)
# ------------------------------------------------------------------
# Socket Interface
def recv(self, *args, **kwargs):
return self.read(block=True)
def send(self, data):
self.send_queue.put(data)
# ------------------------------------------------------------------
# File Socket
def makefile(self, mode='r', bufsize=-1):
"""File socket version to use with ElementTree"""
return self
def read(self, size=4096, block=True, timeout=None):
"""Implement the file socket interface"""
if timeout is not None:
block = True
try:
return self.recv_queue.get(block, timeout)
except:
return None
class TestStream(object):
"""Dummy class to pass a stream object to created stanzas"""
def __init__(self):
self.default_ns = 'jabber:client'
class SleekTest(unittest.TestCase):
"""
@ -27,6 +96,9 @@ class SleekTest(unittest.TestCase):
stanza.plugin_attrib_map[plugin.plugin_attrib] = plugin
stanza.plugin_tag_map[tag] = plugin
# ------------------------------------------------------------------
# Shortcut methods for creating stanza objects
def Message(self, *args, **kwargs):
"""Create a message stanza."""
return Message(None, *args, **kwargs)
@ -39,6 +111,9 @@ class SleekTest(unittest.TestCase):
"""Create a presence stanza."""
return Presence(None, *args, **kwargs)
# ------------------------------------------------------------------
# Methods for comparing stanza objects to XML strings
def checkMessage(self, msg, xml_string, use_values=True):
"""
Create and compare several message stanza objects to a
@ -48,10 +123,12 @@ class SleekTest(unittest.TestCase):
setValues() will not be used.
"""
self.fix_namespaces(msg.xml, 'jabber:client')
debug = "Given Stanza:\n%s\n" % ET.tostring(msg.xml)
xml = ET.fromstring(xml_string)
xml.tag = '{jabber:client}message'
self.fix_namespaces(xml, 'jabber:client')
debug += "XML String:\n%s\n" % ET.tostring(xml)
msg2 = self.Message(xml)
@ -69,8 +146,7 @@ class SleekTest(unittest.TestCase):
debug += "Second Constructed Stanza:\n%s\n" % ET.tostring(msg3.xml)
debug = "Three methods for creating stanza do not match:\n" + debug
self.failUnless(self.compare([xml, msg.xml,
msg2.xml, msg3.xml]),
self.failUnless(self.compare([xml, msg.xml, msg2.xml, msg3.xml]),
debug)
else:
debug = "Two methods for creating stanza do not match:\n" + debug
@ -84,10 +160,12 @@ class SleekTest(unittest.TestCase):
If use_values is False, the test using getValues() and
setValues() will not be used.
"""
self.fix_namespaces(iq.xml, 'jabber:client')
debug = "Given Stanza:\n%s\n" % ET.tostring(iq.xml)
xml = ET.fromstring(xml_string)
xml.tag = '{jabber:client}iq'
self.fix_namespaces(xml, 'jabber:client')
debug += "XML String:\n%s\n" % ET.tostring(xml)
iq2 = self.Iq(xml)
@ -116,6 +194,69 @@ class SleekTest(unittest.TestCase):
"""
pass
# ------------------------------------------------------------------
# Methods for simulating stanza streams.
def streamStart(self, mode='client', skip=True):
if mode == 'client':
self.xmpp = ClientXMPP('tester@localhost', 'test')
self.xmpp.setSocket(TestSocket())
self.xmpp.state.set('reconnect', False)
self.xmpp.state.set('is client', True)
self.xmpp.state.set('connected', True)
# Must have the stream header ready for xmpp.process() to work
self.xmpp.socket.recvData(self.xmpp.stream_header)
self.xmpp.connectTCP = lambda a, b, c, d: True
self.xmpp.startTLS = lambda: True
self.xmpp.process(threaded=True)
if skip:
# Clear startup stanzas
self.xmpp.socket.nextSent(timeout=1)
def streamRecv(self, data):
data = str(data)
self.xmpp.socket.recvData(data)
def streamSendMessage(self, data, use_values=True, timeout=.5):
if isinstance(data, str):
data = self.Message(xml=ET.fromstring(data))
sent = self.xmpp.socket.nextSent(timeout=1)
self.checkMessage(data, sent, use_values)
def streamSendIq(self, data, use_values=True, timeout=.5):
if isinstance(data, str):
data = self.Iq(xml=ET.fromstring(data))
sent = self.xmpp.socket.nextSent(timeout)
self.checkIq(data, sent, use_values)
def streamSendPresence(self, data, use_values=True, timeout=.5):
if isinstance(data, str):
data = self.Presence(xml=ET.fromstring(data))
sent = self.xmpp.socket.nextSent(timeout)
self.checkPresence(data, sent, use_values)
def streamClose(self):
if self.xmpp is not None:
self.xmpp.disconnect()
self.xmpp.socket.recvData(self.xmpp.stream_footer)
# ------------------------------------------------------------------
# XML Comparison and Cleanup
def fix_namespaces(self, xml, ns):
"""
Assign a namespace to an element and any children that
don't have a namespace.
"""
if xml.tag.startswith('{'):
return
xml.tag = '{%s}%s' % (ns, xml.tag)
for child in xml.getchildren():
self.fix_namespaces(child, ns)
def compare(self, xml1, xml2=None):
"""
Compare XML objects.
@ -137,7 +278,6 @@ class SleekTest(unittest.TestCase):
# Step 1: Check tags
if xml1.tag != xml2.tag:
print xml1.tag, xml2.tag
return False
# Step 2: Check attributes

34
tests/test_stream.py Normal file
View file

@ -0,0 +1,34 @@
from sleektest import *
import sleekxmpp.plugins.xep_0033 as xep_0033
class TestStreamTester(SleekTest):
"""
Test that we can simulate and test a stanza stream.
"""
def setUp(self):
self.streamStart()
def tearDown(self):
self.streamClose()
def testEcho(self):
def echo(msg):
msg.reply('Thanks for sending: %(body)s' % msg).send()
self.xmpp.add_event_handler('message', echo)
self.streamRecv("""
<message to="tester@localhost" from="user@localhost">
<body>Hi!</body>
</message>
""")
self.streamSendMessage("""
<message to="user@localhost" from="tester@localhost">
<body>Thanks for sending: Hi!</body>
</message>
""")
suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamTester)