XEP-0045: update methods for asyncio & stanza, typing

- This rewrites most of the stuff in the plugin by using the newly added
  elements instead of raw ElementTree stuff with hardcoded namespaces.
- Adds methods for affiliation/roles
- Adds some type hints
- Fix many cases where the call would simply not work since slixmpp
  exists (and break the API but it was already broken ¯\_(ツ)_/¯)
This commit is contained in:
mathieui 2020-11-22 18:07:03 +01:00
parent fe68d19f91
commit a9a7bdc6c3

View file

@ -9,6 +9,11 @@
from __future__ import with_statement
import logging
from typing import (
List,
Tuple,
Optional,
)
from slixmpp import (
Presence,
@ -38,6 +43,9 @@ from slixmpp.plugins.xep_0045.stanza import (
log = logging.getLogger(__name__)
AFFILIATIONS = ('outcast', 'member', 'admin', 'owner', 'none')
ROLES = ('moderator', 'participant', 'visitor', 'none')
class XEP_0045(BasePlugin):
@ -168,7 +176,6 @@ class XEP_0045(BasePlugin):
self.xmpp.event("muc::%s::message_error" % msg['from'].bare, msg)
def handle_groupchat_subject(self, msg: Message) -> None:
""" Handle a message coming from a muc indicating
a change of subject (or announcing it when joining the room)
@ -179,143 +186,96 @@ class XEP_0045(BasePlugin):
return None
self.xmpp.event('groupchat_subject', msg)
def jid_in_room(self, room, jid):
def jid_in_room(self, room: JID, jid: JID) -> bool:
for nick in self.rooms[room]:
entry = self.rooms[room][nick]
if entry is not None and entry['jid'].full == jid:
return True
return False
def get_nick(self, room, jid):
def get_nick(self, room: JID, jid: JID) -> Optional[str]:
for nick in self.rooms[room]:
entry = self.rooms[room][nick]
if entry is not None and entry['jid'].full == jid:
return nick
def configure_room(self, room, form=None, ifrom=None):
if form is None:
form = self.get_room_config(room, ifrom=ifrom)
iq = self.xmpp.make_iq_set()
iq['to'] = room
if ifrom is not None:
iq['from'] = ifrom
query = ET.Element('{http://jabber.org/protocol/muc#owner}query')
form['type'] = 'submit'
query.append(form)
iq.append(query)
# For now, swallow errors to preserve existing API
try:
result = iq.send()
except IqError:
return False
except IqTimeout:
return False
return True
def join_muc(self, room, nick, maxhistory="0", password='', wait=False, pstatus=None, pshow=None, pfrom=None):
def join_muc(self, room: JID, nick: str, maxhistory="0", password='',
pstatus='', pshow='', pfrom=''):
""" Join the specified room, requesting 'maxhistory' lines of history.
"""
stanza = self.xmpp.make_presence(pto="%s/%s" % (room, nick), pstatus=pstatus, pshow=pshow, pfrom=pfrom)
x = ET.Element('{http://jabber.org/protocol/muc}x')
stanza = self.xmpp.make_presence(
pto="%s/%s" % (room, nick), pstatus=pstatus,
pshow=pshow, pfrom=pfrom
)
stanza.enable('muc_join')
if password:
passelement = ET.Element('{http://jabber.org/protocol/muc}password')
passelement.text = password
x.append(passelement)
stanza['muc_join']['password'] = password
if maxhistory:
history = ET.Element('{http://jabber.org/protocol/muc}history')
if maxhistory == "0":
history.attrib['maxchars'] = maxhistory
if maxhistory == "0":
stanza['muc_join']['history']['maxchars'] = '0'
else:
history.attrib['maxstanzas'] = maxhistory
x.append(history)
stanza.append(x)
if not wait:
self.xmpp.send(stanza)
else:
#wait for our own room presence back
expect = ET.Element("{%s}presence" % self.xmpp.default_ns, {'from':"%s/%s" % (room, nick)})
self.xmpp.send(stanza, expect)
stanza['muc_join']['history']['maxstanzas'] = str(maxhistory)
self.xmpp.send(stanza)
self.rooms[room] = {}
self.our_nicks[room] = nick
def destroy(self, room, reason='', altroom = '', ifrom=None):
iq = self.xmpp.make_iq_set()
if ifrom is not None:
iq['from'] = ifrom
iq['to'] = room
query = ET.Element('{http://jabber.org/protocol/muc#owner}query')
destroy = ET.Element('{http://jabber.org/protocol/muc#owner}destroy')
async def destroy(self, room: JID, reason='', altroom='', *,
ifrom: Optional[JID] = None, **iqkwargs) -> Iq:
iq = self.xmpp.make_iq_set(ifrom=ifrom, ito=room)
iq.enable('mucowner_query')
iq['mucowner_query'].enable('destroy')
if altroom:
destroy.attrib['jid'] = altroom
xreason = ET.Element('{http://jabber.org/protocol/muc#owner}reason')
xreason.text = reason
destroy.append(xreason)
query.append(destroy)
iq.append(query)
# For now, swallow errors to preserve existing API
try:
r = iq.send()
except IqError:
return False
except IqTimeout:
return False
return True
iq['mucowner_query']['destroy']['jid'] = altroom
if reason:
iq['mucowner_query']['destroy']['reason'] = reason
await iq.send(**iqkwargs)
def set_affiliation(self, room, jid=None, nick=None, affiliation='member', ifrom=None):
async def set_affiliation(self, room: JID, jid: Optional[JID] = None, nick: Optional[str] = None, *, affiliation: str,
ifrom: Optional[JID] = None, **iqkwargs):
""" Change room affiliation."""
if affiliation not in ('outcast', 'member', 'admin', 'owner', 'none'):
raise TypeError
query = ET.Element('{http://jabber.org/protocol/muc#admin}query')
if nick is not None:
item = ET.Element('{http://jabber.org/protocol/muc#admin}item', {'affiliation':affiliation, 'nick':nick})
else:
item = ET.Element('{http://jabber.org/protocol/muc#admin}item', {'affiliation':affiliation, 'jid':jid})
query.append(item)
iq = self.xmpp.make_iq_set(query)
iq['to'] = room
iq['from'] = ifrom
# For now, swallow errors to preserve existing API
try:
result = iq.send()
except IqError:
return False
except IqTimeout:
return False
return True
if affiliation not in AFFILIATIONS:
raise ValueError('%s is not a valid affiliation' % affiliation)
if not any((jid, nick)):
raise ValueError('One of jid or nick must be set')
iq = self.xmpp.make_iq_set(ito=room, ifrom=ifrom)
iq.enable('mucadmin_query')
item = MUCAdminItem()
item['affiliation'] = affiliation
if nick:
item['nick'] = nick
if jid:
item['jid'] = jid
iq['mucadmin_query'].append(item)
await iq.send(**iqkwargs)
def set_role(self, room, nick, role):
async def set_role(self, room: JID, nick: str, role: str, *,
ifrom: Optional[JID] = None, **iqkwargs) -> Iq:
""" Change role property of a nick in a room.
Typically, roles are temporary (they last only as long as you are in the
room), whereas affiliations are permanent (they last across groupchat
sessions).
"""
if role not in ('moderator', 'participant', 'visitor', 'none'):
raise TypeError
query = ET.Element('{http://jabber.org/protocol/muc#admin}query')
item = ET.Element('item', {'role':role, 'nick':nick})
query.append(item)
iq = self.xmpp.make_iq_set(query)
iq['to'] = room
result = iq.send()
if result is False or result['type'] != 'result':
raise ValueError
return True
if role not in ROLES:
raise ValueError("Role %s does not exist" % role)
iq = self.xmpp.make_iq_set(ito=room, ifrom=ifrom)
iq.enable('mucadmin_query')
item = MUCAdminItem()
item['role'] = role
item['nick'] = nick
iq['mucadmin_query'].append(item)
await iq.send(**iqkwargs)
def invite(self, room, jid, reason='', mfrom=''):
def invite(self, room: JID, jid: JID, reason='', *,
mfrom: Optional[JID] = None):
""" Invite a jid to a room."""
msg = self.xmpp.make_message(room)
msg['from'] = mfrom
x = ET.Element('{http://jabber.org/protocol/muc#user}x')
invite = ET.Element('{http://jabber.org/protocol/muc#user}invite', {'to': jid})
msg = self.xmpp.make_message(room, mfrom=mfrom)
msg.enable('muc')
msg['muc']['invite'] = jid
if reason:
rxml = ET.Element('{http://jabber.org/protocol/muc#user}reason')
rxml.text = reason
invite.append(rxml)
x.append(invite)
msg.append(x)
msg['muc']['invite']['reason'] = reason
self.xmpp.send(msg)
def leave_muc(self, room, nick, msg='', pfrom=None):
def leave_muc(self, room: JID, nick: str, msg='', pfrom=None):
""" Leave the specified room.
"""
if msg:
@ -324,44 +284,77 @@ class XEP_0045(BasePlugin):
self.xmpp.send_presence(pshow='unavailable', pto="%s/%s" % (room, nick), pfrom=pfrom)
del self.rooms[room]
def get_room_config(self, room, ifrom=''):
iq = self.xmpp.make_iq_get('http://jabber.org/protocol/muc#owner')
iq['to'] = room
iq['from'] = ifrom
async def get_room_config(self, room: JID, ifrom=''):
"""Get the room config form in 0004 plugin format """
iq = self.xmpp.make_iq_get(stanza.NS_OWNER, ito=room, ifrom=ifrom)
# For now, swallow errors to preserve existing API
try:
result = iq.send()
except IqError:
raise ValueError
except IqTimeout:
raise ValueError
result = await iq.send()
form = result.xml.find('{http://jabber.org/protocol/muc#owner}query/{jabber:x:data}x')
if form is None:
raise ValueError
raise ValueError("Configuration form not found")
return self.xmpp.plugin['xep_0004'].build_form(form)
def cancel_config(self, room, ifrom=None):
query = ET.Element('{http://jabber.org/protocol/muc#owner}query')
async def cancel_config(self, room: JID, *,
ifrom: Optional[JID] = None, **iqkwargs) -> Iq:
"""Cancel a requested config form"""
query = MUCOwnerQuery()
x = ET.Element('{jabber:x:data}x', type='cancel')
query.append(x)
iq = self.xmpp.make_iq_set(query)
iq['to'] = room
iq['from'] = ifrom
iq.send()
iq = self.xmpp.make_iq_set(query, ito=room, ifrom=ifrom)
return await iq.send(**iqkwargs)
def set_room_config(self, room, config, ifrom=''):
query = ET.Element('{http://jabber.org/protocol/muc#owner}query')
async def set_room_config(self, room: JID, config, *,
ifrom: Optional[JID] = None, **iqkwargs) -> Iq:
"""Send a room config form"""
query = MUCOwnerQuery()
config['type'] = 'submit'
query.append(config)
iq = self.xmpp.make_iq_set(query)
iq['to'] = room
iq['from'] = ifrom
iq.send()
iq = self.xmpp.make_iq_set(query, ito=room, ifrom=ifrom)
return await iq.send(**iqkwargs)
def get_joined_rooms(self):
async def get_affiliation_list(self, room: JID, affiliation: str, *,
ifrom: Optional[JID] = None, **iqkwargs) -> List[JID]:
""""Get a list of JIDs with the specified affiliation"""
iq = self.xmpp.make_iq_get(stanza.NS_ADMIN, ito=room, ifrom=ifrom)
iq['mucadmin_query']['item']['affiliation'] = affiliation
result = await iq.send(**iqkwargs)
return [item['jid'] for item in result['mucadmin_query']]
async def get_roles_list(self, room: JID, role: str, *,
ifrom: Optional[JID] = None, **iqkwargs) -> List[str]:
""""Get a list of JIDs with the specified role"""
iq = self.xmpp.make_iq_get(stanza.NS_ADMIN, ito=room, ifrom=ifrom)
iq['mucadmin_query']['item']['role'] = role
result = await iq.send(**iqkwargs)
return [item['nick'] for item in result['mucadmin_query']]
async def send_affiliation_list(self, room: JID, affiliations: List[Tuple[JID, str]], *,
ifrom: Optional[JID] = None, **iqkwargs) -> Iq:
"""Send an affiliation delta list"""
iq = self.xmpp.make_iq_set(ito=room, ifrom=ifrom)
for jid, affiliation in affiliations:
item = MUCAdminItem()
item['jid'] = jid
item['affiliation'] = affiliation
iq['mucadmin_query'].append(item)
return await iq.send(**iqkwargs)
async def send_role_list(self, room: JID, roles: List[Tuple[str, str]], *,
ifrom: Optional[JID], **iqkwargs) -> Iq:
"""Send a role delta list"""
iq = self.xmpp.make_iq_set(ito=room, ifrom=ifrom)
for nick, affiliation in roles:
item = MUCAdminItem()
item['nick'] = nick
item['affiliation'] = affiliation
iq['mucadmin_query'].append(item)
return await iq.send(**iqkwargs)
def get_joined_rooms(self) -> List[JID]:
return self.rooms.keys()
def get_our_jid_in_room(self, room_jid):
def get_our_jid_in_room(self, room_jid: JID) -> str:
""" Return the jid we're using in a room.
"""
return "%s/%s" % (room_jid, self.our_nicks[room_jid])
@ -375,19 +368,15 @@ class XEP_0045(BasePlugin):
else:
return None
def get_roster(self, room):
def get_roster(self, room: JID) -> List[str]:
""" Get the list of nicks in a room.
"""
if room not in self.rooms.keys():
return None
return self.rooms[room].keys()
def get_users_by_affiliation(cls, room, affiliation='member', ifrom=None):
if affiliation not in ('outcast', 'member', 'admin', 'owner', 'none'):
def get_users_by_affiliation(self, room: JID, affiliation='member', *, ifrom: Optional[JID] = None):
# Preserve old API
if affiliation not in AFFILIATIONS:
raise TypeError
query = ET.Element('{http://jabber.org/protocol/muc#admin}query')
item = ET.Element('{http://jabber.org/protocol/muc#admin}item', {'affiliation': affiliation})
query.append(item)
iq = cls.xmpp.Iq(sto=room, sfrom=ifrom, stype='get')
iq.append(query)
return iq.send()
return self.get_affiliation_list(room, affiliation, ifrom=ifrom)