From 331c1c1e21f6af2e23009d68bbf620aa64ff17a9 Mon Sep 17 00:00:00 2001 From: nicoco Date: Wed, 19 Jul 2023 07:26:35 +0200 Subject: [PATCH] XEP-0428: add fallback body and subject elements + tests + helpers to strip the fallback content --- slixmpp/plugins/xep_0428/stanza.py | 84 +++++++++++++++- tests/test_stanza_xep_0428.py | 149 +++++++++++++++++++++++++++++ 2 files changed, 228 insertions(+), 5 deletions(-) create mode 100644 tests/test_stanza_xep_0428.py diff --git a/slixmpp/plugins/xep_0428/stanza.py b/slixmpp/plugins/xep_0428/stanza.py index eb3c3dae..404ea5f3 100644 --- a/slixmpp/plugins/xep_0428/stanza.py +++ b/slixmpp/plugins/xep_0428/stanza.py @@ -1,8 +1,13 @@ - # Slixmpp: The Slick XMPP Library # Copyright (C) 2020 Mathieu Pasquet # This file is part of Slixmpp. # See the file LICENSE for copying permissio +from abc import ABC +try: + from typing import Literal +except ImportError: + from typing_extensions import Literal + from slixmpp.stanza import Message from slixmpp.xmlstream import ( ElementBase, @@ -10,14 +15,83 @@ from slixmpp.xmlstream import ( ) -NS = 'urn:xmpp:fallback:0' +NS = "urn:xmpp:fallback:0" class Fallback(ElementBase): namespace = NS - name = 'fallback' - plugin_attrib = 'fallback' + name = "fallback" + plugin_attrib = "fallback" + plugin_multi_attrib = "fallbacks" + interfaces = {"for"} + + def _find_fallback(self, fallback_for: str) -> "Fallback": + if self["for"] == fallback_for: + return self + for fallback in self.parent()["fallbacks"]: + if fallback["for"] == fallback_for: + return fallback + raise AttributeError("No fallback for this namespace", fallback_for) + + def get_stripped_body( + self, fallback_for: str, element: Literal["body", "subject"] = "body" + ) -> str: + """ + Get the body of a message, with the fallback part stripped + + :param fallback_for: namespace of the fallback to strip + :param element: set this to "subject" get the stripped subject instead + of body + + :return: body (or subject) content minus the fallback part + """ + fallback = self._find_fallback(fallback_for) + start = fallback[element]["start"] + end = fallback[element]["end"] + body = self.parent()[element] + if start == end == 0: + return "" + if start <= end < len(body): + return body[:start] + body[end:] + else: + return body + + +class FallbackMixin(ABC): + namespace = NS + name = NotImplemented + plugin_attrib = NotImplemented + interfaces = {"start", "end"} + + def set_start(self, v: int): + self._set_attr("start", str(v)) + + def get_start(self): + return _int_or_zero(self._get_attr("start")) + + def set_end(self, v: int): + self._set_attr("end", str(v)) + + def get_end(self): + return _int_or_zero(self._get_attr("end")) + + +class FallbackBody(FallbackMixin, ElementBase): + name = plugin_attrib = "body" + + +class FallbackSubject(FallbackMixin, ElementBase): + name = plugin_attrib = "subject" + + +def _int_or_zero(v: str): + try: + return int(v) + except ValueError: + return 0 def register_plugins(): - register_stanza_plugin(Message, Fallback) + register_stanza_plugin(Message, Fallback, iterable=True) + register_stanza_plugin(Fallback, FallbackBody) + register_stanza_plugin(Fallback, FallbackSubject) diff --git a/tests/test_stanza_xep_0428.py b/tests/test_stanza_xep_0428.py new file mode 100644 index 00000000..9cf2455f --- /dev/null +++ b/tests/test_stanza_xep_0428.py @@ -0,0 +1,149 @@ +import unittest + +from slixmpp import Message +from slixmpp.test import SlixTest +from slixmpp.plugins.xep_0428 import stanza + +from slixmpp.plugins import xep_0461 +from slixmpp.plugins import xep_0444 + + +class TestFallback(SlixTest): + def setUp(self): + stanza.register_plugins() + + def testSingleFallbackBody(self): + message = Message() + message["fallback"]["for"] = "ns" + message["fallback"]["body"]["start"] = 0 + message["fallback"]["body"]["end"] = 8 + + self.check( + message, # language=XML + """ + + + + + + """, + ) + + def testSingleFallbackSubject(self): + message = Message() + message["fallback"]["for"] = "ns" + message["fallback"]["subject"]["start"] = 0 + message["fallback"]["subject"]["end"] = 8 + + self.check( + message, # language=XML + """ + + + + + + """, + ) + + def testSingleFallbackWholeBody(self): + message = Message() + message["fallback"]["for"] = "ns" + message["fallback"].enable("body") + self.check( + message, # language=XML + """ + + + + + + """, + ) + + def testMultiFallback(self): + message = Message() + + f1 = stanza.Fallback() + f1["for"] = "ns1" + + f2 = stanza.Fallback() + f2["for"] = "ns2" + + message.append(f1) + message.append(f2) + + self.check( + message, # language=XML + """ + + + + + """, + ) + + for i, fallback in enumerate(message["fallbacks"], start=1): + self.assertEqual(fallback["for"], f"ns{i}") + + def testStripFallbackPartOfBody(self): + message = Message() + message["body"] = "> quoted\nsome-body" + message["fallback"]["for"] = xep_0461.stanza.NS + message["fallback"]["body"]["start"] = 0 + message["fallback"]["body"]["end"] = 9 + + self.check( + message, # language=XML + """ + + > quoted\nsome-body + + + + + """, + ) + + self.assertEqual( + message["fallback"].get_stripped_body(xep_0461.stanza.NS), "some-body" + ) + + def testStripWholeBody(self): + message = Message() + message["body"] = "> quoted\nsome-body" + message["fallback"]["for"] = "ns" + message["fallback"].enable("body") + + self.check( + message, # language=XML + """ + + > quoted\nsome-body + + + + + """, + ) + + self.assertEqual(message["fallback"].get_stripped_body("ns"), "") + + def testStripMultiFallback(self): + message = Message() + message["body"] = "> huuuuu\nšŸ‘" + + message["fallback"]["for"] = xep_0461.stanza.NS + message["fallback"]["body"]["start"] = 0 + message["fallback"]["body"]["end"] = 9 + + reaction_fallback = stanza.Fallback() + reaction_fallback["for"] = xep_0444.stanza.NS + reaction_fallback.enable("body") + message.append(reaction_fallback) + + self.assertEqual(message["fallback"].get_stripped_body(xep_0461.stanza.NS), "šŸ‘") + self.assertEqual(message["fallback"].get_stripped_body(xep_0444.stanza.NS), "") + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestFallback)