#! /usr/bin/env python3 # -*- coding: utf-8 -*- # vim:fenc=utf-8 # # Copyright © 2021 Maxime “pep” Buquet # # Distributed under terms of the GPLv3 license. """ OMEMO Plugin. """ import os import asyncio import base64 import hashlib import logging from typing import Dict, List, Optional from poezio.plugin_e2ee import E2EEPlugin from poezio.xdg import DATA_HOME from poezio.tabs import ChatTab, DynamicConversationTab, StaticConversationTab, MucTab from omemo.exceptions import MissingBundleException from slixmpp import JID from slixmpp.stanza import Message from slixmpp.exceptions import IqError, IqTimeout from slixmpp_omemo import PluginCouldNotLoad, MissingOwnKey, NoAvailableSession from slixmpp_omemo import UndecidedException, UntrustedException, EncryptionPrepareException import slixmpp_omemo from pathlib import Path log = logging.getLogger(__name__) def jid_as_path(jid: JID) -> Path: """Ensure JID in folder names don't contain illegal chars for the FS""" jid_str = jid.bare.encode('utf-8') digest = hashlib.sha256(jid_str).digest() return Path(base64.b32encode(digest).decode('US-ASCII')) class Plugin(E2EEPlugin): """OMEMO (XEP-0384) Plugin""" encryption_name = 'omemo' eme_ns = slixmpp_omemo.OMEMO_BASE_NS replace_body_with_eme = True stanza_encryption = False encrypted_tags = [ (slixmpp_omemo.OMEMO_BASE_NS, 'encrypted'), ] self_muc_messages: Dict[str, str] = {} # TODO: Look into blind trust stuff. # https://gist.github.com/mar-v-in/b683220a55bc65dcdafc809be9c5d0e4 trust_states = { 'accepted': { 'verified', 'accepted', }, 'rejected': { 'undecided', 'distrusted', }, } supported_tab_types = (DynamicConversationTab, StaticConversationTab, MucTab) default_config = { # Some MUC services may not reflect the message ids properly, in which # case it is better to set this option to false. 'enable_muc': True, } def init(self) -> None: super().init() self.info = lambda i: self.api.information(i, 'Info') data_dir = os.path.join( DATA_HOME, 'omemo', jid_as_path(self.core.xmpp.boundjid), ) try: # Raise exception if folder exists so that we don't chmod again. os.makedirs(data_dir, mode=0o700, exist_ok=False) except OSError: # Folder already exists pass try: self.core.xmpp.register_plugin( 'xep_0384', { 'data_dir': data_dir, }, module=slixmpp_omemo, ) # OMEMO except (PluginCouldNotLoad,): log.exception('And error occured when loading the omemo plugin.') asyncio.ensure_future( self.core.xmpp['xep_0384'].session_start(self.core.xmpp.boundjid) ) def display_error(self, txt) -> None: """Poezio logger Helper""" self.api.information(txt, 'Error') def get_fingerprints(self, jid: JID) -> List[str]: """Return fingerprints for the provided JID""" devices = self.core.xmpp['xep_0384'].get_trust_for_jid(jid) # XXX: What to do with did -> None entries? # XXX: What to do with the active/inactive devices differenciation? # For now I'll merge both. We should probably display them separately # later on. devices['active'].update(devices['inactive']) return [ slixmpp_omemo.fp_from_ik(trust['key']) for trust in devices['active'].values() if trust is not None ] async def decrypt(self, message: Message, jid: Optional[JID], tab: ChatTab) -> None: if jid is None: self.display_error('Unable to decrypt the message.') return None # XXX: This is only needed to workaround a bug in poezio (fixed in # 00a91774) that makes it not give us realjids. Remove when there is a # poezio release including it. # The realjid of the participant needs to be retrieved in a MUC. if isinstance(tab, MucTab): user = tab.get_user_by_name(jid.resource) if user is not None and user.jid != JID(''): jid = user.jid body = None try: encrypted = message['omemo_encrypted'] body = await self.core.xmpp['xep_0384'].decrypt_message( encrypted, jid, # Always decrypt. Let us handle how we then warn the user. allow_untrusted=True, ) # Heartbeats will return a None body. if body is not None: message['body'] = body.decode('utf-8') except (MissingOwnKey,): # The message is missing our own key, it was not encrypted for # us, and we can't decrypt it. if (message['type'] == 'groupchat' and message['id'] in self.self_muc_messages): body = self.self_muc_messages.pop(message['id']) else: self.display_error( 'I can\'t decrypt this message as it ' 'is not encrypted for me.' ) except (NoAvailableSession,): # We received a message from that contained a session that we # don't know about (deleted session storage, etc.). We can't # decrypt the message, and it's going to be lost. # Here, as we need to initiate a new encrypted session, it is # best if we send an encrypted message directly. XXX: Is it # where we talk about self-healing messages? self.display_error( 'I can\'t decrypt this message as it uses an encrypted ' 'session I don\'t know about.', ) except (EncryptionPrepareException,): # Slixmpp tried its best, but there were errors it couldn't # resolve. At this point you should have seen other exceptions # and given a chance to resolve them already. self.display_error('I was not able to decrypt the message.') except (Exception,) as exn: self.display_error(f'An error occured while attempting decryption.\n{exn}') raise return None async def encrypt(self, message: Message, jids: Optional[List[JID]], _tab) -> None: if jids is None: self.display_error('Unable to encrypt the message.') return None body = message['body'] if self.config.get('enable_muc', True) and message['type'] == 'groupchat': self.self_muc_messages[message['id']] = body expect_problems = {} # type: Dict[JID, List[int]] while True: try: # `encrypt_message` excepts the plaintext to be sent, a list of # bare JIDs to encrypt to, and optionally a dict of problems to # expect per bare JID. # # Note that this function returns an `` object, # and not a full Message stanza. This combined with the # `recipients` parameter that requires for a list of JIDs, # allows you to encrypt for 1:1 as well as groupchats (MUC). recipients = jids encrypt = await self.core.xmpp['xep_0384'].encrypt_message( body, recipients, expect_problems, ) message.append(encrypt) return None except UndecidedException as exn: # The library prevents us from sending a message to an # untrusted/undecided barejid, so we need to make a decision here. # This is where you prompt your user to ask what to do. In # this bot we will automatically trust undecided recipients. await self.core.xmpp['xep_0384'].trust(exn.bare_jid, exn.device, exn.ik) # TODO: catch NoEligibleDevicesException except EncryptionPrepareException as exn: log.debug('FOO: EncryptionPrepareException: %r', exn.errors) for error in exn.errors: if isinstance(error, MissingBundleException): self.display_error( f'Could not find keys for device "{error.device}" ' f'of recipient "{error.bare_jid}". Skipping.' ) jid = JID(error.bare_jid) device_list = expect_problems.setdefault(jid, []) device_list.append(error.device) except (IqError, IqTimeout) as exn: self.display_error( 'An error occured while fetching information on a recipient.\n{exn}' ) return None return None if __name__ == '__main__': jid = JID(input('JID: ')) print(jid_as_path(jid))