diff --git a/docs/api/plugins/xep_0047.rst b/docs/api/plugins/xep_0047.rst index 4efded9b..c8aea741 100644 --- a/docs/api/plugins/xep_0047.rst +++ b/docs/api/plugins/xep_0047.rst @@ -8,6 +8,10 @@ XEP-0047: In-band Bytestreams :members: :exclude-members: session_bind, plugin_init, plugin_end +.. module:: slixmpp.plugins.xep_0047 + +.. autoclass:: IBBytestream + :members: Stanza elements --------------- diff --git a/docs/api/plugins/xep_0363.rst b/docs/api/plugins/xep_0363.rst index ebbfdba1..4bbf95fa 100644 --- a/docs/api/plugins/xep_0363.rst +++ b/docs/api/plugins/xep_0363.rst @@ -8,6 +8,12 @@ XEP-0363: HTTP File Upload :members: :exclude-members: session_bind, plugin_init, plugin_end +.. autoclass:: UploadServiceNotFound + +.. autoclass:: FileTooBig + +.. autoclass:: HTTPError + Stanza elements --------------- diff --git a/itests/test_bob.py b/itests/test_bob.py new file mode 100644 index 00000000..d0827df0 --- /dev/null +++ b/itests/test_bob.py @@ -0,0 +1,35 @@ +import asyncio +import unittest +from slixmpp.test.integration import SlixIntegration + + +class TestBOB(SlixIntegration): + async def asyncSetUp(self): + await super().asyncSetUp() + self.add_client( + self.envjid('CI_ACCOUNT1'), + self.envstr('CI_ACCOUNT1_PASSWORD'), + ) + self.add_client( + self.envjid('CI_ACCOUNT2'), + self.envstr('CI_ACCOUNT2_PASSWORD'), + ) + self.register_plugins(['xep_0231']) + self.data = b'to' * 257 + await self.connect_clients() + + async def test_bob(self): + """Check we can send and receive a BOB.""" + cid = self.clients[0]['xep_0231'].set_bob( + self.data, + 'image/jpeg', + ) + recv = await self.clients[1]['xep_0231'].get_bob( + jid=self.clients[0].boundjid, + cid=cid, + ) + + self.assertEqual(self.data, recv['bob']['data']) + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestBOB) diff --git a/itests/test_httpupload.py b/itests/test_httpupload.py new file mode 100644 index 00000000..09e85c1d --- /dev/null +++ b/itests/test_httpupload.py @@ -0,0 +1,37 @@ +try: + import aiohttp +except ImportError: + aiohttp = None +import unittest +from io import BytesIO +from slixmpp.test.integration import SlixIntegration + + +class TestHTTPUpload(SlixIntegration): + async def asyncSetUp(self): + await super().asyncSetUp() + self.add_client( + self.envjid('CI_ACCOUNT1'), + self.envstr('CI_ACCOUNT1_PASSWORD'), + ) + self.register_plugins(['xep_0363']) + # Minimal data, we do not want to clutter the remote server + self.data = b'tototo' + await self.connect_clients() + + + @unittest.skipIf(aiohttp is None, "aiohttp is not installed") + async def test_httpupload(self): + """Check we can upload a file properly.""" + url = await self.clients[0]['xep_0363'].upload_file( + 'toto.txt', + input_file=BytesIO(self.data), + size=len(self.data), + ) + async with aiohttp.ClientSession() as session: + async with session.get(url) as resp: + text = await resp.text() + self.assertEqual(text.encode('utf-8'), self.data) + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestHTTPUpload) diff --git a/itests/test_ibb.py b/itests/test_ibb.py new file mode 100644 index 00000000..7cda8e22 --- /dev/null +++ b/itests/test_ibb.py @@ -0,0 +1,40 @@ +import asyncio +import unittest +from slixmpp.test.integration import SlixIntegration + + +class TestIBB(SlixIntegration): + async def asyncSetUp(self): + await super().asyncSetUp() + self.add_client( + self.envjid('CI_ACCOUNT1'), + self.envstr('CI_ACCOUNT1_PASSWORD'), + ) + self.add_client( + self.envjid('CI_ACCOUNT2'), + self.envstr('CI_ACCOUNT2_PASSWORD'), + ) + config = {'block_size': 256, 'auto_accept': True} + self.register_plugins(['xep_0047'], [config]) + self.data = b'to' * 257 + await self.connect_clients() + + async def test_ibb(self): + """Check we can send and receive data through ibb""" + coro_in = self.clients[1].wait_until('ibb_stream_start') + coro_out = self.clients[0]['xep_0047'].open_stream( + self.clients[1].boundjid, + sid='toto' + ) + instream, outstream = await asyncio.gather(coro_in, coro_out) + + async def send_and_close(): + await outstream.sendall(self.data) + await outstream.close() + + in_data, _ = await asyncio.gather(instream.gather(), send_and_close()) + + self.assertEqual(self.data, in_data) + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestIBB) diff --git a/itests/test_pep.py b/itests/test_pep.py new file mode 100644 index 00000000..a674a348 --- /dev/null +++ b/itests/test_pep.py @@ -0,0 +1,64 @@ +import asyncio +import unittest +from uuid import uuid4 +from slixmpp.exceptions import IqError +from slixmpp.test.integration import SlixIntegration +from slixmpp.xmlstream import ElementBase, register_stanza_plugin +from slixmpp.plugins.xep_0060.stanza import Item + +class Mystanza(ElementBase): + namespace = 'random-ns' + name = 'mystanza' + plugin_attrib = 'mystanza' + interfaces = {'test'} + +register_stanza_plugin(Item, Mystanza) + +class TestPEP(SlixIntegration): + async def asyncSetUp(self): + await super().asyncSetUp() + self.add_client( + self.envjid('CI_ACCOUNT1'), + self.envstr('CI_ACCOUNT1_PASSWORD'), + ) + self.add_client( + self.envjid('CI_ACCOUNT2'), + self.envstr('CI_ACCOUNT2_PASSWORD'), + ) + self.register_plugins(['xep_0222', 'xep_0223']) + for client in self.clients: + client.auto_authorize = True + await self.connect_clients() + + async def test_pep_public(self): + """Check we can get and set public PEP data""" + stanza = Mystanza() + stanza['test'] = str(uuid4().hex) + await self.clients[0]['xep_0222'].store(stanza, id='toto') + fetched = await self.clients[0]['xep_0222'].retrieve( + stanza.namespace, + ) + fetched_stanza = fetched['pubsub']['items']['item']['mystanza'] + self.assertEqual(fetched_stanza['test'], stanza['test']) + + async def test_pep_private(self): + """Check we can get and set private PEP data""" + stanza = Mystanza() + stanza['test'] = str(uuid4().hex) + await self.clients[0]['xep_0223'].store( + stanza, node='private-random', id='toto' + ) + fetched = await self.clients[0]['xep_0223'].retrieve( + 'private-random', + ) + fetched_stanza = fetched['pubsub']['items']['item']['mystanza'] + self.assertEqual(fetched_stanza['test'], stanza['test']) + + with self.assertRaises(IqError): + fetched = await self.clients[1]['xep_0060'].get_item( + jid=self.clients[0].boundjid.bare, + node='private-random', + item_id='toto', + ) + +suite = unittest.TestLoader().loadTestsFromTestCase(TestPEP) diff --git a/slixmpp/plugins/xep_0047/ibb.py b/slixmpp/plugins/xep_0047/ibb.py index bd96eca2..ec08a8b3 100644 --- a/slixmpp/plugins/xep_0047/ibb.py +++ b/slixmpp/plugins/xep_0047/ibb.py @@ -1,8 +1,17 @@ +# Slixmpp: The Slick XMPP Library +# This file is part of Slixmpp +# See the file LICENSE for copying permission import asyncio import uuid import logging -from slixmpp import Message, Iq +from typing import ( + Optional, + Union, +) + +from slixmpp import JID +from slixmpp.stanza import Message, Iq from slixmpp.exceptions import XMPPError from slixmpp.xmlstream.handler import Callback from slixmpp.xmlstream.matcher import StanzaPath @@ -15,9 +24,27 @@ log = logging.getLogger(__name__) class XEP_0047(BasePlugin): + """ + XEP-0047: In-Band Bytestreams + + Events registered by this plugin: + + - :term:`ibb_stream_start` + - :term:`ibb_stream_end` + - :term:`ibb_stream_data` + - :term:`stream:[stream id]:[peer jid]` + + Plugin Parameters: + + - ``block_size`` (default: ``4096``): default block size to negociate + - ``max_block_size`` (default: ``8192``): max block size to accept + - ``auto_accept`` (default: ``False``): if incoming streams should be + accepted automatically. + + """ name = 'xep_0047' - description = 'XEP-0047: In-band Bytestreams' + description = 'XEP-0047: In-Band Bytestreams' dependencies = {'xep_0030'} stanza = stanza default_config = { @@ -105,17 +132,29 @@ class XEP_0047(BasePlugin): def _preauthorize_sid(self, jid, sid, ifrom, data): self._preauthed_sids[(jid, sid, ifrom)] = True - def open_stream(self, jid, block_size=None, sid=None, use_messages=False, - ifrom=None, timeout=None, callback=None): + async def open_stream(self, jid: JID, *, block_size: Optional[int] = None, + sid: Optional[str] = None, use_messages: bool = False, + ifrom: Optional[JID] = None, + **iqkwargs) -> IBBytestream: + """Open an IBB stream with a peer JID. + + .. versionchanged:: 1.8.0 + This function is now a coroutine and must be awaited. + All parameters except ``jid`` are keyword-args only. + + :param jid: The remote JID to initiate the stream with. + :param block_size: The block size to advertise. + :param sid: The IBB stream id (if not provided, will be auto-generated). + :param use_messages: If the stream should use message stanzas instead of iqs. + :returns: The opened byte stream with the remote JID + :raises .IqError: When the remote entity denied the stream. + """ if sid is None: sid = str(uuid.uuid4()) if block_size is None: block_size = self.block_size - iq = self.xmpp.Iq() - iq['type'] = 'set' - iq['to'] = jid - iq['from'] = ifrom + iq = self.xmpp.make_iq_set(ito=jid, ifrom=ifrom) iq['ibb_open']['block_size'] = block_size iq['ibb_open']['sid'] = sid iq['ibb_open']['stanza'] = 'message' if use_messages else 'iq' @@ -123,25 +162,21 @@ class XEP_0047(BasePlugin): stream = IBBytestream(self.xmpp, sid, block_size, iq['from'], iq['to'], use_messages) - stream_future = asyncio.Future() + callback = iqkwargs.pop('callback', None) + result = await iq.send(**iqkwargs) - def _handle_opened_stream(iq): - log.debug('IBB stream (%s) accepted by %s', stream.sid, iq['from']) - stream.self_jid = iq['to'] - stream.peer_jid = iq['from'] - stream.stream_started = True - self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) - stream_future.set_result(stream) - if callback is not None: - callback(stream) - self.xmpp.event('ibb_stream_start', stream) - self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream) + log.debug('IBB stream (%s) accepted by %s', stream.sid, result['from']) + stream.self_jid = result['to'] + stream.peer_jid = result['from'] + stream.stream_started = True + self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) + if callback is not None: + self.xmpp.add_event_handler('ibb_stream_start', callback, disposable=True) + self.xmpp.event('ibb_stream_start', stream) + self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream) + return stream - iq.send(timeout=timeout, callback=_handle_opened_stream) - - return stream_future - - def _handle_open_request(self, iq): + def _handle_open_request(self, iq: Iq): sid = iq['ibb_open']['sid'] size = iq['ibb_open']['block_size'] or self.block_size @@ -165,7 +200,7 @@ class XEP_0047(BasePlugin): self.xmpp.event('ibb_stream_start', stream) self.xmpp.event('stream:%s:%s' % (sid, stream.peer_jid), stream) - def _handle_data(self, stanza): + def _handle_data(self, stanza: Union[Iq, Message]): sid = stanza['ibb_data']['sid'] stream = self.api['get_stream'](stanza['to'], sid, stanza['from']) if stream is not None and stanza['from'] == stream.peer_jid: @@ -173,7 +208,7 @@ class XEP_0047(BasePlugin): else: raise XMPPError('item-not-found') - def _handle_close(self, iq): + def _handle_close(self, iq: Iq): sid = iq['ibb_close']['sid'] stream = self.api['get_stream'](iq['to'], sid, iq['from']) if stream is not None and iq['from'] == stream.peer_jid: diff --git a/slixmpp/plugins/xep_0047/stanza.py b/slixmpp/plugins/xep_0047/stanza.py index 5c47508a..ac2935ac 100644 --- a/slixmpp/plugins/xep_0047/stanza.py +++ b/slixmpp/plugins/xep_0047/stanza.py @@ -1,3 +1,6 @@ +# Slixmpp: The Slick XMPP Library +# This file is part of Slixmpp +# See the file LICENSE for copying permission import re import base64 diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py index 535ba82b..f020ea68 100644 --- a/slixmpp/plugins/xep_0047/stream.py +++ b/slixmpp/plugins/xep_0047/stream.py @@ -1,17 +1,32 @@ +# Slixmpp: The Slick XMPP Library +# This file is part of Slixmpp +# See the file LICENSE for copying permission import asyncio import socket import logging -from slixmpp.stanza import Iq -from slixmpp.exceptions import XMPPError +from typing import ( + Optional, + IO, + Union, +) + +from slixmpp import JID +from slixmpp.stanza import Iq, Message +from slixmpp.exceptions import XMPPError, IqTimeout log = logging.getLogger(__name__) class IBBytestream(object): + """XEP-0047 Stream abstraction. Created by the ibb plugin automatically. - def __init__(self, xmpp, sid, block_size, jid, peer, use_messages=False): + Provides send methods and triggers :term:`ibb_stream_data` events. + """ + + def __init__(self, xmpp, sid: str, block_size: int, jid: JID, peer: JID, + use_messages: bool = False): self.xmpp = xmpp self.sid = sid self.block_size = block_size @@ -31,7 +46,12 @@ class IBBytestream(object): self.recv_queue = asyncio.Queue() - async def send(self, data, timeout=None): + async def send(self, data: bytes, timeout: Optional[int] = None) -> int: + """Send a single block of data. + + :param data: Data to send (will be truncated if above block size). + :returns: Number of bytes sent. + """ if not self.stream_started or self.stream_out_closed: raise socket.error if len(data) > self.block_size: @@ -58,19 +78,62 @@ class IBBytestream(object): await iq.send(timeout=timeout) return len(data) - async def sendall(self, data, timeout=None): + async def sendall(self, data: bytes, timeout: Optional[int] = None): + """Send all the contents of ``data`` in chunks. + + :param data: Raw data to send. + """ sent_len = 0 while sent_len < len(data): - sent_len += await self.send(data[sent_len:self.block_size], timeout=timeout) + sent_len += await self.send(data[sent_len:sent_len+self.block_size], timeout=timeout) - async def sendfile(self, file, timeout=None): + async def gather(self, max_data: Optional[int] = None, timeout: int = 3600) -> bytes: + """Gather all data sent on a stream until it is closed, and return it. + + .. versionadded:: 1.8.0 + + :param max_data: Max number of bytes to receive. (received data may be + over this limit depending on block_size) + :param timeout: Timeout after which an error will be raised. + :raises .IqTimeout: If the timeout is reached. + :returns: All bytes accumulated in the stream. + """ + result = b'' + end_future = asyncio.Future() + + def on_close(stream): + if stream is self: + end_future.set_result(True) + + def on_data(stream): + nonlocal result + if stream is self: + result += stream.read() + if max_data and len(result) > max_data: + end_future.set_result(True) + + self.xmpp.add_event_handler('ibb_stream_end', on_close) + self.xmpp.add_event_handler('ibb_stream_data', on_data) + try: + await asyncio.wait_for(end_future, timeout, loop=self.xmpp.loop) + except asyncio.TimeoutError: + raise IqTimeout(result) + finally: + self.xmpp.del_event_handler('ibb_stream_end', on_close) + self.xmpp.del_event_handler('ibb_stream_data', on_data) + return result + + async def sendfile(self, file: IO[bytes], timeout: Optional[int] = None): + """Send the contents of a file over the wire, in chunks. + + :param file: The opened file (or file-like) object, in bytes mode.""" while True: data = file.read(self.block_size) if not data: break await self.send(data, timeout=timeout) - def _recv_data(self, stanza): + def _recv_data(self, stanza: Union[Message, Iq]): new_seq = stanza['ibb_data']['seq'] if new_seq != (self.recv_seq + 1) % 65536: self.close() @@ -96,7 +159,8 @@ class IBBytestream(object): raise socket.error return self.recv_queue.get_nowait() - def close(self, timeout=None): + def close(self, timeout: Optional[int] = None) -> asyncio.Future: + """Close the stream.""" iq = self.xmpp.Iq() iq['type'] = 'set' iq['to'] = self.peer_jid @@ -109,7 +173,7 @@ class IBBytestream(object): self.xmpp.event('ibb_stream_end', self) return future - def _closed(self, iq): + def _closed(self, iq: Iq): self.stream_in_closed = True self.stream_out_closed = True iq.reply().send() diff --git a/slixmpp/plugins/xep_0222.py b/slixmpp/plugins/xep_0222.py index 6b612e14..6a371046 100644 --- a/slixmpp/plugins/xep_0222.py +++ b/slixmpp/plugins/xep_0222.py @@ -5,6 +5,7 @@ # See the file LICENSE for copying permission. import logging +from asyncio import Future from typing import Optional, Callable, List from slixmpp import JID from slixmpp.xmlstream import register_stanza_plugin, ElementBase @@ -28,9 +29,11 @@ class XEP_0222(BasePlugin): profile = {'pubsub#persist_items': True, 'pubsub#send_last_published_item': 'never'} - def configure(self, node, ifrom=None, callback=None, timeout=None): + def configure(self, node: str, **iqkwargs) -> Future: """ Update a node's configuration to match the public storage profile. + + :param node: Node to set the configuration at. """ config = self.xmpp['xep_0004'].Form() config['type'] = 'submit' @@ -38,29 +41,26 @@ class XEP_0222(BasePlugin): for field, value in self.profile.items(): config.add_field(var=field, value=value) - return self.xmpp['xep_0060'].set_node_config(None, node, config, - ifrom=ifrom, - callback=callback, - timeout=timeout) + return self.xmpp['xep_0060'].set_node_config( + jid=None, node=node, config=config, **iqkwargs + ) def store(self, stanza: ElementBase, node: Optional[str] = None, - id: Optional[str] = None, ifrom: Optional[JID] = None, - options: Optional[Form] = None, - callback: Optional[Callable] = None, - timeout: Optional[int] = None): + id: Optional[str] = None, **pubsubkwargs) -> Future: """ Store public data via PEP. This is just a (very) thin wrapper around the XEP-0060 publish() method to set the defaults expected by PEP. - :param stanza: The private content to store. + :param stanza: The public content to store. :param node: The node to publish the content to. If not specified, the stanza's namespace will be used. :param id: Optionally specify the ID of the item. :param options: Publish options to use, which will be modified to fit the persistent storage option profile. """ + options = pubsubkwargs.pop('options', None) if not options: options = self.xmpp['xep_0004'].stanza.Form() options['type'] = 'submit' @@ -75,17 +75,12 @@ class XEP_0222(BasePlugin): options.add_field(var=field) options.get_fields()[field]['value'] = value - return self.xmpp['xep_0163'].publish(stanza, node, - options=options, - ifrom=ifrom, - callback=callback, - timeout=timeout) + pubsubkwargs['options'] = options + + return self.xmpp['xep_0163'].publish(stanza, node, id=id, **pubsubkwargs) def retrieve(self, node: str, id: Optional[str] = None, - item_ids: Optional[List[str]] = None, - ifrom: Optional[JID] = None, - callback: Optional[Callable] = None, - timeout: Optional[int] = None): + item_ids: Optional[List[str]] = None, **iqkwargs) -> Future: """ Retrieve public data via PEP. @@ -96,23 +91,17 @@ class XEP_0222(BasePlugin): :param id: Optionally specify the ID of the item. :param item_ids: Specify a group of IDs. If id is also specified, it will be included in item_ids. - :param ifrom: Specify the sender's JID. - :param timeout: The length of time (in seconds) to wait for a response - before exiting the send call if blocking is used. - Defaults to slixmpp.xmlstream.RESPONSE_TIMEOUT - :param callback: Optional reference to a stream handler function. Will - be executed when a reply stanza is received. """ if item_ids is None: item_ids = [] if id is not None: item_ids.append(id) - return self.xmpp['xep_0060'].get_items(None, node, - item_ids=item_ids, - ifrom=ifrom, - callback=callback, - timeout=timeout) + return self.xmpp['xep_0060'].get_items( + jid=None, node=node, + item_ids=item_ids, + **iqkwargs + ) register_plugin(XEP_0222) diff --git a/slixmpp/plugins/xep_0223.py b/slixmpp/plugins/xep_0223.py index 27437913..6ed39285 100644 --- a/slixmpp/plugins/xep_0223.py +++ b/slixmpp/plugins/xep_0223.py @@ -5,6 +5,7 @@ # See the file LICENSE for copying permission. import logging +from asyncio import Future from typing import Optional, Callable, List from slixmpp import JID from slixmpp.xmlstream import register_stanza_plugin, ElementBase @@ -28,28 +29,24 @@ class XEP_0223(BasePlugin): profile = {'pubsub#persist_items': True, 'pubsub#access_model': 'whitelist'} - def configure(self, node, ifrom=None, callback=None, timeout=None): + def configure(self, node: str, **iqkwargs) -> Future: """ - Update a node's configuration to match the public storage profile. + Update a node's configuration to match the private storage profile. + + :param node: Node to set the configuration at. """ - # TODO: that cannot possibly work, why is this here? config = self.xmpp['xep_0004'].Form() config['type'] = 'submit' for field, value in self.profile.items(): config.add_field(var=field, value=value) - return self.xmpp['xep_0060'].set_node_config(None, node, config, - ifrom=ifrom, - callback=callback, - timeout=timeout) + return self.xmpp['xep_0060'].set_node_config( + jid=None, node=node, config=config, **iqkwargs + ) def store(self, stanza: ElementBase, node: Optional[str] = None, - id: Optional[str] = None, ifrom: Optional[JID] = None, - options: Optional[Form] = None, - callback: Optional[Callable] = None, - timeout: Optional[int] = None, - timeout_callback: Optional[Callable] = None): + id: Optional[str] = None, **pubsubkwargs) -> Future: """ Store private data via PEP. @@ -63,6 +60,7 @@ class XEP_0223(BasePlugin): :param options: Publish options to use, which will be modified to fit the persistent storage option profile. """ + options = pubsubkwargs.pop('options', None) if not options: options = self.xmpp['xep_0004'].stanza.Form() options['type'] = 'submit' @@ -77,17 +75,11 @@ class XEP_0223(BasePlugin): options.add_field(var=field) options.get_fields()[field]['value'] = value - return self.xmpp['xep_0163'].publish(stanza, node, options=options, - ifrom=ifrom, callback=callback, - timeout=timeout, - timeout_callback=timeout_callback) + pubsubkwargs['options'] = options + return self.xmpp['xep_0163'].publish(stanza, node, id=id, **pubsubkwargs) def retrieve(self, node: str, id: Optional[str] = None, - item_ids: Optional[List[str]] = None, - ifrom: Optional[JID] = None, - callback: Optional[Callable] = None, - timeout: Optional[int] = None, - timeout_callback: Optional[Callable] = None): + item_ids: Optional[List[str]] = None, **iqkwargs) -> Future: """ Retrieve private data via PEP. @@ -98,22 +90,17 @@ class XEP_0223(BasePlugin): :param id: Optionally specify the ID of the item. :param item_ids: Specify a group of IDs. If id is also specified, it will be included in item_ids. - :param ifrom: Specify the sender's JID. - :param timeout: The length of time (in seconds) to wait for a response - before exiting the send call if blocking is used. - Defaults to slixmpp.xmlstream.RESPONSE_TIMEOUT - :param callback: Optional reference to a stream handler function. Will - be executed when a reply stanza is received. """ if item_ids is None: item_ids = [] if id is not None: item_ids.append(id) - return self.xmpp['xep_0060'].get_items(None, node, - item_ids=item_ids, ifrom=ifrom, - callback=callback, timeout=timeout, - timeout_callback=timeout_callback) + return self.xmpp['xep_0060'].get_items( + jid=None, node=node, + item_ids=item_ids, + **iqkwargs + ) register_plugin(XEP_0223) diff --git a/slixmpp/plugins/xep_0231/bob.py b/slixmpp/plugins/xep_0231/bob.py index b7e990b3..e554c38c 100644 --- a/slixmpp/plugins/xep_0231/bob.py +++ b/slixmpp/plugins/xep_0231/bob.py @@ -1,4 +1,3 @@ - # Slixmpp: The Slick XMPP Library # Copyright (C) 2012 Nathanael C. Fritz, # Emmanuel Gil Peyrot @@ -7,7 +6,10 @@ import logging import hashlib -from slixmpp import future_wrapper +from asyncio import Future +from typing import Optional + +from slixmpp import future_wrapper, JID from slixmpp.stanza import Iq, Message, Presence from slixmpp.exceptions import XMPPError from slixmpp.xmlstream.handler import Callback @@ -65,7 +67,20 @@ class XEP_0231(BasePlugin): def session_bind(self, jid): self.xmpp['xep_0030'].add_feature('urn:xmpp:bob') - def set_bob(self, data, mtype, cid=None, max_age=None): + def set_bob(self, data: bytes, mtype: str, cid: Optional[str] = None, + max_age: Optional[int] = None) -> str: + """Register a blob of binary data as a BOB. + + .. versionchanged:: 1.8.0 + If ``max_age`` is specified, the registered data will be destroyed + after that time. + + :param data: Data to register. + :param mtype: Mime Type of the data (e.g. ``image/jpeg``). + :param cid: Content-ID (will be auto-generated if left out). + :param max_age: Duration of content availability. + :returns: The cid value. + """ if cid is None: cid = 'sha1+%s@bob.xmpp.org' % hashlib.sha1(data).hexdigest() @@ -76,12 +91,24 @@ class XEP_0231(BasePlugin): bob['max_age'] = max_age self.api['set_bob'](args=bob) - + # Schedule destruction of the data + if max_age is not None and max_age > 0: + self.xmpp.loop.call_later(max_age, self.del_bob, cid) return cid @future_wrapper - def get_bob(self, jid=None, cid=None, cached=True, ifrom=None, - timeout=None, callback=None): + def get_bob(self, jid: Optional[JID] = None, cid: Optional[str] = None, + cached: bool = True, ifrom: Optional[JID] = None, + **iqkwargs) -> Future: + """Get a BOB. + + .. versionchanged:: 1.8.0 + Results not in cache do not raise an error when ``cached`` is True. + + :param jid: JID to fetch the BOB from. + :param cid: Content ID (actually required). + :param cached: To fetch the BOB from the local cache first (from CID only) + """ if cached: data = self.api['get_bob'](None, None, ifrom, args=cid) if data is not None: @@ -91,17 +118,14 @@ class XEP_0231(BasePlugin): return iq return data - iq = self.xmpp.Iq() - iq['to'] = jid - iq['from'] = ifrom - iq['type'] = 'get' + iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom) iq['bob']['cid'] = cid - return iq.send(timeout=timeout, callback=callback) + return iq.send(**iqkwargs) - def del_bob(self, cid): + def del_bob(self, cid: str): self.api['del_bob'](args=cid) - def _handle_bob_iq(self, iq): + def _handle_bob_iq(self, iq: Iq): cid = iq['bob']['cid'] if iq['type'] == 'result': @@ -131,7 +155,6 @@ class XEP_0231(BasePlugin): def _get_bob(self, jid, node, ifrom, cid): if cid in self._cids: return self._cids[cid] - raise XMPPError('item-not-found') def _del_bob(self, jid, node, ifrom, cid): if cid in self._cids: diff --git a/slixmpp/plugins/xep_0280/carbons.py b/slixmpp/plugins/xep_0280/carbons.py index 6a35bf84..c67e3fd9 100644 --- a/slixmpp/plugins/xep_0280/carbons.py +++ b/slixmpp/plugins/xep_0280/carbons.py @@ -5,7 +5,10 @@ # See the file LICENSE for copying permissio import logging -import slixmpp +from asyncio import Future +from typing import Optional + +from slixmpp import JID from slixmpp.stanza import Message, Iq from slixmpp.xmlstream.handler import Callback from slixmpp.xmlstream.matcher import StanzaPath @@ -21,6 +24,11 @@ class XEP_0280(BasePlugin): """ XEP-0280 Message Carbons + + Events triggered by this plugin: + + - :term:`carbon_received` + - :term:`carbon_sent` """ name = 'xep_0280' @@ -57,28 +65,22 @@ class XEP_0280(BasePlugin): def session_bind(self, jid): self.xmpp.plugin['xep_0030'].add_feature('urn:xmpp:carbons:2') - def _handle_carbon_received(self, msg): + def _handle_carbon_received(self, msg: Message): if msg['from'].bare == self.xmpp.boundjid.bare: self.xmpp.event('carbon_received', msg) - def _handle_carbon_sent(self, msg): + def _handle_carbon_sent(self, msg: Message): if msg['from'].bare == self.xmpp.boundjid.bare: self.xmpp.event('carbon_sent', msg) - def enable(self, ifrom=None, timeout=None, callback=None, - timeout_callback=None): - iq = self.xmpp.Iq() - iq['type'] = 'set' - iq['from'] = ifrom + def enable(self, ifrom: Optional[JID] = None, **iqkwargs) -> Future: + """Enable carbons.""" + iq = self.xmpp.make_iq_set(ifrom=ifrom) iq.enable('carbon_enable') - return iq.send(timeout_callback=timeout_callback, timeout=timeout, - callback=callback) + return iq.send(**iqkwargs) - def disable(self, ifrom=None, timeout=None, callback=None, - timeout_callback=None): - iq = self.xmpp.Iq() - iq['type'] = 'set' - iq['from'] = ifrom + def disable(self, ifrom: Optional[JID] = None, **iqkwargs) -> Future: + """Disable carbons.""" + iq = self.xmpp.make_iq_set(ifrom=ifrom) iq.enable('carbon_disable') - return iq.send(timeout_callback=timeout_callback, timeout=timeout, - callback=callback) + return iq.send(**iqkwargs) diff --git a/slixmpp/plugins/xep_0363/__init__.py b/slixmpp/plugins/xep_0363/__init__.py index 0ed1d7c8..f693eb92 100644 --- a/slixmpp/plugins/xep_0363/__init__.py +++ b/slixmpp/plugins/xep_0363/__init__.py @@ -1,4 +1,3 @@ - # slixmpp: The Slick XMPP Library # Copyright (C) 2018 Emmanuel Gil Peyrot # This file is part of slixmpp. @@ -6,6 +5,12 @@ from slixmpp.plugins.base import register_plugin from slixmpp.plugins.xep_0363.stanza import Request, Slot, Put, Get, Header -from slixmpp.plugins.xep_0363.http_upload import XEP_0363 +from slixmpp.plugins.xep_0363.http_upload import ( + XEP_0363, + UploadServiceNotFound, + FileTooBig, + HTTPError, + FileUploadError, +) register_plugin(XEP_0363) diff --git a/slixmpp/plugins/xep_0363/http_upload.py b/slixmpp/plugins/xep_0363/http_upload.py index 04b066cd..bae3ee7d 100644 --- a/slixmpp/plugins/xep_0363/http_upload.py +++ b/slixmpp/plugins/xep_0363/http_upload.py @@ -1,18 +1,21 @@ -""" - slixmpp: The Slick XMPP Library - Copyright (C) 2018 Emmanuel Gil Peyrot - This file is part of slixmpp. - - See the file LICENSE for copying permission. -""" +# slixmpp: The Slick XMPP Library +# Copyright (C) 2018 Emmanuel Gil Peyrot +# This file is part of slixmpp. +# See the file LICENSE for copying permission. import logging import os.path from aiohttp import ClientSession +from asyncio import Future from mimetypes import guess_type +from typing import ( + Optional, + IO, +) -from slixmpp import Iq, __version__ +from slixmpp import JID, __version__ +from slixmpp.stanza import Iq from slixmpp.plugins import BasePlugin from slixmpp.xmlstream import register_stanza_plugin from slixmpp.xmlstream.handler import Callback @@ -25,19 +28,39 @@ class FileUploadError(Exception): pass class UploadServiceNotFound(FileUploadError): - pass + """ + Raised if no upload service can be found. + """ class FileTooBig(FileUploadError): + """ + Raised if the file size is above advertised server limits. + + args: + + - size of the file + - max file size allowed + """ def __str__(self): return 'File size too large: {} (max: {} bytes)' \ .format(self.args[0], self.args[1]) class HTTPError(FileUploadError): + """ + Raised when we receive an HTTP error response during upload. + + args: + + - HTTP Error code + - Content of the HTTP response + """ def __str__(self): return 'Could not upload file: %d (%s)' % (self.args[0], self.args[1]) class XEP_0363(BasePlugin): - ''' This plugin only supports Python 3.5+ ''' + """ + XEP-0363: HTTP File Upload + """ name = 'xep_0363' description = 'XEP-0363: HTTP File Upload' @@ -62,9 +85,7 @@ class XEP_0363(BasePlugin): self._handle_request)) def plugin_end(self): - self._http_session.close() self.xmpp.remove_handler('HTTP Upload Request') - self.xmpp.remove_handler('HTTP Upload Slot') self.xmpp['xep_0030'].del_feature(feature=Request.namespace) def session_bind(self, jid): @@ -73,9 +94,14 @@ class XEP_0363(BasePlugin): def _handle_request(self, iq): self.xmpp.event('http_upload_request', iq) - async def find_upload_service(self, domain=None, timeout=None): + async def find_upload_service(self, domain: Optional[JID] = None, **iqkwargs) -> Optional[Iq]: + """Find an upload service on a domain (our own by default). + + :param domain: Domain to disco to find a service. + """ results = await self.xmpp['xep_0030'].get_info_from_domain( - domain=domain, timeout=timeout) + domain=domain, **iqkwargs + ) candidates = [] for info in results: @@ -87,26 +113,49 @@ class XEP_0363(BasePlugin): if feature == Request.namespace: return info - def request_slot(self, jid, filename, size, content_type=None, ifrom=None, - timeout=None, callback=None, timeout_callback=None): - iq = self.xmpp.Iq() - iq['to'] = jid - iq['from'] = ifrom - iq['type'] = 'get' + def request_slot(self, jid: JID, filename: str, size: int, + content_type: Optional[str] = None, *, + ifrom: Optional[JID] = None, **iqkwargs) -> Future: + """Request an HTTP upload slot from a service. + + :param jid: Service to request the slot from. + :param filename: Name of the file that will be uploaded. + :param size: size of the file in bytes. + :param content_type: Type of the file that will be uploaded. + """ + iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom) request = iq['http_upload_request'] request['filename'] = filename request['size'] = str(size) request['content-type'] = content_type or self.default_content_type - return iq.send(timeout=timeout, callback=callback, - timeout_callback=timeout_callback) + return iq.send(**iqkwargs) - async def upload_file(self, filename, size=None, content_type=None, *, - input_file=None, ifrom=None, domain=None, timeout=None, - callback=None, timeout_callback=None): - ''' Helper function which does all of the uploading process. ''' + async def upload_file(self, filename: str, size: Optional[int] = None, + content_type: Optional[str] = None, *, + input_file: Optional[IO[bytes]]=None, + domain: Optional[JID] = None, + **iqkwargs) -> str: + '''Helper function which does all of the uploading discovery and + process. + + :param filename: Path to the file to upload (or only the name if + ``input_file`` is provided. + :param size: size of the file in bytes. + :param content_type: Type of the file that will be uploaded. + :param input_file: Binary file stream on the file. + :param domain: Domain to query to find an HTTP upload service. + :raises .UploadServiceNotFound: If slixmpp is unable to find an + an available upload service. + :raises .FileTooBig: If the filesize is above what is accepted by + the service. + :raises .HTTPError: If there is an error in the HTTP operation. + :returns: The URL of the uploaded file. + ''' + timeout = iqkwargs.get('timeout', None) if self.upload_service is None: info_iq = await self.find_upload_service( - domain=domain, timeout=timeout) + domain=domain, **iqkwargs + ) if info_iq is None: raise UploadServiceNotFound() self.upload_service = info_iq['from'] @@ -137,9 +186,7 @@ class XEP_0363(BasePlugin): basename = os.path.basename(filename) slot_iq = await self.request_slot(self.upload_service, basename, size, - content_type, ifrom, timeout, - callback=callback, - timeout_callback=timeout_callback) + content_type, **iqkwargs) slot = slot_iq['http_upload_slot'] headers = { diff --git a/tests/test_stream_xep_0047.py b/tests/test_stream_xep_0047.py index f7276c0f..53225df5 100644 --- a/tests/test_stream_xep_0047.py +++ b/tests/test_stream_xep_0047.py @@ -14,7 +14,7 @@ class TestInBandByteStreams(SlixTest): def tearDown(self): self.stream_close() - def testOpenStream(self): + async def testOpenStream(self): """Test requesting a stream, successfully""" events = [] @@ -25,8 +25,8 @@ class TestInBandByteStreams(SlixTest): self.xmpp.add_event_handler('ibb_stream_start', on_stream_start) - self.xmpp['xep_0047'].open_stream('tester@localhost/receiver', - sid='testing') + await self.xmpp['xep_0047'].open_stream('tester@localhost/receiver', + sid='testing') self.send(""" @@ -45,7 +45,7 @@ class TestInBandByteStreams(SlixTest): self.assertEqual(events, ['ibb_stream_start']) - def testAysncOpenStream(self): + async def testAysncOpenStream(self): """Test requesting a stream, aysnc""" events = set() @@ -58,9 +58,9 @@ class TestInBandByteStreams(SlixTest): self.xmpp.add_event_handler('ibb_stream_start', on_stream_start) - self.xmpp['xep_0047'].open_stream('tester@localhost/receiver', - sid='testing', - callback=stream_callback) + await self.xmpp['xep_0047'].open_stream('tester@localhost/receiver', + sid='testing', + callback=stream_callback) self.send("""