slixmpp-omemo/examples/echo_client.py
Maxime “pep” Buquet e00c646d95
echo_client: Handle decrypt_message's Optional-ness
Also comment about it

Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2021-12-15 00:56:19 +01:00

356 lines
14 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Slixmpp OMEMO plugin
Copyright (C) 2010 Nathanael C. Fritz
Copyright (C) 2019 Maxime “pep” Buquet <pep@bouah.net>
This file is part of slixmpp-omemo.
See the file LICENSE for copying permission.
"""
import os
import re
import sys
import asyncio
import logging
from getpass import getpass
from argparse import ArgumentParser
from slixmpp import ClientXMPP, JID
from slixmpp.exceptions import IqTimeout, IqError
from slixmpp.stanza import Message
import slixmpp_omemo
from slixmpp_omemo import PluginCouldNotLoad, MissingOwnKey, EncryptionPrepareException
from slixmpp_omemo import UndecidedException, UntrustedException, NoAvailableSession
from omemo.exceptions import MissingBundleException
log = logging.getLogger(__name__)
# Used by the EchoBot
LEVEL_DEBUG = 0
LEVEL_ERROR = 1
class EchoBot(ClientXMPP):
"""
A simple Slixmpp bot that will echo encrypted messages it receives, along
with a short thank you message.
For details on how to build a client with slixmpp, look at examples in the
slixmpp repository.
"""
eme_ns = 'eu.siacs.conversations.axolotl'
cmd_prefix = '!'
debug_level: int = LEVEL_DEBUG # or LEVEL_ERROR
def __init__(self, jid, password):
ClientXMPP.__init__(self, jid, password)
self.prefix_re: re.Pattern = re.compile('^%s' % self.cmd_prefix)
self.cmd_re: re.Pattern = re.compile('^%s(?P<command>\w+)(?:\s+(?P<args>.*))?' % self.cmd_prefix)
self.add_event_handler("session_start", self.start)
self.add_event_handler("message", self.message_handler)
def start(self, _event) -> None:
"""
Process the session_start event.
Typical actions for the session_start event are
requesting the roster and broadcasting an initial
presence stanza.
Arguments:
event -- An empty dictionary. The session_start
event does not provide any additional
data.
"""
self.send_presence()
self.get_roster()
def is_command(self, body: str) -> bool:
return self.prefix_re.match(body) is not None
async def handle_command(self, mto: JID, mtype: str, body: str) -> None:
match = self.cmd_re.match(body)
if match is None:
return None
groups = match.groupdict()
cmd = groups['command']
# args = groups['args']
if cmd == 'help':
await self.cmd_help(mto, mtype)
elif cmd == 'verbose':
await self.cmd_verbose(mto, mtype)
elif cmd == 'error':
await self.cmd_error(mto, mtype)
elif cmd == 'chain_length':
await self.cmd_chain_length(mto, mtype)
return None
async def cmd_help(self, mto: JID, mtype: str) -> None:
body = (
'I\'m the slixmpp-omemo echo bot! '
'The following commands are available:\n'
'{prefix}verbose Send message or reply with log messages\n'
'{prefix}error Send message or reply only on error\n'
).format(prefix=self.cmd_prefix)
return await self.encrypted_reply(mto, mtype, body)
async def cmd_verbose(self, mto: JID, mtype: str) -> None:
self.debug_level = LEVEL_DEBUG
body = '''Debug level set to 'verbose'.'''
return await self.encrypted_reply(mto, mtype, body)
async def cmd_error(self, mto: JID, mtype: str) -> None:
self.debug_level = LEVEL_ERROR
body = '''Debug level set to 'error'.'''
return await self.encrypted_reply(mto, mtype, body)
async def cmd_chain_length(self, mto: JID, mtype: str) -> None:
body = (
'lengths: %r\n' % self['xep_0384']._chain_lengths(mto) +
'should heartbeat: %r' % self['xep_0384'].should_heartbeat(mto)
)
return await self.encrypted_reply(mto, mtype, body)
def message_handler(self, msg: Message) -> None:
asyncio.ensure_future(self.message(msg))
async def message(self, msg: Message, allow_untrusted: bool = False) -> None:
"""
Process incoming message stanzas. Be aware that this also
includes MUC messages and error messages. It is usually
a good idea to check the messages's type before processing
or sending replies.
Arguments:
msg -- The received message stanza. See the documentation
for stanza objects and the Message stanza to see
how it may be used.
"""
mfrom = mto = msg['from']
mtype = msg['type']
if mtype not in ('chat', 'normal'):
return None
if not self['xep_0384'].is_encrypted(msg):
if self.debug_level == LEVEL_DEBUG:
await self.plain_reply(mto, mtype, 'Echo unencrypted message:%(body)s' % msg)
return None
try:
encrypted = msg['omemo_encrypted']
body = self['xep_0384'].decrypt_message(encrypted, mfrom, allow_untrusted)
# decrypt_message returns Optional[str]. It is possible to get
# body-less OMEMO message (see KeyTransportMessages), currently
# used for example to send heartbeats to other devices.
if body is None:
return None
decoded = body.decode('utf8')
if self.is_command(decoded):
await self.handle_command(mto, mtype, decoded)
elif self.debug_level == LEVEL_DEBUG:
await self.encrypted_reply(mto, mtype, 'Echo: %s' % decoded)
return None
except (MissingOwnKey,):
# The message is missing our own key, it was not encrypted for
# us, and we can't decrypt it.
await self.plain_reply(
mto, mtype,
'Error: Message not encrypted for me.',
)
return None
except (NoAvailableSession,) as exn:
# We received a message from that contained a session that we
# don't know about (deleted session storage, etc.). We can't
# decrypt the message, and it's going to be lost.
# Here, as we need to initiate a new encrypted session, it is
# best if we send an encrypted message directly. XXX: Is it
# where we talk about self-healing messages?
await self.encrypted_reply(
mto, mtype,
'Error: Message uses an encrypted '
'session I don\'t know about.',
)
return None
except (UndecidedException, UntrustedException) as exn:
# We received a message from an untrusted device. We can
# choose to decrypt the message nonetheless, with the
# `allow_untrusted` flag on the `decrypt_message` call, which
# we will do here. This is only possible for decryption,
# encryption will require us to decide if we trust the device
# or not. Clients _should_ indicate that the message was not
# trusted, or in undecided state, if they decide to decrypt it
# anyway.
await self.plain_reply(
mto, mtype,
"Error: Your device '%s' is not in my trusted devices." % exn.device,
)
# We resend, setting the `allow_untrusted` parameter to True.
await self.message(msg, allow_untrusted=True)
return None
except (EncryptionPrepareException,):
# Slixmpp tried its best, but there were errors it couldn't
# resolve. At this point you should have seen other exceptions
# and given a chance to resolve them already.
await self.plain_reply(mto, mtype, 'Error: I was not able to decrypt the message.')
return None
except (Exception,) as exn:
await self.plain_reply(mto, mtype, 'Error: Exception occured while attempting decryption.\n%r' % exn)
raise
return None
async def plain_reply(self, mto: JID, mtype: str, body):
"""
Helper to reply to messages
"""
msg = self.make_message(mto=mto, mtype=mtype)
msg['body'] = body
return msg.send()
async def encrypted_reply(self, mto: JID, mtype: str, body):
"""Helper to reply with encrypted messages"""
msg = self.make_message(mto=mto, mtype=mtype)
msg['eme']['namespace'] = self.eme_ns
msg['eme']['name'] = self['xep_0380'].mechanisms[self.eme_ns]
expect_problems = {} # type: Optional[Dict[JID, List[int]]]
while True:
try:
# `encrypt_message` excepts the plaintext to be sent, a list of
# bare JIDs to encrypt to, and optionally a dict of problems to
# expect per bare JID.
#
# Note that this function returns an `<encrypted/>` object,
# and not a full Message stanza. This combined with the
# `recipients` parameter that requires for a list of JIDs,
# allows you to encrypt for 1:1 as well as groupchats (MUC).
#
# `expect_problems`: See EncryptionPrepareException handling.
recipients = [mto]
encrypt = await self['xep_0384'].encrypt_message(body, recipients, expect_problems)
msg.append(encrypt)
return msg.send()
except UndecidedException as exn:
# The library prevents us from sending a message to an
# untrusted/undecided barejid, so we need to make a decision here.
# This is where you prompt your user to ask what to do. In
# this bot we will automatically trust undecided recipients.
self['xep_0384'].trust(exn.bare_jid, exn.device, exn.ik)
# TODO: catch NoEligibleDevicesException
except EncryptionPrepareException as exn:
# This exception is being raised when the library has tried
# all it could and doesn't know what to do anymore. It
# contains a list of exceptions that the user must resolve, or
# explicitely ignore via `expect_problems`.
# TODO: We might need to bail out here if errors are the same?
for error in exn.errors:
if isinstance(error, MissingBundleException):
# We choose to ignore MissingBundleException. It seems
# to be somewhat accepted that it's better not to
# encrypt for a device if it has problems and encrypt
# for the rest, rather than error out. The "faulty"
# device won't be able to decrypt and should display a
# generic message. The receiving end-user at this
# point can bring up the issue if it happens.
self.plain_reply(
mto, mtype,
'Could not find keys for device "%d" of recipient "%s". Skipping.' %
(error.device, error.bare_jid),
)
jid = JID(error.bare_jid)
device_list = expect_problems.setdefault(jid, [])
device_list.append(error.device)
except (IqError, IqTimeout) as exn:
self.plain_reply(
mto, mtype,
'An error occured while fetching information on a recipient.\n%r' % exn,
)
return None
except Exception as exn:
await self.plain_reply(
mto, mtype,
'An error occured while attempting to encrypt.\n%r' % exn,
)
raise
return None
if __name__ == '__main__':
# Setup the command line arguments.
parser = ArgumentParser(description=EchoBot.__doc__)
# Output verbosity options.
parser.add_argument("-q", "--quiet", help="set logging to ERROR",
action="store_const", dest="loglevel",
const=logging.ERROR, default=logging.INFO)
parser.add_argument("-d", "--debug", help="set logging to DEBUG",
action="store_const", dest="loglevel",
const=logging.DEBUG, default=logging.INFO)
# JID and password options.
parser.add_argument("-j", "--jid", dest="jid",
help="JID to use")
parser.add_argument("-p", "--password", dest="password",
help="password to use")
# Data dir for omemo plugin
DATA_DIR = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'omemo',
)
parser.add_argument("--data-dir", dest="data_dir",
help="data directory", default=DATA_DIR)
args = parser.parse_args()
# Setup logging.
logging.basicConfig(level=args.loglevel,
format='%(levelname)-8s %(message)s')
if args.jid is None:
args.jid = input("Username: ")
if args.password is None:
args.password = getpass("Password: ")
# Setup the EchoBot and register plugins. Note that while plugins may
# have interdependencies, the order in which you register them does
# not matter.
# Ensure OMEMO data dir is created
os.makedirs(args.data_dir, exist_ok=True)
xmpp = EchoBot(args.jid, args.password)
xmpp.register_plugin('xep_0030') # Service Discovery
xmpp.register_plugin('xep_0199') # XMPP Ping
xmpp.register_plugin('xep_0380') # Explicit Message Encryption
try:
xmpp.register_plugin(
'xep_0384',
{
'data_dir': args.data_dir,
},
module=slixmpp_omemo,
) # OMEMO
except (PluginCouldNotLoad,):
log.exception('And error occured when loading the omemo plugin.')
sys.exit(1)
# Connect to the XMPP server and start processing XMPP stanzas.
xmpp.connect()
xmpp.process()