""" Slixmpp: The Slick XMPP Library Copyright (C) 2018 Maxime “pep” Buquet This file is part of Slixmpp. See the file LICENSE for copying permission. """ import logging from typing import Any, Dict, List, Union import os import json import base64 import asyncio from slixmpp.plugins.xep_0384.stanza import OMEMO_BASE_NS from slixmpp.plugins.xep_0384.stanza import OMEMO_DEVICES_NS, OMEMO_BUNDLES_NS from slixmpp.plugins.xep_0384.stanza import Bundle, Devices, Device, Encrypted, Key, PreKeyPublic from slixmpp.plugins.base import BasePlugin, register_plugin from slixmpp.exceptions import IqError from slixmpp.stanza import Message, Iq from slixmpp.jid import JID log = logging.getLogger(__name__) HAS_OMEMO = True try: from omemo.exceptions import MissingBundleException, NoEligibleDevicesException from omemo import SessionManager, ExtendedPublicBundle from omemo.util import generateDeviceID from omemo.backends import Backend from omemo_backend_signal import BACKEND as SignalBackend from slixmpp.plugins.xep_0384.storage import SyncFileStorage from slixmpp.plugins.xep_0384.otpkpolicy import KeepingOTPKPolicy except (ImportError,): HAS_OMEMO = False TRUE_VALUES = {True, 'true', '1'} def b64enc(data: bytes) -> str: return base64.b64encode(bytes(bytearray(data))).decode('ASCII') def b64dec(data: str) -> bytes: return base64.b64decode(data) def _load_device_id(cache_dir: str) -> int: filepath = os.path.join(cache_dir, 'device_id.json') # Try reading file first, decoding, and if file was empty generate # new DeviceID try: with open(filepath, 'r') as f: did = json.load(f) except (FileNotFoundError, json.JSONDecodeError): did = generateDeviceID() with open(filepath, 'w') as f: json.dump(did, f) return did # XXX: This should probably be moved in plugins/base.py? class PluginCouldNotLoad(Exception): pass # Generic exception class XEP0384(Exception): pass class MissingOwnKey(XEP0384): pass class NoEligibleDevices(XEP0384): pass class XEP_0384(BasePlugin): """ XEP-0384: OMEMO """ name = 'xep_0384' description = 'XEP-0384 OMEMO' dependencies = {'xep_0163'} default_config = { 'cache_dir': None, } backend_loaded = HAS_OMEMO def plugin_init(self): if not self.backend_loaded: log.info("xep_0384 cannot be loaded as the backend omemo library " "is not available") return storage = SyncFileStorage(self.cache_dir) otpkpolicy = KeepingOTPKPolicy() self._omemo_backend = SignalBackend bare_jid = self.xmpp.boundjid.bare self._device_id = _load_device_id(self.cache_dir) try: self._omemo = SessionManager.create( storage, otpkpolicy, self._omemo_backend, bare_jid, self._device_id, ) except: log.error("Couldn't load the OMEMO object; ¯\\_(ツ)_/¯") raise PluginCouldNotLoad self.xmpp.add_event_handler('pubsub_publish', self._receive_device_list) asyncio.ensure_future(self._set_device_list()) asyncio.ensure_future(self._publish_bundle()) def plugin_end(self): if not self.backend_loaded: return self.xmpp.del_event_handler('pubsub_publish', self._receive_device_list) self.xmpp['xep_0163'].remove_interest(OMEMO_DEVICES_NS) def session_bind(self, _jid): self.xmpp['xep_0163'].add_interest(OMEMO_DEVICES_NS) def my_device_id(self) -> int: return self._device_id def _generate_bundle_iq(self) -> Iq: bundle = self._omemo.public_bundle.serialize(self._omemo_backend) iq = self.xmpp.Iq(stype='set') publish = iq['pubsub']['publish'] publish['node'] = '%s:%d' % (OMEMO_BUNDLES_NS, self._device_id) payload = publish['item']['bundle'] signedPreKeyPublic = b64enc(bundle['spk']['key']) payload['signedPreKeyPublic']['value'] = signedPreKeyPublic payload['signedPreKeyPublic']['signedPreKeyId'] = str(bundle['spk']['id']) payload['signedPreKeySignature']['value'] = b64enc( bundle['spk_signature'] ) identityKey = b64enc(bundle['ik']) payload['identityKey']['value'] = identityKey prekeys = [] for otpk in bundle['otpks']: prekey = PreKeyPublic() prekey['preKeyId'] = str(otpk['id']) prekey['value'] = b64enc(otpk['key']) prekeys.append(prekey) payload['prekeys'] = prekeys return iq async def _publish_bundle(self) -> None: if self._omemo.republish_bundle: iq = self._generate_bundle_iq() await iq.send() async def _fetch_bundle(self, jid: str, device_id: int) -> Union[None, ExtendedPublicBundle]: node = '%s:%d' % (OMEMO_BUNDLES_NS, device_id) iq = await self.xmpp['xep_0060'].get_items(jid, node) bundle = iq['pubsub']['items']['item']['bundle'] return self._parse_bundle(self._omemo_backend, bundle) def _parse_bundle(self, backend: Backend, bundle: Bundle) -> ExtendedPublicBundle: ik = b64dec(bundle['identityKey']['value'].strip()) spk = { 'id': int(bundle['signedPreKeyPublic']['signedPreKeyId']), 'key': b64dec(bundle['signedPreKeyPublic']['value'].strip()), } spk_signature = b64dec(bundle['signedPreKeySignature']['value'].strip()) otpks = [] for prekey in bundle['prekeys']: otpks.append({ 'id': int(prekey['preKeyId']), 'key': b64dec(prekey['value'].strip()), }) return ExtendedPublicBundle.parse(backend, ik, spk, spk_signature, otpks) def _store_device_ids(self, jid: str, items) -> None: device_ids = [] # type: List[int] for item in items: device_ids = [int(d['id']) for d in item['devices']] # XXX: There should only be one item so this is fine, but slixmpp # loops forever otherwise. ??? break return self._omemo.newDeviceList(device_ids, str(jid)) def _receive_device_list(self, msg: Message) -> None: if msg['pubsub_event']['items']['node'] != OMEMO_DEVICES_NS: return jid = msg['from'].bare items = msg['pubsub_event']['items'] self._store_device_ids(jid, items) device_ids = self.get_device_list(jid) active_devices = device_ids['active'] if jid == self.xmpp.boundjid.bare and \ self._device_id not in active_devices: asyncio.ensure_future(self._set_device_list()) async def _set_device_list(self) -> None: jid = self.xmpp.boundjid.bare try: iq = await self.xmpp['xep_0060'].get_items( self.xmpp.boundjid.bare, OMEMO_DEVICES_NS, ) items = iq['pubsub']['items'] self._store_device_ids(jid, items) except IqError as iq_err: if iq_err.condition == "item-not-found": self._store_device_ids(jid, []) else: return # XXX: Handle this! device_ids = self.get_device_list(jid) # Verify that this device in the list and set it if necessary if self._device_id in device_ids: return device_ids['active'].add(self._device_id) devices = [] for i in device_ids['active']: d = Device() d['id'] = str(i) devices.append(d) payload = Devices() payload['devices'] = devices await self.xmpp['xep_0060'].publish( jid, OMEMO_DEVICES_NS, payload=payload, ) def get_device_list(self, jid: str) -> List[str]: # XXX: Maybe someday worry about inactive devices somehow return self._omemo.getDevices(jid) def is_encrypted(self, msg: Message) -> bool: return msg.xml.find('{%s}encrypted' % OMEMO_BASE_NS) is not None def decrypt_message(self, msg: Message) -> Union[None, str]: header = msg['omemo_encrypted']['header'] payload = b64dec(msg['omemo_encrypted']['payload']['value']) jid = msg['from'].bare sid = header['sid'] key = header.xml.find("{%s}key[@rid='%s']" % ( OMEMO_BASE_NS, self._device_id)) if key is None: raise MissingOwnKey("Encrypted message is not for us") key = Key(key) isPrekeyMessage = key['prekey'] in TRUE_VALUES message = b64dec(key['value']) iv = b64dec(header['iv']['value']) # XXX: 'cipher' is part of KeyTransportMessages and is used when no payload # is passed. We do not implement this yet. _cipher, body = self._omemo.decryptMessage( jid, sid, iv, message, isPrekeyMessage, payload, ) return body def _fetching_bundle(self, jid: str, exn: Exception, key: str, _val: Any) -> bool: return isinstance(exn, MissingBundleException) and key == jid async def encrypt_message(self, plaintext: str, recipients: List[JID]) -> Encrypted: """ Returns an encrypted payload to be placed into a message. The API for getting an encrypted payload consists of trying first and fixing errors progressively. The actual sending happens once the application (us) thinks we're good to go. """ recipients = [jid.bare for jid in recipients] bundles = {} # type: Dict[str, Dict[int, ExtendedPublicBundle]] while True: errors = [] self._omemo.encryptMessage( recipients, plaintext.encode('utf-8'), bundles, callback=lambda *args: errors.append(args), always_trust=True, dry_run=True, ) if not errors: break no_eligible_devices = [] for (exn, key, val) in errors: if isinstance(exn, MissingBundleException): bundle = await self._fetch_bundle(key, val) if bundle is not None: devices = bundles.setdefault(key, {}) devices[val] = bundle elif isinstance(exn, NoEligibleDevicesException): # This error is apparently returned every time the omemo # lib couldn't find a device to encrypt to for a # particular JID. # In case there is also an MissingBundleException in the # returned errors, ignore this and retry later, assuming # the fetching of the bundle succeeded. TODO: Ensure that it # did. # This exception is mostly useful when a contact does not # do OMEMO, or hasn't published any device list for any # other reason. if any(self._fetching_bundle(key, *err) for err in errors): continue no_eligible_devices.append(key) if no_eligible_devices: raise NoEligibleDevices(no_eligible_devices) break # Attempt encryption payload = Encrypted() payload['omemo_encrypted'] = self._omemo.encryptMessage( recipients, plaintext.encode('utf-8'), bundles, always_trust=True, ) return payload register_plugin(XEP_0384)