#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Slixmpp OMEMO plugin Copyright (C) 2010 Nathanael C. Fritz Copyright (C) 2019 Maxime “pep” Buquet This file is part of slixmpp-omemo. See the file LICENSE for copying permission. """ import os import re import sys import asyncio import logging from getpass import getpass from argparse import ArgumentParser from slixmpp import ClientXMPP, JID from slixmpp.exceptions import IqTimeout, IqError from slixmpp.stanza import Message import slixmpp_omemo from slixmpp_omemo import PluginCouldNotLoad, MissingOwnKey, EncryptionPrepareException from slixmpp_omemo import UndecidedException, UntrustedException, NoAvailableSession from omemo.exceptions import MissingBundleException log = logging.getLogger(__name__) # Used by the EchoBot LEVEL_DEBUG = 0 LEVEL_ERROR = 1 class EchoBot(ClientXMPP): """ A simple Slixmpp bot that will echo encrypted messages it receives, along with a short thank you message. For details on how to build a client with slixmpp, look at examples in the slixmpp repository. """ eme_ns = 'eu.siacs.conversations.axolotl' cmd_prefix = '!' debug_level: int = LEVEL_DEBUG # or LEVEL_ERROR def __init__(self, jid, password): ClientXMPP.__init__(self, jid, password) self.prefix_re: re.Pattern = re.compile('^%s' % self.cmd_prefix) self.cmd_re: re.Pattern = re.compile('^%s(?P\w+)(?:\s+(?P.*))?' % self.cmd_prefix) self.add_event_handler("session_start", self.start) self.add_event_handler("message", self.message_handler) def start(self, _event) -> None: """ Process the session_start event. Typical actions for the session_start event are requesting the roster and broadcasting an initial presence stanza. Arguments: event -- An empty dictionary. The session_start event does not provide any additional data. """ self.send_presence() self.get_roster() def is_command(self, body: str) -> bool: return self.prefix_re.match(body) is not None async def handle_command(self, mto: JID, mtype: str, body: str) -> None: match = self.cmd_re.match(body) if match is None: return None groups = match.groupdict() cmd = groups['command'] # args = groups['args'] if cmd == 'help': await self.cmd_help(mto, mtype) elif cmd == 'verbose': await self.cmd_verbose(mto, mtype) elif cmd == 'error': await self.cmd_error(mto, mtype) elif cmd == 'chain_length': await self.cmd_chain_length(mto, mtype) return None async def cmd_help(self, mto: JID, mtype: str) -> None: body = ( 'I\'m the slixmpp-omemo echo bot! ' 'The following commands are available:\n' '{prefix}verbose Send message or reply with log messages\n' '{prefix}error Send message or reply only on error\n' ).format(prefix=self.cmd_prefix) return await self.encrypted_reply(mto, mtype, body) async def cmd_verbose(self, mto: JID, mtype: str) -> None: self.debug_level = LEVEL_DEBUG body = '''Debug level set to 'verbose'.''' return await self.encrypted_reply(mto, mtype, body) async def cmd_error(self, mto: JID, mtype: str) -> None: self.debug_level = LEVEL_ERROR body = '''Debug level set to 'error'.''' return await self.encrypted_reply(mto, mtype, body) async def cmd_chain_length(self, mto: JID, mtype: str) -> None: body = ( 'lengths: %r\n' % self['xep_0384']._chain_lengths(mto) + 'should heartbeat: %r' % self['xep_0384'].should_heartbeat(mto) ) return await self.encrypted_reply(mto, mtype, body) def message_handler(self, msg: Message) -> None: asyncio.ensure_future(self.message(msg)) async def message(self, msg: Message, allow_untrusted: bool = False) -> None: """ Process incoming message stanzas. Be aware that this also includes MUC messages and error messages. It is usually a good idea to check the messages's type before processing or sending replies. Arguments: msg -- The received message stanza. See the documentation for stanza objects and the Message stanza to see how it may be used. """ mfrom = mto = msg['from'] mtype = msg['type'] if mtype not in ('chat', 'normal'): return None if not self['xep_0384'].is_encrypted(msg): if self.debug_level == LEVEL_DEBUG: await self.plain_reply(mto, mtype, 'Echo unencrypted message:%(body)s' % msg) return None try: encrypted = msg['omemo_encrypted'] body = self['xep_0384'].decrypt_message(encrypted, mfrom, allow_untrusted) decoded = body.decode('utf8') if self.is_command(decoded): await self.handle_command(mto, mtype, decoded) elif self.debug_level == LEVEL_DEBUG: await self.encrypted_reply(mto, mtype, 'Echo: %s' % decoded) return None except (MissingOwnKey,): # The message is missing our own key, it was not encrypted for # us, and we can't decrypt it. await self.plain_reply( mto, mtype, 'Error: Message not encrypted for me.', ) return None except (NoAvailableSession,) as exn: # We received a message from that contained a session that we # don't know about (deleted session storage, etc.). We can't # decrypt the message, and it's going to be lost. # Here, as we need to initiate a new encrypted session, it is # best if we send an encrypted message directly. XXX: Is it # where we talk about self-healing messages? await self.encrypted_reply( mto, mtype, 'Error: Message uses an encrypted ' 'session I don\'t know about.', ) return None except (UndecidedException, UntrustedException) as exn: # We received a message from an untrusted device. We can # choose to decrypt the message nonetheless, with the # `allow_untrusted` flag on the `decrypt_message` call, which # we will do here. This is only possible for decryption, # encryption will require us to decide if we trust the device # or not. Clients _should_ indicate that the message was not # trusted, or in undecided state, if they decide to decrypt it # anyway. await self.plain_reply( mto, mtype, "Error: Your device '%s' is not in my trusted devices." % exn.device, ) # We resend, setting the `allow_untrusted` parameter to True. await self.message(msg, allow_untrusted=True) return None except (EncryptionPrepareException,): # Slixmpp tried its best, but there were errors it couldn't # resolve. At this point you should have seen other exceptions # and given a chance to resolve them already. await self.plain_reply(mto, mtype, 'Error: I was not able to decrypt the message.') return None except (Exception,) as exn: await self.plain_reply(mto, mtype, 'Error: Exception occured while attempting decryption.\n%r' % exn) raise return None async def plain_reply(self, mto: JID, mtype: str, body): """ Helper to reply to messages """ msg = self.make_message(mto=mto, mtype=mtype) msg['body'] = body return msg.send() async def encrypted_reply(self, mto: JID, mtype: str, body): """Helper to reply with encrypted messages""" msg = self.make_message(mto=mto, mtype=mtype) msg['eme']['namespace'] = self.eme_ns msg['eme']['name'] = self['xep_0380'].mechanisms[self.eme_ns] expect_problems = {} # type: Optional[Dict[JID, List[int]]] while True: try: # `encrypt_message` excepts the plaintext to be sent, a list of # bare JIDs to encrypt to, and optionally a dict of problems to # expect per bare JID. # # Note that this function returns an `` object, # and not a full Message stanza. This combined with the # `recipients` parameter that requires for a list of JIDs, # allows you to encrypt for 1:1 as well as groupchats (MUC). # # `expect_problems`: See EncryptionPrepareException handling. recipients = [mto] encrypt = await self['xep_0384'].encrypt_message(body, recipients, expect_problems) msg.append(encrypt) return msg.send() except UndecidedException as exn: # The library prevents us from sending a message to an # untrusted/undecided barejid, so we need to make a decision here. # This is where you prompt your user to ask what to do. In # this bot we will automatically trust undecided recipients. self['xep_0384'].trust(exn.bare_jid, exn.device, exn.ik) # TODO: catch NoEligibleDevicesException except EncryptionPrepareException as exn: # This exception is being raised when the library has tried # all it could and doesn't know what to do anymore. It # contains a list of exceptions that the user must resolve, or # explicitely ignore via `expect_problems`. # TODO: We might need to bail out here if errors are the same? for error in exn.errors: if isinstance(error, MissingBundleException): # We choose to ignore MissingBundleException. It seems # to be somewhat accepted that it's better not to # encrypt for a device if it has problems and encrypt # for the rest, rather than error out. The "faulty" # device won't be able to decrypt and should display a # generic message. The receiving end-user at this # point can bring up the issue if it happens. self.plain_reply( mto, mtype, 'Could not find keys for device "%d" of recipient "%s". Skipping.' % (error.device, error.bare_jid), ) jid = JID(error.bare_jid) device_list = expect_problems.setdefault(jid, []) device_list.append(error.device) except (IqError, IqTimeout) as exn: self.plain_reply( mto, mtype, 'An error occured while fetching information on a recipient.\n%r' % exn, ) return None except Exception as exn: await self.plain_reply( mto, mtype, 'An error occured while attempting to encrypt.\n%r' % exn, ) raise return None if __name__ == '__main__': # Setup the command line arguments. parser = ArgumentParser(description=EchoBot.__doc__) # Output verbosity options. parser.add_argument("-q", "--quiet", help="set logging to ERROR", action="store_const", dest="loglevel", const=logging.ERROR, default=logging.INFO) parser.add_argument("-d", "--debug", help="set logging to DEBUG", action="store_const", dest="loglevel", const=logging.DEBUG, default=logging.INFO) # JID and password options. parser.add_argument("-j", "--jid", dest="jid", help="JID to use") parser.add_argument("-p", "--password", dest="password", help="password to use") # Data dir for omemo plugin DATA_DIR = os.path.join( os.path.dirname(os.path.abspath(__file__)), 'omemo', ) parser.add_argument("--data-dir", dest="data_dir", help="data directory", default=DATA_DIR) args = parser.parse_args() # Setup logging. logging.basicConfig(level=args.loglevel, format='%(levelname)-8s %(message)s') if args.jid is None: args.jid = input("Username: ") if args.password is None: args.password = getpass("Password: ") # Setup the EchoBot and register plugins. Note that while plugins may # have interdependencies, the order in which you register them does # not matter. # Ensure OMEMO data dir is created os.makedirs(args.data_dir, exist_ok=True) xmpp = EchoBot(args.jid, args.password) xmpp.register_plugin('xep_0030') # Service Discovery xmpp.register_plugin('xep_0199') # XMPP Ping xmpp.register_plugin('xep_0380') # Explicit Message Encryption try: xmpp.register_plugin( 'xep_0384', { 'data_dir': args.data_dir, }, module=slixmpp_omemo, ) # OMEMO except (PluginCouldNotLoad,): log.exception('And error occured when loading the omemo plugin.') sys.exit(1) # Connect to the XMPP server and start processing XMPP stanzas. xmpp.connect() xmpp.process()