This commit is contained in:
mathieui 2015-04-19 20:53:35 +02:00
commit 0305ce66b7
No known key found for this signature in database
GPG key ID: C59F84CEEFD616E3
6 changed files with 131 additions and 163 deletions

View file

@ -22,13 +22,10 @@ class IBBReceiver(slixmpp.ClientXMPP):
A basic example of creating and using an in-band bytestream. A basic example of creating and using an in-band bytestream.
""" """
def __init__(self, jid, password): def __init__(self, jid, password, filename):
slixmpp.ClientXMPP.__init__(self, jid, password) slixmpp.ClientXMPP.__init__(self, jid, password)
self.register_plugin('xep_0030') # Service Discovery self.file = open(filename, 'wb')
self.register_plugin('xep_0047', {
'auto_accept': True
}) # In-band Bytestreams
# The session_start event will be triggered when # The session_start event will be triggered when
# the bot establishes its connection with the server # the bot establishes its connection with the server
@ -39,6 +36,7 @@ class IBBReceiver(slixmpp.ClientXMPP):
self.add_event_handler("ibb_stream_start", self.stream_opened) self.add_event_handler("ibb_stream_start", self.stream_opened)
self.add_event_handler("ibb_stream_data", self.stream_data) self.add_event_handler("ibb_stream_data", self.stream_data)
self.add_event_handler("ibb_stream_end", self.stream_closed)
def start(self, event): def start(self, event):
""" """
@ -56,29 +54,16 @@ class IBBReceiver(slixmpp.ClientXMPP):
self.send_presence() self.send_presence()
self.get_roster() self.get_roster()
def accept_stream(self, iq):
"""
Check that it is ok to accept a stream request.
Controlling stream acceptance can be done via either:
- setting 'auto_accept' to False in the plugin
configuration. The default is True.
- setting 'accept_stream' to a function which accepts
an Iq stanza as its argument, like this one.
The accept_stream function will be used if it exists, and the
auto_accept value will be used otherwise.
"""
return True
def stream_opened(self, stream): def stream_opened(self, stream):
print('Stream opened: %s from %s' % (stream.sid, stream.peer_jid)) print('Stream opened: %s from %s' % (stream.sid, stream.peer_jid))
# You could run a loop reading from the stream using stream.recv(), def stream_data(self, stream):
# or use the ibb_stream_data event. self.file.write(stream.read())
def stream_data(self, event): def stream_closed(self, stream):
print(event['data']) print('Stream closed: %s from %s' % (stream.sid, stream.peer_jid))
self.file.close()
self.disconnect()
if __name__ == '__main__': if __name__ == '__main__':
# Setup the command line arguments. # Setup the command line arguments.
@ -97,6 +82,8 @@ if __name__ == '__main__':
help="JID to use") help="JID to use")
parser.add_argument("-p", "--password", dest="password", parser.add_argument("-p", "--password", dest="password",
help="password to use") help="password to use")
parser.add_argument("-o", "--out", dest="filename",
help="file to save to")
args = parser.parse_args() args = parser.parse_args()
@ -108,9 +95,18 @@ if __name__ == '__main__':
args.jid = input("Username: ") args.jid = input("Username: ")
if args.password is None: if args.password is None:
args.password = getpass("Password: ") args.password = getpass("Password: ")
if args.filename is None:
args.filename = input("File path: ")
xmpp = IBBReceiver(args.jid, args.password) # Setup the IBBReceiver and register plugins. Note that while plugins may
# have interdependencies, the order in which you register them does
# not matter.
xmpp = IBBReceiver(args.jid, args.password, args.filename)
xmpp.register_plugin('xep_0030') # Service Discovery
xmpp.register_plugin('xep_0047', {
'auto_accept': True
}) # In-band Bytestreams
# Connect to the XMPP server and start processing XMPP stanzas. # Connect to the XMPP server and start processing XMPP stanzas.
xmpp.connect() xmpp.connect()
xmpp.process() xmpp.process(forever=False)

View file

@ -9,11 +9,13 @@
See the file LICENSE for copying permission. See the file LICENSE for copying permission.
""" """
import asyncio
import logging import logging
from getpass import getpass from getpass import getpass
from argparse import ArgumentParser from argparse import ArgumentParser
import slixmpp import slixmpp
from slixmpp.exceptions import IqError, IqTimeout
class IBBSender(slixmpp.ClientXMPP): class IBBSender(slixmpp.ClientXMPP):
@ -22,11 +24,13 @@ class IBBSender(slixmpp.ClientXMPP):
A basic example of creating and using an in-band bytestream. A basic example of creating and using an in-band bytestream.
""" """
def __init__(self, jid, password, receiver, filename): def __init__(self, jid, password, receiver, filename, use_messages=False):
slixmpp.ClientXMPP.__init__(self, jid, password) slixmpp.ClientXMPP.__init__(self, jid, password)
self.receiver = receiver self.receiver = receiver
self.filename = filename
self.file = open(filename, 'rb')
self.use_messages = use_messages
# The session_start event will be triggered when # The session_start event will be triggered when
# the bot establishes its connection with the server # the bot establishes its connection with the server
@ -35,6 +39,7 @@ class IBBSender(slixmpp.ClientXMPP):
# our roster. # our roster.
self.add_event_handler("session_start", self.start) self.add_event_handler("session_start", self.start)
@asyncio.coroutine
def start(self, event): def start(self, event):
""" """
Process the session_start event. Process the session_start event.
@ -51,15 +56,22 @@ class IBBSender(slixmpp.ClientXMPP):
self.send_presence() self.send_presence()
self.get_roster() self.get_roster()
# For the purpose of demonstration, we'll set a very small block try:
# size. The default block size is 4096. We'll also use a window # Open the IBB stream in which to write to.
# allowing sending multiple blocks at a time; in this case, three stream = yield from self['xep_0047'].open_stream(self.receiver, use_messages=self.use_messages)
# block transfers may be in progress at any time.
stream = self['xep_0047'].open_stream(self.receiver)
with open(self.filename) as f: # If you want to send in-memory bytes, use stream.sendall() instead.
data = f.read() yield from stream.sendfile(self.file, timeout=10)
stream.sendall(data)
# And finally close the stream.
yield from stream.close(timeout=10)
except (IqError, IqTimeout):
print('File transfer errored')
else:
print('File transfer finished')
finally:
self.file.close()
self.disconnect()
if __name__ == '__main__': if __name__ == '__main__':
@ -80,9 +92,11 @@ if __name__ == '__main__':
parser.add_argument("-p", "--password", dest="password", parser.add_argument("-p", "--password", dest="password",
help="password to use") help="password to use")
parser.add_argument("-r", "--receiver", dest="receiver", parser.add_argument("-r", "--receiver", dest="receiver",
help="JID to use") help="JID of the receiver")
parser.add_argument("-f", "--file", dest="filename", parser.add_argument("-f", "--file", dest="filename",
help="JID to use") help="file to send")
parser.add_argument("-m", "--use-messages", action="store_true",
help="use messages instead of iqs for file transfer")
args = parser.parse_args() args = parser.parse_args()
@ -99,16 +113,13 @@ if __name__ == '__main__':
if args.filename is None: if args.filename is None:
args.filename = input("File path: ") args.filename = input("File path: ")
# Setup the EchoBot and register plugins. Note that while plugins may # Setup the IBBSender and register plugins. Note that while plugins may
# have interdependencies, the order in which you register them does # have interdependencies, the order in which you register them does
# not matter. # not matter.
xmpp = IBBSender(args.jid, args.password, args.receiver, args.filename) xmpp = IBBSender(args.jid, args.password, args.receiver, args.filename, args.use_messages)
xmpp.register_plugin('xep_0030') # Service Discovery xmpp.register_plugin('xep_0030') # Service Discovery
xmpp.register_plugin('xep_0004') # Data Forms
xmpp.register_plugin('xep_0047') # In-band Bytestreams xmpp.register_plugin('xep_0047') # In-band Bytestreams
xmpp.register_plugin('xep_0060') # PubSub
xmpp.register_plugin('xep_0199') # XMPP Ping
# Connect to the XMPP server and start processing XMPP stanzas. # Connect to the XMPP server and start processing XMPP stanzas.
xmpp.connect() xmpp.connect()
xmpp.process() xmpp.process(forever=False)

View file

@ -1,6 +1,6 @@
import asyncio
import uuid import uuid
import logging import logging
import threading
from slixmpp import Message, Iq from slixmpp import Message, Iq
from slixmpp.exceptions import XMPPError from slixmpp.exceptions import XMPPError
@ -23,17 +23,11 @@ class XEP_0047(BasePlugin):
default_config = { default_config = {
'block_size': 4096, 'block_size': 4096,
'max_block_size': 8192, 'max_block_size': 8192,
'window_size': 1,
'auto_accept': False, 'auto_accept': False,
} }
def plugin_init(self): def plugin_init(self):
self._streams = {} self._streams = {}
self._pending_streams = {}
self._pending_lock = threading.Lock()
self._stream_lock = threading.Lock()
self._preauthed_sids_lock = threading.Lock()
self._preauthed_sids = {} self._preauthed_sids = {}
register_stanza_plugin(Iq, Open) register_stanza_plugin(Iq, Open)
@ -85,9 +79,8 @@ class XEP_0047(BasePlugin):
self._streams[(jid, sid, peer_jid)] = stream self._streams[(jid, sid, peer_jid)] = stream
def _del_stream(self, jid, sid, peer_jid, data): def _del_stream(self, jid, sid, peer_jid, data):
with self._stream_lock: if (jid, sid, peer_jid) in self._streams:
if (jid, sid, peer_jid) in self._streams: del self._streams[(jid, sid, peer_jid)]
del self._streams[(jid, sid, peer_jid)]
def _accept_stream(self, iq): def _accept_stream(self, iq):
receiver = iq['to'] receiver = iq['to']
@ -100,22 +93,19 @@ class XEP_0047(BasePlugin):
def _authorized(self, jid, sid, ifrom, iq): def _authorized(self, jid, sid, ifrom, iq):
if self.auto_accept: if self.auto_accept:
if iq['ibb_open']['block_size'] <= self.max_block_size: return True
return True
return False return False
def _authorized_sid(self, jid, sid, ifrom, iq): def _authorized_sid(self, jid, sid, ifrom, iq):
with self._preauthed_sids_lock: if (jid, sid, ifrom) in self._preauthed_sids:
if (jid, sid, ifrom) in self._preauthed_sids: del self._preauthed_sids[(jid, sid, ifrom)]
del self._preauthed_sids[(jid, sid, ifrom)] return True
return True return False
return False
def _preauthorize_sid(self, jid, sid, ifrom, data): def _preauthorize_sid(self, jid, sid, ifrom, data):
with self._preauthed_sids_lock: self._preauthed_sids[(jid, sid, ifrom)] = True
self._preauthed_sids[(jid, sid, ifrom)] = True
def open_stream(self, jid, block_size=None, sid=None, window=1, use_messages=False, def open_stream(self, jid, block_size=None, sid=None, use_messages=False,
ifrom=None, timeout=None, callback=None): ifrom=None, timeout=None, callback=None):
if sid is None: if sid is None:
sid = str(uuid.uuid4()) sid = str(uuid.uuid4())
@ -128,43 +118,28 @@ class XEP_0047(BasePlugin):
iq['from'] = ifrom iq['from'] = ifrom
iq['ibb_open']['block_size'] = block_size iq['ibb_open']['block_size'] = block_size
iq['ibb_open']['sid'] = sid iq['ibb_open']['sid'] = sid
iq['ibb_open']['stanza'] = 'iq' iq['ibb_open']['stanza'] = 'message' if use_messages else 'iq'
stream = IBBytestream(self.xmpp, sid, block_size, stream = IBBytestream(self.xmpp, sid, block_size,
iq['from'], iq['to'], window, iq['from'], iq['to'], use_messages)
use_messages)
with self._stream_lock: stream_future = asyncio.Future()
self._pending_streams[iq['id']] = stream
self._pending_streams[iq['id']] = stream def _handle_opened_stream(iq):
log.debug('IBB stream (%s) accepted by %s', stream.sid, iq['from'])
stream.self_jid = iq['to']
stream.peer_jid = iq['from']
stream.stream_started = True
self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream)
stream_future.set_result(stream)
if callback is not None:
callback(stream)
self.xmpp.event('ibb_stream_start', stream)
self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream)
cb = None iq.send(timeout=timeout, callback=_handle_opened_stream)
if callback is not None:
def chained(resp):
self._handle_opened_stream(resp)
callback(resp)
cb = chained
else:
cb = self._handle_opened_stream
return iq.send(timeout=timeout, callback=cb)
def _handle_opened_stream(self, iq): return stream_future
if iq['type'] == 'result':
with self._stream_lock:
stream = self._pending_streams.get(iq['id'], None)
if stream is not None:
log.debug('IBB stream (%s) accepted by %s', stream.sid, iq['from'])
stream.self_jid = iq['to']
stream.peer_jid = iq['from']
stream.stream_started.set()
self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream)
self.xmpp.event('ibb_stream_start', stream)
self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream)
with self._stream_lock:
if iq['id'] in self._pending_streams:
del self._pending_streams[iq['id']]
def _handle_open_request(self, iq): def _handle_open_request(self, iq):
sid = iq['ibb_open']['sid'] sid = iq['ibb_open']['sid']
@ -176,15 +151,14 @@ class XEP_0047(BasePlugin):
raise XMPPError(etype='modify', condition='bad-request') raise XMPPError(etype='modify', condition='bad-request')
if not self._accept_stream(iq): if not self._accept_stream(iq):
raise XMPPError(etype='modify', condition='not-acceptable') raise XMPPError(etype='cancel', condition='not-acceptable')
if size > self.max_block_size: if size > self.max_block_size:
raise XMPPError('resource-constraint') raise XMPPError('resource-constraint')
stream = IBBytestream(self.xmpp, sid, size, stream = IBBytestream(self.xmpp, sid, size,
iq['to'], iq['from'], iq['to'], iq['from'])
self.window_size) stream.stream_started = True
stream.stream_started.set()
self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream)
iq.reply().send() iq.reply().send()

View file

@ -24,7 +24,7 @@ class Open(ElementBase):
interfaces = set(('block_size', 'sid', 'stanza')) interfaces = set(('block_size', 'sid', 'stanza'))
def get_block_size(self): def get_block_size(self):
return int(self._get_attr('block-size')) return int(self._get_attr('block-size', '0'))
def set_block_size(self, value): def set_block_size(self, value):
self._set_attr('block-size', str(value)) self._set_attr('block-size', str(value))
@ -47,7 +47,10 @@ class Data(ElementBase):
self._set_attr('seq', str(value)) self._set_attr('seq', str(value))
def get_data(self): def get_data(self):
b64_data = self.xml.text.strip() text = self.xml.text
if not text:
raise XMPPError('not-acceptable', 'IBB data element is empty.')
b64_data = text.strip()
if VALID_B64.match(b64_data).group() == b64_data: if VALID_B64.match(b64_data).group() == b64_data:
return from_b64(b64_data) return from_b64(b64_data)
else: else:

View file

@ -1,7 +1,6 @@
import asyncio
import socket import socket
import threading
import logging import logging
from queue import Queue
from slixmpp.stanza import Iq from slixmpp.stanza import Iq
from slixmpp.exceptions import XMPPError from slixmpp.exceptions import XMPPError
@ -12,11 +11,10 @@ log = logging.getLogger(__name__)
class IBBytestream(object): class IBBytestream(object):
def __init__(self, xmpp, sid, block_size, jid, peer, window_size=1, use_messages=False): def __init__(self, xmpp, sid, block_size, jid, peer, use_messages=False):
self.xmpp = xmpp self.xmpp = xmpp
self.sid = sid self.sid = sid
self.block_size = block_size self.block_size = block_size
self.window_size = window_size
self.use_messages = use_messages self.use_messages = use_messages
if jid is None: if jid is None:
@ -27,29 +25,20 @@ class IBBytestream(object):
self.send_seq = -1 self.send_seq = -1
self.recv_seq = -1 self.recv_seq = -1
self._send_seq_lock = threading.Lock() self.stream_started = False
self._recv_seq_lock = threading.Lock() self.stream_in_closed = False
self.stream_out_closed = False
self.stream_started = threading.Event() self.recv_queue = asyncio.Queue()
self.stream_in_closed = threading.Event()
self.stream_out_closed = threading.Event()
self.recv_queue = Queue() @asyncio.coroutine
def send(self, data, timeout=None):
self.send_window = threading.BoundedSemaphore(value=self.window_size) if not self.stream_started or self.stream_out_closed:
self.window_ids = set()
self.window_empty = threading.Event()
self.window_empty.set()
def send(self, data):
if not self.stream_started.is_set() or \
self.stream_out_closed.is_set():
raise socket.error raise socket.error
data = data[0:self.block_size] if len(data) > self.block_size:
self.send_window.acquire() data = data[:self.block_size]
with self._send_seq_lock: self.send_seq = (self.send_seq + 1) % 65535
self.send_seq = (self.send_seq + 1) % 65535 seq = self.send_seq
seq = self.send_seq
if self.use_messages: if self.use_messages:
msg = self.xmpp.Message() msg = self.xmpp.Message()
msg['to'] = self.peer_jid msg['to'] = self.peer_jid
@ -59,7 +48,6 @@ class IBBytestream(object):
msg['ibb_data']['seq'] = seq msg['ibb_data']['seq'] = seq
msg['ibb_data']['data'] = data msg['ibb_data']['data'] = data
msg.send() msg.send()
self.send_window.release()
else: else:
iq = self.xmpp.Iq() iq = self.xmpp.Iq()
iq['type'] = 'set' iq['type'] = 'set'
@ -68,71 +56,65 @@ class IBBytestream(object):
iq['ibb_data']['sid'] = self.sid iq['ibb_data']['sid'] = self.sid
iq['ibb_data']['seq'] = seq iq['ibb_data']['seq'] = seq
iq['ibb_data']['data'] = data iq['ibb_data']['data'] = data
self.window_empty.clear() yield from iq.send(timeout=timeout)
self.window_ids.add(iq['id'])
iq.send(callback=self._recv_ack)
return len(data) return len(data)
def sendall(self, data): @asyncio.coroutine
def sendall(self, data, timeout=None):
sent_len = 0 sent_len = 0
while sent_len < len(data): while sent_len < len(data):
sent_len += self.send(data[sent_len:]) sent_len += yield from self.send(data[sent_len:self.block_size], timeout=timeout)
def _recv_ack(self, iq): @asyncio.coroutine
self.window_ids.remove(iq['id']) def sendfile(self, file, timeout=None):
if not self.window_ids: while True:
self.window_empty.set() data = file.read(self.block_size)
self.send_window.release() if not data:
if iq['type'] == 'error': break
self.close() yield from self.send(data, timeout=timeout)
def _recv_data(self, stanza): def _recv_data(self, stanza):
with self._recv_seq_lock: new_seq = stanza['ibb_data']['seq']
new_seq = stanza['ibb_data']['seq'] if new_seq != (self.recv_seq + 1) % 65535:
if new_seq != (self.recv_seq + 1) % 65535: self.close()
self.close() raise XMPPError('unexpected-request')
raise XMPPError('unexpected-request') self.recv_seq = new_seq
self.recv_seq = new_seq
data = stanza['ibb_data']['data'] data = stanza['ibb_data']['data']
if len(data) > self.block_size: if len(data) > self.block_size:
self.close() self.close()
raise XMPPError('not-acceptable') raise XMPPError('not-acceptable')
self.recv_queue.put(data) self.recv_queue.put_nowait(data)
self.xmpp.event('ibb_stream_data', {'stream': self, 'data': data}) self.xmpp.event('ibb_stream_data', self)
if isinstance(stanza, Iq): if isinstance(stanza, Iq):
stanza.reply().send() stanza.reply().send()
def recv(self, *args, **kwargs): def recv(self, *args, **kwargs):
return self.read(block=True) return self.read()
def read(self, block=True, timeout=None, **kwargs): def read(self):
if not self.stream_started.is_set() or \ if not self.stream_started or self.stream_in_closed:
self.stream_in_closed.is_set():
raise socket.error raise socket.error
if timeout is not None: return self.recv_queue.get_nowait()
block = True
try:
return self.recv_queue.get(block, timeout)
except:
return None
def close(self): def close(self, timeout=None):
iq = self.xmpp.Iq() iq = self.xmpp.Iq()
iq['type'] = 'set' iq['type'] = 'set'
iq['to'] = self.peer_jid iq['to'] = self.peer_jid
iq['from'] = self.self_jid iq['from'] = self.self_jid
iq['ibb_close']['sid'] = self.sid iq['ibb_close']['sid'] = self.sid
self.stream_out_closed.set() self.stream_out_closed = True
iq.send(block=False, def _close_stream(_):
callback=lambda x: self.stream_in_closed.set()) self.stream_in_closed = True
future = iq.send(timeout=timeout, callback=_close_stream)
self.xmpp.event('ibb_stream_end', self) self.xmpp.event('ibb_stream_end', self)
return future
def _closed(self, iq): def _closed(self, iq):
self.stream_in_closed.set() self.stream_in_closed = True
self.stream_out_closed.set() self.stream_out_closed = True
iq.reply().send() iq.reply().send()
self.xmpp.event('ibb_stream_end', self) self.xmpp.event('ibb_stream_end', self)

View file

@ -1,3 +1,4 @@
import asyncio
import threading import threading
import time import time
@ -78,6 +79,7 @@ class TestInBandByteStreams(SlixTest):
self.assertEqual(events, set(['ibb_stream_start', 'callback'])) self.assertEqual(events, set(['ibb_stream_start', 'callback']))
@asyncio.coroutine
def testSendData(self): def testSendData(self):
"""Test sending data over an in-band bytestream.""" """Test sending data over an in-band bytestream."""
@ -115,7 +117,7 @@ class TestInBandByteStreams(SlixTest):
# Test sending data out # Test sending data out
stream.send("Testing") yield from stream.send("Testing")
self.send(""" self.send("""
<iq type="set" id="2" <iq type="set" id="2"