diff --git a/.gitignore b/.gitignore
index c15723d1..6d046004 100644
--- a/.gitignore
+++ b/.gitignore
@@ -13,3 +13,5 @@ slixmpp.egg-info/
.baboon/
.DS_STORE
.idea/
+.vscode/
+venv/
\ No newline at end of file
diff --git a/docs/api/plugins/index.rst b/docs/api/plugins/index.rst
index 60a6360d..e6aa6913 100644
--- a/docs/api/plugins/index.rst
+++ b/docs/api/plugins/index.rst
@@ -75,6 +75,7 @@ Plugin index
xep_0335
xep_0352
xep_0353
+ xep_0356
xep_0359
xep_0363
xep_0369
diff --git a/docs/api/plugins/xep_0356.rst b/docs/api/plugins/xep_0356.rst
new file mode 100644
index 00000000..694c7c01
--- /dev/null
+++ b/docs/api/plugins/xep_0356.rst
@@ -0,0 +1,17 @@
+
+XEP-0356: Privileged Entity
+===========================
+
+.. module:: slixmpp.plugins.xep_0356
+
+.. autoclass:: XEP_0356
+ :members:
+ :exclude-members: session_bind, plugin_init, plugin_end
+
+
+Stanza elements
+---------------
+
+.. automodule:: slixmpp.plugins.xep_0356.stanza
+ :members:
+ :undoc-members:
diff --git a/slixmpp/plugins/xep_0356/__init__.py b/slixmpp/plugins/xep_0356/__init__.py
new file mode 100644
index 00000000..d457a06b
--- /dev/null
+++ b/slixmpp/plugins/xep_0356/__init__.py
@@ -0,0 +1,7 @@
+from slixmpp.plugins.base import register_plugin
+
+from slixmpp.plugins.xep_0356 import stanza
+from slixmpp.plugins.xep_0356.stanza import Perm, Privilege
+from slixmpp.plugins.xep_0356.privilege import XEP_0356
+
+register_plugin(XEP_0356)
diff --git a/slixmpp/plugins/xep_0356/privilege.py b/slixmpp/plugins/xep_0356/privilege.py
new file mode 100644
index 00000000..a0bdb6a7
--- /dev/null
+++ b/slixmpp/plugins/xep_0356/privilege.py
@@ -0,0 +1,144 @@
+import logging
+import typing
+
+from slixmpp import Message, JID, Iq
+from slixmpp.plugins.base import BasePlugin
+from slixmpp.xmlstream.matcher import StanzaPath
+from slixmpp.xmlstream.handler import Callback
+from slixmpp.xmlstream import register_stanza_plugin
+
+from slixmpp.plugins.xep_0356 import stanza, Privilege, Perm
+
+
+log = logging.getLogger(__name__)
+
+
+class XEP_0356(BasePlugin):
+ """
+ XEP-0356: Privileged Entity
+
+ Events:
+
+ ::
+
+ privileges_advertised -- Received message/privilege from the server
+ """
+
+ name = "xep_0356"
+ description = "XEP-0356: Privileged Entity"
+ dependencies = {"xep_0297"}
+ stanza = stanza
+
+ granted_privileges = {"roster": "none", "message": "none", "presence": "none"}
+
+ def plugin_init(self):
+ if not self.xmpp.is_component:
+ log.error("XEP 0356 is only available for components")
+ return
+
+ stanza.register()
+
+ self.xmpp.register_handler(
+ Callback(
+ "Privileges",
+ StanzaPath("message/privilege"),
+ self._handle_privilege,
+ )
+ )
+
+ def plugin_end(self):
+ self.xmpp.remove_handler("Privileges")
+
+ def _handle_privilege(self, msg: Message):
+ """
+ Called when the XMPP server advertise the component's privileges.
+
+ Stores the privileges in this instance's granted_privileges attribute (a dict)
+ and raises the privileges_advertised event
+ """
+ for perm in msg["privilege"]["perms"]:
+ self.granted_privileges[perm["access"]] = perm["type"]
+ log.debug(f"Privileges: {self.granted_privileges}")
+ self.xmpp.event("privileges_advertised")
+
+ def send_privileged_message(self, msg: Message):
+ if self.granted_privileges["message"] == "outgoing":
+ self._make_privileged_message(msg).send()
+ else:
+ log.error(
+ "The server hasn't authorized us to send messages on behalf of other users"
+ )
+
+ def _make_privileged_message(self, msg: Message):
+ stanza = self.xmpp.make_message(
+ mto=self.xmpp.server_host, mfrom=self.xmpp.boundjid.bare
+ )
+ stanza["privilege"]["forwarded"].append(msg)
+ return stanza
+
+ def _make_get_roster(self, jid: typing.Union[JID, str], **iq_kwargs):
+ return self.xmpp.make_iq_get(
+ queryxmlns="jabber:iq:roster",
+ ifrom=self.xmpp.boundjid.bare,
+ ito=jid,
+ **iq_kwargs,
+ )
+
+ def _make_set_roster(
+ self,
+ jid: typing.Union[JID, str],
+ roster_items: dict,
+ **iq_kwargs,
+ ):
+ iq = self.xmpp.make_iq_set(
+ ifrom=self.xmpp.boundjid.bare,
+ ito=jid,
+ **iq_kwargs,
+ )
+ iq["roster"]["items"] = roster_items
+ return iq
+
+ async def get_roster(self, jid: typing.Union[JID, str], **send_kwargs) -> Iq:
+ """
+ Return the roster of user on the server the component has privileged access to.
+
+ Raises ValueError if the server did not advertise the corresponding privileges
+
+ :param jid: user we want to fetch the roster from
+ """
+ if self.granted_privileges["roster"] not in ("get", "both"):
+ log.error("The server did not grant us privileges to get rosters")
+ raise ValueError
+ else:
+ return await self._make_get_roster(jid).send(**send_kwargs)
+
+ async def set_roster(
+ self, jid: typing.Union[JID, str], roster_items: dict, **send_kwargs
+ ) -> Iq:
+ """
+ Return the roster of user on the server the component has privileged access to.
+
+ Raises ValueError if the server did not advertise the corresponding privileges
+
+ :param jid: user we want to add or modify roster items
+ :param roster_items: a dict containing the roster items' JIDs as keys and
+ nested dicts containing names, subscriptions and groups.
+ Example:
+ {
+ "friend1@example.com": {
+ "name": "Friend 1",
+ "subscription": "both",
+ "groups": ["group1", "group2"],
+ },
+ "friend2@example.com": {
+ "name": "Friend 2",
+ "subscription": "from",
+ "groups": ["group3"],
+ },
+ }
+ """
+ if self.granted_privileges["roster"] not in ("set", "both"):
+ log.error("The server did not grant us privileges to set rosters")
+ raise ValueError
+ else:
+ return await self._make_set_roster(jid, roster_items).send(**send_kwargs)
diff --git a/slixmpp/plugins/xep_0356/stanza.py b/slixmpp/plugins/xep_0356/stanza.py
new file mode 100644
index 00000000..ef01ee3e
--- /dev/null
+++ b/slixmpp/plugins/xep_0356/stanza.py
@@ -0,0 +1,47 @@
+from slixmpp.stanza import Message
+from slixmpp.xmlstream import (
+ ElementBase,
+ register_stanza_plugin,
+)
+from slixmpp.plugins.xep_0297 import Forwarded
+
+
+class Privilege(ElementBase):
+ namespace = "urn:xmpp:privilege:1"
+ name = "privilege"
+ plugin_attrib = "privilege"
+
+ def permission(self, access):
+ for perm in self["perms"]:
+ if perm["access"] == access:
+ return perm["type"]
+
+ def roster(self):
+ return self.permission("roster")
+
+ def message(self):
+ return self.permission("message")
+
+ def presence(self):
+ return self.permission("presence")
+
+ def add_perm(self, access, type):
+ # This should only be needed for servers, so maybe out of scope for slixmpp
+ perm = Perm()
+ perm["type"] = type
+ perm["access"] = access
+ self.append(perm)
+
+
+class Perm(ElementBase):
+ namespace = "urn:xmpp:privilege:1"
+ name = "perm"
+ plugin_attrib = "perm"
+ plugin_multi_attrib = "perms"
+ interfaces = {"type", "access"}
+
+
+def register():
+ register_stanza_plugin(Message, Privilege)
+ register_stanza_plugin(Privilege, Forwarded)
+ register_stanza_plugin(Privilege, Perm, iterable=True)
\ No newline at end of file
diff --git a/tests/test_stanza_xep_0356.py b/tests/test_stanza_xep_0356.py
new file mode 100644
index 00000000..ef116db2
--- /dev/null
+++ b/tests/test_stanza_xep_0356.py
@@ -0,0 +1,41 @@
+import unittest
+from slixmpp import Message
+from slixmpp.test import SlixTest
+from slixmpp.xmlstream import register_stanza_plugin
+
+from slixmpp.plugins.xep_0356 import stanza
+
+
+class TestPermissions(SlixTest):
+ def setUp(self):
+ stanza.register()
+
+ def testAdvertisePermission(self):
+ xmlstring = """
+
+
+
+
+
+
+
+ """
+ msg = self.Message()
+ msg["from"] = "capulet.net"
+ msg["to"] = "pubub.capulet.lit"
+ # This raises AttributeError: 'NoneType' object has no attribute 'use_origin_id'
+ # msg["id"] = "id"
+
+ for access, type_ in [
+ ("roster", "both"),
+ ("message", "outgoing"),
+ ("presence", "managed_entity"),
+ ]:
+ msg["privilege"].add_perm(access, type_)
+
+ self.check(msg, xmlstring)
+ # Should this one work? → # AttributeError: 'Message' object has no attribute 'permission'
+ # self.assertEqual(msg.permission["roster"], "both")
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestPermissions)
diff --git a/tests/test_stream_xep_0356.py b/tests/test_stream_xep_0356.py
new file mode 100644
index 00000000..2949daad
--- /dev/null
+++ b/tests/test_stream_xep_0356.py
@@ -0,0 +1,116 @@
+import unittest
+
+from slixmpp import ComponentXMPP, Iq, Message
+from slixmpp.roster import RosterItem
+from slixmpp.test import SlixTest
+
+
+class TestPermissions(SlixTest):
+ def setUp(self):
+ self.stream_start(
+ mode="component",
+ plugins=["xep_0356"],
+ jid="pubsub.capulet.lit",
+ server="capulet.net",
+ )
+
+ def testPluginEnd(self):
+ exc = False
+ try:
+ self.xmpp.plugin.disable("xep_0356")
+ except Exception as e:
+ exc = True
+ self.assertFalse(exc)
+
+ def testGrantedPrivileges(self):
+ # https://xmpp.org/extensions/xep-0356.html#example-4
+ results = {"event": False}
+ self.xmpp.add_event_handler(
+ "privileges_advertised", lambda msg: results.__setitem__("event", True)
+ )
+ self.recv(
+ """
+
+
+
+
+
+
+ """
+ )
+ self.assertEqual(self.xmpp["xep_0356"].granted_privileges["roster"], "both")
+ self.assertEqual(
+ self.xmpp["xep_0356"].granted_privileges["message"], "outgoing"
+ )
+ self.assertEqual(self.xmpp["xep_0356"].granted_privileges["presence"], "none")
+ self.assertTrue(results["event"])
+
+ def testGetRosterIq(self):
+ iq = self.xmpp["xep_0356"]._make_get_roster("juliet@example.com")
+ xmlstring = """
+
+
+
+ """
+ self.check(iq, xmlstring, use_values=False)
+
+ def testSetRosterIq(self):
+ jid = "juliet@example.com"
+ items = {
+ "friend1@example.com": {
+ "name": "Friend 1",
+ "subscription": "both",
+ "groups": ["group1", "group2"],
+ },
+ "friend2@example.com": {
+ "name": "Friend 2",
+ "subscription": "from",
+ "groups": ["group3"],
+ },
+ }
+ iq = self.xmpp["xep_0356"]._make_set_roster(jid, items)
+ xmlstring = f"""
+
+
+ -
+ group1
+ group2
+
+ -
+ group3
+
+
+
+ """
+ self.check(iq, xmlstring, use_values=False)
+
+ def testMakeOutgoingMessage(self):
+ xmlstring = """
+
+
+
+
+ I do not hate you
+
+
+
+
+ """
+ msg = Message()
+ msg["from"] = "juliet@capulet.lit"
+ msg["to"] = "romeo@montague.lit"
+ msg["body"] = "I do not hate you"
+
+ priv_msg = self.xmpp["xep_0356"]._make_privileged_message(msg)
+ self.check(priv_msg, xmlstring, use_values=False)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestPermissions)