Merge branch 'fix-slow-tasks' into 'master'
xmlstream: Fix slow tasks See merge request poezio/slixmpp!162
This commit is contained in:
commit
6f4ac7e7ce
2 changed files with 57 additions and 4 deletions
49
itests/test_slow_filters.py
Normal file
49
itests/test_slow_filters.py
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
import asyncio
|
||||||
|
import unittest
|
||||||
|
from slixmpp.test.integration import SlixIntegration
|
||||||
|
from slixmpp import Message
|
||||||
|
|
||||||
|
|
||||||
|
class TestSlowFilter(SlixIntegration):
|
||||||
|
async def asyncSetUp(self):
|
||||||
|
await super().asyncSetUp()
|
||||||
|
self.add_client(
|
||||||
|
self.envjid('CI_ACCOUNT1'),
|
||||||
|
self.envstr('CI_ACCOUNT1_PASSWORD'),
|
||||||
|
)
|
||||||
|
self.add_client(
|
||||||
|
self.envjid('CI_ACCOUNT2'),
|
||||||
|
self.envstr('CI_ACCOUNT2_PASSWORD'),
|
||||||
|
)
|
||||||
|
await self.connect_clients()
|
||||||
|
|
||||||
|
async def test_filters(self):
|
||||||
|
"""Make sure filters work"""
|
||||||
|
def add_a(stanza):
|
||||||
|
if isinstance(stanza, Message):
|
||||||
|
stanza['body'] = stanza['body'] + ' a'
|
||||||
|
return stanza
|
||||||
|
|
||||||
|
async def add_b(stanza):
|
||||||
|
if isinstance(stanza, Message):
|
||||||
|
stanza['body'] = stanza['body'] + ' b'
|
||||||
|
return stanza
|
||||||
|
|
||||||
|
async def add_c_wait(stanza):
|
||||||
|
if isinstance(stanza, Message):
|
||||||
|
await asyncio.sleep(2)
|
||||||
|
stanza['body'] = stanza['body'] + ' c'
|
||||||
|
return stanza
|
||||||
|
self.clients[0].add_filter('out', add_a)
|
||||||
|
self.clients[0].add_filter('out', add_b)
|
||||||
|
self.clients[0].add_filter('out', add_c_wait)
|
||||||
|
body = 'Msg body'
|
||||||
|
msg = self.clients[0].make_message(
|
||||||
|
mto=self.clients[1].boundjid, mbody=body,
|
||||||
|
)
|
||||||
|
msg.send()
|
||||||
|
message = await self.clients[1].wait_until('message')
|
||||||
|
self.assertEqual(message['body'], body + ' a b c')
|
||||||
|
|
||||||
|
|
||||||
|
suite = unittest.TestLoader().loadTestsFromTestCase(TestSlowFilter)
|
|
@ -1053,11 +1053,13 @@ class XMLStream(asyncio.BaseProtocol):
|
||||||
"""
|
"""
|
||||||
data = await task
|
data = await task
|
||||||
self.__slow_tasks.remove(task)
|
self.__slow_tasks.remove(task)
|
||||||
for filter in self.__filters['out']:
|
if data is None:
|
||||||
|
return
|
||||||
|
for filter in self.__filters['out'][:]:
|
||||||
if filter in already_used:
|
if filter in already_used:
|
||||||
continue
|
continue
|
||||||
if iscoroutinefunction(filter):
|
if iscoroutinefunction(filter):
|
||||||
data = await task
|
data = await filter(data)
|
||||||
else:
|
else:
|
||||||
data = filter(data)
|
data = filter(data)
|
||||||
if data is None:
|
if data is None:
|
||||||
|
@ -1093,7 +1095,7 @@ class XMLStream(asyncio.BaseProtocol):
|
||||||
timeout=1,
|
timeout=1,
|
||||||
)
|
)
|
||||||
if pending:
|
if pending:
|
||||||
self.slow_tasks.append(task)
|
self.__slow_tasks.append(task)
|
||||||
asyncio.ensure_future(
|
asyncio.ensure_future(
|
||||||
self._continue_slow_send(
|
self._continue_slow_send(
|
||||||
task,
|
task,
|
||||||
|
@ -1101,7 +1103,9 @@ class XMLStream(asyncio.BaseProtocol):
|
||||||
),
|
),
|
||||||
loop=self.loop,
|
loop=self.loop,
|
||||||
)
|
)
|
||||||
raise Exception("Slow coro, rescheduling")
|
raise ContinueQueue(
|
||||||
|
"Slow coroutine, rescheduling filters"
|
||||||
|
)
|
||||||
data = task.result()
|
data = task.result()
|
||||||
else:
|
else:
|
||||||
data = filter(data)
|
data = filter(data)
|
||||||
|
|
Loading…
Reference in a new issue