61 lines
2 KiB
Python
61 lines
2 KiB
Python
import asyncio
|
|
import unittest
|
|
from slixmpp import JID
|
|
from slixmpp.test.integration import SlixIntegration
|
|
|
|
|
|
class TestUserAvatar(SlixIntegration):
|
|
async def asyncSetUp(self):
|
|
await super().asyncSetUp()
|
|
self.add_client(
|
|
self.envjid('CI_ACCOUNT1'),
|
|
self.envstr('CI_ACCOUNT1_PASSWORD'),
|
|
)
|
|
self.register_plugins(['xep_0084'])
|
|
self.data = b'coucou coucou'
|
|
await self.connect_clients()
|
|
|
|
async def _clear_avatar(self):
|
|
"""Utility for purging remote state"""
|
|
await self.clients[0]['xep_0084'].stop()
|
|
await self.clients[0]['xep_0084'].publish_avatar(b'')
|
|
|
|
async def test_set_avatar(self):
|
|
"""Check we can set and get a PEP avatar and metadata"""
|
|
await self._clear_avatar()
|
|
|
|
await self.clients[0]['xep_0084'].publish_avatar(
|
|
self.data
|
|
)
|
|
metadata = {
|
|
'id': self.clients[0]['xep_0084'].generate_id(self.data),
|
|
'bytes': 13,
|
|
'type': 'image/jpeg',
|
|
}
|
|
# Wait for metadata publish event
|
|
event = self.clients[0].wait_until('avatar_metadata_publish')
|
|
publish = self.clients[0]['xep_0084'].publish_avatar_metadata(
|
|
metadata,
|
|
)
|
|
res = await asyncio.gather(
|
|
event,
|
|
publish,
|
|
)
|
|
message = res[0]
|
|
recv_meta = message['pubsub_event']['items']['item']['avatar_metadata']
|
|
info = recv_meta['info']
|
|
self.assertEqual(info['bytes'], metadata['bytes'])
|
|
self.assertEqual(info['type'], metadata['type'])
|
|
self.assertEqual(info['id'], metadata['id'])
|
|
|
|
recv = await self.clients[0]['xep_0084'].retrieve_avatar(
|
|
JID(self.clients[0].boundjid.bare),
|
|
info['id']
|
|
)
|
|
avatar = recv['pubsub']['items']['item']['avatar_data']['value']
|
|
self.assertEqual(avatar, self.data)
|
|
|
|
await self._clear_avatar()
|
|
|
|
|
|
suite = unittest.TestLoader().loadTestsFromTestCase(TestUserAvatar)
|