XEP-0047: API changes

and fix unit tests broken for years.
This commit is contained in:
mathieui 2021-02-14 12:11:58 +01:00
parent 13de36baa1
commit d17967f58e
3 changed files with 106 additions and 32 deletions

View file

@ -8,11 +8,76 @@ XEP-0047: In-band Bytestreams
:members: :members:
:exclude-members: session_bind, plugin_init, plugin_end :exclude-members: session_bind, plugin_init, plugin_end
.. module:: slixmpp.plugins.xep_0047
.. autoclass:: IBBytestream .. autoclass:: IBBytestream
:members: :members:
Internal API methods
--------------------
The API here is used to manage streams and authorize. The default handlers
work with the config parameters.
.. glossary::
authorized_sid (0047 version)
- **jid**: :class:`~.JID` receiving the stream initiation.
- **node**: stream id
- **ifrom**: who the stream is from.
- **args**: :class:`~.Iq` of the stream request.
- **returns**: ``True`` if the stream should be accepted,
``False`` otherwise.
Check if the stream should be accepted. Uses
the information setup by :term:`preauthorize_sid (0047 version)`
by default.
authorized (0047 version)
- **jid**: :class:`~.JID` receiving the stream initiation.
- **node**: stream id
- **ifrom**: who the stream is from.
- **args**: :class:`~.Iq` of the stream request.
- **returns**: ``True`` if the stream should be accepted,
``False`` otherwise.
A fallback handler (run after :term:`authorized_sid (0047 version)`)
to check if a stream should be accepted. Uses the ``auto_accept``
parameter by default.
preauthorize_sid (0047 version)
- **jid**: :class:`~.JID` receiving the stream initiation.
- **node**: stream id
- **ifrom**: who the stream will be from.
- **args**: Unused.
Register a stream id to be accepted automatically (called from
other plugins such as XEP-0095).
get_stream
- **jid**: :class:`~.JID` of local receiver.
- **node**: stream id
- **ifrom**: who the stream is from.
- **args**: unused
- **returns**: :class:`~.IBBytestream`
Return a currently opened stream between two JIDs.
set_stream
- **jid**: :class:`~.JID` of local receiver.
- **node**: stream id
- **ifrom**: who the stream is from.
- **args**: unused
Register an opened stream between two JIDs.
del_stream
- **jid**: :class:`~.JID` of local receiver.
- **node**: stream id
- **ifrom**: who the stream is from.
- **args**: unused
Delete a stream between two JIDs.
Stanza elements Stanza elements
--------------- ---------------

View file

@ -1,7 +1,6 @@
# Slixmpp: The Slick XMPP Library # Slixmpp: The Slick XMPP Library
# This file is part of Slixmpp # This file is part of Slixmpp
# See the file LICENSE for copying permission # See the file LICENSE for copying permission
import asyncio
import uuid import uuid
import logging import logging
@ -13,7 +12,7 @@ from typing import (
from slixmpp import JID from slixmpp import JID
from slixmpp.stanza import Message, Iq from slixmpp.stanza import Message, Iq
from slixmpp.exceptions import XMPPError from slixmpp.exceptions import XMPPError
from slixmpp.xmlstream.handler import Callback from slixmpp.xmlstream.handler import CoroutineCallback
from slixmpp.xmlstream.matcher import StanzaPath from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.xmlstream import register_stanza_plugin from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.plugins import BasePlugin from slixmpp.plugins import BasePlugin
@ -41,6 +40,12 @@ class XEP_0047(BasePlugin):
- ``auto_accept`` (default: ``False``): if incoming streams should be - ``auto_accept`` (default: ``False``): if incoming streams should be
accepted automatically. accepted automatically.
- :term:`authorized (0047 version)`
- :term:`authorized_sid (0047 version)`
- :term:`preauthorize_sid (0047 version)`
- :term:`get_stream`
- :term:`set_stream`
- :term:`del_stream`
""" """
name = 'xep_0047' name = 'xep_0047'
@ -62,22 +67,22 @@ class XEP_0047(BasePlugin):
register_stanza_plugin(Iq, Data) register_stanza_plugin(Iq, Data)
register_stanza_plugin(Message, Data) register_stanza_plugin(Message, Data)
self.xmpp.register_handler(Callback( self.xmpp.register_handler(CoroutineCallback(
'IBB Open', 'IBB Open',
StanzaPath('iq@type=set/ibb_open'), StanzaPath('iq@type=set/ibb_open'),
self._handle_open_request)) self._handle_open_request))
self.xmpp.register_handler(Callback( self.xmpp.register_handler(CoroutineCallback(
'IBB Close', 'IBB Close',
StanzaPath('iq@type=set/ibb_close'), StanzaPath('iq@type=set/ibb_close'),
self._handle_close)) self._handle_close))
self.xmpp.register_handler(Callback( self.xmpp.register_handler(CoroutineCallback(
'IBB Data', 'IBB Data',
StanzaPath('iq@type=set/ibb_data'), StanzaPath('iq@type=set/ibb_data'),
self._handle_data)) self._handle_data))
self.xmpp.register_handler(Callback( self.xmpp.register_handler(CoroutineCallback(
'IBB Message Data', 'IBB Message Data',
StanzaPath('message/ibb_data'), StanzaPath('message/ibb_data'),
self._handle_data)) self._handle_data))
@ -109,14 +114,14 @@ class XEP_0047(BasePlugin):
if (jid, sid, peer_jid) in self._streams: if (jid, sid, peer_jid) in self._streams:
del self._streams[(jid, sid, peer_jid)] del self._streams[(jid, sid, peer_jid)]
def _accept_stream(self, iq): async def _accept_stream(self, iq):
receiver = iq['to'] receiver = iq['to']
sender = iq['from'] sender = iq['from']
sid = iq['ibb_open']['sid'] sid = iq['ibb_open']['sid']
if self.api['authorized_sid'](receiver, sid, sender, iq): if await self.api['authorized_sid'](receiver, sid, sender, iq):
return True return True
return self.api['authorized'](receiver, sid, sender, iq) return await self.api['authorized'](receiver, sid, sender, iq)
def _authorized(self, jid, sid, ifrom, iq): def _authorized(self, jid, sid, ifrom, iq):
if self.auto_accept: if self.auto_accept:
@ -169,14 +174,14 @@ class XEP_0047(BasePlugin):
stream.self_jid = result['to'] stream.self_jid = result['to']
stream.peer_jid = result['from'] stream.peer_jid = result['from']
stream.stream_started = True stream.stream_started = True
self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) await self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream)
if callback is not None: if callback is not None:
self.xmpp.add_event_handler('ibb_stream_start', callback, disposable=True) self.xmpp.add_event_handler('ibb_stream_start', callback, disposable=True)
self.xmpp.event('ibb_stream_start', stream) self.xmpp.event('ibb_stream_start', stream)
self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream) self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream)
return stream return stream
def _handle_open_request(self, iq: Iq): async def _handle_open_request(self, iq: Iq):
sid = iq['ibb_open']['sid'] sid = iq['ibb_open']['sid']
size = iq['ibb_open']['block_size'] or self.block_size size = iq['ibb_open']['block_size'] or self.block_size
@ -185,7 +190,7 @@ class XEP_0047(BasePlugin):
if not sid: if not sid:
raise XMPPError(etype='modify', condition='bad-request') raise XMPPError(etype='modify', condition='bad-request')
if not self._accept_stream(iq): if not await self._accept_stream(iq):
raise XMPPError(etype='cancel', condition='not-acceptable') raise XMPPError(etype='cancel', condition='not-acceptable')
if size > self.max_block_size: if size > self.max_block_size:
@ -194,25 +199,25 @@ class XEP_0047(BasePlugin):
stream = IBBytestream(self.xmpp, sid, size, stream = IBBytestream(self.xmpp, sid, size,
iq['to'], iq['from']) iq['to'], iq['from'])
stream.stream_started = True stream.stream_started = True
self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) await self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream)
iq.reply().send() iq.reply().send()
self.xmpp.event('ibb_stream_start', stream) self.xmpp.event('ibb_stream_start', stream)
self.xmpp.event('stream:%s:%s' % (sid, stream.peer_jid), stream) self.xmpp.event('stream:%s:%s' % (sid, stream.peer_jid), stream)
def _handle_data(self, stanza: Union[Iq, Message]): async def _handle_data(self, stanza: Union[Iq, Message]):
sid = stanza['ibb_data']['sid'] sid = stanza['ibb_data']['sid']
stream = self.api['get_stream'](stanza['to'], sid, stanza['from']) stream = await self.api['get_stream'](stanza['to'], sid, stanza['from'])
if stream is not None and stanza['from'] == stream.peer_jid: if stream is not None and stanza['from'] == stream.peer_jid:
stream._recv_data(stanza) stream._recv_data(stanza)
else: else:
raise XMPPError('item-not-found') raise XMPPError('item-not-found')
def _handle_close(self, iq: Iq): async def _handle_close(self, iq: Iq):
sid = iq['ibb_close']['sid'] sid = iq['ibb_close']['sid']
stream = self.api['get_stream'](iq['to'], sid, iq['from']) stream = await self.api['get_stream'](iq['to'], sid, iq['from'])
if stream is not None and iq['from'] == stream.peer_jid: if stream is not None and iq['from'] == stream.peer_jid:
stream._closed(iq) stream._closed(iq)
self.api['del_stream'](stream.self_jid, stream.sid, stream.peer_jid) await self.api['del_stream'](stream.self_jid, stream.sid, stream.peer_jid)
else: else:
raise XMPPError('item-not-found') raise XMPPError('item-not-found')

View file

@ -14,7 +14,7 @@ class TestInBandByteStreams(SlixTest):
def tearDown(self): def tearDown(self):
self.stream_close() self.stream_close()
async def testOpenStream(self): def testOpenStream(self):
"""Test requesting a stream, successfully""" """Test requesting a stream, successfully"""
events = [] events = []
@ -25,8 +25,9 @@ class TestInBandByteStreams(SlixTest):
self.xmpp.add_event_handler('ibb_stream_start', on_stream_start) self.xmpp.add_event_handler('ibb_stream_start', on_stream_start)
await self.xmpp['xep_0047'].open_stream('tester@localhost/receiver', self.xmpp.wrap(self.xmpp['xep_0047'].open_stream('tester@localhost/receiver',
sid='testing') sid='testing'))
self.wait_()
self.send(""" self.send("""
<iq type="set" to="tester@localhost/receiver" id="1"> <iq type="set" to="tester@localhost/receiver" id="1">
@ -45,7 +46,7 @@ class TestInBandByteStreams(SlixTest):
self.assertEqual(events, ['ibb_stream_start']) self.assertEqual(events, ['ibb_stream_start'])
async def testAysncOpenStream(self): def testAysncOpenStream(self):
"""Test requesting a stream, aysnc""" """Test requesting a stream, aysnc"""
events = set() events = set()
@ -58,9 +59,10 @@ class TestInBandByteStreams(SlixTest):
self.xmpp.add_event_handler('ibb_stream_start', on_stream_start) self.xmpp.add_event_handler('ibb_stream_start', on_stream_start)
await self.xmpp['xep_0047'].open_stream('tester@localhost/receiver', self.xmpp.wrap(self.xmpp['xep_0047'].open_stream('tester@localhost/receiver',
sid='testing', sid='testing',
callback=stream_callback) callback=stream_callback))
self.wait_()
self.send(""" self.send("""
<iq type="set" to="tester@localhost/receiver" id="1"> <iq type="set" to="tester@localhost/receiver" id="1">
@ -79,7 +81,7 @@ class TestInBandByteStreams(SlixTest):
self.assertEqual(events, {'ibb_stream_start', 'callback'}) self.assertEqual(events, {'ibb_stream_start', 'callback'})
async def testSendData(self): def testSendData(self):
"""Test sending data over an in-band bytestream.""" """Test sending data over an in-band bytestream."""
streams = [] streams = []
@ -89,13 +91,14 @@ class TestInBandByteStreams(SlixTest):
streams.append(stream) streams.append(stream)
def on_stream_data(d): def on_stream_data(d):
data.append(d['data']) data.append(d.read())
self.xmpp.add_event_handler('ibb_stream_start', on_stream_start) self.xmpp.add_event_handler('ibb_stream_start', on_stream_start)
self.xmpp.add_event_handler('ibb_stream_data', on_stream_data) self.xmpp.add_event_handler('ibb_stream_data', on_stream_data)
self.xmpp['xep_0047'].open_stream('tester@localhost/receiver', self.xmpp.wrap(self.xmpp['xep_0047'].open_stream('tester@localhost/receiver',
sid='testing') sid='testing'))
self.wait_()
self.send(""" self.send("""
<iq type="set" to="tester@localhost/receiver" id="1"> <iq type="set" to="tester@localhost/receiver" id="1">
@ -116,7 +119,8 @@ class TestInBandByteStreams(SlixTest):
# Test sending data out # Test sending data out
await stream.send("Testing") self.xmpp.wrap(stream.send("Testing"))
self.wait_()
self.send(""" self.send("""
<iq type="set" id="2" <iq type="set" id="2"