diff --git a/slixmpp/plugins/xep_0446/__init__.py b/slixmpp/plugins/xep_0446/__init__.py new file mode 100644 index 00000000..6bd714a7 --- /dev/null +++ b/slixmpp/plugins/xep_0446/__init__.py @@ -0,0 +1,6 @@ +from slixmpp.plugins.base import register_plugin + +from . import stanza +from .file_metadata import XEP_0446 + +register_plugin(XEP_0446) diff --git a/slixmpp/plugins/xep_0446/file_metadata.py b/slixmpp/plugins/xep_0446/file_metadata.py new file mode 100644 index 00000000..e917f6b4 --- /dev/null +++ b/slixmpp/plugins/xep_0446/file_metadata.py @@ -0,0 +1,20 @@ +import logging + +from slixmpp.plugins import BasePlugin + +from . import stanza + +log = logging.getLogger(__name__) + + +class XEP_0446(BasePlugin): + + """ + XEP-0446: File metadata element + + Minimum needed for xep 0447 (Stateless file sharing) + """ + + name = "xep_0446" + description = "XEP-0446: File metadata element" + stanza = stanza diff --git a/slixmpp/plugins/xep_0446/stanza.py b/slixmpp/plugins/xep_0446/stanza.py new file mode 100644 index 00000000..f887272d --- /dev/null +++ b/slixmpp/plugins/xep_0446/stanza.py @@ -0,0 +1,38 @@ +from datetime import datetime + +from slixmpp.plugins.xep_0082 import format_datetime, parse +from slixmpp.xmlstream import ElementBase + +NS = "urn:xmpp:file:metadata:0" + + +class File(ElementBase): + name = "file" + namespace = NS + plugin_attrib = "file" + interfaces = sub_interfaces = {"media-type", "name", "date", "size", "hash", "desc"} + + def set_size(self, size: int): + self._set_sub_text("size", str(size)) + + def get_size(self): + return _int_or_none(self._get_sub_text("size")) + + def get_date(self): + try: + return parse(self._get_sub_text("date")) + except ValueError: + return + + def set_date(self, stamp: datetime): + try: + self._set_sub_text("date", format_datetime(stamp)) + except ValueError: + pass + + +def _int_or_none(v): + try: + return int(v) + except ValueError: + return None diff --git a/slixmpp/plugins/xep_0447/__init__.py b/slixmpp/plugins/xep_0447/__init__.py new file mode 100644 index 00000000..88840635 --- /dev/null +++ b/slixmpp/plugins/xep_0447/__init__.py @@ -0,0 +1,11 @@ + +# Slixmpp: The Slick XMPP Library +# Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout +# This file is part of Slixmpp. +# See the file LICENSE for copying permission +from slixmpp.plugins.base import register_plugin + +from . import stanza +from .sfs import XEP_0447 + +register_plugin(XEP_0447) diff --git a/slixmpp/plugins/xep_0447/sfs.py b/slixmpp/plugins/xep_0447/sfs.py new file mode 100644 index 00000000..b6731258 --- /dev/null +++ b/slixmpp/plugins/xep_0447/sfs.py @@ -0,0 +1,64 @@ +import logging +from datetime import datetime +from pathlib import Path +from typing import Iterable, Optional + +from slixmpp.plugins import BasePlugin +from slixmpp.stanza import Message +from slixmpp.xmlstream import register_stanza_plugin + +from . import stanza + +log = logging.getLogger(__name__) + + +class XEP_0447(BasePlugin): + + """ + XEP-0447: Stateless File Sharing + + Only support outgoing SFS, incoming is not handled at all. + """ + + name = "xep_0447" + description = "XEP-0447: Stateless File Sharing" + dependencies = {"xep_0300", "xep_0446"} + stanza = stanza + + def plugin_init(self): + register_stanza_plugin(Message, stanza.StatelessFileSharing) + + register_stanza_plugin(stanza.StatelessFileSharing, stanza.Sources) + register_stanza_plugin( + stanza.StatelessFileSharing, self.xmpp["xep_0446"].stanza.File + ) + register_stanza_plugin(stanza.Sources, stanza.UrlData, iterable=True) + + def get_sfs( + self, + path: Path, + uris: Iterable[str], + media_type: Optional[str], + desc: Optional[str], + ): + sfs = stanza.StatelessFileSharing() + sfs["disposition"] = "inline" + for uri in uris: + ref = stanza.UrlData() + ref["target"] = uri + sfs["sources"].append(ref) + if media_type: + sfs["file"]["media-type"] = media_type + if desc: + sfs["file"]["desc"] = desc + sfs["file"]["name"] = path.name + + stat = path.stat() + sfs["file"]["size"] = stat.st_size + sfs["file"]["date"] = datetime.fromtimestamp(stat.st_mtime) + + h = self.xmpp.plugin["xep_0300"].compute_hash(path) + h["value"] = h["value"].decode() + sfs["file"].append(h) + + return sfs diff --git a/slixmpp/plugins/xep_0447/stanza.py b/slixmpp/plugins/xep_0447/stanza.py new file mode 100644 index 00000000..618ec537 --- /dev/null +++ b/slixmpp/plugins/xep_0447/stanza.py @@ -0,0 +1,21 @@ +from slixmpp.xmlstream import ElementBase + +NAMESPACE = "urn:xmpp:sfs:0" + + +class StatelessFileSharing(ElementBase): + name = "file-sharing" + plugin_attrib = "sfs" + namespace = NAMESPACE + interfaces = {"disposition"} + + +class Sources(ElementBase): + name = plugin_attrib = "sources" + namespace = NAMESPACE + + +class UrlData(ElementBase): + name = plugin_attrib = "url-data" + namespace = "http://jabber.org/protocol/url-data" + interfaces = {"target"} diff --git a/tests/test_stream_xep_0447.py b/tests/test_stream_xep_0447.py new file mode 100644 index 00000000..bff2b3e9 --- /dev/null +++ b/tests/test_stream_xep_0447.py @@ -0,0 +1,59 @@ +import unittest +from datetime import datetime +from base64 import b64encode +from pathlib import Path +from tempfile import NamedTemporaryFile +from hashlib import sha256 + +from slixmpp.test import SlixTest +from slixmpp.plugins.xep_0082 import format_datetime + + +class TestStatelessFileSharing(SlixTest): + def setUp(self): + self.stream_start( + mode="component", jid="whatevs.shakespeare.lit", plugins={"xep_0447"} + ) + + def tearDown(self): + self.stream_close() + + def test_set_file(self): + with NamedTemporaryFile("wb+") as f: + n = 10 + size = 0 + for i in range(n): + size += len(bytes(i)) + f.write(bytes(i)) + + f.seek(0) + h = b64encode(sha256(f.read()).digest()).decode() + sfs = self.xmpp["xep_0447"].get_sfs( + Path(f.name), + ["https://xxx.com"], + media_type="MEDIA", + desc="DESCRIPTION", + ) + + self.check( + sfs, + f""" + + + MEDIA + {Path(f.name).name} + {size} + {h} + DESCRIPTION + {format_datetime(datetime.fromtimestamp(Path(f.name).stat().st_mtime))} + + + + + + """, + use_values=False, + ) + +suite = unittest.TestLoader().loadTestsFromTestCase(TestStatelessFileSharing)