poezio-omemo/poezio_omemo/__init__.py

223 lines
8.5 KiB
Python
Raw Normal View History

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2019 Maxime “pep” Buquet <pep@bouah.net>
#
# Distributed under terms of the GPLv3 license.
"""
OMEMO Plugin.
"""
import os
import asyncio
import base64
import hashlib
import logging
from typing import Dict, List, Optional
from poezio.plugin_e2ee import E2EEPlugin
from poezio.xdg import DATA_HOME
from poezio.tabs import ChatTab, DynamicConversationTab, StaticConversationTab, MucTab
from omemo.exceptions import MissingBundleException
from slixmpp import JID
from slixmpp.stanza import Message
from slixmpp.exceptions import IqError, IqTimeout
from slixmpp_omemo import PluginCouldNotLoad, MissingOwnKey, NoAvailableSession
from slixmpp_omemo import UndecidedException, UntrustedException, EncryptionPrepareException
import slixmpp_omemo
log = logging.getLogger(__name__)
class Plugin(E2EEPlugin):
"""OMEMO (XEP-0384) Plugin"""
encryption_name = 'omemo'
eme_ns = slixmpp_omemo.OMEMO_BASE_NS
replace_body_with_eme = True
stanza_encryption = False
encrypted_tags = [
(slixmpp_omemo.OMEMO_BASE_NS, 'encrypted'),
]
self_muc_messages: Dict[str, str] = {}
# TODO: Look into blind trust stuff.
# https://gist.github.com/mar-v-in/b683220a55bc65dcdafc809be9c5d0e4
trust_states = {
'accepted': {
'verified',
'accepted',
}, 'rejected': {
'undecided',
'distrusted',
},
}
supported_tab_types = (DynamicConversationTab, StaticConversationTab, MucTab)
default_config = {
# Some MUC services may not reflect the message ids properly, in which
# case it is better to set this option to false.
'enable_muc': True,
}
def init(self) -> None:
super().init()
self.info = lambda i: self.api.information(i, 'Info')
jid_str = self.core.xmpp.boundjid.bare.encode('utf-8')
digest = hashlib.sha256(jid_str).digest()
hashed_jid = base64.b32encode(digest).decode('US-ASCII')
data_dir = os.path.join(DATA_HOME, 'omemo', hashed_jid)
os.makedirs(data_dir, exist_ok=True)
try:
self.core.xmpp.register_plugin(
'xep_0384', {
'data_dir': data_dir,
},
module=slixmpp_omemo,
) # OMEMO
except (PluginCouldNotLoad,):
log.exception('And error occured when loading the omemo plugin.')
asyncio.ensure_future(
self.core.xmpp['xep_0384'].session_start(self.core.xmpp.boundjid)
)
def display_error(self, txt) -> None:
"""Poezio logger Helper"""
self.api.information(txt, 'Error')
def get_fingerprints(self, jid: JID) -> List[str]:
"""Return fingerprints for the provided JID"""
devices = self.core.xmpp['xep_0384'].get_trust_for_jid(jid)
# XXX: What to do with did -> None entries?
# XXX: What to do with the active/inactive devices differenciation?
# For now I'll merge both. We should probably display them separately
# later on.
devices['active'].update(devices['inactive'])
return [
slixmpp_omemo.fp_from_ik(trust['key'])
for trust in devices['active'].values()
if trust is not None
]
def decrypt(self, message: Message, jid: Optional[JID], tab: ChatTab) -> None:
if jid is None:
self.display_error('Unable to decrypt the message.')
return None
# XXX: This is only needed to workaround a bug in poezio (fixed in
# 00a91774) that makes it not give us realjids. Remove when there is a
# poezio release including it.
# The realjid of the participant needs to be retrieved in a MUC.
if isinstance(tab, MucTab):
user = tab.get_user_by_name(jid.resource)
if user is not None and user.jid != JID(''):
jid = user.jid
body = None
try:
encrypted = message['omemo_encrypted']
body = self.core.xmpp['xep_0384'].decrypt_message(
encrypted,
jid,
# Always decrypt. Let us handle how we then warn the user.
allow_untrusted=True,
)
body = body.decode('utf-8')
except (MissingOwnKey,):
# The message is missing our own key, it was not encrypted for
# us, and we can't decrypt it.
if (message['type'] == 'groupchat' and
message['id'] in self.self_muc_messages):
body = self.self_muc_messages.pop(message['id'])
else:
self.display_error(
'I can\'t decrypt this message as it '
'is not encrypted for me.'
)
except (NoAvailableSession,) as exn:
# We received a message from that contained a session that we
# don't know about (deleted session storage, etc.). We can't
# decrypt the message, and it's going to be lost.
# Here, as we need to initiate a new encrypted session, it is
# best if we send an encrypted message directly. XXX: Is it
# where we talk about self-healing messages?
self.display_error(
'I can\'t decrypt this message as it uses an encrypted '
'session I don\'t know about.',
)
except (EncryptionPrepareException,):
# Slixmpp tried its best, but there were errors it couldn't
# resolve. At this point you should have seen other exceptions
# and given a chance to resolve them already.
self.display_error('I was not able to decrypt the message.')
except (Exception,) as exn:
self.display_error('An error occured while attempting decryption.\n%r' % exn)
raise
if body is not None:
message['body'] = body
async def encrypt(self, message: Message, jids: Optional[List[JID]], _tab) -> None:
if jids is None:
self.display_error('Unable to encrypt the message.')
return None
body = message['body']
if self.config.get('enable_muc', True) and message['type'] == 'groupchat':
self.self_muc_messages[message['id']] = body
expect_problems = {} # type: Dict[JID, List[int]]
while True:
try:
# `encrypt_message` excepts the plaintext to be sent, a list of
# bare JIDs to encrypt to, and optionally a dict of problems to
# expect per bare JID.
#
# Note that this function returns an `<encrypted/>` object,
# and not a full Message stanza. This combined with the
# `recipients` parameter that requires for a list of JIDs,
# allows you to encrypt for 1:1 as well as groupchats (MUC).
recipients = jids
encrypt = await self.core.xmpp['xep_0384'].encrypt_message(
body,
recipients,
expect_problems,
)
message.append(encrypt)
return None
except UndecidedException as exn:
# The library prevents us from sending a message to an
# untrusted/undecided barejid, so we need to make a decision here.
# This is where you prompt your user to ask what to do. In
# this bot we will automatically trust undecided recipients.
self.core.xmpp['xep_0384'].trust(exn.bare_jid, exn.device, exn.ik)
# TODO: catch NoEligibleDevicesException
except EncryptionPrepareException as exn:
log.debug('FOO: EncryptionPrepareException: %r', exn.errors)
for error in exn.errors:
if isinstance(error, MissingBundleException):
self.display_error(
'Could not find keys for device "%d" of recipient "%s". Skipping.' %
(error.device, error.bare_jid),
)
jid = JID(error.bare_jid)
device_list = expect_problems.setdefault(jid, [])
device_list.append(error.device)
except (IqError, IqTimeout) as exn:
self.display_error(
'An error occured while fetching information on a recipient.\n%r' % exn,
)
return None
return None