Compare commits

..

No commits in common. "main" and "initial-publish" have entirely different histories.

9 changed files with 211 additions and 382 deletions

View file

@ -1,21 +1,17 @@
---
stages:
- lint
- lint
.python-3.9:
image: python:3.9
.python-3.7:
image: python:3.7
.python-3.10:
image: python:3.10
.python-3.8:
image: python:3.8
.pylint:
stage: lint
script:
- apt update && apt install -y libidn11-dev build-essential cmake
- pip3 install pylint pyasn1-modules cffi --upgrade
- pip3 install -e git+https://github.com/syndace/python-omemo#egg=omemo
- pip3 install -e git+https://github.com/syndace/python-omemo-backend-signal#egg=omemo-backend-signal
- pip3 install -e git+https://lab.louiz.org/poezio/slixmpp.git#egg=slixmpp
- python3 setup.py install
- pylint -E slixmpp_omemo
@ -28,22 +24,22 @@ stages:
- mypyc --ignore-missing-imports ./slixmpp_omemo
allow_failure: true
lint-3.9-pylint:
lint-3.7-pylint:
extends:
- .python-3.9
- .python-3.7
- .pylint
lint-3.9-mypy:
lint-3.8-pylint:
extends:
- .python-3.9
- .mypy
lint-3.10-pylint:
extends:
- .python-3.10
- .python-3.8
- .pylint
lint-3.10-mypy:
lint-3.7-mypy:
extends:
- .python-3.10
- .python-3.7
- .mypy
lint-3.8-mypy:
extends:
- .python-3.8
- .mypy

View file

@ -1,42 +1,3 @@
Version 0.9.0:
2022-10-19 Maxime “pep” Buquet <pep@bouah.net>
* Added:
- Coroutines in asyncio.wait is now deprecated. Added create_task calls
- Replaced all ensure_future calls by create_task
Version 0.8.0:
2022-08-23 Maxime “pep” Buquet <pep@bouah.net>
* Breaking:
- get_devices and get_active_devices now return Iterable[int] instead of Iterable[str]
* Changes:
- fetch_bundle and fetch_device methods are now public
- my_fingerprint doesn't traceback anymore on normal operation
* Added:
- New fetch_bundles method to fetch all bundles at once
- Add upper bound on OMEMO lib version requirements as it'll become significant
Version 0.7.0:
2022-04-03 Maxime “pep” Buquet <pep@bouah.net>
* Breaking:
- Removed get_device_list method in favor of newly added get_devices and
get_active_devices methods.
- Renamed make_heartbeat to send_heartbeat and make it send the message as
well.
* Improvements:
- Added py.typed to the repository for static type checking tools
- New delete_session method
Version 0.6.1:
2022-03-14 Maxime “pep” Buquet <pep@bouah.net>
* Improvements:
- Add minimal version requirements in requirements.txt and setup.py
Version 0.6.0:
2022-03-12 Maxime “pep” Buquet <pep@bouah.net>
* Improvements:
- Ensure device list is published even if we're already connected (#10)
- Ensure bundles are republished on decrypt
* Added:
- Heartbeat messages. Signal to other devices we're still active after
some amount of messages. Stop raising exceptions when there is no payload.
- Ensure heartbeats are stored in the archive.
- Commands to echo_bot. (verbose, error)
Version 0.5.0:
2021-07-12 Maxime “pep” Buquet <pep@bouah.net>
* Added:

View file

@ -27,19 +27,6 @@ Installation
- PIP: `slixmpp-omemo`
- Manual: `python3 setup.py install`
Examples
--------
The repository contains an example bot that contains many comments, and
can be used to test against other setups. To use it:
```
python examples/echo_bot.py --debug -j foo@bar -p passwd --data-dir /foo/bar
```
It also contains commands. Feel free to open merge requests or issues to
add new useful ones.
Credits
-------

View file

@ -13,6 +13,7 @@
import os
import re
import sys
import asyncio
import logging
from getpass import getpass
from argparse import ArgumentParser
@ -20,8 +21,6 @@ from argparse import ArgumentParser
from slixmpp import ClientXMPP, JID
from slixmpp.exceptions import IqTimeout, IqError
from slixmpp.stanza import Message
from slixmpp.xmlstream.handler import CoroutineCallback
from slixmpp.xmlstream.matcher import MatchXPath
import slixmpp_omemo
from slixmpp_omemo import PluginCouldNotLoad, MissingOwnKey, EncryptionPrepareException
from slixmpp_omemo import UndecidedException, UntrustedException, NoAvailableSession
@ -55,10 +54,7 @@ class EchoBot(ClientXMPP):
self.cmd_re: re.Pattern = re.compile('^%s(?P<command>\w+)(?:\s+(?P<args>.*))?' % self.cmd_prefix)
self.add_event_handler("session_start", self.start)
self.register_handler(CoroutineCallback('Messages',
MatchXPath(f'{{{self.default_ns}}}message'),
self.message_handler,
))
self.add_event_handler("message", self.message_handler)
def start(self, _event) -> None:
"""
@ -94,6 +90,8 @@ class EchoBot(ClientXMPP):
await self.cmd_verbose(mto, mtype)
elif cmd == 'error':
await self.cmd_error(mto, mtype)
elif cmd == 'chain_length':
await self.cmd_chain_length(mto, mtype)
return None
@ -101,9 +99,9 @@ class EchoBot(ClientXMPP):
body = (
'I\'m the slixmpp-omemo echo bot! '
'The following commands are available:\n'
f'{self.cmd_prefix}verbose Send message or reply with log messages\n'
f'{self.cmd_prefix}error Send message or reply only on error\n'
)
'{prefix}verbose Send message or reply with log messages\n'
'{prefix}error Send message or reply only on error\n'
).format(prefix=self.cmd_prefix)
return await self.encrypted_reply(mto, mtype, body)
async def cmd_verbose(self, mto: JID, mtype: str) -> None:
@ -116,7 +114,17 @@ class EchoBot(ClientXMPP):
body = '''Debug level set to 'error'.'''
return await self.encrypted_reply(mto, mtype, body)
async def message_handler(self, msg: Message, allow_untrusted: bool = False) -> None:
async def cmd_chain_length(self, mto: JID, mtype: str) -> None:
body = (
'lengths: %r\n' % self['xep_0384']._chain_lengths(mto) +
'should heartbeat: %r' % self['xep_0384'].should_heartbeat(mto)
)
return await self.encrypted_reply(mto, mtype, body)
def message_handler(self, msg: Message) -> None:
asyncio.ensure_future(self.message(msg))
async def message(self, msg: Message, allow_untrusted: bool = False) -> None:
"""
Process incoming message stanzas. Be aware that this also
includes MUC messages and error messages. It is usually
@ -136,21 +144,18 @@ class EchoBot(ClientXMPP):
if not self['xep_0384'].is_encrypted(msg):
if self.debug_level == LEVEL_DEBUG:
await self.plain_reply(mto, mtype, f"Echo unencrypted message: {msg['body']}")
await self.plain_reply(mto, mtype, 'Echo unencrypted message:%(body)s' % msg)
return None
try:
encrypted = msg['omemo_encrypted']
body = await self['xep_0384'].decrypt_message(encrypted, mfrom, allow_untrusted)
# decrypt_message returns Optional[str]. It is possible to get
# body-less OMEMO message (see KeyTransportMessages), currently
# used for example to send heartbeats to other devices.
if body is not None:
decoded = body.decode('utf8')
if self.is_command(decoded):
await self.handle_command(mto, mtype, decoded)
elif self.debug_level == LEVEL_DEBUG:
await self.encrypted_reply(mto, mtype, f'Echo: {decoded}')
body = self['xep_0384'].decrypt_message(encrypted, mfrom, allow_untrusted)
decoded = body.decode('utf8')
if self.is_command(decoded):
await self.handle_command(mto, mtype, decoded)
elif self.debug_level == LEVEL_DEBUG:
await self.encrypted_reply(mto, mtype, 'Echo: %s' % decoded)
return None
except (MissingOwnKey,):
# The message is missing our own key, it was not encrypted for
# us, and we can't decrypt it.
@ -158,6 +163,7 @@ class EchoBot(ClientXMPP):
mto, mtype,
'Error: Message not encrypted for me.',
)
return None
except (NoAvailableSession,) as exn:
# We received a message from that contained a session that we
# don't know about (deleted session storage, etc.). We can't
@ -170,6 +176,7 @@ class EchoBot(ClientXMPP):
'Error: Message uses an encrypted '
'session I don\'t know about.',
)
return None
except (UndecidedException, UntrustedException) as exn:
# We received a message from an untrusted device. We can
# choose to decrypt the message nonetheless, with the
@ -181,15 +188,17 @@ class EchoBot(ClientXMPP):
# anyway.
await self.plain_reply(
mto, mtype,
f"Error: Your device '{exn.device}' is not in my trusted devices.",
"Error: Your device '%s' is not in my trusted devices." % exn.device,
)
# We resend, setting the `allow_untrusted` parameter to True.
await self.message_handler(msg, allow_untrusted=True)
await self.message(msg, allow_untrusted=True)
return None
except (EncryptionPrepareException,):
# Slixmpp tried its best, but there were errors it couldn't
# resolve. At this point you should have seen other exceptions
# and given a chance to resolve them already.
await self.plain_reply(mto, mtype, 'Error: I was not able to decrypt the message.')
return None
except (Exception,) as exn:
await self.plain_reply(mto, mtype, 'Error: Exception occured while attempting decryption.\n%r' % exn)
raise
@ -235,7 +244,7 @@ class EchoBot(ClientXMPP):
# untrusted/undecided barejid, so we need to make a decision here.
# This is where you prompt your user to ask what to do. In
# this bot we will automatically trust undecided recipients.
await self['xep_0384'].trust(exn.bare_jid, exn.device, exn.ik)
self['xep_0384'].trust(exn.bare_jid, exn.device, exn.ik)
# TODO: catch NoEligibleDevicesException
except EncryptionPrepareException as exn:
# This exception is being raised when the library has tried
@ -254,8 +263,8 @@ class EchoBot(ClientXMPP):
# point can bring up the issue if it happens.
self.plain_reply(
mto, mtype,
f'Could not find keys for device "{error.device}"'
f' of recipient "{error.bare_jid}". Skipping.',
'Could not find keys for device "%d" of recipient "%s". Skipping.' %
(error.device, error.bare_jid),
)
jid = JID(error.bare_jid)
device_list = expect_problems.setdefault(jid, [])

View file

@ -1,3 +1,3 @@
slixmpp>=1.8.0
omemo-backend-signal>=0.3.0
omemo>=0.14.0,<0.15
slixmpp
omemo
omemo-backend-signal

View file

@ -40,9 +40,8 @@ CLASSIFIERS = [
'Intended Audience :: Developers',
'License :: OSI Approved :: GNU General Public License v3 (GPLv3)',
'Programming Language :: Python',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Topic :: Internet :: XMPP',
'Topic :: Security :: Cryptography',
'Topic :: Software Development :: Libraries :: Python Modules',
@ -59,12 +58,7 @@ setup(
url='https://lab.louiz.org/poezio/slixmpp-omemo',
license='GPLv3',
platforms=['any'],
package_data={'slixmpp_omemo': ['py.typed']},
packages=['slixmpp_omemo'],
install_requires=[
'slixmpp>=1.8.0',
'omemo-backend-signal>=0.3.0',
'omemo>=0.14.0,<0.15',
],
install_requires=['slixmpp', 'omemo', 'omemo-backend-signal'],
classifiers=CLASSIFIERS,
)

View file

@ -11,18 +11,16 @@
import logging
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
import os
import json
import base64
import asyncio
from functools import reduce
from typing import Any, Dict, List, Optional, Set, Tuple, Union
# Not available in Python 3.7, and slixmpp already imports the right things
# for me
from slixmpp.types import TypedDict
import os
import json
import base64
import asyncio
from slixmpp.plugins.xep_0060.stanza import Items, EventItems
from slixmpp.plugins.xep_0004 import Form
from slixmpp.plugins.base import BasePlugin, register_plugin
@ -63,6 +61,11 @@ PUBLISH_OPTIONS_NODE = 'http://jabber.org/protocol/pubsub#publish-options'
PUBSUB_ERRORS = 'http://jabber.org/protocol/pubsub#errors'
class ChainLengths(TypedDict):
receiving: List[Tuple[int, int]]
sending: List[Tuple[int, int]]
def b64enc(data: bytes) -> str:
return base64.b64encode(bytes(bytearray(data))).decode('ASCII')
@ -117,7 +120,7 @@ def _generate_encrypted_payload(encrypted) -> Encrypted:
if 'payload' in encrypted:
tag['payload']['value'] = b64enc(encrypted['payload'])
for devices in encrypted['keys'].values():
for bare_jid, devices in encrypted['keys'].items():
for rid, device in devices.items():
key = Key()
key['value'] = b64enc(device['data'])
@ -158,9 +161,6 @@ class MissingOwnKey(XEP0384): pass
class NoAvailableSession(XEP0384): pass
class UninitializedOMEMOSession(XEP0384): pass
class EncryptionPrepareException(XEP0384):
def __init__(self, errors):
self.errors = errors
@ -184,11 +184,6 @@ class ErroneousPayload(XEP0384):
"""To be raised when the payload is not of the form we expect"""
class ErroneousParameter(XEP0384):
"""To be raised when parameters to the `encrypt_message` method aren't
used as expected."""
class XEP_0384(BasePlugin):
"""
@ -197,7 +192,7 @@ class XEP_0384(BasePlugin):
name = 'xep_0384'
description = 'XEP-0384 OMEMO'
dependencies = {'xep_0004', 'xep_0030', 'xep_0060', 'xep_0163', 'xep_0334'}
dependencies = {'xep_0004', 'xep_0060', 'xep_0163'}
default_config = {
'data_dir': None,
'storage_backend': None,
@ -216,9 +211,6 @@ class XEP_0384(BasePlugin):
# Used at startup to prevent publishing device list and bundles multiple times
_initial_publish_done = False
# Initiated once the OMEMO session is created.
__omemo_session: Optional[SessionManager] = None
def plugin_init(self) -> None:
if not self.backend_loaded:
log_str = ("xep_0384 cannot be loaded as the backend omemo library "
@ -235,17 +227,34 @@ class XEP_0384(BasePlugin):
raise PluginCouldNotLoad("xep_0384 cannot be loaded as there is "
"no data directory specified.")
storage = self.storage_backend
if self.storage_backend is None:
storage = JSONFileStorage(self.data_dir)
otpkpolicy = self.otpk_policy
bare_jid = self.xmpp.boundjid.bare
self._device_id = _load_device_id(self.data_dir)
asyncio.create_task(self.session_start_omemo())
try:
self._omemo = SessionManager.create(
storage,
otpkpolicy,
self.omemo_backend,
bare_jid,
self._device_id,
)
except:
log.error("Couldn't load the OMEMO object; ¯\\_(ツ)_/¯")
raise PluginCouldNotLoad
self.xmpp.add_event_handler('session_start', self.session_start)
self.xmpp['xep_0060'].map_node_event(OMEMO_DEVICES_NS, 'omemo_device_list')
self.xmpp.add_event_handler('omemo_device_list_publish', self._receive_device_list)
# If this plugin is loaded after 'session_start' has fired, we still
# need to publish bundles
if self.xmpp.is_connected and not self._initial_publish_done:
asyncio.create_task(self._initial_publish())
asyncio.ensure_future(self._initial_publish())
return None
def plugin_end(self):
if not self.backend_loaded:
@ -255,34 +264,6 @@ class XEP_0384(BasePlugin):
self.xmpp.remove_event_handler('omemo_device_list_publish', self._receive_device_list)
self.xmpp['xep_0163'].remove_interest(OMEMO_DEVICES_NS)
async def session_start_omemo(self):
"""Creates the OMEMO session object"""
storage = self.storage_backend
if self.storage_backend is None:
storage = JSONFileStorage(self.data_dir)
otpkpolicy = self.otpk_policy
bare_jid = self.xmpp.boundjid.bare
try:
self.__omemo_session = await SessionManager.create(
storage,
otpkpolicy,
self.omemo_backend,
bare_jid,
self._device_id,
)
except Exception as exn:
log.error("Couldn't load the OMEMO object; ¯\\_(ツ)_/¯")
raise PluginCouldNotLoad from exn
def _omemo(self) -> SessionManager:
"""Helper method to unguard potentially uninitialized SessionManager"""
if self.__omemo_session is None:
raise UninitializedOMEMOSession
return self.__omemo_session
async def session_start(self, _jid):
await self._initial_publish()
@ -290,16 +271,16 @@ class XEP_0384(BasePlugin):
if self.backend_loaded:
self.xmpp['xep_0163'].add_interest(OMEMO_DEVICES_NS)
await asyncio.wait([
asyncio.create_task(self._set_device_list()),
asyncio.create_task(self._publish_bundle()),
self._set_device_list(),
self._publish_bundle(),
])
self._initial_publish_done = True
def my_device_id(self) -> int:
return self._device_id
async def my_fingerprint(self) -> str:
bundle = self._omemo().public_bundle.serialize(self.omemo_backend)
def my_fingerprint(self) -> str:
bundle = self._omemo.public_bundle.serialize(self.omemo_backend)
return fp_from_ik(bundle['ik'])
def _set_node_config(
@ -349,10 +330,10 @@ class XEP_0384(BasePlugin):
)
async def _generate_bundle_iq(self, publish_options: bool = True) -> Iq:
bundle = self._omemo().public_bundle.serialize(self.omemo_backend)
bundle = self._omemo.public_bundle.serialize(self.omemo_backend)
jid = self.xmpp.boundjid
disco = await self.xmpp['xep_0030'].get_info(jid=jid.bare, local=False)
disco = await self.xmpp['xep_0030'].get_info(jid.bare)
publish_options = PUBLISH_OPTIONS_NODE in disco['disco_info'].get_features()
iq = self.xmpp.Iq(stype='set')
@ -387,17 +368,15 @@ class XEP_0384(BasePlugin):
return iq
async def _publish_bundle(self) -> None:
log.debug('Publishing our own bundle. Do we need to?')
if self._omemo().republish_bundle:
log.debug('Publishing.')
if self._omemo.republish_bundle:
iq = await self._generate_bundle_iq()
try:
await iq.send()
except IqError as exn:
except IqError as e:
# TODO: Slixmpp should handle pubsub#errors so we don't have to
# fish the element ourselves
precondition = exn.iq['error'].xml.find(
f'{{{PUBSUB_ERRORS}}}precondition-not-met'
precondition = e.iq['error'].xml.find(
'{%s}%s' % (PUBSUB_ERRORS, 'precondition-not-met'),
)
if precondition is not None:
log.debug('The node we tried to publish was already '
@ -412,71 +391,40 @@ class XEP_0384(BasePlugin):
raise
iq = await self._generate_bundle_iq(publish_options=False)
await iq.send()
else:
log.debug('Not publishing.')
async def fetch_bundle(self, jid: JID, device_id: int) -> None:
"""
Fetch bundle for specified jid / device_id pair.
"""
log.debug('Fetching bundle for JID: %r, device: %r', jid, device_id)
node = f'{OMEMO_BUNDLES_NS}:{device_id}'
async def _fetch_bundle(self, jid: str, device_id: int) -> Optional[ExtendedPublicBundle]:
node = '%s:%d' % (OMEMO_BUNDLES_NS, device_id)
try:
iq = await self.xmpp['xep_0060'].get_items(jid, node)
except (IqError, IqTimeout):
return None
bundle = iq['pubsub']['items']['item']['bundle']
bundle = _parse_bundle(self.omemo_backend, bundle)
if bundle is not None:
log.debug('Encryption: Bundle %r found!', device_id)
devices = self.bundles.setdefault(jid.bare, {})
devices[device_id] = bundle
else:
log.debug('Encryption: Bundle %r not found!', device_id)
return _parse_bundle(self.omemo_backend, bundle)
async def fetch_bundles(self, jid: JID) -> None:
"""
Fetch bundles of active devices for specified JID.
This is a helper function to allow the user to request a store
update. Failed bundles are not retried.
"""
# Ignore failures
await asyncio.gather(
*map(
lambda did: self.fetch_bundle(jid, did),
await self.get_active_devices(jid)
),
return_exceptions=True,
)
async def fetch_devices(self, jid: JID) -> None:
"""
Manually query PEP OMEMO_DEVICES_NS nodes
"""
log.debug('Fetching device list for JID: %r', jid)
async def _fetch_device_list(self, jid: JID) -> None:
"""Manually query PEP OMEMO_DEVICES_NS nodes"""
iq = await self.xmpp['xep_0060'].get_items(jid.full, OMEMO_DEVICES_NS)
return await self._read_device_list(jid, iq['pubsub']['items'])
async def _store_device_ids(self, jid: str, items: Union[Items, EventItems]) -> None:
def _store_device_ids(self, jid: str, items: Union[Items, EventItems]) -> None:
"""Store Device list"""
device_ids = [] # type: List[int]
items = list(items)
if items:
device_ids = [int(d['id']) for d in items[0]['devices']]
return await self._omemo().newDeviceList(str(jid), device_ids)
return self._omemo.newDeviceList(str(jid), device_ids)
def _receive_device_list(self, msg: Message) -> None:
"""Handler for received PEP OMEMO_DEVICES_NS payloads"""
asyncio.create_task(
asyncio.ensure_future(
self._read_device_list(msg['from'], msg['pubsub_event']['items']),
)
async def _read_device_list(self, jid: JID, items: Union[Items, EventItems]) -> None:
"""Read items and devices if we need to set the device list again or not"""
bare_jid = jid.bare
await self._store_device_ids(bare_jid, items)
self._store_device_ids(bare_jid, items)
items = list(items)
device_ids = []
@ -489,7 +437,7 @@ class XEP_0384(BasePlugin):
return None
async def _set_device_list(self, device_ids: Optional[Iterable[int]] = None) -> None:
async def _set_device_list(self, device_ids: Optional[Set[int]] = None) -> None:
own_jid = self.xmpp.boundjid
try:
@ -497,15 +445,15 @@ class XEP_0384(BasePlugin):
own_jid.bare, OMEMO_DEVICES_NS,
)
items = iq['pubsub']['items']
await self._store_device_ids(own_jid.bare, items)
self._store_device_ids(own_jid.bare, items)
except IqError as iq_err:
if iq_err.condition == "item-not-found":
await self._store_device_ids(own_jid.bare, [])
self._store_device_ids(own_jid.bare, [])
else:
return # XXX: Handle this!
if device_ids is None:
device_ids = await self.get_active_devices(own_jid)
device_ids = self.get_device_list(own_jid)
devices = []
for i in device_ids:
@ -516,7 +464,7 @@ class XEP_0384(BasePlugin):
payload['devices'] = devices
jid = self.xmpp.boundjid
disco = await self.xmpp['xep_0030'].get_info(jid=jid.bare, local=False)
disco = await self.xmpp['xep_0030'].get_info(jid.bare)
publish_options = PUBLISH_OPTIONS_NODE in disco['disco_info'].get_features()
options = None
@ -531,7 +479,6 @@ class XEP_0384(BasePlugin):
})
try:
log.debug('Setting own device list to %r', device_ids)
await self.xmpp['xep_0060'].publish(
own_jid.bare, OMEMO_DEVICES_NS, payload=payload, options=options,
)
@ -554,80 +501,55 @@ class XEP_0384(BasePlugin):
own_jid.bare, OMEMO_DEVICES_NS, payload=payload,
)
async def get_devices(self, jid: JID) -> Iterable[int]:
def get_device_list(self, jid: JID) -> List[str]:
"""Return active device ids. Always contains our own device id."""
return self._omemo.getDevices(jid.bare).get('active', [])
def _chain_lengths(self, jid: JID) -> ChainLengths:
"""
Get all devices for a JID.
Gather receiving and sending chain lengths for all devices (active
/ inactive) of a JID.
Receiving chain length is used to know when to send a heartbeat to
signal recipients our device is still active and listening. See:
https://xmpp.org/extensions/xep-0384.html#rules
Sending chain length is used on the other side when a device
hasn't been sending us messages and seems inactive.
# XXX: Only the receiving part is used in this library for the
# moment.
"""
devices = await self._omemo().getDevices(jid.bare)
return map(int, set(devices.get('active', []) + devices.get('inactive', [])))
# XXX: This method uses APIs that haven't been made public yet in the
# OMEMO library as of 0.12 (9fd7123).
async def get_active_devices(self, jid: JID) -> Iterable[int]:
bare = jid.bare
devices = self._omemo.getDevices(bare)
active = devices.get('active', set())
inactive = devices.get('inactive', set())
devices = active.union(inactive)
lengths: ChainLengths = {'sending': [], 'receiving': []}
for did in devices:
session = self._omemo._SessionManager__loadSession(bare, did)
skr = session._DoubleRatchet__skr
lengths['sending'].append((did, skr.sending_chain_length))
lengths['receiving'].append((did, skr.receiving_chain_length))
return lengths
def should_heartbeat(self, jid: JID) -> bool:
"""
Return active device ids. Always contains our own device id.
"""
devices = await self._omemo().getDevices(jid.bare)
return map(int, set(devices.get('active', [])))
async def _should_heartbeat(self, jid: JID, device_id: int, prekey: bool) -> bool:
"""
Internal helper for :py:func:`XEP_0384.should_heartbeat`.
Returns whether we should send a heartbeat message for (JID,
device_id).
We check if the message is a prekey message, in which case we
assume it's a new session and we want to ACK relatively early.
Otherwise we look at the number of messages since we have last
replied and if above a certain threshold we notify them that we're
still active.
"""
length = await self._omemo().receiving_chain_length(jid.bare, device_id)
inactive_session = (length or 0) > self.heartbeat_after
log.debug(
'Chain length for %r / %d: %d -> inactive_session? %r',
jid, device_id, length, inactive_session,
)
log.debug('Is this a prekey message: %r', prekey)
res = prekey or inactive_session
log.debug('Should heartbeat? %r', res)
return res
async def should_heartbeat(self, jid: JID, msg: Union[Message, Encrypted]) -> bool:
"""
Returns whether we should send a heartbeat message to the sender
device. See notes about heartbeat in
Returns whether we should send a heartbeat message for JID.
See notes about heartbeat in
https://xmpp.org/extensions/xep-0384.html#rules.
This method will return True if this session (to the sender
device) is not yet confirmed, or if it hasn't been answered in a
while.
"""
prekey: bool = False
receiving_chain_lengths = self._chain_lengths(jid).get('receiving', [])
lengths = map(lambda d_l: d_l[1], receiving_chain_lengths)
return max(lengths, default=0) > self.heartbeat_after
# Get prekey information from message
encrypted = msg
if isinstance(msg, Message):
encrypted = msg['omemo_encrypted']
header = encrypted['header']
sid = header['sid']
key = header.xml.find("{%s}key[@rid='%s']" % (
OMEMO_BASE_NS, self._device_id))
# Don't error out. If it's not encrypted to us we don't need to send a
# heartbeat.
prekey = False
if key is not None:
key = Key(key)
prekey = key['prekey'] in TRUE_VALUES
return await self._should_heartbeat(jid, sid, prekey)
async def send_heartbeat(self, jid: JID, device_id: int) -> None:
async def make_heartbeat(self, jid: JID) -> Message:
"""
Returns a heartbeat message.
@ -637,30 +559,21 @@ class XEP_0384(BasePlugin):
"""
msg = self.xmpp.make_message(mto=jid)
encrypted = await self.encrypt_message(
encrypted = await self.encrypt_key_transport_message(
plaintext=None,
recipients=[jid],
expect_problems=None,
_ignore_trust=True,
_device_id=device_id,
)
msg.append(encrypted)
msg.enable('store')
msg.send()
return msg
async def delete_session(self, jid: JID, device_id: int) -> None:
"""
Delete the session for the provided jid/device_id pair.
"""
await self._omemo().deleteSession(jid.bare, device_id)
def trust(self, jid: JID, device_id: int, ik: bytes) -> None:
self._omemo.setTrust(jid.bare, device_id, ik, True)
async def trust(self, jid: JID, device_id: int, ik: bytes) -> None:
await self._omemo().setTrust(jid.bare, device_id, ik, True)
def distrust(self, jid: JID, device_id: int, ik: bytes) -> None:
self._omemo.setTrust(jid.bare, device_id, ik, False)
async def distrust(self, jid: JID, device_id: int, ik: bytes) -> None:
await self._omemo().setTrust(jid.bare, device_id, ik, False)
async def get_trust_for_jid(self, jid: JID) -> Dict[str, List[Optional[Dict[str, Any]]]]:
def get_trust_for_jid(self, jid: JID) -> Dict[str, List[Optional[Dict[str, Any]]]]:
"""
Fetches trust for JID. The returned dictionary will contain active
and inactive devices. Each of these dict will contain device ids
@ -681,23 +594,21 @@ class XEP_0384(BasePlugin):
}
"""
return await self._omemo().getTrustForJID(jid.bare)
return self._omemo.getTrustForJID(jid.bare)
@staticmethod
def is_encrypted(msg: Message) -> bool:
def is_encrypted(self, msg: Message) -> bool:
return msg.xml.find('{%s}encrypted' % OMEMO_BASE_NS) is not None
async def decrypt_message(
def decrypt_message(
self,
encrypted: Encrypted,
sender: JID,
allow_untrusted: bool = False,
) -> Optional[str]:
header = encrypted['header']
payload = None
if encrypted['payload']['value'] is not None:
payload = b64dec(encrypted['payload']['value'])
if encrypted['payload']['value'] is None:
raise ErroneousPayload('The payload element was empty')
payload = b64dec(encrypted['payload']['value'])
jid = sender.bare
sid = int(header['sid'])
@ -719,27 +630,15 @@ class XEP_0384(BasePlugin):
# XXX: 'cipher' is part of KeyTransportMessages and is used when no payload
# is passed. We do not implement this yet.
try:
log.debug('Decryption: Attempt to decrypt message from JID: %r', sender)
if payload is None:
await self._omemo().decryptRatchetForwardingMessage(
jid,
sid,
iv,
message,
isPrekeyMessage,
allow_untrusted=allow_untrusted,
)
body = None
else:
body = await self._omemo().decryptMessage(
jid,
sid,
iv,
message,
isPrekeyMessage,
payload,
allow_untrusted=allow_untrusted,
)
body = self._omemo.decryptMessage(
jid,
sid,
iv,
message,
isPrekeyMessage,
payload,
allow_untrusted=allow_untrusted,
)
except (omemo.exceptions.NoSessionException,):
# This might happen when the sender is sending using a session
# that we don't know about (deleted session storage, etc.). In
@ -748,31 +647,38 @@ class XEP_0384(BasePlugin):
raise NoAvailableSession(jid, sid)
except (omemo.exceptions.TrustException,) as exn:
if exn.problem == 'undecided':
log.debug('Decryption: trust state for JID: %r, device: %r, is undecided', exn.bare_jid, exn.device)
raise UndecidedException(exn.bare_jid, exn.device, exn.ik)
if exn.problem == 'untrusted':
log.debug('Decryption: trust state for JID: %r, device: %r, set to untrusted', exn.bare_jid, exn.device)
raise UntrustedException(exn.bare_jid, exn.device, exn.ik)
raise
finally:
asyncio.create_task(self._publish_bundle())
asyncio.ensure_future(self._publish_bundle())
if self.auto_heartbeat:
log.debug('Checking if heartbeat is required. auto_hearbeat enabled.')
should_heartbeat = await self._should_heartbeat(sender, sid, isPrekeyMessage)
if should_heartbeat:
log.debug('Decryption: Sending hearbeat to %s / %d', jid, sid)
await self.send_heartbeat(JID(jid), sid)
if self.auto_heartbeat and self.should_heartbeat():
async def send_heartbeat():
msg = await self.make_heartbeat(JID(jid))
msg.send()
asyncio.ensure_future(send_heartbeat())
return body
async def encrypt_message(
self,
plaintext: Optional[str],
plaintext: str,
recipients: List[JID],
expect_problems: Optional[Dict[JID, List[int]]] = None,
) -> Encrypted:
return await self.encrypt_key_transport_message(
plaintext.encode('utf-8'),
recipients,
expect_problems,
)
async def encrypt_key_transport_message(
self,
plaintext: Optional[bytes],
recipients: List[JID],
expect_problems: Optional[Dict[JID, List[int]]] = None,
_ignore_trust: bool = False,
_device_id: Optional[int] = None,
) -> Encrypted:
"""
Returns an encrypted payload to be placed into a message.
@ -780,22 +686,9 @@ class XEP_0384(BasePlugin):
The API for getting an encrypted payload consists of trying first
and fixing errors progressively. The actual sending happens once the
application (us) thinks we're good to go.
If `plaintext` is specified, this will generate a full OMEMO payload. If
not, if `_ignore_trust` is True, this will generate a ratchet forwarding
message, and otherwise it will generate a key transport message.
These are rather technical details to the user and fiddling with
parameters else than `plaintext` and `recipients` should be rarely
needed.
The `_device_id` parameter is required in the case of a ratchet
forwarding message. That is, `plaintext` to None, and `_ignore_trust`
to True. If specified, a single recipient JID is required. If not all
these conditions are met, ErroneousParameter will be raised.
"""
barejids: List[str] = [jid.bare for jid in recipients]
recipients = [jid.bare for jid in recipients]
old_errors = None # type: Optional[List[Tuple[Exception, Any, Any]]]
while True:
@ -809,27 +702,17 @@ class XEP_0384(BasePlugin):
expect_problems = {jid.bare: did for (jid, did) in expect_problems.items()}
try:
log.debug('Encryption: attempt to encrypt for JIDs: %r', barejids)
if plaintext is not None:
encrypted = await self._omemo().encryptMessage(
barejids,
plaintext.encode('utf-8'),
bundles=self.bundles,
encrypted = self._omemo.encryptMessage(
recipients,
plaintext,
self.bundles,
expect_problems=expect_problems,
)
elif _ignore_trust:
if not _device_id or len(barejids) != 1:
raise ErroneousParameter
bundle = self.bundles.get(barejids[0], {}).get(_device_id, None)
encrypted = await self._omemo().encryptRatchetForwardingMessage(
bare_jid=barejids[0],
device_id=_device_id,
bundle=bundle,
)
else:
encrypted = await self._omemo().encryptKeyTransportMessage(
barejids,
bundles=self.bundles,
encrypted = self._omemo.encryptKeyTransportMessage(
recipients,
self.bundles,
expect_problems=expect_problems,
)
return _generate_encrypted_payload(encrypted)
@ -837,18 +720,18 @@ class XEP_0384(BasePlugin):
errors = exception.problems
if errors == old_errors:
log.debug('Encryption: Still not possible after another iteration.')
raise EncryptionPrepareException(errors)
old_errors = errors
for exn in errors:
if isinstance(exn, omemo.exceptions.NoDevicesException):
log.debug('Encryption: Missing device list for JID: %r', exn.bare_jid)
await self.fetch_devices(JID(exn.bare_jid))
await self._fetch_device_list(JID(exn.bare_jid))
elif isinstance(exn, omemo.exceptions.MissingBundleException):
log.debug('Encryption: Missing bundle for JID: %r, device: %r', exn.bare_jid, exn.device)
await self.fetch_bundle(JID(exn.bare_jid), exn.device)
bundle = await self._fetch_bundle(exn.bare_jid, exn.device)
if bundle is not None:
devices = self.bundles.setdefault(exn.bare_jid, {})
devices[exn.device] = bundle
elif isinstance(exn, omemo.exceptions.TrustException):
# On TrustException, there are two possibilities.
# Either trust has not been explicitely set yet, and is
@ -857,7 +740,6 @@ class XEP_0384(BasePlugin):
# a choice. If untrusted, then we can safely tell the
# OMEMO lib to not encrypt to this device
if exn.problem == 'undecided':
log.debug('Encryption: Trust state not set for JID: %r, device: %r', exn.bare_jid, exn.device)
raise UndecidedException(exn.bare_jid, exn.device, exn.ik)
distrusted_jid = JID(exn.bare_jid)
expect_problems.setdefault(distrusted_jid, []).append(exn.device)

View file

View file

@ -9,5 +9,5 @@
See the file LICENSE for copying permission.
"""
__version__ = "0.9.0"
__version_info__ = (0, 9, 0)
__version__ = "0.5.0"
__version_info__ = (0, 5, 0)