# -*- coding: utf-8 -*-
"""
Slixmpp: The Slick XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of Slixmpp.
See the file LICENSE for copying permission.
"""
import sys
import datetime
import time
import threading
from slixmpp.test import *
from slixmpp.xmlstream import ElementBase
from slixmpp.plugins.xep_0325.device import Device
class TestStreamControl(SlixTest):
"""
Test using the XEP-0325 plugin.
"""
def setUp(self):
pass
def _time_now(self):
return datetime.datetime.now().replace(microsecond=0).isoformat()
def testRequestSetOk(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0325'])
myDevice = Device("Device22")
myDevice._add_control_field(name="Temperature", typename="int", value="15")
self.xmpp['xep_0325'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5)
self.recv("""
""")
self.send("""
""")
self.assertEqual(myDevice._get_field_value("Temperature"), "17")
def testRequestSetMulti(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0325'])
myDevice = Device("Device22")
myDevice._add_control_field(name="Temperature", typename="int", value="15")
myDevice._add_control_field(name="Startup", typename="date", value="2013-01-03")
myDevice2 = Device("Device23")
myDevice2._add_control_field(name="Temperature", typename="int", value="19")
myDevice2._add_control_field(name="Startup", typename="date", value="2013-01-09")
self.xmpp['xep_0325'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5)
self.xmpp['xep_0325'].register_node(nodeId="Device23", device=myDevice2, commTimeout=0.5)
self.recv("""
""")
self.send("""
""")
self.assertEqual(myDevice._get_field_value("Temperature"), "17")
self.assertEqual(myDevice2._get_field_value("Temperature"), "19")
self.recv("""
""")
self.send("""
""")
self.assertEqual(myDevice._get_field_value("Temperature"), "20")
self.assertEqual(myDevice2._get_field_value("Temperature"), "20")
self.assertEqual(myDevice._get_field_value("Startup"), "2013-02-01")
self.assertEqual(myDevice2._get_field_value("Startup"), "2013-02-01")
def testRequestSetFail(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0325'])
myDevice = Device("Device23")
myDevice._add_control_field(name="Temperature", typename="int", value="15")
self.xmpp['xep_0325'].register_node(nodeId="Device23", device=myDevice, commTimeout=0.5)
self.recv("""
""")
self.send("""
Invalid field Voltage
""")
self.assertEqual(myDevice._get_field_value("Temperature"), "15")
self.assertFalse(myDevice.has_control_field("Voltage", "int"))
def testDirectSetOk(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0325'])
myDevice = Device("Device22")
myDevice._add_control_field(name="Temperature", typename="int", value="15")
self.xmpp['xep_0325'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5)
self.recv("""
""")
time.sleep(0.5)
self.assertEqual(myDevice._get_field_value("Temperature"), "17")
def testDirectSetFail(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0325'])
myDevice = Device("Device22")
myDevice._add_control_field(name="Temperature", typename="int", value="15")
self.xmpp['xep_0325'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5)
self.recv("""
""")
self.assertEqual(myDevice._get_field_value("Temperature"), "15");
self.assertFalse(myDevice.has_control_field("Voltage", "int"));
def testRequestSetOkAPI(self):
self.stream_start(mode='client',
plugins=['xep_0030',
'xep_0325'])
results = []
def my_callback(from_jid, result, nodeIds=None, fields=None, error_msg=None):
results.append(result)
fields = []
fields.append(("Temperature", "double", "20.5"))
fields.append(("TemperatureAlarmSetting", "string", "High"))
self.xmpp['xep_0325'].set_request(from_jid="tester@localhost", to_jid="you@google.com", fields=fields, nodeIds=['Device33', 'Device22'], callback=my_callback)
self.send("""
""")
self.recv("""
""")
self.assertEqual(results, ["OK"]);
def testRequestSetErrorAPI(self):
self.stream_start(mode='client',
plugins=['xep_0030',
'xep_0325'])
results = []
def my_callback(from_jid, result, nodeIds=None, fields=None, error_msg=None):
results.append(result)
fields = []
fields.append(("Temperature", "double", "20.5"))
fields.append(("TemperatureAlarmSetting", "string", "High"))
self.xmpp['xep_0325'].set_request(from_jid="tester@localhost", to_jid="you@google.com", fields=fields, nodeIds=['Device33', 'Device22'], callback=my_callback)
self.send("""
""")
self.recv("""
Sensor error
""")
self.assertEqual(results, ["OtherError"])
def testServiceDiscoveryClient(self):
self.stream_start(mode='client',
plugins=['xep_0030',
'xep_0325'])
self.recv("""
""")
self.send("""
""")
def testServiceDiscoveryComponent(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0325'])
self.recv("""
""")
self.send("""
""")
suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamControl)