233 lines
8.5 KiB
Python
233 lines
8.5 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Slixmpp OMEMO plugin
|
|
Copyright (C) 2010 Nathanael C. Fritz
|
|
Copyright (C) 2019 Maxime “pep” Buquet <pep@bouah.net>
|
|
This file is part of slixmpp-omemo.
|
|
|
|
See the file LICENSE for copying permission.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import asyncio
|
|
import logging
|
|
from getpass import getpass
|
|
from argparse import ArgumentParser
|
|
|
|
from slixmpp import ClientXMPP, JID
|
|
from slixmpp.stanza import Message
|
|
from slixmpp_omemo import PluginCouldNotLoad, MissingOwnKey, EncryptionPrepareException
|
|
from slixmpp_omemo import UndecidedException, UntrustedException, NoAvailableSession
|
|
import slixmpp_omemo
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class EchoBot(ClientXMPP):
|
|
|
|
"""
|
|
A simple Slixmpp bot that will echo encrypted messages it receives, along
|
|
with a short thank you message.
|
|
|
|
For details on how to build a client with slixmpp, look at exemples in the
|
|
slixmpp repository.
|
|
"""
|
|
|
|
eme_ns = 'eu.siacs.conversations.axolotl'
|
|
|
|
def __init__(self, jid, password):
|
|
ClientXMPP.__init__(self, jid, password)
|
|
|
|
self.add_event_handler("session_start", self.start)
|
|
self.add_event_handler("message", self.message_handler)
|
|
|
|
def start(self, _event) -> None:
|
|
"""
|
|
Process the session_start event.
|
|
|
|
Typical actions for the session_start event are
|
|
requesting the roster and broadcasting an initial
|
|
presence stanza.
|
|
|
|
Arguments:
|
|
event -- An empty dictionary. The session_start
|
|
event does not provide any additional
|
|
data.
|
|
"""
|
|
self.send_presence()
|
|
self.get_roster()
|
|
|
|
def message_handler(self, msg: Message) -> None:
|
|
asyncio.ensure_future(self.message(msg))
|
|
|
|
async def message(self, msg: Message, allow_untrusted: bool = False) -> None:
|
|
"""
|
|
Process incoming message stanzas. Be aware that this also
|
|
includes MUC messages and error messages. It is usually
|
|
a good idea to check the messages's type before processing
|
|
or sending replies.
|
|
|
|
Arguments:
|
|
msg -- The received message stanza. See the documentation
|
|
for stanza objects and the Message stanza to see
|
|
how it may be used.
|
|
"""
|
|
|
|
if msg['type'] not in ('chat', 'normal'):
|
|
return None
|
|
|
|
if not self['xep_0384'].is_encrypted(msg):
|
|
await self.plain_reply(msg, 'This message was not encrypted.\n%(body)s' % msg)
|
|
return None
|
|
|
|
try:
|
|
body = self['xep_0384'].decrypt_message(msg, allow_untrusted)
|
|
await self.encrypted_reply(msg, 'Thanks for sending\n%s' % body.decode("utf8"))
|
|
except (MissingOwnKey,):
|
|
# The message is missing our own key, it was not encrypted for
|
|
# us, and we can't decrypt it.
|
|
await self.plain_reply(
|
|
msg,
|
|
'I can\'t decrypt this message as it is not encrypted for me.',
|
|
)
|
|
except (NoAvailableSession,) as exn:
|
|
# We received a message from that contained a session that we
|
|
# don't know about (deleted session storage, etc.). We can't
|
|
# decrypt the message, and it's going to be lost.
|
|
# Here, as we need to initiate a new encrypted session, it is
|
|
# best if we send an encrypted message directly. XXX: Is it
|
|
# where we talk about self-healing messages?
|
|
await self.encrypted_reply(
|
|
msg,
|
|
'I can\'t decrypt this message as it uses an encrypted '
|
|
'session I don\'t know about.',
|
|
)
|
|
except (UndecidedException, UntrustedException) as exn:
|
|
# We received a message from an untrusted device. We can
|
|
# choose to decrypt the message nonetheless, with the
|
|
# `allow_untrusted` flag on the `decrypt_message` call, which
|
|
# we will do here. This is only possible for decryption,
|
|
# encryption will require us to decide if we trust the device
|
|
# or not. Clients _should_ indicate that the message was not
|
|
# trusted, or in undecided state, if they decide to decrypt it
|
|
# anyway.
|
|
await self.plain_reply(
|
|
msg,
|
|
"Your device '%s' is not in my trusted devices." % exn.device,
|
|
)
|
|
# We resend, setting the `allow_untrusted` parameter to True.
|
|
await self.message(msg, allow_untrusted=True)
|
|
except (EncryptionPrepareException,):
|
|
# Slixmpp tried its best, but there were errors it couldn't
|
|
# resolve. At this point you should have seen other exceptions
|
|
# and given a chance to resolve them already.
|
|
await self.plain_reply(msg, 'I was not able to decrypt the message.')
|
|
|
|
return None
|
|
|
|
async def plain_reply(self, original_msg, body) -> None:
|
|
"""
|
|
Helper to reply to messages
|
|
"""
|
|
|
|
mto = original_msg['from']
|
|
mtype = original_msg['type']
|
|
msg = self.make_message(mto=mto, mtype=mtype)
|
|
msg['body'] = body
|
|
return msg.send()
|
|
|
|
async def encrypted_reply(self, original_msg, body) -> None:
|
|
"""Helper to reply with encrypted messages"""
|
|
|
|
mto = original_msg['from']
|
|
mtype = original_msg['type']
|
|
msg = self.make_message(mto=mto, mtype=mtype)
|
|
msg['eme']['namespace'] = self.eme_ns
|
|
msg['eme']['name'] = self['xep_0380'].mechanisms[self.eme_ns]
|
|
|
|
while True:
|
|
try:
|
|
# `encrypt_message` excepts the plaintext to be sent, a list of
|
|
# bare JIDs to encrypt to, and optionally a dict of problems to
|
|
# expect per bare JID.
|
|
recipients = [mto]
|
|
encrypt = await self['xep_0384'].encrypt_message(body, recipients)
|
|
msg.append(encrypt)
|
|
return msg.send()
|
|
except UndecidedException as exn:
|
|
# The library prevents us from sending a message to an
|
|
# untrusted/undecided barejid, so we need to make a decision here.
|
|
# This is where you prompt your user to ask what to do.
|
|
self['xep_0384'].trust(exn.bare_jid, exn.device, exn.ik)
|
|
except Exception as exn:
|
|
break
|
|
|
|
return None
|
|
|
|
if __name__ == '__main__':
|
|
# Setup the command line arguments.
|
|
parser = ArgumentParser(description=EchoBot.__doc__)
|
|
|
|
# Output verbosity options.
|
|
parser.add_argument("-q", "--quiet", help="set logging to ERROR",
|
|
action="store_const", dest="loglevel",
|
|
const=logging.ERROR, default=logging.INFO)
|
|
parser.add_argument("-d", "--debug", help="set logging to DEBUG",
|
|
action="store_const", dest="loglevel",
|
|
const=logging.DEBUG, default=logging.INFO)
|
|
|
|
# JID and password options.
|
|
parser.add_argument("-j", "--jid", dest="jid",
|
|
help="JID to use")
|
|
parser.add_argument("-p", "--password", dest="password",
|
|
help="password to use")
|
|
|
|
# Data dir for omemo plugin
|
|
DATA_DIR = os.path.join(
|
|
os.path.dirname(os.path.abspath(__file__)),
|
|
'omemo',
|
|
)
|
|
parser.add_argument("--data-dir", dest="data_dir",
|
|
help="data directory", default=DATA_DIR)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Setup logging.
|
|
logging.basicConfig(level=args.loglevel,
|
|
format='%(levelname)-8s %(message)s')
|
|
|
|
if args.jid is None:
|
|
args.jid = input("Username: ")
|
|
if args.password is None:
|
|
args.password = getpass("Password: ")
|
|
|
|
# Setup the EchoBot and register plugins. Note that while plugins may
|
|
# have interdependencies, the order in which you register them does
|
|
# not matter.
|
|
|
|
# Ensure OMEMO data dir is created
|
|
os.makedirs(args.data_dir, exist_ok=True)
|
|
|
|
xmpp = EchoBot(args.jid, args.password)
|
|
xmpp.register_plugin('xep_0030') # Service Discovery
|
|
xmpp.register_plugin('xep_0199') # XMPP Ping
|
|
xmpp.register_plugin('xep_0380') # Explicit Message Encryption
|
|
|
|
try:
|
|
xmpp.register_plugin(
|
|
'xep_0384',
|
|
{
|
|
'data_dir': args.data_dir,
|
|
},
|
|
module=slixmpp_omemo,
|
|
) # OMEMO
|
|
except (PluginCouldNotLoad,):
|
|
log.exception('And error occured when loading the omemo plugin.')
|
|
sys.exit(1)
|
|
|
|
# Connect to the XMPP server and start processing XMPP stanzas.
|
|
xmpp.connect()
|
|
xmpp.process()
|