slixtest: add more shortcuts for asyncio workflow

This commit is contained in:
mathieui 2021-02-14 11:35:03 +01:00
parent 94700de7a3
commit 7e3e056e6b

View file

@ -434,7 +434,9 @@ class SlixTest(unittest.TestCase):
timeout -- Time to wait in seconds for data to be received by
a live connection.
"""
self.wait_()
self.xmpp.data_received(data)
self.wait_()
def recv_header(self, sto='',
sfrom='',
@ -623,10 +625,20 @@ class SlixTest(unittest.TestCase):
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(self.xmpp.run_filters(), loop=loop)
queue = self.xmpp.waiting_queue
print(queue)
loop.run_until_complete(queue.join())
future.cancel()
def wait_(self):
async def yield_some():
for i in range(100):
await asyncio.sleep(0)
loop = asyncio.get_event_loop()
loop.run_until_complete(yield_some())
def run_coro(self, coro):
loop = asyncio.get_event_loop()
loop.run_until_complete(coro)
def stream_close(self):
"""
Disconnect the dummy XMPP client.