# -*- coding: utf-8 -*- """ Slixmpp: The Slick XMPP Library Implementation of xeps for Internet of Things http://wiki.xmpp.org/web/Tech_pages/IoT_systems Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se This file is part of Slixmpp. See the file LICENSE for copying permission. """ import sys import datetime import time import threading from slixmpp.test import * from slixmpp.xmlstream import ElementBase from slixmpp.plugins.xep_0325.device import Device class TestStreamControl(SlixTest): """ Test using the XEP-0325 plugin. """ def setUp(self): pass def _time_now(self): return datetime.datetime.now().replace(microsecond=0).isoformat() def testRequestSetOk(self): self.stream_start(mode='component', plugins=['xep_0030', 'xep_0325']) myDevice = Device("Device22") myDevice._add_control_field(name="Temperature", typename="int", value="15") self.xmpp['xep_0325'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5) self.recv(""" """) self.send(""" """) self.assertEqual(myDevice._get_field_value("Temperature"), "17") def testRequestSetMulti(self): self.stream_start(mode='component', plugins=['xep_0030', 'xep_0325']) myDevice = Device("Device22") myDevice._add_control_field(name="Temperature", typename="int", value="15") myDevice._add_control_field(name="Startup", typename="date", value="2013-01-03") myDevice2 = Device("Device23") myDevice2._add_control_field(name="Temperature", typename="int", value="19") myDevice2._add_control_field(name="Startup", typename="date", value="2013-01-09") self.xmpp['xep_0325'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5) self.xmpp['xep_0325'].register_node(nodeId="Device23", device=myDevice2, commTimeout=0.5) self.recv(""" """) self.send(""" """) self.assertEqual(myDevice._get_field_value("Temperature"), "17") self.assertEqual(myDevice2._get_field_value("Temperature"), "19") self.recv(""" """) self.send(""" """) self.assertEqual(myDevice._get_field_value("Temperature"), "20") self.assertEqual(myDevice2._get_field_value("Temperature"), "20") self.assertEqual(myDevice._get_field_value("Startup"), "2013-02-01") self.assertEqual(myDevice2._get_field_value("Startup"), "2013-02-01") def testRequestSetFail(self): self.stream_start(mode='component', plugins=['xep_0030', 'xep_0325']) myDevice = Device("Device23") myDevice._add_control_field(name="Temperature", typename="int", value="15") self.xmpp['xep_0325'].register_node(nodeId="Device23", device=myDevice, commTimeout=0.5) self.recv(""" """) self.send(""" Invalid field Voltage """) self.assertEqual(myDevice._get_field_value("Temperature"), "15") self.assertFalse(myDevice.has_control_field("Voltage", "int")) def testDirectSetOk(self): self.stream_start(mode='component', plugins=['xep_0030', 'xep_0325']) myDevice = Device("Device22") myDevice._add_control_field(name="Temperature", typename="int", value="15") self.xmpp['xep_0325'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5) self.recv(""" """) time.sleep(0.5) self.assertEqual(myDevice._get_field_value("Temperature"), "17") def testDirectSetFail(self): self.stream_start(mode='component', plugins=['xep_0030', 'xep_0325']) myDevice = Device("Device22") myDevice._add_control_field(name="Temperature", typename="int", value="15") self.xmpp['xep_0325'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5) self.recv(""" """) self.assertEqual(myDevice._get_field_value("Temperature"), "15"); self.assertFalse(myDevice.has_control_field("Voltage", "int")); def testRequestSetOkAPI(self): self.stream_start(mode='client', plugins=['xep_0030', 'xep_0325']) results = [] def my_callback(from_jid, result, nodeIds=None, fields=None, error_msg=None): results.append(result) fields = [] fields.append(("Temperature", "double", "20.5")) fields.append(("TemperatureAlarmSetting", "string", "High")) self.xmpp['xep_0325'].set_request(from_jid="tester@localhost", to_jid="you@google.com", fields=fields, nodeIds=['Device33', 'Device22'], callback=my_callback) self.send(""" """) self.recv(""" """) self.assertEqual(results, ["OK"]); def testRequestSetErrorAPI(self): self.stream_start(mode='client', plugins=['xep_0030', 'xep_0325']) results = [] def my_callback(from_jid, result, nodeIds=None, fields=None, error_msg=None): results.append(result) fields = [] fields.append(("Temperature", "double", "20.5")) fields.append(("TemperatureAlarmSetting", "string", "High")) self.xmpp['xep_0325'].set_request(from_jid="tester@localhost", to_jid="you@google.com", fields=fields, nodeIds=['Device33', 'Device22'], callback=my_callback) self.send(""" """) self.recv(""" Sensor error """) self.assertEqual(results, ["OtherError"]) def testServiceDiscoveryClient(self): self.stream_start(mode='client', plugins=['xep_0030', 'xep_0325']) self.recv(""" """) self.send(""" """) def testServiceDiscoveryComponent(self): self.stream_start(mode='component', plugins=['xep_0030', 'xep_0325']) self.recv(""" """) self.send(""" """) suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamControl)