From ae31f78b638e7d6376a645ac1da697a0348144d1 Mon Sep 17 00:00:00 2001 From: mathieui Date: Sun, 7 Mar 2021 19:45:01 +0100 Subject: [PATCH 1/3] docs: add resultiterator docs for XEP-0059 --- docs/api/plugins/xep_0059.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/api/plugins/xep_0059.rst b/docs/api/plugins/xep_0059.rst index e9f7db28..0d5ba32e 100644 --- a/docs/api/plugins/xep_0059.rst +++ b/docs/api/plugins/xep_0059.rst @@ -8,6 +8,9 @@ XEP-0059: Result Set Management :members: :exclude-members: session_bind, plugin_init, plugin_end +.. autoclass:: ResultIterator + :members: + :member-order: bysource Stanza elements --------------- From 27cf97458b6876b462ac08c24ff0f69107004136 Mon Sep 17 00:00:00 2001 From: mathieui Date: Sun, 7 Mar 2021 19:47:00 +0100 Subject: [PATCH 2/3] XEP-0059: docs and typing Also: - fix a typo in the plugin description (wrong number) - add iq_options to make retrieval more flexible --- slixmpp/plugins/xep_0059/rsm.py | 170 ++++++++++++++++++++------------ 1 file changed, 107 insertions(+), 63 deletions(-) diff --git a/slixmpp/plugins/xep_0059/rsm.py b/slixmpp/plugins/xep_0059/rsm.py index 0fd6b2f9..00615ee4 100644 --- a/slixmpp/plugins/xep_0059/rsm.py +++ b/slixmpp/plugins/xep_0059/rsm.py @@ -5,9 +5,16 @@ # See the file LICENSE for copying permission. import logging -import slixmpp -from slixmpp import Iq -from slixmpp.plugins import BasePlugin, register_plugin +from collections.abc import AsyncIterator +from typing import ( + Any, + Callable, + Dict, + Optional, +) + +from slixmpp.stanza import Iq +from slixmpp.plugins import BasePlugin from slixmpp.xmlstream import register_stanza_plugin from slixmpp.plugins.xep_0059 import stanza, Set from slixmpp.exceptions import XMPPError @@ -16,41 +23,73 @@ from slixmpp.exceptions import XMPPError log = logging.getLogger(__name__) -class ResultIterator: +class ResultIterator(AsyncIterator): """ An iterator for Result Set Management + + Example: + + .. code-block:: python + + q = Iq() + q['to'] = 'pubsub.example.com' + q['disco_items']['node'] = 'blog' + async for i in ResultIterator(q, 'disco_items', '10'): + print(i['disco_items']['items']) + """ + #: Template for the RSM query + query: Iq + #: Substanza of the query to send, e.g. "disco_items" + interface: str + #: Stanza interface on the query results providing the retrieved + #: elements (used to count them) + results: str + #: From which item id to start + start: Optional[str] + #: Amount of elements to retrieve for each page + amount: int + #: If True, page backwards through the results + reverse: bool + #: Callback to run before sending the stanza + pre_cb: Optional[Callable[[Iq], None]] + #: Callback to run after receiving the reply + post_cb: Optional[Callable[[Iq], None]] + #: Optional dict of Iq options (timeout, etc…) for Iq.send() + iq_options: Dict[str, Any] - def __init__(self, query, interface, results='substanzas', amount=10, - start=None, reverse=False, recv_interface=None, - pre_cb=None, post_cb=None): + def __init__(self, query: Iq, interface: str, results: str = 'substanzas', + amount: int = 10, + start: Optional[str] = None, reverse: bool = False, + recv_interface: Optional[str] = None, + pre_cb: Optional[Callable[[Iq], None]] = None, + post_cb: Optional[Callable[[Iq], None]] = None, + iq_options: Optional[Dict[str, Any]] = None): """ - Arguments: - query -- The template query - interface -- The substanza of the query to send, for example disco_items - recv_interface -- The substanza of the query to receive, for example disco_items - results -- The query stanza's interface which provides a + :param query: The template query + :param interface: The substanza of the query to send, for example + disco_items + :param recv_interface: The substanza of the query to receive, for + example disco_items + :param results: The query stanza's interface which provides a countable list of query results. - amount -- The max amounts of items to request per iteration - start -- From which item id to start - reverse -- If True, page backwards through the results - pre_cb -- Callback to run before sending the stanza - post_cb -- Callback to run after receiving the reply - - Example: - q = Iq() - q['to'] = 'pubsub.example.com' - q['disco_items']['node'] = 'blog' - for i in ResultIterator(q, 'disco_items', '10'): - print i['disco_items']['items'] - + :param amount: The max amounts of items to request per iteration + :param start: From which item id to start + :param reverse: If True, page backwards through the results + :param pre_cb: Callback to run before sending the stanza + :param post_cb: Callback to run after receiving the reply + :param iq_options: Optional dict of parameters for Iq.send """ self.query = query self.amount = amount self.start = start + if iq_options is None: + self.iq_options = {} + else: + self.iq_options = iq_options self.interface = interface - if recv_interface: + if recv_interface is not None: self.recv_interface = recv_interface else: self.recv_interface = interface @@ -63,10 +102,10 @@ class ResultIterator: def __aiter__(self): return self - async def __anext__(self): + async def __anext__(self) -> Iq: return await self.next() - async def next(self): + async def next(self) -> Iq: """ Return the next page of results from a query. @@ -76,20 +115,19 @@ class ResultIterator: """ if self._stop: raise StopAsyncIteration - if self.query[self.interface]['rsm']['before'] is None: - self.query[self.interface]['rsm']['before'] = self.reverse self.query['id'] = self.query.stream.new_id() self.query[self.interface]['rsm']['max'] = str(self.amount) - if self.start and self.reverse: - self.query[self.interface]['rsm']['before'] = self.start - elif self.start: - self.query[self.interface]['rsm']['after'] = self.start + if self.start: + if self.reverse: + self.query[self.interface]['rsm']['before'] = self.start + else: + self.query[self.interface]['rsm']['after'] = self.start try: if self.pre_cb: self.pre_cb(self.query) - r = await self.query.send() + r = await self.query.send(**self.iq_options) if not r[self.recv_interface]['rsm']['first'] and \ not r[self.recv_interface]['rsm']['last']: @@ -118,7 +156,7 @@ class ResultIterator: class XEP_0059(BasePlugin): """ - XEP-0050: Result Set Management + XEP-0059: Result Set Management """ name = 'xep_0059' @@ -139,34 +177,40 @@ class XEP_0059(BasePlugin): def session_bind(self, jid): self.xmpp['xep_0030'].add_feature(Set.namespace) - def iterate(self, stanza, interface, results='substanzas', amount=10, reverse=False, - recv_interface=None, pre_cb=None, post_cb=None): + def iterate(self, stanza: Iq, interface: str, results: str = 'substanzas', + amount: int = 10, reverse: bool = False, + recv_interface: Optional[str] = None, + pre_cb: Optional[Callable[[Iq], None]] = None, + post_cb: Optional[Callable[[Iq], None]] = None, + iq_options: Optional[Dict[str, Any]] = None + ) -> ResultIterator: """ Create a new result set iterator for a given stanza query. - Arguments: - stanza -- A stanza object to serve as a template for - queries made each iteration. For example, a - basic disco#items query. - interface -- The name of the substanza to which the - result set management stanza should be - appended in the query stanza. For example, - for disco#items queries the interface - 'disco_items' should be used. - recv_interface -- The name of the substanza from which the - result set management stanza should be - read in the result stanza. If unspecified, - it will be set to the same value as the - ``interface`` parameter. - pre_cb -- Callback to run before sending each stanza e.g. - setting the MAM queryid and starting a stanza - collector. - post_cb -- Callback to run after receiving each stanza e.g. - stopping a MAM stanza collector in order to - gather results. - results -- The name of the interface containing the - query results (typically just 'substanzas'). + :param stanza: A stanza object to serve as a template for + queries made each iteration. For example, a + basic disco#items query. + :param interface: The name of the substanza to which the + result set management stanza should be + appended in the query stanza. For example, + for disco#items queries the interface + 'disco_items' should be used. + :param recv_interface: The name of the substanza from which the + result set management stanza should be + read in the result stanza. If unspecified, + it will be set to the same value as the + ``interface`` parameter. + :param pre_cb: Callback to run before sending each stanza e.g. + setting the MAM queryid and starting a stanza + collector. + :param post_cb: Callback to run after receiving each stanza e.g. + stopping a MAM stanza collector in order to + gather results. + :param results: The name of the interface containing the + query results (typically just 'substanzas'). + :param iq_options: Optional dict of parameters for Iq.send """ - return ResultIterator(stanza, interface, results, amount, reverse=reverse, - recv_interface=recv_interface, pre_cb=pre_cb, - post_cb=post_cb) + return ResultIterator(stanza, interface, results, amount, + reverse=reverse, recv_interface=recv_interface, + pre_cb=pre_cb, post_cb=post_cb, + iq_options=iq_options) From 5a3ab2c5c13aea6761680a9ccc6cf53c133c9ac7 Mon Sep 17 00:00:00 2001 From: mathieui Date: Sun, 7 Mar 2021 19:48:07 +0100 Subject: [PATCH 3/3] tests: enable and fix RSM test for XEP-0030 --- slixmpp/plugins/xep_0059/rsm.py | 2 + tests/test_stream_xep_0030.py | 80 +++++++++++++++++++++++---------- 2 files changed, 58 insertions(+), 24 deletions(-) diff --git a/slixmpp/plugins/xep_0059/rsm.py b/slixmpp/plugins/xep_0059/rsm.py index 00615ee4..61752af4 100644 --- a/slixmpp/plugins/xep_0059/rsm.py +++ b/slixmpp/plugins/xep_0059/rsm.py @@ -123,6 +123,8 @@ class ResultIterator(AsyncIterator): self.query[self.interface]['rsm']['before'] = self.start else: self.query[self.interface]['rsm']['after'] = self.start + elif self.reverse: + self.query[self.interface]['rsm']['before'] = True try: if self.pre_cb: diff --git a/tests/test_stream_xep_0030.py b/tests/test_stream_xep_0030.py index 8cba8280..4cabfe38 100644 --- a/tests/test_stream_xep_0030.py +++ b/tests/test_stream_xep_0030.py @@ -512,30 +512,28 @@ class TestStreamDisco(SlixTest): self.assertEqual(results, items, "Unexpected items: %s" % results) - ''' - def testGetItemsIterator(self): + def testGetItemsIterators(self): """Test interaction between XEP-0030 and XEP-0059 plugins.""" - - raised_exceptions = [] + iteration_finished = [] + jids_found = set() self.stream_start(mode='client', plugins=['xep_0030', 'xep_0059']) - results = self.xmpp['xep_0030'].get_items(jid='foo@localhost', - node='bar', - iterator=True) - results.amount = 10 - - def run_test(): - try: - results.next() - except StopIteration: - raised_exceptions.append(True) - - t = threading.Thread(name="get_items_iterator", - target=run_test) - t.start() + async def run_test(): + iterator = await self.xmpp['xep_0030'].get_items( + jid='foo@localhost', + node='bar', + iterator=True + ) + iterator.amount = 10 + async for page in iterator: + for item in page['disco_items']['items']: + jids_found.add(item[0]) + iteration_finished.append(True) + test_run = self.xmpp.wrap(run_test()) + self.wait_() self.send(""" + + + + + + a@b + e@b + 10 """) - - t.join() - - self.assertEqual(raised_exceptions, [True], - "StopIteration was not raised: %s" % raised_exceptions) - ''' + self.wait_() + self.send(""" + + + + 10 + e@b + + + + """) + self.recv(""" + + + + + + + + + f@b + j@b + 10 + + + + """) + expected_jids = {'%s@b' % i for i in 'abcdefghij'} + self.run_coro(test_run) + self.assertEqual(expected_jids, jids_found) + self.assertEqual(iteration_finished, [True]) suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamDisco)