358 lines
14 KiB
Python
358 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Slixmpp OMEMO plugin
|
|
Copyright (C) 2010 Nathanael C. Fritz
|
|
Copyright (C) 2019 Maxime “pep” Buquet <pep@bouah.net>
|
|
This file is part of slixmpp-omemo.
|
|
|
|
See the file LICENSE for copying permission.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import asyncio
|
|
import logging
|
|
from getpass import getpass
|
|
from argparse import ArgumentParser
|
|
|
|
from slixmpp import ClientXMPP, JID
|
|
from slixmpp.exceptions import IqTimeout, IqError
|
|
from slixmpp.stanza import Message
|
|
import slixmpp_omemo
|
|
from slixmpp_omemo import PluginCouldNotLoad, MissingOwnKey, EncryptionPrepareException
|
|
from slixmpp_omemo import UndecidedException, UntrustedException, NoAvailableSession
|
|
from omemo.exceptions import MissingBundleException
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Used by the EchoBot
|
|
LEVEL_DEBUG = 0
|
|
LEVEL_ERROR = 1
|
|
|
|
|
|
class EchoBot(ClientXMPP):
|
|
|
|
"""
|
|
A simple Slixmpp bot that will echo encrypted messages it receives, along
|
|
with a short thank you message.
|
|
|
|
For details on how to build a client with slixmpp, look at examples in the
|
|
slixmpp repository.
|
|
"""
|
|
|
|
eme_ns = 'eu.siacs.conversations.axolotl'
|
|
cmd_prefix = '!'
|
|
debug_level: int = LEVEL_DEBUG # or LEVEL_ERROR
|
|
|
|
def __init__(self, jid, password):
|
|
ClientXMPP.__init__(self, jid, password)
|
|
|
|
self.prefix_re: re.Pattern = re.compile('^%s' % self.cmd_prefix)
|
|
self.cmd_re: re.Pattern = re.compile('^%s(?P<command>\w+)(?:\s+(?P<args>.*))?' % self.cmd_prefix)
|
|
|
|
self.add_event_handler("session_start", self.start)
|
|
self.add_event_handler("message", self.message_handler)
|
|
|
|
def start(self, _event) -> None:
|
|
"""
|
|
Process the session_start event.
|
|
|
|
Typical actions for the session_start event are
|
|
requesting the roster and broadcasting an initial
|
|
presence stanza.
|
|
|
|
Arguments:
|
|
event -- An empty dictionary. The session_start
|
|
event does not provide any additional
|
|
data.
|
|
"""
|
|
self.send_presence()
|
|
self.get_roster()
|
|
|
|
def is_command(self, body: str) -> bool:
|
|
return self.prefix_re.match(body) is not None
|
|
|
|
async def handle_command(self, mto: JID, mtype: str, body: str) -> None:
|
|
match = self.cmd_re.match(body)
|
|
if match is None:
|
|
return None
|
|
|
|
groups = match.groupdict()
|
|
cmd = groups['command']
|
|
# args = groups['args']
|
|
|
|
if cmd == 'help':
|
|
await self.cmd_help(mto, mtype)
|
|
elif cmd == 'verbose':
|
|
await self.cmd_verbose(mto, mtype)
|
|
elif cmd == 'error':
|
|
await self.cmd_error(mto, mtype)
|
|
elif cmd == 'chain_length':
|
|
await self.cmd_chain_length(mto, mtype)
|
|
|
|
return None
|
|
|
|
async def cmd_help(self, mto: JID, mtype: str) -> None:
|
|
body = (
|
|
'I\'m the slixmpp-omemo echo bot! '
|
|
'The following commands are available:\n'
|
|
'{prefix}verbose Send message or reply with log messages\n'
|
|
'{prefix}error Send message or reply only on error\n'
|
|
).format(prefix=self.cmd_prefix)
|
|
return await self.encrypted_reply(mto, mtype, body)
|
|
|
|
async def cmd_verbose(self, mto: JID, mtype: str) -> None:
|
|
self.debug_level = LEVEL_DEBUG
|
|
body = '''Debug level set to 'verbose'.'''
|
|
return await self.encrypted_reply(mto, mtype, body)
|
|
|
|
async def cmd_error(self, mto: JID, mtype: str) -> None:
|
|
self.debug_level = LEVEL_ERROR
|
|
body = '''Debug level set to 'error'.'''
|
|
return await self.encrypted_reply(mto, mtype, body)
|
|
|
|
async def cmd_chain_length(self, mto: JID, mtype: str) -> None:
|
|
chain_length = await self['xep_0384']._chain_lengths(mto)
|
|
should_heartbeat = await self['xep_0384'].should_heartbeat(mto)
|
|
body = (
|
|
'lengths: %r\n' % chain_length +
|
|
'should heartbeat: %r' % should_heartbeat
|
|
)
|
|
return await self.encrypted_reply(mto, mtype, body)
|
|
|
|
def message_handler(self, msg: Message) -> None:
|
|
asyncio.ensure_future(self.message(msg))
|
|
|
|
async def message(self, msg: Message, allow_untrusted: bool = False) -> None:
|
|
"""
|
|
Process incoming message stanzas. Be aware that this also
|
|
includes MUC messages and error messages. It is usually
|
|
a good idea to check the messages's type before processing
|
|
or sending replies.
|
|
|
|
Arguments:
|
|
msg -- The received message stanza. See the documentation
|
|
for stanza objects and the Message stanza to see
|
|
how it may be used.
|
|
"""
|
|
mfrom = mto = msg['from']
|
|
mtype = msg['type']
|
|
|
|
if mtype not in ('chat', 'normal'):
|
|
return None
|
|
|
|
if not self['xep_0384'].is_encrypted(msg):
|
|
if self.debug_level == LEVEL_DEBUG:
|
|
await self.plain_reply(mto, mtype, 'Echo unencrypted message:%(body)s' % msg)
|
|
return None
|
|
|
|
try:
|
|
encrypted = msg['omemo_encrypted']
|
|
body = await self['xep_0384'].decrypt_message(encrypted, mfrom, allow_untrusted)
|
|
# decrypt_message returns Optional[str]. It is possible to get
|
|
# body-less OMEMO message (see KeyTransportMessages), currently
|
|
# used for example to send heartbeats to other devices.
|
|
if body is None:
|
|
return None
|
|
decoded = body.decode('utf8')
|
|
if self.is_command(decoded):
|
|
await self.handle_command(mto, mtype, decoded)
|
|
elif self.debug_level == LEVEL_DEBUG:
|
|
await self.encrypted_reply(mto, mtype, 'Echo: %s' % decoded)
|
|
return None
|
|
except (MissingOwnKey,):
|
|
# The message is missing our own key, it was not encrypted for
|
|
# us, and we can't decrypt it.
|
|
await self.plain_reply(
|
|
mto, mtype,
|
|
'Error: Message not encrypted for me.',
|
|
)
|
|
return None
|
|
except (NoAvailableSession,) as exn:
|
|
# We received a message from that contained a session that we
|
|
# don't know about (deleted session storage, etc.). We can't
|
|
# decrypt the message, and it's going to be lost.
|
|
# Here, as we need to initiate a new encrypted session, it is
|
|
# best if we send an encrypted message directly. XXX: Is it
|
|
# where we talk about self-healing messages?
|
|
await self.encrypted_reply(
|
|
mto, mtype,
|
|
'Error: Message uses an encrypted '
|
|
'session I don\'t know about.',
|
|
)
|
|
return None
|
|
except (UndecidedException, UntrustedException) as exn:
|
|
# We received a message from an untrusted device. We can
|
|
# choose to decrypt the message nonetheless, with the
|
|
# `allow_untrusted` flag on the `decrypt_message` call, which
|
|
# we will do here. This is only possible for decryption,
|
|
# encryption will require us to decide if we trust the device
|
|
# or not. Clients _should_ indicate that the message was not
|
|
# trusted, or in undecided state, if they decide to decrypt it
|
|
# anyway.
|
|
await self.plain_reply(
|
|
mto, mtype,
|
|
"Error: Your device '%s' is not in my trusted devices." % exn.device,
|
|
)
|
|
# We resend, setting the `allow_untrusted` parameter to True.
|
|
await self.message(msg, allow_untrusted=True)
|
|
return None
|
|
except (EncryptionPrepareException,):
|
|
# Slixmpp tried its best, but there were errors it couldn't
|
|
# resolve. At this point you should have seen other exceptions
|
|
# and given a chance to resolve them already.
|
|
await self.plain_reply(mto, mtype, 'Error: I was not able to decrypt the message.')
|
|
return None
|
|
except (Exception,) as exn:
|
|
await self.plain_reply(mto, mtype, 'Error: Exception occured while attempting decryption.\n%r' % exn)
|
|
raise
|
|
|
|
return None
|
|
|
|
async def plain_reply(self, mto: JID, mtype: str, body):
|
|
"""
|
|
Helper to reply to messages
|
|
"""
|
|
|
|
msg = self.make_message(mto=mto, mtype=mtype)
|
|
msg['body'] = body
|
|
return msg.send()
|
|
|
|
async def encrypted_reply(self, mto: JID, mtype: str, body):
|
|
"""Helper to reply with encrypted messages"""
|
|
|
|
msg = self.make_message(mto=mto, mtype=mtype)
|
|
msg['eme']['namespace'] = self.eme_ns
|
|
msg['eme']['name'] = self['xep_0380'].mechanisms[self.eme_ns]
|
|
|
|
expect_problems = {} # type: Optional[Dict[JID, List[int]]]
|
|
|
|
while True:
|
|
try:
|
|
# `encrypt_message` excepts the plaintext to be sent, a list of
|
|
# bare JIDs to encrypt to, and optionally a dict of problems to
|
|
# expect per bare JID.
|
|
#
|
|
# Note that this function returns an `<encrypted/>` object,
|
|
# and not a full Message stanza. This combined with the
|
|
# `recipients` parameter that requires for a list of JIDs,
|
|
# allows you to encrypt for 1:1 as well as groupchats (MUC).
|
|
#
|
|
# `expect_problems`: See EncryptionPrepareException handling.
|
|
recipients = [mto]
|
|
encrypt = await self['xep_0384'].encrypt_message(body, recipients, expect_problems)
|
|
msg.append(encrypt)
|
|
return msg.send()
|
|
except UndecidedException as exn:
|
|
# The library prevents us from sending a message to an
|
|
# untrusted/undecided barejid, so we need to make a decision here.
|
|
# This is where you prompt your user to ask what to do. In
|
|
# this bot we will automatically trust undecided recipients.
|
|
await self['xep_0384'].trust(exn.bare_jid, exn.device, exn.ik)
|
|
# TODO: catch NoEligibleDevicesException
|
|
except EncryptionPrepareException as exn:
|
|
# This exception is being raised when the library has tried
|
|
# all it could and doesn't know what to do anymore. It
|
|
# contains a list of exceptions that the user must resolve, or
|
|
# explicitely ignore via `expect_problems`.
|
|
# TODO: We might need to bail out here if errors are the same?
|
|
for error in exn.errors:
|
|
if isinstance(error, MissingBundleException):
|
|
# We choose to ignore MissingBundleException. It seems
|
|
# to be somewhat accepted that it's better not to
|
|
# encrypt for a device if it has problems and encrypt
|
|
# for the rest, rather than error out. The "faulty"
|
|
# device won't be able to decrypt and should display a
|
|
# generic message. The receiving end-user at this
|
|
# point can bring up the issue if it happens.
|
|
self.plain_reply(
|
|
mto, mtype,
|
|
'Could not find keys for device "%d" of recipient "%s". Skipping.' %
|
|
(error.device, error.bare_jid),
|
|
)
|
|
jid = JID(error.bare_jid)
|
|
device_list = expect_problems.setdefault(jid, [])
|
|
device_list.append(error.device)
|
|
except (IqError, IqTimeout) as exn:
|
|
self.plain_reply(
|
|
mto, mtype,
|
|
'An error occured while fetching information on a recipient.\n%r' % exn,
|
|
)
|
|
return None
|
|
except Exception as exn:
|
|
await self.plain_reply(
|
|
mto, mtype,
|
|
'An error occured while attempting to encrypt.\n%r' % exn,
|
|
)
|
|
raise
|
|
|
|
return None
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Setup the command line arguments.
|
|
parser = ArgumentParser(description=EchoBot.__doc__)
|
|
|
|
# Output verbosity options.
|
|
parser.add_argument("-q", "--quiet", help="set logging to ERROR",
|
|
action="store_const", dest="loglevel",
|
|
const=logging.ERROR, default=logging.INFO)
|
|
parser.add_argument("-d", "--debug", help="set logging to DEBUG",
|
|
action="store_const", dest="loglevel",
|
|
const=logging.DEBUG, default=logging.INFO)
|
|
|
|
# JID and password options.
|
|
parser.add_argument("-j", "--jid", dest="jid",
|
|
help="JID to use")
|
|
parser.add_argument("-p", "--password", dest="password",
|
|
help="password to use")
|
|
|
|
# Data dir for omemo plugin
|
|
DATA_DIR = os.path.join(
|
|
os.path.dirname(os.path.abspath(__file__)),
|
|
'omemo',
|
|
)
|
|
parser.add_argument("--data-dir", dest="data_dir",
|
|
help="data directory", default=DATA_DIR)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Setup logging.
|
|
logging.basicConfig(level=args.loglevel,
|
|
format='%(levelname)-8s %(message)s')
|
|
|
|
if args.jid is None:
|
|
args.jid = input("Username: ")
|
|
if args.password is None:
|
|
args.password = getpass("Password: ")
|
|
|
|
# Setup the EchoBot and register plugins. Note that while plugins may
|
|
# have interdependencies, the order in which you register them does
|
|
# not matter.
|
|
|
|
# Ensure OMEMO data dir is created
|
|
os.makedirs(args.data_dir, exist_ok=True)
|
|
|
|
xmpp = EchoBot(args.jid, args.password)
|
|
xmpp.register_plugin('xep_0030') # Service Discovery
|
|
xmpp.register_plugin('xep_0199') # XMPP Ping
|
|
xmpp.register_plugin('xep_0380') # Explicit Message Encryption
|
|
|
|
try:
|
|
xmpp.register_plugin(
|
|
'xep_0384',
|
|
{
|
|
'data_dir': args.data_dir,
|
|
},
|
|
module=slixmpp_omemo,
|
|
) # OMEMO
|
|
except (PluginCouldNotLoad,):
|
|
log.exception('And error occured when loading the omemo plugin.')
|
|
sys.exit(1)
|
|
|
|
# Connect to the XMPP server and start processing XMPP stanzas.
|
|
xmpp.connect()
|
|
xmpp.process()
|