XEP-0047: Better typing, docs, coroutine.

- Add a gather() shortcut to buffer all data received in a stream
- Fix a bug in sendall that happens if the data is above the block
  size.
This commit is contained in:
mathieui 2021-02-13 16:13:30 +01:00
parent ace82c9bc4
commit 02e0afbf0f
4 changed files with 143 additions and 37 deletions

View file

@ -8,6 +8,10 @@ XEP-0047: In-band Bytestreams
:members: :members:
:exclude-members: session_bind, plugin_init, plugin_end :exclude-members: session_bind, plugin_init, plugin_end
.. module:: slixmpp.plugins.xep_0047
.. autoclass:: IBBytestream
:members:
Stanza elements Stanza elements
--------------- ---------------

View file

@ -1,8 +1,17 @@
# Slixmpp: The Slick XMPP Library
# This file is part of Slixmpp
# See the file LICENSE for copying permission
import asyncio import asyncio
import uuid import uuid
import logging import logging
from slixmpp import Message, Iq from typing import (
Optional,
Union,
)
from slixmpp import JID
from slixmpp.stanza import Message, Iq
from slixmpp.exceptions import XMPPError from slixmpp.exceptions import XMPPError
from slixmpp.xmlstream.handler import Callback from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath from slixmpp.xmlstream.matcher import StanzaPath
@ -15,9 +24,27 @@ log = logging.getLogger(__name__)
class XEP_0047(BasePlugin): class XEP_0047(BasePlugin):
"""
XEP-0047: In-Band Bytestreams
Events registered by this plugin:
- :term:`ibb_stream_start`
- :term:`ibb_stream_end`
- :term:`ibb_stream_data`
- :term:`stream:[stream id]:[peer jid]`
Plugin Parameters:
- ``block_size`` (default: ``4096``): default block size to negociate
- ``max_block_size`` (default: ``8192``): max block size to accept
- ``auto_accept`` (default: ``False``): if incoming streams should be
accepted automatically.
"""
name = 'xep_0047' name = 'xep_0047'
description = 'XEP-0047: In-band Bytestreams' description = 'XEP-0047: In-Band Bytestreams'
dependencies = {'xep_0030'} dependencies = {'xep_0030'}
stanza = stanza stanza = stanza
default_config = { default_config = {
@ -105,17 +132,29 @@ class XEP_0047(BasePlugin):
def _preauthorize_sid(self, jid, sid, ifrom, data): def _preauthorize_sid(self, jid, sid, ifrom, data):
self._preauthed_sids[(jid, sid, ifrom)] = True self._preauthed_sids[(jid, sid, ifrom)] = True
def open_stream(self, jid, block_size=None, sid=None, use_messages=False, async def open_stream(self, jid: JID, *, block_size: Optional[int] = None,
ifrom=None, timeout=None, callback=None): sid: Optional[str] = None, use_messages: bool = False,
ifrom: Optional[JID] = None,
**iqkwargs) -> IBBytestream:
"""Open an IBB stream with a peer JID.
.. versionchanged:: 1.8.0
This function is now a coroutine and must be awaited.
All parameters except ``jid`` are keyword-args only.
:param jid: The remote JID to initiate the stream with.
:param block_size: The block size to advertise.
:param sid: The IBB stream id (if not provided, will be auto-generated).
:param use_messages: If the stream should use message stanzas instead of iqs.
:returns: The opened byte stream with the remote JID
:raises .IqError: When the remote entity denied the stream.
"""
if sid is None: if sid is None:
sid = str(uuid.uuid4()) sid = str(uuid.uuid4())
if block_size is None: if block_size is None:
block_size = self.block_size block_size = self.block_size
iq = self.xmpp.Iq() iq = self.xmpp.make_iq_set(ito=jid, ifrom=ifrom)
iq['type'] = 'set'
iq['to'] = jid
iq['from'] = ifrom
iq['ibb_open']['block_size'] = block_size iq['ibb_open']['block_size'] = block_size
iq['ibb_open']['sid'] = sid iq['ibb_open']['sid'] = sid
iq['ibb_open']['stanza'] = 'message' if use_messages else 'iq' iq['ibb_open']['stanza'] = 'message' if use_messages else 'iq'
@ -123,25 +162,21 @@ class XEP_0047(BasePlugin):
stream = IBBytestream(self.xmpp, sid, block_size, stream = IBBytestream(self.xmpp, sid, block_size,
iq['from'], iq['to'], use_messages) iq['from'], iq['to'], use_messages)
stream_future = asyncio.Future() callback = iqkwargs.pop('callback', None)
result = await iq.send(**iqkwargs)
def _handle_opened_stream(iq): log.debug('IBB stream (%s) accepted by %s', stream.sid, result['from'])
log.debug('IBB stream (%s) accepted by %s', stream.sid, iq['from']) stream.self_jid = result['to']
stream.self_jid = iq['to'] stream.peer_jid = result['from']
stream.peer_jid = iq['from'] stream.stream_started = True
stream.stream_started = True self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream)
self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) if callback is not None:
stream_future.set_result(stream) self.xmpp.add_event_handler('ibb_stream_start', callback, disposable=True)
if callback is not None: self.xmpp.event('ibb_stream_start', stream)
callback(stream) self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream)
self.xmpp.event('ibb_stream_start', stream) return stream
self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream)
iq.send(timeout=timeout, callback=_handle_opened_stream) def _handle_open_request(self, iq: Iq):
return stream_future
def _handle_open_request(self, iq):
sid = iq['ibb_open']['sid'] sid = iq['ibb_open']['sid']
size = iq['ibb_open']['block_size'] or self.block_size size = iq['ibb_open']['block_size'] or self.block_size
@ -165,7 +200,7 @@ class XEP_0047(BasePlugin):
self.xmpp.event('ibb_stream_start', stream) self.xmpp.event('ibb_stream_start', stream)
self.xmpp.event('stream:%s:%s' % (sid, stream.peer_jid), stream) self.xmpp.event('stream:%s:%s' % (sid, stream.peer_jid), stream)
def _handle_data(self, stanza): def _handle_data(self, stanza: Union[Iq, Message]):
sid = stanza['ibb_data']['sid'] sid = stanza['ibb_data']['sid']
stream = self.api['get_stream'](stanza['to'], sid, stanza['from']) stream = self.api['get_stream'](stanza['to'], sid, stanza['from'])
if stream is not None and stanza['from'] == stream.peer_jid: if stream is not None and stanza['from'] == stream.peer_jid:
@ -173,7 +208,7 @@ class XEP_0047(BasePlugin):
else: else:
raise XMPPError('item-not-found') raise XMPPError('item-not-found')
def _handle_close(self, iq): def _handle_close(self, iq: Iq):
sid = iq['ibb_close']['sid'] sid = iq['ibb_close']['sid']
stream = self.api['get_stream'](iq['to'], sid, iq['from']) stream = self.api['get_stream'](iq['to'], sid, iq['from'])
if stream is not None and iq['from'] == stream.peer_jid: if stream is not None and iq['from'] == stream.peer_jid:

View file

@ -1,3 +1,6 @@
# Slixmpp: The Slick XMPP Library
# This file is part of Slixmpp
# See the file LICENSE for copying permission
import re import re
import base64 import base64

View file

@ -1,17 +1,32 @@
# Slixmpp: The Slick XMPP Library
# This file is part of Slixmpp
# See the file LICENSE for copying permission
import asyncio import asyncio
import socket import socket
import logging import logging
from slixmpp.stanza import Iq from typing import (
from slixmpp.exceptions import XMPPError Optional,
IO,
Union,
)
from slixmpp import JID
from slixmpp.stanza import Iq, Message
from slixmpp.exceptions import XMPPError, IqTimeout
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class IBBytestream(object): class IBBytestream(object):
"""XEP-0047 Stream abstraction. Created by the ibb plugin automatically.
def __init__(self, xmpp, sid, block_size, jid, peer, use_messages=False): Provides send methods and triggers :term:`ibb_stream_data` events.
"""
def __init__(self, xmpp, sid: str, block_size: int, jid: JID, peer: JID,
use_messages: bool = False):
self.xmpp = xmpp self.xmpp = xmpp
self.sid = sid self.sid = sid
self.block_size = block_size self.block_size = block_size
@ -31,7 +46,12 @@ class IBBytestream(object):
self.recv_queue = asyncio.Queue() self.recv_queue = asyncio.Queue()
async def send(self, data, timeout=None): async def send(self, data: bytes, timeout: Optional[int] = None) -> int:
"""Send a single block of data.
:param data: Data to send (will be truncated if above block size).
:returns: Number of bytes sent.
"""
if not self.stream_started or self.stream_out_closed: if not self.stream_started or self.stream_out_closed:
raise socket.error raise socket.error
if len(data) > self.block_size: if len(data) > self.block_size:
@ -58,19 +78,62 @@ class IBBytestream(object):
await iq.send(timeout=timeout) await iq.send(timeout=timeout)
return len(data) return len(data)
async def sendall(self, data, timeout=None): async def sendall(self, data: bytes, timeout: Optional[int] = None):
"""Send all the contents of ``data`` in chunks.
:param data: Raw data to send.
"""
sent_len = 0 sent_len = 0
while sent_len < len(data): while sent_len < len(data):
sent_len += await self.send(data[sent_len:self.block_size], timeout=timeout) sent_len += await self.send(data[sent_len:sent_len+self.block_size], timeout=timeout)
async def sendfile(self, file, timeout=None): async def gather(self, max_data: Optional[int] = None, timeout: int = 3600) -> bytes:
"""Gather all data sent on a stream until it is closed, and return it.
.. versionadded:: 1.8.0
:param max_data: Max number of bytes to receive. (received data may be
over this limit depending on block_size)
:param timeout: Timeout after which an error will be raised.
:raises .IqTimeout: If the timeout is reached.
:returns: All bytes accumulated in the stream.
"""
result = b''
end_future = asyncio.Future()
def on_close(stream):
if stream is self:
end_future.set_result(True)
def on_data(stream):
nonlocal result
if stream is self:
result += stream.read()
if max_data and len(result) > max_data:
end_future.set_result(True)
self.xmpp.add_event_handler('ibb_stream_end', on_close)
self.xmpp.add_event_handler('ibb_stream_data', on_data)
try:
await asyncio.wait_for(end_future, timeout, loop=self.xmpp.loop)
except asyncio.TimeoutError:
raise IqTimeout(result)
finally:
self.xmpp.del_event_handler('ibb_stream_end', on_close)
self.xmpp.del_event_handler('ibb_stream_data', on_data)
return result
async def sendfile(self, file: IO[bytes], timeout: Optional[int] = None):
"""Send the contents of a file over the wire, in chunks.
:param file: The opened file (or file-like) object, in bytes mode."""
while True: while True:
data = file.read(self.block_size) data = file.read(self.block_size)
if not data: if not data:
break break
await self.send(data, timeout=timeout) await self.send(data, timeout=timeout)
def _recv_data(self, stanza): def _recv_data(self, stanza: Union[Message, Iq]):
new_seq = stanza['ibb_data']['seq'] new_seq = stanza['ibb_data']['seq']
if new_seq != (self.recv_seq + 1) % 65536: if new_seq != (self.recv_seq + 1) % 65536:
self.close() self.close()
@ -96,7 +159,8 @@ class IBBytestream(object):
raise socket.error raise socket.error
return self.recv_queue.get_nowait() return self.recv_queue.get_nowait()
def close(self, timeout=None): def close(self, timeout: Optional[int] = None) -> asyncio.Future:
"""Close the stream."""
iq = self.xmpp.Iq() iq = self.xmpp.Iq()
iq['type'] = 'set' iq['type'] = 'set'
iq['to'] = self.peer_jid iq['to'] = self.peer_jid
@ -109,7 +173,7 @@ class IBBytestream(object):
self.xmpp.event('ibb_stream_end', self) self.xmpp.event('ibb_stream_end', self)
return future return future
def _closed(self, iq): def _closed(self, iq: Iq):
self.stream_in_closed = True self.stream_in_closed = True
self.stream_out_closed = True self.stream_out_closed = True
iq.reply().send() iq.reply().send()