diff --git a/sleekxmpp/plugins/xep_0047/ibb.py b/sleekxmpp/plugins/xep_0047/ibb.py
index c4f46e2f..85415f17 100644
--- a/sleekxmpp/plugins/xep_0047/ibb.py
+++ b/sleekxmpp/plugins/xep_0047/ibb.py
@@ -22,7 +22,7 @@ class xep_0047(base_plugin):
self.stanza = stanza
self.streams = {}
- self.pending_streams = {}
+ self.pending_streams = {3: 5}
self.pending_close_streams = {}
self._stream_lock = threading.Lock()
@@ -63,9 +63,6 @@ class xep_0047(base_plugin):
def open_stream(self, jid, block_size=4096, sid=None, window=1,
ifrom=None, block=True, timeout=None, callback=None):
- if not block and callback is not None:
- callback = lambda resp: self._handle_opened_stream(resp, callback)
-
if sid is None:
sid = str(uuid.uuid4())
@@ -77,29 +74,40 @@ class xep_0047(base_plugin):
iq['ibb_open']['sid'] = sid
iq['ibb_open']['stanza'] = 'iq'
- stream = IBBytestream(self.xmpp, sid, size,
+ stream = IBBytestream(self.xmpp, sid, block_size,
iq['to'], iq['from'], window)
-
+
with self._stream_lock:
self.pending_streams[iq['id']] = stream
-
- resp = iq.send(block=block, timeout=timeout, callback=callback)
+
+ self.pending_streams[iq['id']] = stream
+
if block:
+ resp = iq.send(timeout=timeout)
self._handle_opened_stream(resp)
return stream
+ else:
+ cb = None
+ if callback is not None:
+ def chained(resp):
+ self._handle_opened_stream(resp)
+ callback(resp)
+ cb = chained
+ else:
+ cb = self._handle_opened_stream
+ return iq.send(block=block, timeout=timeout, callback=cb)
- def _handle_opened_stream(self, iq, callback=None):
+
+ def _handle_opened_stream(self, iq):
if iq['type'] == 'result':
with self._stream_lock:
- stream = self.pending_streams[iq['id']]
- stream.sender = iq['to']
- stream.receiver = iq['from']
- stream.stream_started.set()
- self.streams[stream.sid] = stream
- self.xmpp.event('ibb_stream_start', stream)
-
- if callback is not None:
- callback(iq)
+ stream = self.pending_streams.get(iq['id'], None)
+ if stream is not None:
+ stream.sender = iq['to']
+ stream.receiver = iq['from']
+ stream.stream_started.set()
+ self.streams[stream.sid] = stream
+ self.xmpp.event('ibb_stream_start', stream)
with self._stream_lock:
if iq['id'] in self.pending_streams:
@@ -114,7 +122,7 @@ class xep_0047(base_plugin):
if size > self.max_block_size:
raise XMPPError('resource-constraint')
- stream = IBBytestream(self.xmpp, sid, size,
+ stream = IBBytestream(self.xmpp, sid, size,
iq['from'], iq['to'],
self.window_size)
stream.stream_started.set()
diff --git a/sleekxmpp/plugins/xep_0047/stanza.py b/sleekxmpp/plugins/xep_0047/stanza.py
index 2e4f8ccc..f5b77c0e 100644
--- a/sleekxmpp/plugins/xep_0047/stanza.py
+++ b/sleekxmpp/plugins/xep_0047/stanza.py
@@ -53,7 +53,7 @@ class Data(ElementBase):
raise XMPPError('not-acceptable')
def set_data(self, value):
- self.xml.text = to_64(value)
+ self.xml.text = to_b64(value)
def del_data(self):
self.xml.text = ''
diff --git a/tests/test_stanza_xep_0047.py b/tests/test_stanza_xep_0047.py
index 1b212529..6aa2314b 100644
--- a/tests/test_stanza_xep_0047.py
+++ b/tests/test_stanza_xep_0047.py
@@ -73,5 +73,18 @@ class TestIBB(SleekTest):
self.assertTrue(errored, "ABCD?EFGH did not raise base64 error")
+ def testConvertData(self):
+ """Test that data is converted to base64"""
+ iq = Iq()
+ iq['type'] = 'set'
+ iq['ibb_data']['seq'] = 0
+ iq['ibb_data']['data'] = 'sleekxmpp'
+
+ self.check(iq, """
+
+ c2xlZWt4bXBw
+
+ """)
+
suite = unittest.TestLoader().loadTestsFromTestCase(TestIBB)
diff --git a/tests/test_stream_xep_0047.py b/tests/test_stream_xep_0047.py
new file mode 100644
index 00000000..485dafe5
--- /dev/null
+++ b/tests/test_stream_xep_0047.py
@@ -0,0 +1,98 @@
+import threading
+import time
+
+from sleekxmpp.test import *
+
+
+class TestInBandByteStreams(SleekTest):
+
+ def setUp(self):
+ self.stream_start(plugins=['xep_0047', 'xep_0030'])
+
+ def tearDown(self):
+ self.stream_close()
+
+ def testOpenStream(self):
+ """Test requesting a stream, successfully"""
+
+ events = []
+
+ def on_stream_start(stream):
+ events.append('ibb_stream_start')
+
+
+ self.xmpp.add_event_handler('ibb_stream_start', on_stream_start)
+
+ t = threading.Thread(name='open_stream',
+ target=self.xmpp['xep_0047'].open_stream,
+ args=('tester@localhost/receiver',),
+ kwargs={'sid': 'testing'})
+ t.start()
+
+ self.send("""
+
+
+
+ """)
+
+ self.recv("""
+
+ """)
+
+ t.join()
+
+ time.sleep(0.2)
+
+ self.assertEqual(events, ['ibb_stream_start'])
+
+ def testAysncOpenStream(self):
+ """Test requesting a stream, aysnc"""
+
+ events = set()
+
+ def on_stream_start(stream):
+ events.add('ibb_stream_start')
+
+ def stream_callback(iq):
+ events.add('callback')
+
+ self.xmpp.add_event_handler('ibb_stream_start', on_stream_start)
+
+ t = threading.Thread(name='open_stream',
+ target=self.xmpp['xep_0047'].open_stream,
+ args=('tester@localhost/receiver',),
+ kwargs={'sid': 'testing',
+ 'block': False,
+ 'callback': stream_callback})
+ t.start()
+
+ self.send("""
+
+
+
+ """)
+
+ self.recv("""
+
+ """)
+
+ t.join()
+
+ time.sleep(0.2)
+
+ self.assertEqual(events, set(['ibb_stream_start', 'callback']))
+
+
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestInBandByteStreams)