XEP-0428: add fallback body and subject elements
+ tests + helpers to strip the fallback content
This commit is contained in:
parent
28a60c22e2
commit
331c1c1e21
2 changed files with 228 additions and 5 deletions
|
@ -1,8 +1,13 @@
|
|||
|
||||
# Slixmpp: The Slick XMPP Library
|
||||
# Copyright (C) 2020 Mathieu Pasquet <mathieui@mathieui.net>
|
||||
# This file is part of Slixmpp.
|
||||
# See the file LICENSE for copying permissio
|
||||
from abc import ABC
|
||||
try:
|
||||
from typing import Literal
|
||||
except ImportError:
|
||||
from typing_extensions import Literal
|
||||
|
||||
from slixmpp.stanza import Message
|
||||
from slixmpp.xmlstream import (
|
||||
ElementBase,
|
||||
|
@ -10,14 +15,83 @@ from slixmpp.xmlstream import (
|
|||
)
|
||||
|
||||
|
||||
NS = 'urn:xmpp:fallback:0'
|
||||
NS = "urn:xmpp:fallback:0"
|
||||
|
||||
|
||||
class Fallback(ElementBase):
|
||||
namespace = NS
|
||||
name = 'fallback'
|
||||
plugin_attrib = 'fallback'
|
||||
name = "fallback"
|
||||
plugin_attrib = "fallback"
|
||||
plugin_multi_attrib = "fallbacks"
|
||||
interfaces = {"for"}
|
||||
|
||||
def _find_fallback(self, fallback_for: str) -> "Fallback":
|
||||
if self["for"] == fallback_for:
|
||||
return self
|
||||
for fallback in self.parent()["fallbacks"]:
|
||||
if fallback["for"] == fallback_for:
|
||||
return fallback
|
||||
raise AttributeError("No fallback for this namespace", fallback_for)
|
||||
|
||||
def get_stripped_body(
|
||||
self, fallback_for: str, element: Literal["body", "subject"] = "body"
|
||||
) -> str:
|
||||
"""
|
||||
Get the body of a message, with the fallback part stripped
|
||||
|
||||
:param fallback_for: namespace of the fallback to strip
|
||||
:param element: set this to "subject" get the stripped subject instead
|
||||
of body
|
||||
|
||||
:return: body (or subject) content minus the fallback part
|
||||
"""
|
||||
fallback = self._find_fallback(fallback_for)
|
||||
start = fallback[element]["start"]
|
||||
end = fallback[element]["end"]
|
||||
body = self.parent()[element]
|
||||
if start == end == 0:
|
||||
return ""
|
||||
if start <= end < len(body):
|
||||
return body[:start] + body[end:]
|
||||
else:
|
||||
return body
|
||||
|
||||
|
||||
class FallbackMixin(ABC):
|
||||
namespace = NS
|
||||
name = NotImplemented
|
||||
plugin_attrib = NotImplemented
|
||||
interfaces = {"start", "end"}
|
||||
|
||||
def set_start(self, v: int):
|
||||
self._set_attr("start", str(v))
|
||||
|
||||
def get_start(self):
|
||||
return _int_or_zero(self._get_attr("start"))
|
||||
|
||||
def set_end(self, v: int):
|
||||
self._set_attr("end", str(v))
|
||||
|
||||
def get_end(self):
|
||||
return _int_or_zero(self._get_attr("end"))
|
||||
|
||||
|
||||
class FallbackBody(FallbackMixin, ElementBase):
|
||||
name = plugin_attrib = "body"
|
||||
|
||||
|
||||
class FallbackSubject(FallbackMixin, ElementBase):
|
||||
name = plugin_attrib = "subject"
|
||||
|
||||
|
||||
def _int_or_zero(v: str):
|
||||
try:
|
||||
return int(v)
|
||||
except ValueError:
|
||||
return 0
|
||||
|
||||
|
||||
def register_plugins():
|
||||
register_stanza_plugin(Message, Fallback)
|
||||
register_stanza_plugin(Message, Fallback, iterable=True)
|
||||
register_stanza_plugin(Fallback, FallbackBody)
|
||||
register_stanza_plugin(Fallback, FallbackSubject)
|
||||
|
|
149
tests/test_stanza_xep_0428.py
Normal file
149
tests/test_stanza_xep_0428.py
Normal file
|
@ -0,0 +1,149 @@
|
|||
import unittest
|
||||
|
||||
from slixmpp import Message
|
||||
from slixmpp.test import SlixTest
|
||||
from slixmpp.plugins.xep_0428 import stanza
|
||||
|
||||
from slixmpp.plugins import xep_0461
|
||||
from slixmpp.plugins import xep_0444
|
||||
|
||||
|
||||
class TestFallback(SlixTest):
|
||||
def setUp(self):
|
||||
stanza.register_plugins()
|
||||
|
||||
def testSingleFallbackBody(self):
|
||||
message = Message()
|
||||
message["fallback"]["for"] = "ns"
|
||||
message["fallback"]["body"]["start"] = 0
|
||||
message["fallback"]["body"]["end"] = 8
|
||||
|
||||
self.check(
|
||||
message, # language=XML
|
||||
"""
|
||||
<message>
|
||||
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
|
||||
<body start="0" end="8" />
|
||||
</fallback>
|
||||
</message>
|
||||
""",
|
||||
)
|
||||
|
||||
def testSingleFallbackSubject(self):
|
||||
message = Message()
|
||||
message["fallback"]["for"] = "ns"
|
||||
message["fallback"]["subject"]["start"] = 0
|
||||
message["fallback"]["subject"]["end"] = 8
|
||||
|
||||
self.check(
|
||||
message, # language=XML
|
||||
"""
|
||||
<message>
|
||||
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
|
||||
<subject start="0" end="8" />
|
||||
</fallback>
|
||||
</message>
|
||||
""",
|
||||
)
|
||||
|
||||
def testSingleFallbackWholeBody(self):
|
||||
message = Message()
|
||||
message["fallback"]["for"] = "ns"
|
||||
message["fallback"].enable("body")
|
||||
self.check(
|
||||
message, # language=XML
|
||||
"""
|
||||
<message>
|
||||
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
|
||||
<body />
|
||||
</fallback>
|
||||
</message>
|
||||
""",
|
||||
)
|
||||
|
||||
def testMultiFallback(self):
|
||||
message = Message()
|
||||
|
||||
f1 = stanza.Fallback()
|
||||
f1["for"] = "ns1"
|
||||
|
||||
f2 = stanza.Fallback()
|
||||
f2["for"] = "ns2"
|
||||
|
||||
message.append(f1)
|
||||
message.append(f2)
|
||||
|
||||
self.check(
|
||||
message, # language=XML
|
||||
"""
|
||||
<message>
|
||||
<fallback xmlns='urn:xmpp:fallback:0' for='ns1' />
|
||||
<fallback xmlns='urn:xmpp:fallback:0' for='ns2' />
|
||||
</message>
|
||||
""",
|
||||
)
|
||||
|
||||
for i, fallback in enumerate(message["fallbacks"], start=1):
|
||||
self.assertEqual(fallback["for"], f"ns{i}")
|
||||
|
||||
def testStripFallbackPartOfBody(self):
|
||||
message = Message()
|
||||
message["body"] = "> quoted\nsome-body"
|
||||
message["fallback"]["for"] = xep_0461.stanza.NS
|
||||
message["fallback"]["body"]["start"] = 0
|
||||
message["fallback"]["body"]["end"] = 9
|
||||
|
||||
self.check(
|
||||
message, # language=XML
|
||||
"""
|
||||
<message>
|
||||
<body>> quoted\nsome-body</body>
|
||||
<fallback xmlns='urn:xmpp:fallback:0' for='urn:xmpp:reply:0'>
|
||||
<body start="0" end="9" />
|
||||
</fallback>
|
||||
</message>
|
||||
""",
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
message["fallback"].get_stripped_body(xep_0461.stanza.NS), "some-body"
|
||||
)
|
||||
|
||||
def testStripWholeBody(self):
|
||||
message = Message()
|
||||
message["body"] = "> quoted\nsome-body"
|
||||
message["fallback"]["for"] = "ns"
|
||||
message["fallback"].enable("body")
|
||||
|
||||
self.check(
|
||||
message, # language=XML
|
||||
"""
|
||||
<message>
|
||||
<body>> quoted\nsome-body</body>
|
||||
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
|
||||
<body />
|
||||
</fallback>
|
||||
</message>
|
||||
""",
|
||||
)
|
||||
|
||||
self.assertEqual(message["fallback"].get_stripped_body("ns"), "")
|
||||
|
||||
def testStripMultiFallback(self):
|
||||
message = Message()
|
||||
message["body"] = "> huuuuu\n👍"
|
||||
|
||||
message["fallback"]["for"] = xep_0461.stanza.NS
|
||||
message["fallback"]["body"]["start"] = 0
|
||||
message["fallback"]["body"]["end"] = 9
|
||||
|
||||
reaction_fallback = stanza.Fallback()
|
||||
reaction_fallback["for"] = xep_0444.stanza.NS
|
||||
reaction_fallback.enable("body")
|
||||
message.append(reaction_fallback)
|
||||
|
||||
self.assertEqual(message["fallback"].get_stripped_body(xep_0461.stanza.NS), "👍")
|
||||
self.assertEqual(message["fallback"].get_stripped_body(xep_0444.stanza.NS), "")
|
||||
|
||||
|
||||
suite = unittest.TestLoader().loadTestsFromTestCase(TestFallback)
|
Loading…
Reference in a new issue