Update RSM for asyncio
- Use an async iterator - Add a "recv_interface" parameter in order to differenciate the stanza we send from the stanza we receive (required for MAM) - Add a pre_cb to run before sending the query stanza - Add a post_cb to run after receiving the result stanza
This commit is contained in:
parent
9a563f1425
commit
b38e229359
1 changed files with 53 additions and 24 deletions
|
@ -19,23 +19,27 @@ from slixmpp.exceptions import XMPPError
|
|||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ResultIterator():
|
||||
class ResultIterator:
|
||||
|
||||
"""
|
||||
An iterator for Result Set Managment
|
||||
"""
|
||||
|
||||
def __init__(self, query, interface, results='substanzas', amount=10,
|
||||
start=None, reverse=False):
|
||||
start=None, reverse=False, recv_interface=None,
|
||||
pre_cb=None, post_cb=None):
|
||||
"""
|
||||
Arguments:
|
||||
query -- The template query
|
||||
interface -- The substanza of the query, for example disco_items
|
||||
interface -- The substanza of the query to send, for example disco_items
|
||||
recv_interface -- The substanza of the query to receive, for example disco_items
|
||||
results -- The query stanza's interface which provides a
|
||||
countable list of query results.
|
||||
amount -- The max amounts of items to request per iteration
|
||||
start -- From which item id to start
|
||||
reverse -- If True, page backwards through the results
|
||||
pre_cb -- Callback to run before sending the stanza
|
||||
post_cb -- Callback to run after receiving the reply
|
||||
|
||||
Example:
|
||||
q = Iq()
|
||||
|
@ -49,17 +53,23 @@ class ResultIterator():
|
|||
self.amount = amount
|
||||
self.start = start
|
||||
self.interface = interface
|
||||
if recv_interface:
|
||||
self.recv_interface = recv_interface
|
||||
else:
|
||||
self.recv_interface = interface
|
||||
self.pre_cb = pre_cb
|
||||
self.post_cb = post_cb
|
||||
self.results = results
|
||||
self.reverse = reverse
|
||||
self._stop = False
|
||||
|
||||
def __iter__(self):
|
||||
def __aiter__(self):
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
return self.next()
|
||||
async def __anext__(self):
|
||||
return await self.next()
|
||||
|
||||
def next(self):
|
||||
async def next(self):
|
||||
"""
|
||||
Return the next page of results from a query.
|
||||
|
||||
|
@ -68,7 +78,7 @@ class ResultIterator():
|
|||
of items.
|
||||
"""
|
||||
if self._stop:
|
||||
raise StopIteration
|
||||
raise StopAsyncIteration
|
||||
self.query[self.interface]['rsm']['before'] = self.reverse
|
||||
self.query['id'] = self.query.stream.new_id()
|
||||
self.query[self.interface]['rsm']['max'] = str(self.amount)
|
||||
|
@ -79,28 +89,32 @@ class ResultIterator():
|
|||
self.query[self.interface]['rsm']['after'] = self.start
|
||||
|
||||
try:
|
||||
r = self.query.send(block=True)
|
||||
if self.pre_cb:
|
||||
self.pre_cb(self.query)
|
||||
r = await self.query.send()
|
||||
|
||||
if not r[self.interface]['rsm']['first'] and \
|
||||
not r[self.interface]['rsm']['last']:
|
||||
raise StopIteration
|
||||
if not r[self.recv_interface]['rsm']['first'] and \
|
||||
not r[self.recv_interface]['rsm']['last']:
|
||||
raise StopAsyncIteration
|
||||
|
||||
if r[self.interface]['rsm']['count'] and \
|
||||
r[self.interface]['rsm']['first_index']:
|
||||
count = int(r[self.interface]['rsm']['count'])
|
||||
first = int(r[self.interface]['rsm']['first_index'])
|
||||
num_items = len(r[self.interface][self.results])
|
||||
if r[self.recv_interface]['rsm']['count'] and \
|
||||
r[self.recv_interface]['rsm']['first_index']:
|
||||
count = int(r[self.recv_interface]['rsm']['count'])
|
||||
first = int(r[self.recv_interface]['rsm']['first_index'])
|
||||
num_items = len(r[self.recv_interface][self.results])
|
||||
if first + num_items == count:
|
||||
self._stop = True
|
||||
|
||||
if self.reverse:
|
||||
self.start = r[self.interface]['rsm']['first']
|
||||
self.start = r[self.recv_interface]['rsm']['first']
|
||||
else:
|
||||
self.start = r[self.interface]['rsm']['last']
|
||||
self.start = r[self.recv_interface]['rsm']['last']
|
||||
|
||||
if self.post_cb:
|
||||
self.post_cb(r)
|
||||
return r
|
||||
except XMPPError:
|
||||
raise StopIteration
|
||||
raise StopAsyncIteration
|
||||
|
||||
|
||||
class XEP_0059(BasePlugin):
|
||||
|
@ -127,7 +141,8 @@ class XEP_0059(BasePlugin):
|
|||
def session_bind(self, jid):
|
||||
self.xmpp['xep_0030'].add_feature(Set.namespace)
|
||||
|
||||
def iterate(self, stanza, interface, results='substanzas'):
|
||||
def iterate(self, stanza, interface, results='substanzas',
|
||||
recv_interface=None, pre_cb=None, post_cb=None):
|
||||
"""
|
||||
Create a new result set iterator for a given stanza query.
|
||||
|
||||
|
@ -137,9 +152,23 @@ class XEP_0059(BasePlugin):
|
|||
basic disco#items query.
|
||||
interface -- The name of the substanza to which the
|
||||
result set management stanza should be
|
||||
appended. For example, for disco#items queries
|
||||
the interface 'disco_items' should be used.
|
||||
appended in the query stanza. For example,
|
||||
for disco#items queries the interface
|
||||
'disco_items' should be used.
|
||||
recv_interface -- The name of the substanza from which the
|
||||
result set management stanza should be
|
||||
read in the result stanza. If unspecified,
|
||||
it will be set to the same value as the
|
||||
``interface`` parameter.
|
||||
pre_cb -- Callback to run before sending each stanza e.g.
|
||||
setting the MAM queryid and starting a stanza
|
||||
collector.
|
||||
post_cb -- Callback to run after receiving each stanza e.g.
|
||||
stopping a MAM stanza collector in order to
|
||||
gather results.
|
||||
results -- The name of the interface containing the
|
||||
query results (typically just 'substanzas').
|
||||
"""
|
||||
return ResultIterator(stanza, interface, results)
|
||||
return ResultIterator(stanza, interface, results,
|
||||
recv_interface=recv_interface, pre_cb=pre_cb,
|
||||
post_cb=post_cb)
|
||||
|
|
Loading…
Reference in a new issue