Merge branch 'xep-0045-fixes-misc' into 'master'

Misc fixes for XEP-0045

See merge request poezio/slixmpp!97
This commit is contained in:
Link Mauve 2021-01-10 15:12:20 +01:00
commit ca465032e7
3 changed files with 140 additions and 41 deletions

View file

@ -42,6 +42,8 @@ from slixmpp.plugins.xep_0045.stanza import (
MUCOwnerQuery, MUCOwnerQuery,
MUCOwnerDestroy, MUCOwnerDestroy,
MUCStatus, MUCStatus,
MUCActor,
MUCUserItem,
) )
@ -66,6 +68,9 @@ class XEP_0045(BasePlugin):
self.rooms = {} self.rooms = {}
self.our_nicks = {} self.our_nicks = {}
# load MUC support in presence stanzas # load MUC support in presence stanzas
register_stanza_plugin(MUCMessage, MUCUserItem)
register_stanza_plugin(MUCPresence, MUCUserItem)
register_stanza_plugin(MUCUserItem, MUCActor)
register_stanza_plugin(MUCMessage, MUCInvite) register_stanza_plugin(MUCMessage, MUCInvite)
register_stanza_plugin(MUCMessage, MUCDecline) register_stanza_plugin(MUCMessage, MUCDecline)
register_stanza_plugin(MUCMessage, MUCStatus) register_stanza_plugin(MUCMessage, MUCStatus)
@ -83,7 +88,7 @@ class XEP_0045(BasePlugin):
self.xmpp.register_handler( self.xmpp.register_handler(
Callback( Callback(
'MUCPresence', 'MUCPresence',
MatchXMLMask("<presence xmlns='%s' />" % self.xmpp.default_ns), StanzaPath("presence/muc"),
self.handle_groupchat_presence, self.handle_groupchat_presence,
)) ))
self.xmpp.register_handler( self.xmpp.register_handler(
@ -131,22 +136,27 @@ class XEP_0045(BasePlugin):
def handle_groupchat_invite(self, inv): def handle_groupchat_invite(self, inv):
""" Handle an invite into a muc. """ """ Handle an invite into a muc. """
if inv['from'] not in self.rooms.keys(): if self.xmpp.is_component:
self.xmpp.event("groupchat_invite", inv) self.xmpp.event('groupchat_invite', inv)
else:
if inv['from'] not in self.rooms.keys():
self.xmpp.event("groupchat_invite", inv)
def handle_groupchat_decline(self, decl): def handle_groupchat_decline(self, decl):
"""Handle an invitation decline.""" """Handle an invitation decline."""
if decl['from'] in self.room.keys(): if self.xmpp.is_component:
self.xmpp.event('groupchat_decline', decl) self.xmpp.event('groupchat_invite', decl)
else:
if decl['from'] in self.room.keys():
self.xmpp.event('groupchat_decline', decl)
def handle_config_change(self, msg): def handle_config_change(self, msg):
"""Handle a MUC configuration change (with status code).""" """Handle a MUC configuration change (with status code)."""
self.xmpp.event('groupchat_config_status', msg) self.xmpp.event('groupchat_config_status', msg)
self.xmpp.event('muc::%s::config_status' % msg['from'].bare , msg) self.xmpp.event('muc::%s::config_status' % msg['from'].bare , msg)
def handle_groupchat_presence(self, pr): def client_handle_presence(self, pr: Presence):
""" Handle a presence in a muc. """As a client, handle a presence stanza"""
"""
got_offline = False got_offline = False
got_online = False got_online = False
if pr['muc']['room'] not in self.rooms.keys(): if pr['muc']['room'] not in self.rooms.keys():
@ -172,6 +182,13 @@ class XEP_0045(BasePlugin):
if got_online: if got_online:
self.xmpp.event("muc::%s::got_online" % entry['room'], pr) self.xmpp.event("muc::%s::got_online" % entry['room'], pr)
def handle_groupchat_presence(self, pr: Presence):
""" Handle a presence in a muc."""
if self.xmpp.is_component:
self.xmpp.event('groupchat_presence', pr)
else:
self.client_handle_presence(pr)
def handle_groupchat_message(self, msg: Message) -> None: def handle_groupchat_message(self, msg: Message) -> None:
""" Handle a message event in a muc. """ Handle a message event in a muc.
""" """
@ -248,24 +265,22 @@ class XEP_0045(BasePlugin):
iq['mucowner_query']['destroy']['reason'] = reason iq['mucowner_query']['destroy']['reason'] = reason
await iq.send(**iqkwargs) await iq.send(**iqkwargs)
async def set_affiliation(self, room: JID, jid: Optional[JID] = None, nick: Optional[str] = None, *, affiliation: str, async def set_affiliation(self, room: JID, affiliation: str, *, jid: Optional[JID] = None,
reason: str = '', ifrom: Optional[JID] = None, **iqkwargs): nick: Optional[str] = None, reason: str = '',
ifrom: Optional[JID] = None, **iqkwargs):
""" Change room affiliation.""" """ Change room affiliation."""
if affiliation not in AFFILIATIONS: if affiliation not in AFFILIATIONS:
raise ValueError('%s is not a valid affiliation' % affiliation) raise ValueError('%s is not a valid affiliation' % affiliation)
if not any((jid, nick)): if not any((jid, nick)):
raise ValueError('One of jid or nick must be set') raise ValueError('One of jid or nick must be set')
iq = self.xmpp.make_iq_set(ito=room, ifrom=ifrom) iq = self.xmpp.make_iq_set(ito=room, ifrom=ifrom)
iq.enable('mucadmin_query') iq['mucadmin_query']['item']['affiliation'] = affiliation
item = MUCAdminItem()
item['affiliation'] = affiliation
if nick: if nick:
item['nick'] = nick iq['mucadmin_query']['item']['nick'] = nick
if jid: if jid:
item['jid'] = jid iq['mucadmin_query']['item']['jid'] = jid
if reason: if reason:
item['reason'] = reason iq['mucadmin_query']['item']['reason'] = reason
iq['mucadmin_query'].append(item)
await iq.send(**iqkwargs) await iq.send(**iqkwargs)
async def set_role(self, room: JID, nick: str, role: str, *, async def set_role(self, room: JID, nick: str, role: str, *,
@ -278,13 +293,10 @@ class XEP_0045(BasePlugin):
if role not in ROLES: if role not in ROLES:
raise ValueError("Role %s does not exist" % role) raise ValueError("Role %s does not exist" % role)
iq = self.xmpp.make_iq_set(ito=room, ifrom=ifrom) iq = self.xmpp.make_iq_set(ito=room, ifrom=ifrom)
iq.enable('mucadmin_query') iq['mucadmin_query']['item']['role'] = role
item = MUCAdminItem() iq['mucadmin_query']['item']['nick'] = nick
item['role'] = role
item['nick'] = nick
if reason: if reason:
item['reason'] = reason iq['mucadmin_query']['item']['reason'] = reason
iq['mucadmin_query'].append(item)
await iq.send(**iqkwargs) await iq.send(**iqkwargs)
def invite(self, room: JID, jid: JID, reason: str = '', *, def invite(self, room: JID, jid: JID, reason: str = '', *,

View file

@ -7,7 +7,12 @@
See the file LICENSE for copying permission. See the file LICENSE for copying permission.
""" """
from typing import Iterable, Set from typing import (
Iterable,
Set,
Optional,
Union,
)
import logging import logging
from slixmpp.xmlstream import ElementBase, ET, JID from slixmpp.xmlstream import ElementBase, ET, JID
@ -45,24 +50,21 @@ class MUCBase(ElementBase):
status['code'] = code status['code'] = code
self.append(status) self.append(status)
def get_item_attr(self, attr, default: str): def get_item_attr(self, attr: str, default):
item = self.xml.find(f'{{{NS_USER}}}item') item = self.xml.find(f'{{{NS_USER}}}item')
if item is None: if item is None:
return default return default
return item.get(attr) return self['item'][attr]
def set_item_attr(self, attr, value: str): def set_item_attr(self, attr: str, value: str):
item = self.xml.find(f'{{{NS_USER}}}item') item = self['item']
if item is None: item[attr] = value
item = ET.Element(f'{{{NS_USER}}}item')
self.xml.append(item)
item.attrib[attr] = value
return item return item
def del_item_attr(self, attr): def del_item_attr(self, attr):
item = self.xml.find(f'{{{NS_USER}}}item') item = self.xml.find(f'{{{NS_USER}}}item')
if item is not None and attr in item.attrib: if item is not None:
del item.attrib[attr] del self['item'][attr]
def get_affiliation(self): def get_affiliation(self):
return self.get_item_attr('affiliation', '') return self.get_item_attr('affiliation', '')
@ -71,13 +73,12 @@ class MUCBase(ElementBase):
self.set_item_attr('affiliation', value) self.set_item_attr('affiliation', value)
def del_affiliation(self): def del_affiliation(self):
# TODO: set default affiliation
self.del_item_attr('affiliation') self.del_item_attr('affiliation')
def get_jid(self): def get_jid(self) -> JID:
return JID(self.get_item_attr('jid', '')) return JID(self.get_item_attr('jid', ''))
def set_jid(self, value): def set_jid(self, value: Union[JID, str]):
if not isinstance(value, str): if not isinstance(value, str):
value = str(value) value = str(value)
self.set_item_attr('jid', value) self.set_item_attr('jid', value)
@ -85,10 +86,10 @@ class MUCBase(ElementBase):
def del_jid(self): def del_jid(self):
self.del_item_attr('jid') self.del_item_attr('jid')
def get_role(self): def get_role(self) -> str:
return self.get_item_attr('role', '') return self.get_item_attr('role', '')
def set_role(self, value): def set_role(self, value: str):
# TODO: check for valid role # TODO: check for valid role
self.set_item_attr('role', value) self.set_item_attr('role', value)
@ -96,10 +97,10 @@ class MUCBase(ElementBase):
# TODO: set default role # TODO: set default role
self.del_item_attr('role') self.del_item_attr('role')
def get_nick(self): def get_nick(self) -> str:
return self.parent()['from'].resource return self.parent()['from'].resource
def get_room(self): def get_room(self) -> str:
return self.parent()['from'].bare return self.parent()['from'].bare
def set_nick(self, value): def set_nick(self, value):
@ -232,3 +233,30 @@ class MUCStatus(ElementBase):
def set_code(self, code: int): def set_code(self, code: int):
self.xml.attrib['code'] = str(code) self.xml.attrib['code'] = str(code)
class MUCUserItem(ElementBase):
namespace = NS_USER
name = 'item'
plugin_attrib = 'item'
interfaces = {'role', 'affiliation', 'jid', 'reason', 'nick'}
sub_interfaces = {'reason'}
def get_jid(self) -> Optional[JID]:
jid = self.xml.attrib.get('jid', None)
if jid:
return JID(jid)
return jid
class MUCActor(ElementBase):
namespace = NS_USER
name = 'actor'
plugin_attrib = 'actor'
interfaces = {'jid', 'nick'}
def get_jid(self) -> Optional[JID]:
jid = self.xml.attrib.get('jid', None)
if jid:
return JID(jid)
return jid

View file

@ -13,6 +13,8 @@ from slixmpp.plugins.xep_0045.stanza import (
MUCStatus, MUCStatus,
MUCInvite, MUCInvite,
MUCDecline, MUCDecline,
MUCUserItem,
MUCActor,
) )
from slixmpp.xmlstream import register_stanza_plugin, ET from slixmpp.xmlstream import register_stanza_plugin, ET
@ -20,6 +22,9 @@ from slixmpp.xmlstream import register_stanza_plugin, ET
class TestMUC(SlixTest): class TestMUC(SlixTest):
def setUp(self): def setUp(self):
register_stanza_plugin(MUCPresence, MUCUserItem)
register_stanza_plugin(MUCMessage, MUCUserItem)
register_stanza_plugin(MUCUserItem, MUCActor)
register_stanza_plugin(MUCMessage, MUCInvite) register_stanza_plugin(MUCMessage, MUCInvite)
register_stanza_plugin(MUCMessage, MUCDecline) register_stanza_plugin(MUCMessage, MUCDecline)
register_stanza_plugin(MUCMessage, MUCStatus) register_stanza_plugin(MUCMessage, MUCStatus)
@ -33,7 +38,46 @@ class TestMUC(SlixTest):
register_stanza_plugin(MUCOwnerQuery, MUCOwnerDestroy) register_stanza_plugin(MUCOwnerQuery, MUCOwnerDestroy)
register_stanza_plugin(MUCAdminQuery, MUCAdminItem, iterable=True) register_stanza_plugin(MUCAdminQuery, MUCAdminItem, iterable=True)
def testPresence(self): def testPresence(self):
presence = Presence()
presence['from'] = JID('muc@service/nick')
presence['muc']['item']['affiliation'] = 'member'
presence['muc']['item']['role'] = 'participant'
presence['muc']['status_codes'] = (100, 110, 210)
self.check(presence, """
<presence from='muc@service/nick'>
<x xmlns='http://jabber.org/protocol/muc#user'>
<item affiliation='member' role='participant'/>
<status code='100'/>
<status code='110'/>
<status code='210'/>
</x>
</presence>
""", use_values=False)
def testPresenceReason(self):
presence = Presence()
presence['from'] = JID('muc@service/nick')
presence['muc']['item']['affiliation'] = 'member'
presence['muc']['item']['role'] = 'participant'
presence['muc']['item']['reason'] = 'coucou'
presence['muc']['item']['actor']['nick'] = 'JPR'
self.check(presence, """
<presence from='muc@service/nick'>
<x xmlns='http://jabber.org/protocol/muc#user'>
<item affiliation='member' role='participant'>
<actor nick="JPR"/>
<reason>coucou</reason>
</item>
</x>
</presence>
""", use_values=False)
def testPresenceLegacy(self):
presence = Presence() presence = Presence()
presence['from'] = JID('muc@service/nick') presence['from'] = JID('muc@service/nick')
presence['muc']['affiliation'] = 'member' presence['muc']['affiliation'] = 'member'
@ -96,5 +140,20 @@ class TestMUC(SlixTest):
</iq> </iq>
""", use_values=False) """, use_values=False)
def testSetAffiliation(self):
iq = Iq()
iq['type'] = 'set'
iq['id'] = '1'
iq['mucadmin_query']['item']['jid'] = JID('test@example.com')
iq['mucadmin_query']['item']['affiliation'] = 'owner'
self.check(iq, """
<iq type='set' id='1'>
<query xmlns='http://jabber.org/protocol/muc#admin'>
<item jid='test@example.com'
affiliation='owner'/>
</query>
</iq>
""", use_values=False)
suite = unittest.TestLoader().loadTestsFromTestCase(TestMUC) suite = unittest.TestLoader().loadTestsFromTestCase(TestMUC)