omemo: add supported_tab_types and trust_states plugin attributes
- supported tab types is a list of tabs this plugin should be active in (only chattabs) - trust_states is a dict[str → set] containing only two keys: accepted and rejected, whose values are the internal plugin states that should allow encryption and the ones that should not
This commit is contained in:
parent
4586765793
commit
25dae11eb3
1 changed files with 74 additions and 3 deletions
|
@ -10,7 +10,7 @@
|
|||
Interface for E2EE (End-to-end Encryption) plugins.
|
||||
"""
|
||||
|
||||
from typing import Callable, Dict, List, Optional, Union, Tuple
|
||||
from typing import Callable, Dict, List, Optional, Union, Tuple, Set
|
||||
|
||||
from slixmpp import InvalidJID, JID, Message
|
||||
from slixmpp.xmlstream import StanzaBase
|
||||
|
@ -24,6 +24,8 @@ from poezio.tabs import (
|
|||
)
|
||||
from poezio.plugin import BasePlugin
|
||||
from poezio.theming import get_theme, dump_tuple
|
||||
from poezio.config import config
|
||||
from poezio.decorators import command_args_parser
|
||||
|
||||
from asyncio import iscoroutinefunction
|
||||
|
||||
|
@ -114,7 +116,16 @@ class E2EEPlugin(BasePlugin):
|
|||
# time
|
||||
_enabled_tabs = {} # type: Dict[JID, Callable]
|
||||
|
||||
# Tabs that support this encryption mechanism
|
||||
supported_tab_types = [] # type: List[ChatTabs]
|
||||
|
||||
# States for each remote entity
|
||||
trust_states = {'accepted': set(), 'rejected': set()} # type: Dict[str, Set[str]]
|
||||
|
||||
def init(self):
|
||||
self._all_trust_states = self.trust_states['accepted'].union(
|
||||
self.trust_states['rejected']
|
||||
)
|
||||
if self.encryption_name is None and self.encryption_short_name is None:
|
||||
raise NotImplementedError
|
||||
|
||||
|
@ -138,7 +149,7 @@ class E2EEPlugin(BasePlugin):
|
|||
# sure poezio is not sneaking anything past us.
|
||||
self.core.xmpp.add_filter('out', self._encrypt_wrapper)
|
||||
|
||||
for tab_t in (DynamicConversationTab, StaticConversationTab, PrivateTab, MucTab):
|
||||
for tab_t in self.supported_tab_types:
|
||||
self.api.add_tab_command(
|
||||
tab_t,
|
||||
self.encryption_short_name,
|
||||
|
@ -148,6 +159,25 @@ class E2EEPlugin(BasePlugin):
|
|||
help='Toggle automatic {} encryption for tab.'.format(self.encryption_name),
|
||||
)
|
||||
|
||||
trust_msg = 'Set {name} state to {state} for this fingerprint on this JID.'
|
||||
for state in self._all_trust_states:
|
||||
for tab_t in self.supported_tab_types:
|
||||
self.api.add_tab_command(
|
||||
tab_t,
|
||||
self.encryption_short_name + '_' + state,
|
||||
lambda args: self.__command_set_state_local(args, state),
|
||||
usage='<fingerprint>',
|
||||
short=trust_msg.format(name=self.encryption_short_name, state=state),
|
||||
help=trust_msg.format(name=self.encryption_short_name, state=state),
|
||||
)
|
||||
self.api.add_command(
|
||||
self.encryption_short_name + '_' + state,
|
||||
lambda args: self.__command_set_state_global(args, state),
|
||||
usage='<JID> <fingerprint>',
|
||||
short=trust_msg.format(name=self.encryption_short_name, state=state),
|
||||
help=trust_msg.format(name=self.encryption_short_name, state=state),
|
||||
)
|
||||
|
||||
ConversationTab.add_information_element(
|
||||
self.encryption_short_name,
|
||||
self._display_encryption_status,
|
||||
|
@ -161,6 +191,15 @@ class E2EEPlugin(BasePlugin):
|
|||
self._display_encryption_status,
|
||||
)
|
||||
|
||||
self.__load_encrypted_states()
|
||||
|
||||
def __load_encrypted_states(self) -> None:
|
||||
"""Load previously stored encryption states for jids."""
|
||||
for section in config.sections():
|
||||
value = config.get('encryption', section=section)
|
||||
if value and value == self.encryption_short_name:
|
||||
self._enabled_tabs[section] = self.encrypt
|
||||
|
||||
def cleanup(self):
|
||||
ConversationTab.remove_information_element(self.encryption_short_name)
|
||||
MucTab.remove_information_element(self.encryption_short_name)
|
||||
|
@ -184,19 +223,40 @@ class E2EEPlugin(BasePlugin):
|
|||
def _toggle_tab(self, _input: str) -> None:
|
||||
jid = self.api.current_tab().jid # type: JID
|
||||
|
||||
if self._encryption_enabled(jid):
|
||||
if self._enabled_tabs.get(jid) == self.encrypt:
|
||||
del self._enabled_tabs[jid]
|
||||
config.remove_and_save('encryption', section=jid)
|
||||
self.api.information(
|
||||
'{} encryption disabled for {}'.format(self.encryption_name, jid),
|
||||
'Info',
|
||||
)
|
||||
else:
|
||||
self._enabled_tabs[jid] = self.encrypt
|
||||
config.set_and_save('encryption', self.encryption_short_name, section=jid)
|
||||
self.api.information(
|
||||
'{} encryption enabled for {}'.format(self.encryption_name, jid),
|
||||
'Info',
|
||||
)
|
||||
|
||||
@command_args_parser.quoted(2)
|
||||
def __command_set_state_global(self, args, state='') -> None:
|
||||
jid, fpr = args
|
||||
if state not in self._all_trust_states:
|
||||
self.api.information('Unknown state for plugin %s: %s' % (self.encryption_short_name, state), 'Error')
|
||||
return
|
||||
self.store_trust(jid, state, fpr)
|
||||
|
||||
@command_args_parser.quoted(1)
|
||||
def __command_set_state_local(self, args, state='') -> None:
|
||||
if isinstance(self.api.current_tab(), MucTab):
|
||||
return
|
||||
jid = self.api.get_current_tab().name
|
||||
fpr = args[0]
|
||||
if state not in self._all_trust_states:
|
||||
self.api.information('Unknown state for plugin %s: %s' % (self.encryption_short_name, state), 'Error')
|
||||
return
|
||||
self.store_trust(jid, state, fpr)
|
||||
|
||||
def _encryption_enabled(self, jid: JID) -> bool:
|
||||
return jid in self._enabled_tabs and self._enabled_tabs[jid] == self.encrypt
|
||||
|
||||
|
@ -306,6 +366,17 @@ class E2EEPlugin(BasePlugin):
|
|||
log.debug('Encrypted %s message: %r', self.encryption_name, message)
|
||||
return message
|
||||
|
||||
def store_trust(self, jid: JID, state: str, fingerprint: str) -> None:
|
||||
"""Store trust for a fingerprint and a jid."""
|
||||
option_name = '%s:%s' % (self.encryption_short_name, fingerprint)
|
||||
config.silent_set(option=option_name, value=state, section=jid)
|
||||
|
||||
def fetch_trust(self, jid: JID, fingerprint: str) -> str:
|
||||
"""Fetch trust of a fingerprint and a jid.
|
||||
"""
|
||||
option_name = '%s:%s' % (self.encryption_short_name, fingerprint)
|
||||
return config.get(option=option_name, section=jid)
|
||||
|
||||
async def decrypt(self, _message: Message, tab: ChatTabs):
|
||||
"""Decryption method
|
||||
|
||||
|
|
Loading…
Reference in a new issue