diff --git a/plugin.py b/plugin.py index 510dfcb..5a13190 100644 --- a/plugin.py +++ b/plugin.py @@ -12,7 +12,6 @@ from typing import List, Union import os import json -import time import base64 import asyncio from slixmpp.plugins.xep_0384.stanza import OMEMO_BASE_NS @@ -20,6 +19,7 @@ from slixmpp.plugins.xep_0384.stanza import OMEMO_DEVICES_NS, OMEMO_BUNDLES_NS from slixmpp.plugins.xep_0384.stanza import Devices, Device, Key, PreKeyPublic from slixmpp.plugins.base import BasePlugin, register_plugin from slixmpp.exceptions import IqError +from slixmpp.stanza import Message, Iq log = logging.getLogger(__name__) @@ -30,7 +30,7 @@ try: from omemo_backend_signal import BACKEND as SignalBackend from slixmpp.plugins.xep_0384.storage import SyncFileStorage from slixmpp.plugins.xep_0384.otpkpolicy import KeepingOTPKPolicy -except ImportError as e: +except (ImportError,): HAS_OMEMO = False TRUE_VALUES = {True, 'true', '1'} @@ -41,14 +41,30 @@ TRUE_VALUES = {True, 'true', '1'} def encode_public_key(key: bytes) -> bytes: return b'\x05' + key -def b64enc(data): + +def b64enc(data: bytes) -> str: return base64.b64encode(bytes(bytearray(data))).decode('ASCII') -def b64dec(data): +def b64dec(data: str) -> bytes: return base64.b64decode(data) +def _load_device_id(cache_dir: str) -> int: + filepath = os.path.join(cache_dir, 'device_id.json') + # Try reading file first, decoding, and if file was empty generate + # new DeviceID + try: + with open(filepath, 'r') as f: + did = json.load(f) + except (FileNotFoundError, json.JSONDecodeError): + did = generateDeviceID() + with open(filepath, 'w') as f: + json.dump(did, f) + + return did + + # XXX: This should probably be moved in plugins/base.py? class PluginCouldNotLoad(Exception): pass @@ -68,19 +84,17 @@ class XEP_0384(BasePlugin): backend_loaded = HAS_OMEMO - device_ids = {} - def plugin_init(self): if not self.backend_loaded: log.info("xep_0384 cannot be loaded as the backend omemo library " - "is not available") + "is not available") return storage = SyncFileStorage(self.cache_dir) otpkpolicy = KeepingOTPKPolicy() backend = SignalBackend bare_jid = self.xmpp.boundjid.bare - self._device_id = self._load_device_id(self.cache_dir) + self._device_id = _load_device_id(self.cache_dir) try: self._omemo = SessionManager.create( @@ -105,27 +119,13 @@ class XEP_0384(BasePlugin): self.xmpp.del_event_handler('pubsub_publish', self._receive_device_list) self.xmpp['xep_0163'].remove_interest(OMEMO_DEVICES_NS) - def _load_device_id(self, cache_dir): - filepath = os.path.join(cache_dir, 'device_id.json') - # Try reading file first, decoding, and if file was empty generate - # new DeviceID - try: - with open(filepath, 'r') as f: - did = json.load(f) - except (FileNotFoundError, json.JSONDecodeError): - did = generateDeviceID() - with open(filepath, 'w') as f: - json.dump(did, f) - - return did - def session_bind(self, _jid): self.xmpp['xep_0163'].add_interest(OMEMO_DEVICES_NS) - def my_device_id(self): + def my_device_id(self) -> int: return self._device_id - def _generate_bundle_iq(self): + def _generate_bundle_iq(self) -> Iq: bundle = self._omemo.public_bundle iq = self.xmpp.Iq(stype='set') @@ -151,13 +151,13 @@ class XEP_0384(BasePlugin): return iq - async def _publish_bundle(self): + async def _publish_bundle(self) -> None: if self._omemo.republish_bundle: iq = self._generate_bundle_iq() await iq.send() - def _store_device_ids(self, jid, items): - device_ids = [] + def _store_device_ids(self, jid: str, items) -> None: + device_ids = [] # type: List[int] for item in items: device_ids = [int(d['id']) for d in item['devices']] @@ -166,7 +166,7 @@ class XEP_0384(BasePlugin): break return self._omemo.newDeviceList(device_ids, str(jid)) - def _receive_device_list(self, msg): + def _receive_device_list(self, msg: Message) -> None: if msg['pubsub_event']['items']['node'] != OMEMO_DEVICES_NS: return @@ -181,7 +181,7 @@ class XEP_0384(BasePlugin): self._device_id not in active_devices: asyncio.ensure_future(self._set_device_list()) - async def _set_device_list(self): + async def _set_device_list(self) -> None: jid = self.xmpp.boundjid.bare try: @@ -216,14 +216,14 @@ class XEP_0384(BasePlugin): jid, OMEMO_DEVICES_NS, payload=payload, ) - def get_device_list(self, jid) -> List[str]: + def get_device_list(self, jid: str) -> List[str]: # XXX: Maybe someday worry about inactive devices somehow return self._omemo.getDevices(jid) - def is_encrypted(self, msg): + def is_encrypted(self, msg: Message) -> bool: return msg.xml.find('{%s}encrypted' % OMEMO_BASE_NS) is not None - def decrypt_message(self, msg) -> Union[None, str]: + def decrypt_message(self, msg: Message) -> Union[None, str]: header = msg['omemo_encrypted']['header'] payload = b64dec(msg['omemo_encrypted']['payload']['value'])