xep_0384: Make storage synchronous

The OMEMO lib provides a way to do async operations, but slixmpp and
poezio are not entirely ready for this:
- The plugin_init method is not awaited, in slixmpp
- Event handlers are not awaited, in poezio

This would need to be fixed before being able to do what I am trying to
do asynchronously.

Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
This commit is contained in:
Maxime “pep” Buquet 2018-11-19 01:37:07 +01:00
parent f20a1a8224
commit 11cbbdcda4
3 changed files with 54 additions and 119 deletions

View file

@ -25,10 +25,10 @@ log = logging.getLogger(__name__)
HAS_OMEMO = True
try:
from omemo import SessionManager
from omemo.util import generateDeviceID
from omemo_backend_signal import BACKEND as SignalBackend
from slixmpp.plugins.xep_0384.session import WrappedSessionManager as SessionManager
from slixmpp.plugins.xep_0384.storage import AsyncInMemoryStorage
from slixmpp.plugins.xep_0384.storage import SyncFileStorage
from slixmpp.plugins.xep_0384.otpkpolicy import KeepingOTPKPolicy
except ImportError as e:
HAS_OMEMO = False
@ -74,29 +74,20 @@ class XEP_0384(BasePlugin):
"is not available")
return
storage = AsyncInMemoryStorage(self.cache_dir)
storage = SyncFileStorage(self.cache_dir)
otpkpolicy = KeepingOTPKPolicy()
backend = SignalBackend
bare_jid = self.xmpp.boundjid.bare
self._device_id = self._load_device_id(self.cache_dir)
future = SessionManager.create(
storage,
otpkpolicy,
backend,
bare_jid,
self._device_id,
)
asyncio.ensure_future(future)
# XXX: This is crap. Rewrite slixmpp plugin system to use async.
# The issue here is that I can't declare plugin_init as async because
# it's not awaited on.
while not future.done():
time.sleep(0.1)
try:
self._omemo = future.result()
self._omemo = SessionManager.create(
storage,
otpkpolicy,
backend,
bare_jid,
self._device_id,
)
except:
log.error("Couldn't load the OMEMO object; ¯\\_(ツ)_/¯")
raise PluginCouldNotLoad
@ -163,7 +154,7 @@ class XEP_0384(BasePlugin):
iq = self._generate_bundle_iq()
await iq.send()
async def _store_device_ids(self, jid, items):
def _store_device_ids(self, jid, items):
device_ids = []
for item in items:
device_ids = [int(d['id']) for d in item['devices']]
@ -171,17 +162,17 @@ class XEP_0384(BasePlugin):
# XXX: There should only be one item so this is fine, but slixmpp
# loops forever otherwise. ???
break
return await self._omemo.newDeviceList(device_ids, str(jid))
return self._omemo.newDeviceList(device_ids, str(jid))
async def _receive_device_list(self, msg):
def _receive_device_list(self, msg):
if msg['pubsub_event']['items']['node'] != OMEMO_DEVICES_NS:
return
jid = msg['from'].bare
items = msg['pubsub_event']['items']
await self._store_device_ids(jid, items)
self._store_device_ids(jid, items)
device_ids = await self.get_device_list(jid)
device_ids = self.get_device_list(jid)
active_devices = device_ids['active']
if jid == self.xmpp.boundjid.bare and \
@ -196,14 +187,14 @@ class XEP_0384(BasePlugin):
self.xmpp.boundjid.bare, OMEMO_DEVICES_NS,
)
items = iq['pubsub']['items']
await self._store_device_ids(jid, items)
self._store_device_ids(jid, items)
except IqError as iq_err:
if iq_err.condition == "item-not-found":
await self._store_device_ids(jid, [])
self._store_device_ids(jid, [])
else:
return # XXX: Handle this!
device_ids = await self.get_device_list(jid)
device_ids = self.get_device_list(jid)
# Verify that this device in the list and set it if necessary
if self._device_id in device_ids:
@ -223,9 +214,9 @@ class XEP_0384(BasePlugin):
jid, OMEMO_DEVICES_NS, payload=payload,
)
async def get_device_list(self, jid) -> List[str]:
def get_device_list(self, jid) -> List[str]:
# XXX: Maybe someday worry about inactive devices somehow
return await self._omemo.getDevices(jid)
return self._omemo.getDevices(jid)
def is_encrypted(self, msg):
return msg.xml.find('{%s}encrypted' % OMEMO_BASE_NS) is not None

View file

@ -1,32 +0,0 @@
"""
Wrap omemo.SessionManager object to return futures
"""
from omemo import SessionManager
from asyncio import Future
def wrap(method, *args, **kwargs):
future = Future()
promise = method(*args, **kwargs)
promise.then(future.set_result, future.set_exception)
return future
class WrappedSessionManager(SessionManager):
@classmethod
def create(cls, *args, **kwargs) -> Future:
return wrap(super().create, *args, **kwargs)
def encryptMessage(self, *args, **kwargs) -> Future:
return wrap(super().encryptMessage, *args, **kwargs)
def decryptMessage(self, *args, **kwargs) -> Future:
return wrap(super().decryptMessage, *args, **kwargs)
def newDeviceList(self, *args, **kwargs) -> Future:
return wrap(super().newDeviceList, *args, **kwargs)
def getDevices(self, *args, **kwargs) -> Future:
return wrap(super().getDevices, *args, **kwargs)

View file

@ -11,7 +11,7 @@ import copy
import json
class AsyncInMemoryStorage(omemo.Storage):
class SyncFileStorage(omemo.Storage):
def __init__(self, storage_dir):
self.storage_dir = storage_dir
self.__state = None
@ -30,32 +30,28 @@ class AsyncInMemoryStorage(omemo.Storage):
def trust(self, trusted):
self.__trusted = trusted
def loadOwnData(self, callback):
def loadOwnData(self, _callback):
if self.__own_data is None:
try:
filepath = os.path.join(self.storage_dir, 'own_data.json')
with open(filepath, 'r') as f:
self.__own_data = json.load(f)
except OSError:
return callback(True, None)
except json.JSONDecodeError as e:
return callback(False, e)
return None
return callback(True, self.__own_data)
return self.__own_data
def storeOwnData(self, callback, own_bare_jid, own_device_id):
def storeOwnData(self, _callback, own_bare_jid, own_device_id):
self.__own_data = {
'own_bare_jid': own_bare_jid,
'own_device_id': own_device_id,
}
try:
filepath = os.path.join(self.storage_dir, 'own_data.json')
with open(filepath, 'w') as f:
json.dump(self.__own_data, f)
return callback(True, None)
except Exception as e:
return callback(False, e)
filepath = os.path.join(self.storage_dir, 'own_data.json')
with open(filepath, 'w') as f:
json.dump(self.__own_data, f)
return None
def loadState(self, callback):
if self.__state is None:
@ -64,80 +60,60 @@ class AsyncInMemoryStorage(omemo.Storage):
with open(filepath, 'r') as f:
self.__state = json.load(f)
except OSError:
return callback(True, None)
except json.JSONDecodeError as e:
return callback(False, e)
return None
return callback(True, self.__state)
return self.__state
def storeState(self, callback, state):
def storeState(self, _callback, state):
self.__state = state
try:
filepath = os.path.join(self.storage_dir, 'omemo.json')
with open(filepath, 'w') as f:
json.dump(self.__state, f)
return callback(True, None)
except Exception as e:
return callback(False, e)
filepath = os.path.join(self.storage_dir, 'omemo.json')
with open(filepath, 'w') as f:
json.dump(self.__state, f)
def loadSession(self, callback, bare_jid, device_id):
callback(True, self.__sessions.get(bare_jid, {}).get(device_id, None))
def loadSession(self, _callback, bare_jid, device_id):
return self.__sessions.get(bare_jid, {}).get(device_id, None)
def storeSession(self, callback, bare_jid, device_id, session):
self.__sessions[bare_jid] = self.__sessions.get(bare_jid, {})
self.__sessions[bare_jid][device_id] = session
callback(True, None)
def loadActiveDevices(self, callback, bare_jid):
def loadActiveDevices(self, _callback, bare_jid):
if self.__devices is None:
try:
filepath = os.path.join(self.storage_dir, 'devices.json')
with open(filepath, 'r') as f:
self.__devices = json.load(f)
except OSError:
return callback(True, None)
except json.JSONDecodeError as e:
return callback(False, e)
return None
return callback(True, self.__devices.get(bare_jid, {}).get("active", []))
return self.__devices.get(bare_jid, {}).get("active", [])
def storeActiveDevices(self, callback, bare_jid, devices):
def storeActiveDevices(self, _callback, bare_jid, devices):
self.__devices[bare_jid] = self.__devices.get(bare_jid, {})
self.__devices[bare_jid]["active"] = list(devices)
try:
filepath = os.path.join(self.storage_dir, 'devices.json')
with open(filepath, 'w') as f:
json.dump(self.__devices, f)
return callback(True, None)
except Exception as e:
return callback(False, e)
filepath = os.path.join(self.storage_dir, 'devices.json')
with open(filepath, 'w') as f:
json.dump(self.__devices, f)
def loadInactiveDevices(self, callback, bare_jid):
def loadInactiveDevices(self, _callback, bare_jid):
if self.__devices is None:
try:
filepath = os.path.join(self.storage_dir, 'devices.json')
with open(filepath, 'r') as f:
self.__devices = json.load(f)
except OSError:
return callback(True, None)
except json.JSONDecodeError as e:
return callback(False, e)
return None
return callback(True, self.__devices.get(bare_jid, {}).get("inactive", []))
return self.__devices.get(bare_jid, {}).get("inactive", [])
def storeInactiveDevices(self, callback, bare_jid, devices):
def storeInactiveDevices(self, _callback, bare_jid, devices):
self.__devices[bare_jid] = self.__devices.get(bare_jid, {})
self.__devices[bare_jid]["inactive"] = list(devices)
try:
filepath = os.path.join(self.storage_dir, 'devices.json')
with open(filepath, 'w') as f:
json.dump(self.__devices, f)
return callback(True, None)
except Exception as e:
return callback(False, e)
filepath = os.path.join(self.storage_dir, 'devices.json')
with open(filepath, 'w') as f:
json.dump(self.__devices, f)
def isTrusted(self, callback, bare_jid, device):
result = False
@ -147,8 +123,8 @@ class AsyncInMemoryStorage(omemo.Storage):
else:
result = bare_jid in self.__trusted and device in self.__trusted[bare_jid]
callback(True, result)
return result
@property
def is_async(self):
return True
return False