From 45f7cb8bda42512bad70b6262483ac435209d9c3 Mon Sep 17 00:00:00 2001 From: Emmanuel Gil Peyrot Date: Tue, 14 Apr 2015 17:18:34 +0200 Subject: [PATCH 01/12] XEP-0047: prevent tracebacks in stanza reading. --- slixmpp/plugins/xep_0047/stanza.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/slixmpp/plugins/xep_0047/stanza.py b/slixmpp/plugins/xep_0047/stanza.py index 62199077..7f8ff0ba 100644 --- a/slixmpp/plugins/xep_0047/stanza.py +++ b/slixmpp/plugins/xep_0047/stanza.py @@ -24,7 +24,7 @@ class Open(ElementBase): interfaces = set(('block_size', 'sid', 'stanza')) def get_block_size(self): - return int(self._get_attr('block-size')) + return int(self._get_attr('block-size', '0')) def set_block_size(self, value): self._set_attr('block-size', str(value)) @@ -47,7 +47,10 @@ class Data(ElementBase): self._set_attr('seq', str(value)) def get_data(self): - b64_data = self.xml.text.strip() + text = self.xml.text + if not text: + raise XMPPError('not-acceptable', 'IBB data element is empty.') + b64_data = text.strip() if VALID_B64.match(b64_data).group() == b64_data: return from_b64(b64_data) else: From c1f23b566b447dbaa4a4ea6942b61128b3161c10 Mon Sep 17 00:00:00 2001 From: Emmanuel Gil Peyrot Date: Tue, 14 Apr 2015 17:24:44 +0200 Subject: [PATCH 02/12] XEP-0047: remove now-useless threading locks. --- slixmpp/plugins/xep_0047/ibb.py | 33 +++++++++--------------------- slixmpp/plugins/xep_0047/stream.py | 19 +++++++---------- 2 files changed, 17 insertions(+), 35 deletions(-) diff --git a/slixmpp/plugins/xep_0047/ibb.py b/slixmpp/plugins/xep_0047/ibb.py index 87cd1f51..aa4e6afc 100644 --- a/slixmpp/plugins/xep_0047/ibb.py +++ b/slixmpp/plugins/xep_0047/ibb.py @@ -1,6 +1,5 @@ import uuid import logging -import threading from slixmpp import Message, Iq from slixmpp.exceptions import XMPPError @@ -30,10 +29,6 @@ class XEP_0047(BasePlugin): def plugin_init(self): self._streams = {} self._pending_streams = {} - self._pending_lock = threading.Lock() - self._stream_lock = threading.Lock() - - self._preauthed_sids_lock = threading.Lock() self._preauthed_sids = {} register_stanza_plugin(Iq, Open) @@ -85,9 +80,8 @@ class XEP_0047(BasePlugin): self._streams[(jid, sid, peer_jid)] = stream def _del_stream(self, jid, sid, peer_jid, data): - with self._stream_lock: - if (jid, sid, peer_jid) in self._streams: - del self._streams[(jid, sid, peer_jid)] + if (jid, sid, peer_jid) in self._streams: + del self._streams[(jid, sid, peer_jid)] def _accept_stream(self, iq): receiver = iq['to'] @@ -105,15 +99,13 @@ class XEP_0047(BasePlugin): return False def _authorized_sid(self, jid, sid, ifrom, iq): - with self._preauthed_sids_lock: - if (jid, sid, ifrom) in self._preauthed_sids: - del self._preauthed_sids[(jid, sid, ifrom)] - return True - return False + if (jid, sid, ifrom) in self._preauthed_sids: + del self._preauthed_sids[(jid, sid, ifrom)] + return True + return False def _preauthorize_sid(self, jid, sid, ifrom, data): - with self._preauthed_sids_lock: - self._preauthed_sids[(jid, sid, ifrom)] = True + self._preauthed_sids[(jid, sid, ifrom)] = True def open_stream(self, jid, block_size=None, sid=None, window=1, use_messages=False, ifrom=None, timeout=None, callback=None): @@ -134,9 +126,6 @@ class XEP_0047(BasePlugin): iq['from'], iq['to'], window, use_messages) - with self._stream_lock: - self._pending_streams[iq['id']] = stream - self._pending_streams[iq['id']] = stream cb = None @@ -151,8 +140,7 @@ class XEP_0047(BasePlugin): def _handle_opened_stream(self, iq): if iq['type'] == 'result': - with self._stream_lock: - stream = self._pending_streams.get(iq['id'], None) + stream = self._pending_streams.get(iq['id'], None) if stream is not None: log.debug('IBB stream (%s) accepted by %s', stream.sid, iq['from']) stream.self_jid = iq['to'] @@ -162,9 +150,8 @@ class XEP_0047(BasePlugin): self.xmpp.event('ibb_stream_start', stream) self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream) - with self._stream_lock: - if iq['id'] in self._pending_streams: - del self._pending_streams[iq['id']] + if iq['id'] in self._pending_streams: + del self._pending_streams[iq['id']] def _handle_open_request(self, iq): sid = iq['ibb_open']['sid'] diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py index 817f96a1..e15a66be 100644 --- a/slixmpp/plugins/xep_0047/stream.py +++ b/slixmpp/plugins/xep_0047/stream.py @@ -27,9 +27,6 @@ class IBBytestream(object): self.send_seq = -1 self.recv_seq = -1 - self._send_seq_lock = threading.Lock() - self._recv_seq_lock = threading.Lock() - self.stream_started = threading.Event() self.stream_in_closed = threading.Event() self.stream_out_closed = threading.Event() @@ -47,9 +44,8 @@ class IBBytestream(object): raise socket.error data = data[0:self.block_size] self.send_window.acquire() - with self._send_seq_lock: - self.send_seq = (self.send_seq + 1) % 65535 - seq = self.send_seq + self.send_seq = (self.send_seq + 1) % 65535 + seq = self.send_seq if self.use_messages: msg = self.xmpp.Message() msg['to'] = self.peer_jid @@ -87,12 +83,11 @@ class IBBytestream(object): self.close() def _recv_data(self, stanza): - with self._recv_seq_lock: - new_seq = stanza['ibb_data']['seq'] - if new_seq != (self.recv_seq + 1) % 65535: - self.close() - raise XMPPError('unexpected-request') - self.recv_seq = new_seq + new_seq = stanza['ibb_data']['seq'] + if new_seq != (self.recv_seq + 1) % 65535: + self.close() + raise XMPPError('unexpected-request') + self.recv_seq = new_seq data = stanza['ibb_data']['data'] if len(data) > self.block_size: From aa022204ee6f915fbc4668d5904e334b2a8f7041 Mon Sep 17 00:00:00 2001 From: Emmanuel Gil Peyrot Date: Tue, 14 Apr 2015 17:33:54 +0200 Subject: [PATCH 03/12] =?UTF-8?q?XEP-0047:=20don=E2=80=99t=20answer=20with?= =?UTF-8?q?=20an=20unauthorized=20error=20when=20block-size=20is=20too=20b?= =?UTF-8?q?ig.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- slixmpp/plugins/xep_0047/ibb.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/slixmpp/plugins/xep_0047/ibb.py b/slixmpp/plugins/xep_0047/ibb.py index aa4e6afc..958d1544 100644 --- a/slixmpp/plugins/xep_0047/ibb.py +++ b/slixmpp/plugins/xep_0047/ibb.py @@ -94,8 +94,7 @@ class XEP_0047(BasePlugin): def _authorized(self, jid, sid, ifrom, iq): if self.auto_accept: - if iq['ibb_open']['block_size'] <= self.max_block_size: - return True + return True return False def _authorized_sid(self, jid, sid, ifrom, iq): From 990113f8e745b810f91f5c8e52967d6c5d32c4df Mon Sep 17 00:00:00 2001 From: Emmanuel Gil Peyrot Date: Tue, 14 Apr 2015 17:38:11 +0200 Subject: [PATCH 04/12] XEP-0047: return the correct error type on not-acceptable (example 5). --- slixmpp/plugins/xep_0047/ibb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/slixmpp/plugins/xep_0047/ibb.py b/slixmpp/plugins/xep_0047/ibb.py index 958d1544..cbb13de5 100644 --- a/slixmpp/plugins/xep_0047/ibb.py +++ b/slixmpp/plugins/xep_0047/ibb.py @@ -162,7 +162,7 @@ class XEP_0047(BasePlugin): raise XMPPError(etype='modify', condition='bad-request') if not self._accept_stream(iq): - raise XMPPError(etype='modify', condition='not-acceptable') + raise XMPPError(etype='cancel', condition='not-acceptable') if size > self.max_block_size: raise XMPPError('resource-constraint') From ce085bf4f4714d6073e297b599d39b42fcfe218f Mon Sep 17 00:00:00 2001 From: Emmanuel Gil Peyrot Date: Tue, 14 Apr 2015 17:40:50 +0200 Subject: [PATCH 05/12] XEP-0047: announce the correct stanza type if message is selected. --- slixmpp/plugins/xep_0047/ibb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/slixmpp/plugins/xep_0047/ibb.py b/slixmpp/plugins/xep_0047/ibb.py index cbb13de5..de801877 100644 --- a/slixmpp/plugins/xep_0047/ibb.py +++ b/slixmpp/plugins/xep_0047/ibb.py @@ -119,7 +119,7 @@ class XEP_0047(BasePlugin): iq['from'] = ifrom iq['ibb_open']['block_size'] = block_size iq['ibb_open']['sid'] = sid - iq['ibb_open']['stanza'] = 'iq' + iq['ibb_open']['stanza'] = 'message' if use_messages else 'iq' stream = IBBytestream(self.xmpp, sid, block_size, iq['from'], iq['to'], window, From eb4e09b0ca2f699405c448ae7e4b818ff72ea301 Mon Sep 17 00:00:00 2001 From: Emmanuel Gil Peyrot Date: Tue, 14 Apr 2015 17:46:50 +0200 Subject: [PATCH 06/12] XEP-0047: allow only one window over the stream. --- slixmpp/plugins/xep_0047/ibb.py | 9 +++------ slixmpp/plugins/xep_0047/stream.py | 16 +--------------- 2 files changed, 4 insertions(+), 21 deletions(-) diff --git a/slixmpp/plugins/xep_0047/ibb.py b/slixmpp/plugins/xep_0047/ibb.py index de801877..d1c708ed 100644 --- a/slixmpp/plugins/xep_0047/ibb.py +++ b/slixmpp/plugins/xep_0047/ibb.py @@ -22,7 +22,6 @@ class XEP_0047(BasePlugin): default_config = { 'block_size': 4096, 'max_block_size': 8192, - 'window_size': 1, 'auto_accept': False, } @@ -106,7 +105,7 @@ class XEP_0047(BasePlugin): def _preauthorize_sid(self, jid, sid, ifrom, data): self._preauthed_sids[(jid, sid, ifrom)] = True - def open_stream(self, jid, block_size=None, sid=None, window=1, use_messages=False, + def open_stream(self, jid, block_size=None, sid=None, use_messages=False, ifrom=None, timeout=None, callback=None): if sid is None: sid = str(uuid.uuid4()) @@ -122,8 +121,7 @@ class XEP_0047(BasePlugin): iq['ibb_open']['stanza'] = 'message' if use_messages else 'iq' stream = IBBytestream(self.xmpp, sid, block_size, - iq['from'], iq['to'], window, - use_messages) + iq['from'], iq['to'], use_messages) self._pending_streams[iq['id']] = stream @@ -168,8 +166,7 @@ class XEP_0047(BasePlugin): raise XMPPError('resource-constraint') stream = IBBytestream(self.xmpp, sid, size, - iq['to'], iq['from'], - self.window_size) + iq['to'], iq['from']) stream.stream_started.set() self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) iq.reply().send() diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py index e15a66be..9c9d82a5 100644 --- a/slixmpp/plugins/xep_0047/stream.py +++ b/slixmpp/plugins/xep_0047/stream.py @@ -12,11 +12,10 @@ log = logging.getLogger(__name__) class IBBytestream(object): - def __init__(self, xmpp, sid, block_size, jid, peer, window_size=1, use_messages=False): + def __init__(self, xmpp, sid, block_size, jid, peer, use_messages=False): self.xmpp = xmpp self.sid = sid self.block_size = block_size - self.window_size = window_size self.use_messages = use_messages if jid is None: @@ -33,17 +32,11 @@ class IBBytestream(object): self.recv_queue = Queue() - self.send_window = threading.BoundedSemaphore(value=self.window_size) - self.window_ids = set() - self.window_empty = threading.Event() - self.window_empty.set() - def send(self, data): if not self.stream_started.is_set() or \ self.stream_out_closed.is_set(): raise socket.error data = data[0:self.block_size] - self.send_window.acquire() self.send_seq = (self.send_seq + 1) % 65535 seq = self.send_seq if self.use_messages: @@ -55,7 +48,6 @@ class IBBytestream(object): msg['ibb_data']['seq'] = seq msg['ibb_data']['data'] = data msg.send() - self.send_window.release() else: iq = self.xmpp.Iq() iq['type'] = 'set' @@ -64,8 +56,6 @@ class IBBytestream(object): iq['ibb_data']['sid'] = self.sid iq['ibb_data']['seq'] = seq iq['ibb_data']['data'] = data - self.window_empty.clear() - self.window_ids.add(iq['id']) iq.send(callback=self._recv_ack) return len(data) @@ -75,10 +65,6 @@ class IBBytestream(object): sent_len += self.send(data[sent_len:]) def _recv_ack(self, iq): - self.window_ids.remove(iq['id']) - if not self.window_ids: - self.window_empty.set() - self.send_window.release() if iq['type'] == 'error': self.close() From d34ddf33db361eb39193ca4231bc56c60af5ddac Mon Sep 17 00:00:00 2001 From: Emmanuel Gil Peyrot Date: Tue, 14 Apr 2015 17:54:16 +0200 Subject: [PATCH 07/12] XEP-0047: replace threading events with simple booleans. --- slixmpp/plugins/xep_0047/ibb.py | 4 ++-- slixmpp/plugins/xep_0047/stream.py | 24 +++++++++++------------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/slixmpp/plugins/xep_0047/ibb.py b/slixmpp/plugins/xep_0047/ibb.py index d1c708ed..be452877 100644 --- a/slixmpp/plugins/xep_0047/ibb.py +++ b/slixmpp/plugins/xep_0047/ibb.py @@ -142,7 +142,7 @@ class XEP_0047(BasePlugin): log.debug('IBB stream (%s) accepted by %s', stream.sid, iq['from']) stream.self_jid = iq['to'] stream.peer_jid = iq['from'] - stream.stream_started.set() + stream.stream_started = True self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) self.xmpp.event('ibb_stream_start', stream) self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream) @@ -167,7 +167,7 @@ class XEP_0047(BasePlugin): stream = IBBytestream(self.xmpp, sid, size, iq['to'], iq['from']) - stream.stream_started.set() + stream.stream_started = True self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) iq.reply().send() diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py index 9c9d82a5..9b7426a9 100644 --- a/slixmpp/plugins/xep_0047/stream.py +++ b/slixmpp/plugins/xep_0047/stream.py @@ -1,5 +1,4 @@ import socket -import threading import logging from queue import Queue @@ -26,15 +25,14 @@ class IBBytestream(object): self.send_seq = -1 self.recv_seq = -1 - self.stream_started = threading.Event() - self.stream_in_closed = threading.Event() - self.stream_out_closed = threading.Event() + self.stream_started = False + self.stream_in_closed = False + self.stream_out_closed = False self.recv_queue = Queue() def send(self, data): - if not self.stream_started.is_set() or \ - self.stream_out_closed.is_set(): + if not self.stream_started or self.stream_out_closed: raise socket.error data = data[0:self.block_size] self.send_seq = (self.send_seq + 1) % 65535 @@ -90,8 +88,7 @@ class IBBytestream(object): return self.read(block=True) def read(self, block=True, timeout=None, **kwargs): - if not self.stream_started.is_set() or \ - self.stream_in_closed.is_set(): + if not self.stream_started or self.stream_in_closed: raise socket.error if timeout is not None: block = True @@ -106,14 +103,15 @@ class IBBytestream(object): iq['to'] = self.peer_jid iq['from'] = self.self_jid iq['ibb_close']['sid'] = self.sid - self.stream_out_closed.set() - iq.send(block=False, - callback=lambda x: self.stream_in_closed.set()) + self.stream_out_closed = True + def _close_stream(_): + self.stream_in_closed = True + iq.send(block=False, callback=_close_stream) self.xmpp.event('ibb_stream_end', self) def _closed(self, iq): - self.stream_in_closed.set() - self.stream_out_closed.set() + self.stream_in_closed = True + self.stream_out_closed = True iq.reply().send() self.xmpp.event('ibb_stream_end', self) From ac31913a653e42d288a0ae477866ac06ad9bf0a3 Mon Sep 17 00:00:00 2001 From: Emmanuel Gil Peyrot Date: Tue, 14 Apr 2015 18:33:36 +0200 Subject: [PATCH 08/12] XEP-0047: make open_stream() return a future that will be set to the stream object. --- slixmpp/plugins/xep_0047/ibb.py | 39 +++++++++++++-------------------- 1 file changed, 15 insertions(+), 24 deletions(-) diff --git a/slixmpp/plugins/xep_0047/ibb.py b/slixmpp/plugins/xep_0047/ibb.py index be452877..52d7fbe5 100644 --- a/slixmpp/plugins/xep_0047/ibb.py +++ b/slixmpp/plugins/xep_0047/ibb.py @@ -1,3 +1,4 @@ +import asyncio import uuid import logging @@ -27,7 +28,6 @@ class XEP_0047(BasePlugin): def plugin_init(self): self._streams = {} - self._pending_streams = {} self._preauthed_sids = {} register_stanza_plugin(Iq, Open) @@ -123,32 +123,23 @@ class XEP_0047(BasePlugin): stream = IBBytestream(self.xmpp, sid, block_size, iq['from'], iq['to'], use_messages) - self._pending_streams[iq['id']] = stream + stream_future = asyncio.Future() - cb = None - if callback is not None: - def chained(resp): - self._handle_opened_stream(resp) - callback(resp) - cb = chained - else: - cb = self._handle_opened_stream - return iq.send(timeout=timeout, callback=cb) + def _handle_opened_stream(iq): + log.debug('IBB stream (%s) accepted by %s', stream.sid, iq['from']) + stream.self_jid = iq['to'] + stream.peer_jid = iq['from'] + stream.stream_started = True + self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) + stream_future.set_result(stream) + if callback is not None: + callback(stream) + self.xmpp.event('ibb_stream_start', stream) + self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream) - def _handle_opened_stream(self, iq): - if iq['type'] == 'result': - stream = self._pending_streams.get(iq['id'], None) - if stream is not None: - log.debug('IBB stream (%s) accepted by %s', stream.sid, iq['from']) - stream.self_jid = iq['to'] - stream.peer_jid = iq['from'] - stream.stream_started = True - self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) - self.xmpp.event('ibb_stream_start', stream) - self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream) + iq.send(timeout=timeout, callback=_handle_opened_stream) - if iq['id'] in self._pending_streams: - del self._pending_streams[iq['id']] + return stream_future def _handle_open_request(self, iq): sid = iq['ibb_open']['sid'] From 766d0dfd405fd1e0c6892d1bd3be9b599648f5d2 Mon Sep 17 00:00:00 2001 From: Emmanuel Gil Peyrot Date: Tue, 14 Apr 2015 18:52:30 +0200 Subject: [PATCH 09/12] =?UTF-8?q?XEP-0047:=20use=20asyncio=E2=80=99s=20Que?= =?UTF-8?q?ue=20implementation,=20to=20prevent=20any=20possibility=20of=20?= =?UTF-8?q?deadlock.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- slixmpp/plugins/xep_0047/stream.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py index 9b7426a9..3b8c013d 100644 --- a/slixmpp/plugins/xep_0047/stream.py +++ b/slixmpp/plugins/xep_0047/stream.py @@ -1,6 +1,6 @@ +import asyncio import socket import logging -from queue import Queue from slixmpp.stanza import Iq from slixmpp.exceptions import XMPPError @@ -29,7 +29,7 @@ class IBBytestream(object): self.stream_in_closed = False self.stream_out_closed = False - self.recv_queue = Queue() + self.recv_queue = asyncio.Queue() def send(self, data): if not self.stream_started or self.stream_out_closed: @@ -78,24 +78,19 @@ class IBBytestream(object): self.close() raise XMPPError('not-acceptable') - self.recv_queue.put(data) + self.recv_queue.put_nowait(data) self.xmpp.event('ibb_stream_data', {'stream': self, 'data': data}) if isinstance(stanza, Iq): stanza.reply().send() def recv(self, *args, **kwargs): - return self.read(block=True) + return self.read() - def read(self, block=True, timeout=None, **kwargs): + def read(self): if not self.stream_started or self.stream_in_closed: raise socket.error - if timeout is not None: - block = True - try: - return self.recv_queue.get(block, timeout) - except: - return None + return self.recv_queue.get_nowait() def close(self): iq = self.xmpp.Iq() @@ -106,7 +101,7 @@ class IBBytestream(object): self.stream_out_closed = True def _close_stream(_): self.stream_in_closed = True - iq.send(block=False, callback=_close_stream) + iq.send(callback=_close_stream) self.xmpp.event('ibb_stream_end', self) def _closed(self, iq): From 058c5307877c4e0d8cf8cbef1b83bbd187de1bc4 Mon Sep 17 00:00:00 2001 From: Emmanuel Gil Peyrot Date: Tue, 14 Apr 2015 18:56:43 +0200 Subject: [PATCH 10/12] XEP-0047: prevent any unneededly large or useless bytes slice. --- slixmpp/plugins/xep_0047/stream.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py index 3b8c013d..eee6bd0e 100644 --- a/slixmpp/plugins/xep_0047/stream.py +++ b/slixmpp/plugins/xep_0047/stream.py @@ -34,7 +34,8 @@ class IBBytestream(object): def send(self, data): if not self.stream_started or self.stream_out_closed: raise socket.error - data = data[0:self.block_size] + if len(data) > self.block_size: + data = data[:self.block_size] self.send_seq = (self.send_seq + 1) % 65535 seq = self.send_seq if self.use_messages: @@ -60,7 +61,7 @@ class IBBytestream(object): def sendall(self, data): sent_len = 0 while sent_len < len(data): - sent_len += self.send(data[sent_len:]) + sent_len += self.send(data[sent_len:self.block_size]) def _recv_ack(self, iq): if iq['type'] == 'error': From 4415d3be1ab10717e1bd3c3fde68b4c04932adda Mon Sep 17 00:00:00 2001 From: Emmanuel Gil Peyrot Date: Tue, 14 Apr 2015 19:19:46 +0200 Subject: [PATCH 11/12] XEP-0047: use coroutines for send(), sendall() and the new sendfile(). --- slixmpp/plugins/xep_0047/stream.py | 27 +++++++++++++++++---------- tests/test_stream_xep_0047.py | 4 +++- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py index eee6bd0e..3be894eb 100644 --- a/slixmpp/plugins/xep_0047/stream.py +++ b/slixmpp/plugins/xep_0047/stream.py @@ -31,7 +31,8 @@ class IBBytestream(object): self.recv_queue = asyncio.Queue() - def send(self, data): + @asyncio.coroutine + def send(self, data, timeout=None): if not self.stream_started or self.stream_out_closed: raise socket.error if len(data) > self.block_size: @@ -55,17 +56,22 @@ class IBBytestream(object): iq['ibb_data']['sid'] = self.sid iq['ibb_data']['seq'] = seq iq['ibb_data']['data'] = data - iq.send(callback=self._recv_ack) + yield from iq.send(timeout=timeout) return len(data) - def sendall(self, data): + @asyncio.coroutine + def sendall(self, data, timeout=None): sent_len = 0 while sent_len < len(data): - sent_len += self.send(data[sent_len:self.block_size]) + sent_len += yield from self.send(data[sent_len:self.block_size], timeout=timeout) - def _recv_ack(self, iq): - if iq['type'] == 'error': - self.close() + @asyncio.coroutine + def sendfile(self, file, timeout=None): + while True: + data = file.read(self.block_size) + if not data: + break + yield from self.send(data, timeout=timeout) def _recv_data(self, stanza): new_seq = stanza['ibb_data']['seq'] @@ -80,7 +86,7 @@ class IBBytestream(object): raise XMPPError('not-acceptable') self.recv_queue.put_nowait(data) - self.xmpp.event('ibb_stream_data', {'stream': self, 'data': data}) + self.xmpp.event('ibb_stream_data', self) if isinstance(stanza, Iq): stanza.reply().send() @@ -93,7 +99,7 @@ class IBBytestream(object): raise socket.error return self.recv_queue.get_nowait() - def close(self): + def close(self, timeout=None): iq = self.xmpp.Iq() iq['type'] = 'set' iq['to'] = self.peer_jid @@ -102,8 +108,9 @@ class IBBytestream(object): self.stream_out_closed = True def _close_stream(_): self.stream_in_closed = True - iq.send(callback=_close_stream) + future = iq.send(timeout=timeout, callback=_close_stream) self.xmpp.event('ibb_stream_end', self) + return future def _closed(self, iq): self.stream_in_closed = True diff --git a/tests/test_stream_xep_0047.py b/tests/test_stream_xep_0047.py index 2cc43823..ecba2445 100644 --- a/tests/test_stream_xep_0047.py +++ b/tests/test_stream_xep_0047.py @@ -1,3 +1,4 @@ +import asyncio import threading import time @@ -78,6 +79,7 @@ class TestInBandByteStreams(SlixTest): self.assertEqual(events, set(['ibb_stream_start', 'callback'])) + @asyncio.coroutine def testSendData(self): """Test sending data over an in-band bytestream.""" @@ -115,7 +117,7 @@ class TestInBandByteStreams(SlixTest): # Test sending data out - stream.send("Testing") + yield from stream.send("Testing") self.send(""" Date: Fri, 10 Apr 2015 22:29:44 +0200 Subject: [PATCH 12/12] XEP-0047: fix examples. --- examples/ibb_transfer/ibb_receiver.py | 48 ++++++++++++--------------- examples/ibb_transfer/ibb_sender.py | 47 ++++++++++++++++---------- 2 files changed, 51 insertions(+), 44 deletions(-) diff --git a/examples/ibb_transfer/ibb_receiver.py b/examples/ibb_transfer/ibb_receiver.py index 46dec047..e934f295 100755 --- a/examples/ibb_transfer/ibb_receiver.py +++ b/examples/ibb_transfer/ibb_receiver.py @@ -22,13 +22,10 @@ class IBBReceiver(slixmpp.ClientXMPP): A basic example of creating and using an in-band bytestream. """ - def __init__(self, jid, password): + def __init__(self, jid, password, filename): slixmpp.ClientXMPP.__init__(self, jid, password) - self.register_plugin('xep_0030') # Service Discovery - self.register_plugin('xep_0047', { - 'auto_accept': True - }) # In-band Bytestreams + self.file = open(filename, 'wb') # The session_start event will be triggered when # the bot establishes its connection with the server @@ -39,6 +36,7 @@ class IBBReceiver(slixmpp.ClientXMPP): self.add_event_handler("ibb_stream_start", self.stream_opened) self.add_event_handler("ibb_stream_data", self.stream_data) + self.add_event_handler("ibb_stream_end", self.stream_closed) def start(self, event): """ @@ -56,29 +54,16 @@ class IBBReceiver(slixmpp.ClientXMPP): self.send_presence() self.get_roster() - def accept_stream(self, iq): - """ - Check that it is ok to accept a stream request. - - Controlling stream acceptance can be done via either: - - setting 'auto_accept' to False in the plugin - configuration. The default is True. - - setting 'accept_stream' to a function which accepts - an Iq stanza as its argument, like this one. - - The accept_stream function will be used if it exists, and the - auto_accept value will be used otherwise. - """ - return True - def stream_opened(self, stream): print('Stream opened: %s from %s' % (stream.sid, stream.peer_jid)) - # You could run a loop reading from the stream using stream.recv(), - # or use the ibb_stream_data event. + def stream_data(self, stream): + self.file.write(stream.read()) - def stream_data(self, event): - print(event['data']) + def stream_closed(self, stream): + print('Stream closed: %s from %s' % (stream.sid, stream.peer_jid)) + self.file.close() + self.disconnect() if __name__ == '__main__': # Setup the command line arguments. @@ -97,6 +82,8 @@ if __name__ == '__main__': help="JID to use") parser.add_argument("-p", "--password", dest="password", help="password to use") + parser.add_argument("-o", "--out", dest="filename", + help="file to save to") args = parser.parse_args() @@ -108,9 +95,18 @@ if __name__ == '__main__': args.jid = input("Username: ") if args.password is None: args.password = getpass("Password: ") + if args.filename is None: + args.filename = input("File path: ") - xmpp = IBBReceiver(args.jid, args.password) + # Setup the IBBReceiver and register plugins. Note that while plugins may + # have interdependencies, the order in which you register them does + # not matter. + xmpp = IBBReceiver(args.jid, args.password, args.filename) + xmpp.register_plugin('xep_0030') # Service Discovery + xmpp.register_plugin('xep_0047', { + 'auto_accept': True + }) # In-band Bytestreams # Connect to the XMPP server and start processing XMPP stanzas. xmpp.connect() - xmpp.process() + xmpp.process(forever=False) diff --git a/examples/ibb_transfer/ibb_sender.py b/examples/ibb_transfer/ibb_sender.py index c7e87bb4..f1c0cab2 100755 --- a/examples/ibb_transfer/ibb_sender.py +++ b/examples/ibb_transfer/ibb_sender.py @@ -9,11 +9,13 @@ See the file LICENSE for copying permission. """ +import asyncio import logging from getpass import getpass from argparse import ArgumentParser import slixmpp +from slixmpp.exceptions import IqError, IqTimeout class IBBSender(slixmpp.ClientXMPP): @@ -22,11 +24,13 @@ class IBBSender(slixmpp.ClientXMPP): A basic example of creating and using an in-band bytestream. """ - def __init__(self, jid, password, receiver, filename): + def __init__(self, jid, password, receiver, filename, use_messages=False): slixmpp.ClientXMPP.__init__(self, jid, password) self.receiver = receiver - self.filename = filename + + self.file = open(filename, 'rb') + self.use_messages = use_messages # The session_start event will be triggered when # the bot establishes its connection with the server @@ -35,6 +39,7 @@ class IBBSender(slixmpp.ClientXMPP): # our roster. self.add_event_handler("session_start", self.start) + @asyncio.coroutine def start(self, event): """ Process the session_start event. @@ -51,15 +56,22 @@ class IBBSender(slixmpp.ClientXMPP): self.send_presence() self.get_roster() - # For the purpose of demonstration, we'll set a very small block - # size. The default block size is 4096. We'll also use a window - # allowing sending multiple blocks at a time; in this case, three - # block transfers may be in progress at any time. - stream = self['xep_0047'].open_stream(self.receiver) + try: + # Open the IBB stream in which to write to. + stream = yield from self['xep_0047'].open_stream(self.receiver, use_messages=self.use_messages) - with open(self.filename) as f: - data = f.read() - stream.sendall(data) + # If you want to send in-memory bytes, use stream.sendall() instead. + yield from stream.sendfile(self.file, timeout=10) + + # And finally close the stream. + yield from stream.close(timeout=10) + except (IqError, IqTimeout): + print('File transfer errored') + else: + print('File transfer finished') + finally: + self.file.close() + self.disconnect() if __name__ == '__main__': @@ -80,9 +92,11 @@ if __name__ == '__main__': parser.add_argument("-p", "--password", dest="password", help="password to use") parser.add_argument("-r", "--receiver", dest="receiver", - help="JID to use") + help="JID of the receiver") parser.add_argument("-f", "--file", dest="filename", - help="JID to use") + help="file to send") + parser.add_argument("-m", "--use-messages", action="store_true", + help="use messages instead of iqs for file transfer") args = parser.parse_args() @@ -99,16 +113,13 @@ if __name__ == '__main__': if args.filename is None: args.filename = input("File path: ") - # Setup the EchoBot and register plugins. Note that while plugins may + # Setup the IBBSender and register plugins. Note that while plugins may # have interdependencies, the order in which you register them does # not matter. - xmpp = IBBSender(args.jid, args.password, args.receiver, args.filename) + xmpp = IBBSender(args.jid, args.password, args.receiver, args.filename, args.use_messages) xmpp.register_plugin('xep_0030') # Service Discovery - xmpp.register_plugin('xep_0004') # Data Forms xmpp.register_plugin('xep_0047') # In-band Bytestreams - xmpp.register_plugin('xep_0060') # PubSub - xmpp.register_plugin('xep_0199') # XMPP Ping # Connect to the XMPP server and start processing XMPP stanzas. xmpp.connect() - xmpp.process() + xmpp.process(forever=False)