d676c2ee7b
(use automodule directive & docstrings)
326 lines
12 KiB
Python
326 lines
12 KiB
Python
"""
|
||
This plugin implements the `XEP-0027`_ “Current Jabber OpenPGP Usage”.
|
||
|
||
This is a plugin used to encrypt one-to-one conversation using the PGP
|
||
encryption method. You can use it if you want really good privacy. Without this
|
||
encryption, your messages are encrypted **at least** from your client (poezio) to
|
||
your server. The message is decrypted by your server and you cannot control the
|
||
encryption method of your messages from your server to your contact’s server
|
||
(unless you are your own server’s administrator), nor from your contact’s
|
||
server to your contact’s client.
|
||
|
||
This plugin does end-to-end encryption. This means that **only** your contact can
|
||
decrypt your messages, and it is fully encrypted during **all** its travel
|
||
through the internet.
|
||
|
||
Note that if you are having an encrypted conversation with a contact, you can
|
||
**not** send XHTML-IM messages to him. They will be removed and be replaced by
|
||
plain text messages.
|
||
|
||
Installation and configuration
|
||
------------------------------
|
||
|
||
You should autoload this plugin, as it will send your signed presence directly
|
||
on login, making it easier for your contact’s clients to know that you are
|
||
supporting GPG encryption. To do that, use the :term:`plugins_autoload` configuration
|
||
option.
|
||
|
||
You need to create a plugin configuration file. Create a file named :file:`gpg.cfg`
|
||
into your plugins configuration directory (:file:`~/.config/poezio/plugins` by
|
||
default), and fill it like this:
|
||
|
||
.. code-block:: ini
|
||
|
||
[gpg]
|
||
keyid = 091F9C78
|
||
passphrase = your OPTIONAL passphrase
|
||
|
||
[keys]
|
||
example@jabber.org = E3CFCDE2
|
||
juliet@xmpp.org = EF27ABCD
|
||
|
||
The ``gpg`` section is about your key. You need to specify the keyid, for the
|
||
key you want to use. You can as well provide a passphrase. If you don’t, you
|
||
should use a gpg agent or something like that that will ask your passphrase
|
||
whenever you need it.
|
||
|
||
The ``keys`` section contains your contact’s id keys. For each contact you want
|
||
to have encrypted conversations with, add her/his JID associated with the keyid
|
||
of his/her key.
|
||
|
||
And that’s it, now you need to talk directly to the **full** jid of your
|
||
contacts. Poezio doesn’t let you encrypt messages whom recipients is a bare
|
||
JID.
|
||
|
||
Additionnal information on GnuPG
|
||
--------------------------------
|
||
|
||
Create a key
|
||
~~~~~~~~~~~~
|
||
|
||
To create a personal key, use
|
||
|
||
.. code-block:: bash
|
||
|
||
gpg --gen-key
|
||
|
||
and follow the instructions.
|
||
|
||
Keyid
|
||
~~~~~
|
||
The keyid (required in the gpg.cfg configuration file) is a 8 character-long
|
||
key. You can get the ones you created or imported by using the command
|
||
|
||
.. code-block:: bash
|
||
|
||
gpg --list-keys
|
||
|
||
You will get something like
|
||
|
||
.. code-block:: none
|
||
|
||
pub 4096R/01234567 2011-11-11
|
||
uid Your Name Here (comment) <email@example.org>
|
||
sub 4096R/AAFFBBCC 2011-11-11
|
||
|
||
pub 2048R/12345678 2011-11-12 [expire: 2011-11-22]
|
||
uid A contact’s name (comment) <fake@fake.fr>
|
||
sub 2048R/FFBBAACC 2011-11-12 [expire: 2011-11-22]
|
||
|
||
In this example, the keyids are ``01234567`` and ``12345678``.
|
||
|
||
Share your key
|
||
~~~~~~~~~~~~~~
|
||
Use:
|
||
|
||
.. code-block:: bash
|
||
|
||
gpg --send-keys --keyserver pgp.mit.edu <keyid>
|
||
|
||
to upload you public key on a public server.
|
||
|
||
.. _XEP-0027: http://xmpp.org/extensions/xep-0027.html
|
||
|
||
"""
|
||
from gpg import gnupg
|
||
from sleekxmpp.xmlstream.stanzabase import JID
|
||
|
||
from xml.etree import cElementTree as ET
|
||
import xml.sax.saxutils
|
||
|
||
import logging
|
||
log = logging.getLogger(__name__)
|
||
|
||
from plugin import BasePlugin
|
||
|
||
from tabs import ConversationTab
|
||
|
||
NS_SIGNED = "jabber:x:signed"
|
||
NS_ENCRYPTED = "jabber:x:encrypted"
|
||
|
||
|
||
SIGNED_ATTACHED_MESSAGE = """-----BEGIN PGP SIGNED MESSAGE-----
|
||
Hash: %(hash)s
|
||
|
||
%(clear)s
|
||
-----BEGIN PGP SIGNATURE-----
|
||
Version: GnuPG
|
||
|
||
%(data)s
|
||
-----END PGP SIGNATURE-----
|
||
"""
|
||
|
||
|
||
ENCRYPTED_MESSAGE = """-----BEGIN PGP MESSAGE-----
|
||
Version: GnuPG
|
||
|
||
%(data)s
|
||
-----END PGP MESSAGE-----"""
|
||
|
||
|
||
class Plugin(BasePlugin):
|
||
def init(self):
|
||
self.contacts = {}
|
||
# a dict of {full-JID: 'signed'/'valid'/'invalid'/'disabled'}
|
||
# Whenever we receive a signed presence from a JID, we add it to this
|
||
# dict, this way we know if we can encrypt the messages we will send to
|
||
# this JID.
|
||
# If that resource sends a non-signed presence, then we remove it
|
||
# from that dict and stop encrypting our messages.
|
||
# 'disabled' means that the user do NOT want to encrypt its messages
|
||
# even if the key is valid.
|
||
self.gpg = gnupg.GPG()
|
||
self.keyid = self.config.get('keyid', '') or None
|
||
self.passphrase = self.config.get('passphrase', '') or None
|
||
if not self.keyid:
|
||
self.core.information('No GPG keyid provided in the configuration', 'Warning')
|
||
|
||
self.add_event_handler('send_normal_presence', self.sign_presence)
|
||
self.add_event_handler('normal_presence', self.on_normal_presence)
|
||
self.add_event_handler('conversation_say_after', self.on_conversation_say)
|
||
self.add_event_handler('conversation_msg', self.on_conversation_msg)
|
||
|
||
self.add_tab_command(ConversationTab, 'gpg', self.command_gpg,
|
||
usage='<force|disable|setkey> [jid] [keyid]',
|
||
help='Force or disable gpg encryption with the fulljid of the current conversation. The setkey argument lets you associate a keyid with the given bare JID.',
|
||
short='Manage the GPG status',
|
||
completion=self.gpg_completion)
|
||
ConversationTab.add_information_element('gpg', self.display_encryption_status)
|
||
|
||
def cleanup(self):
|
||
self.send_unsigned_presence()
|
||
ConversationTab.remove_information_element('gpg')
|
||
self.del_tab_command(ConversationTab, 'gpg')
|
||
|
||
def sign_presence(self, presence):
|
||
"""
|
||
Sign every normal presence we send
|
||
"""
|
||
signed_element = ET.Element('{%s}x' % (NS_SIGNED,))
|
||
t = self.gpg.sign(presence['status'], keyid=self.keyid, passphrase=self.passphrase, detach=True)
|
||
if not t:
|
||
self.core.information('Could not sign presence. Disabling GPG module', 'Info')
|
||
self.core.plugin_manager.unload('gpg')
|
||
return
|
||
text = xml.sax.saxutils.escape(str(t))
|
||
signed_element.text = self.remove_gpg_headers(text)
|
||
presence.append(signed_element)
|
||
|
||
def send_unsigned_presence(self):
|
||
"""
|
||
Send our current presence, to everyone, but unsigned, to indicate
|
||
that we cannot/do not want to encrypt/decrypt messages.
|
||
"""
|
||
current_presence = self.core.get_status()
|
||
self.core.command_status('%s %s' % (current_presence.show or 'available', current_presence.message,))
|
||
|
||
def on_normal_presence(self, presence, resource):
|
||
"""
|
||
Check if it’s signed, if it is and we can verify the signature,
|
||
add 'valid' or 'invalid' into the dict. If it cannot be verified, just add
|
||
'signed'. Otherwise, do nothing.
|
||
"""
|
||
signed = presence.find('{%s}x' % (NS_SIGNED,))
|
||
bare = presence['from'].bare
|
||
full = presence['from'].full
|
||
if signed is None:
|
||
if bare in self.contacts.keys():
|
||
del self.contacts[bare]
|
||
return
|
||
if self.config.has_section('keys') and bare in self.config.options('keys'):
|
||
self.contacts[full] = 'invalid'
|
||
for hash_ in ('SHA1', 'SHA256'):
|
||
to_verify = SIGNED_ATTACHED_MESSAGE % {'clear': presence['status'],
|
||
'data': signed.text,
|
||
'hash': hash_}
|
||
verify = self.gpg.verify(to_verify)
|
||
if verify:
|
||
self.contacts[full] = 'valid'
|
||
break
|
||
else:
|
||
self.contacts[full] = 'signed'
|
||
|
||
def on_conversation_say(self, message, tab):
|
||
"""
|
||
Check if the contact has a signed AND veryfied signature.
|
||
If yes, encrypt the message with her key.
|
||
"""
|
||
to = message['to']
|
||
if not message['body']:
|
||
# there’s nothing to encrypt if this is a chatstate, for example
|
||
return
|
||
signed = to.full in self.contacts.keys()
|
||
if signed:
|
||
veryfied = self.contacts[to.full] in ('valid', 'forced')
|
||
else:
|
||
veryfied = False
|
||
if veryfied:
|
||
# remove the xhtm_im body if present, because that
|
||
# cannot be encrypted.
|
||
del message['xhtml_im']
|
||
encrypted_element = ET.Element('{%s}x' % (NS_ENCRYPTED,))
|
||
text = self.gpg.encrypt(message['body'], self.config.get(to.bare, '', section='keys'), always_trust=True)
|
||
if not text:
|
||
self.core.information('Could not encrypt message to %s' % (to.full),)
|
||
# If we could not encrypt the message, don't send anything
|
||
message['body'] = ''
|
||
return
|
||
encrypted_element.text = self.remove_gpg_headers(xml.sax.saxutils.escape(str(text)))
|
||
message.append(encrypted_element)
|
||
message['body'] = 'This message has been encrypted using the GPG key with id: %s' % self.keyid
|
||
|
||
def on_conversation_msg(self, message, tab):
|
||
"""
|
||
Check if the message is encrypted, and decrypt it if we can.
|
||
"""
|
||
encrypted = message.find('{%s}x' % (NS_ENCRYPTED,))
|
||
fro = message['from']
|
||
if encrypted is not None:
|
||
if self.config.has_section('keys') and fro.bare in self.config.options('keys'):
|
||
keyid = self.config.get(fro.bare, '', 'keys')
|
||
decrypted = self.gpg.decrypt(ENCRYPTED_MESSAGE % {'data': str(encrypted.text)}, passphrase=self.passphrase)
|
||
if not decrypted:
|
||
self.core.information('Could not decrypt message from %s' % (fro.full),)
|
||
return
|
||
message['body'] = str(decrypted)
|
||
|
||
def display_encryption_status(self, jid):
|
||
"""
|
||
Returns the status of encryption for the associated jid. This is to be used
|
||
in the ConversationTab’s InfoWin.
|
||
"""
|
||
if jid.full not in self.contacts.keys():
|
||
return ''
|
||
status = self.contacts[jid.full]
|
||
if status in ('valid', 'invalid', 'signed'):
|
||
return ' GPG Key: %s (%s)' % (status, 'encrypted' if status == 'valid' else 'NOT encrypted',)
|
||
else:
|
||
return ' GPG: Encryption %s' % (status,)
|
||
|
||
def command_gpg(self, args):
|
||
"""
|
||
A command to force or disable the encryption, or to assign a keyid to a JID
|
||
"""
|
||
args = args.split()
|
||
if not args:
|
||
return self.core.command_help("gpg")
|
||
if len(args) >= 2:
|
||
jid = JID(args[1])
|
||
else:
|
||
if isinstance(self.core.current_tab(), ConversationTab):
|
||
jid = JID(self.core.current_tab().get_name())
|
||
else:
|
||
return
|
||
command = args[0]
|
||
if command == 'force' or command == 'enable':
|
||
# we can force encryption only with contact having an associated
|
||
# key, otherwise we cannot encrypt at all
|
||
if self.config.has_section('keys') and jid.bare in self.config.options('keys'):
|
||
self.contacts[JID(jid).full] = 'forced'
|
||
else:
|
||
self.core.information('Cannot force encryption: no key associated with %s' % (jid.bare), 'Info')
|
||
elif command == 'disable':
|
||
self.contacts[JID(jid).full] = 'disabled'
|
||
elif command == 'setkey':
|
||
if len(args) != 3:
|
||
return self.core.command_help("gpg")
|
||
if not self.config.has_section('keys'):
|
||
self.config.add_section('keys')
|
||
self.config.set(jid.bare, args[2], 'keys')
|
||
self.config.write()
|
||
self.core.refresh_window()
|
||
|
||
def gpg_completion(self, the_input):
|
||
return the_input.auto_completion(['force', 'disable', 'setkey'], ' ')
|
||
|
||
def remove_gpg_headers(self, text):
|
||
lines = text.splitlines()
|
||
while lines[0].strip() != '':
|
||
lines.pop(0)
|
||
while lines[0].strip() == '':
|
||
lines.pop(0)
|
||
res = []
|
||
for line in lines:
|
||
if not line.startswith('---'):
|
||
res.append(line)
|
||
return '\n'.join(res)
|