230 lines
8.8 KiB
Python
230 lines
8.8 KiB
Python
#! /usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
# vim:fenc=utf-8
|
|
#
|
|
# Copyright © 2021 Maxime “pep” Buquet <pep@bouah.net>
|
|
#
|
|
# Distributed under terms of the GPLv3 license.
|
|
|
|
"""
|
|
OMEMO Plugin.
|
|
"""
|
|
|
|
import os
|
|
import asyncio
|
|
import base64
|
|
import hashlib
|
|
import logging
|
|
from typing import Dict, List, Optional
|
|
|
|
from poezio.plugin_e2ee import E2EEPlugin
|
|
from poezio.xdg import DATA_HOME
|
|
from poezio.tabs import ChatTab, DynamicConversationTab, StaticConversationTab, MucTab
|
|
|
|
from omemo.exceptions import MissingBundleException
|
|
from slixmpp import JID
|
|
from slixmpp.stanza import Message
|
|
from slixmpp.exceptions import IqError, IqTimeout
|
|
from slixmpp_omemo import PluginCouldNotLoad, MissingOwnKey, NoAvailableSession
|
|
from slixmpp_omemo import UndecidedException, UntrustedException, EncryptionPrepareException
|
|
import slixmpp_omemo
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class Plugin(E2EEPlugin):
|
|
"""OMEMO (XEP-0384) Plugin"""
|
|
|
|
encryption_name = 'omemo'
|
|
eme_ns = slixmpp_omemo.OMEMO_BASE_NS
|
|
replace_body_with_eme = True
|
|
stanza_encryption = False
|
|
|
|
encrypted_tags = [
|
|
(slixmpp_omemo.OMEMO_BASE_NS, 'encrypted'),
|
|
]
|
|
|
|
self_muc_messages: Dict[str, str] = {}
|
|
|
|
# TODO: Look into blind trust stuff.
|
|
# https://gist.github.com/mar-v-in/b683220a55bc65dcdafc809be9c5d0e4
|
|
trust_states = {
|
|
'accepted': {
|
|
'verified',
|
|
'accepted',
|
|
}, 'rejected': {
|
|
'undecided',
|
|
'distrusted',
|
|
},
|
|
}
|
|
supported_tab_types = (DynamicConversationTab, StaticConversationTab, MucTab)
|
|
|
|
default_config = {
|
|
# Some MUC services may not reflect the message ids properly, in which
|
|
# case it is better to set this option to false.
|
|
'enable_muc': True,
|
|
}
|
|
|
|
def init(self) -> None:
|
|
super().init()
|
|
|
|
self.info = lambda i: self.api.information(i, 'Info')
|
|
|
|
# Ensure folder names don't contain illegal chars for the FS
|
|
jid_str = self.core.xmpp.boundjid.bare.encode('utf-8')
|
|
digest = hashlib.sha256(jid_str).digest()
|
|
hashed_jid = base64.b32encode(digest).decode('US-ASCII')
|
|
|
|
data_dir = os.path.join(DATA_HOME, 'omemo', hashed_jid)
|
|
|
|
try:
|
|
# Raise exception if folder exists so that we don't chmod again.
|
|
os.makedirs(data_dir, mode=0o700, exist_ok=False)
|
|
except OSError: # Folder already exists
|
|
pass
|
|
|
|
try:
|
|
self.core.xmpp.register_plugin(
|
|
'xep_0384', {
|
|
'data_dir': data_dir,
|
|
},
|
|
module=slixmpp_omemo,
|
|
) # OMEMO
|
|
except (PluginCouldNotLoad,):
|
|
log.exception('And error occured when loading the omemo plugin.')
|
|
|
|
asyncio.ensure_future(
|
|
self.core.xmpp['xep_0384'].session_start(self.core.xmpp.boundjid)
|
|
)
|
|
|
|
def display_error(self, txt) -> None:
|
|
"""Poezio logger Helper"""
|
|
self.api.information(txt, 'Error')
|
|
|
|
def get_fingerprints(self, jid: JID) -> List[str]:
|
|
"""Return fingerprints for the provided JID"""
|
|
devices = self.core.xmpp['xep_0384'].get_trust_for_jid(jid)
|
|
|
|
# XXX: What to do with did -> None entries?
|
|
# XXX: What to do with the active/inactive devices differenciation?
|
|
# For now I'll merge both. We should probably display them separately
|
|
# later on.
|
|
devices['active'].update(devices['inactive'])
|
|
return [
|
|
slixmpp_omemo.fp_from_ik(trust['key'])
|
|
for trust in devices['active'].values()
|
|
if trust is not None
|
|
]
|
|
|
|
async def decrypt(self, message: Message, jid: Optional[JID], tab: ChatTab) -> None:
|
|
if jid is None:
|
|
self.display_error('Unable to decrypt the message.')
|
|
return None
|
|
|
|
# XXX: This is only needed to workaround a bug in poezio (fixed in
|
|
# 00a91774) that makes it not give us realjids. Remove when there is a
|
|
# poezio release including it.
|
|
# The realjid of the participant needs to be retrieved in a MUC.
|
|
if isinstance(tab, MucTab):
|
|
user = tab.get_user_by_name(jid.resource)
|
|
if user is not None and user.jid != JID(''):
|
|
jid = user.jid
|
|
|
|
body = None
|
|
try:
|
|
encrypted = message['omemo_encrypted']
|
|
body = await self.core.xmpp['xep_0384'].decrypt_message(
|
|
encrypted,
|
|
jid,
|
|
# Always decrypt. Let us handle how we then warn the user.
|
|
allow_untrusted=True,
|
|
)
|
|
# Heartbeats will return a None body.
|
|
if body is not None:
|
|
message['body'] = body.decode('utf-8')
|
|
except (MissingOwnKey,):
|
|
# The message is missing our own key, it was not encrypted for
|
|
# us, and we can't decrypt it.
|
|
if (message['type'] == 'groupchat' and
|
|
message['id'] in self.self_muc_messages):
|
|
body = self.self_muc_messages.pop(message['id'])
|
|
else:
|
|
self.display_error(
|
|
'I can\'t decrypt this message as it '
|
|
'is not encrypted for me.'
|
|
)
|
|
except (NoAvailableSession,) as exn:
|
|
# We received a message from that contained a session that we
|
|
# don't know about (deleted session storage, etc.). We can't
|
|
# decrypt the message, and it's going to be lost.
|
|
# Here, as we need to initiate a new encrypted session, it is
|
|
# best if we send an encrypted message directly. XXX: Is it
|
|
# where we talk about self-healing messages?
|
|
self.display_error(
|
|
'I can\'t decrypt this message as it uses an encrypted '
|
|
'session I don\'t know about.',
|
|
)
|
|
except (EncryptionPrepareException,):
|
|
# Slixmpp tried its best, but there were errors it couldn't
|
|
# resolve. At this point you should have seen other exceptions
|
|
# and given a chance to resolve them already.
|
|
self.display_error('I was not able to decrypt the message.')
|
|
except (Exception,) as exn:
|
|
self.display_error('An error occured while attempting decryption.\n%r' % exn)
|
|
raise
|
|
|
|
return None
|
|
|
|
async def encrypt(self, message: Message, jids: Optional[List[JID]], _tab) -> None:
|
|
if jids is None:
|
|
self.display_error('Unable to encrypt the message.')
|
|
return None
|
|
|
|
body = message['body']
|
|
if self.config.get('enable_muc', True) and message['type'] == 'groupchat':
|
|
self.self_muc_messages[message['id']] = body
|
|
expect_problems = {} # type: Dict[JID, List[int]]
|
|
|
|
while True:
|
|
try:
|
|
# `encrypt_message` excepts the plaintext to be sent, a list of
|
|
# bare JIDs to encrypt to, and optionally a dict of problems to
|
|
# expect per bare JID.
|
|
#
|
|
# Note that this function returns an `<encrypted/>` object,
|
|
# and not a full Message stanza. This combined with the
|
|
# `recipients` parameter that requires for a list of JIDs,
|
|
# allows you to encrypt for 1:1 as well as groupchats (MUC).
|
|
recipients = jids
|
|
encrypt = await self.core.xmpp['xep_0384'].encrypt_message(
|
|
body,
|
|
recipients,
|
|
expect_problems,
|
|
)
|
|
message.append(encrypt)
|
|
return None
|
|
except UndecidedException as exn:
|
|
# The library prevents us from sending a message to an
|
|
# untrusted/undecided barejid, so we need to make a decision here.
|
|
# This is where you prompt your user to ask what to do. In
|
|
# this bot we will automatically trust undecided recipients.
|
|
await self.core.xmpp['xep_0384'].trust(exn.bare_jid, exn.device, exn.ik)
|
|
# TODO: catch NoEligibleDevicesException
|
|
except EncryptionPrepareException as exn:
|
|
log.debug('FOO: EncryptionPrepareException: %r', exn.errors)
|
|
for error in exn.errors:
|
|
if isinstance(error, MissingBundleException):
|
|
self.display_error(
|
|
'Could not find keys for device "%d" of recipient "%s". Skipping.' %
|
|
(error.device, error.bare_jid),
|
|
)
|
|
jid = JID(error.bare_jid)
|
|
device_list = expect_problems.setdefault(jid, [])
|
|
device_list.append(error.device)
|
|
except (IqError, IqTimeout) as exn:
|
|
self.display_error(
|
|
'An error occured while fetching information on a recipient.\n%r' % exn,
|
|
)
|
|
return None
|
|
|
|
return None
|