diff --git a/poezio/plugin_e2ee.py b/poezio/plugin_e2ee.py index 94084d41..8206dc4a 100644 --- a/poezio/plugin_e2ee.py +++ b/poezio/plugin_e2ee.py @@ -10,7 +10,7 @@ Interface for E2EE (End-to-end Encryption) plugins. """ -from typing import Callable, Dict, List, Optional, Union, Tuple +from typing import Callable, Dict, List, Optional, Union, Tuple, Set from slixmpp import InvalidJID, JID, Message from slixmpp.xmlstream import StanzaBase @@ -24,6 +24,8 @@ from poezio.tabs import ( ) from poezio.plugin import BasePlugin from poezio.theming import get_theme, dump_tuple +from poezio.config import config +from poezio.decorators import command_args_parser from asyncio import iscoroutinefunction @@ -114,7 +116,16 @@ class E2EEPlugin(BasePlugin): # time _enabled_tabs = {} # type: Dict[JID, Callable] + # Tabs that support this encryption mechanism + supported_tab_types = [] # type: List[ChatTabs] + + # States for each remote entity + trust_states = {'accepted': set(), 'rejected': set()} # type: Dict[str, Set[str]] + def init(self): + self._all_trust_states = self.trust_states['accepted'].union( + self.trust_states['rejected'] + ) if self.encryption_name is None and self.encryption_short_name is None: raise NotImplementedError @@ -138,7 +149,7 @@ class E2EEPlugin(BasePlugin): # sure poezio is not sneaking anything past us. self.core.xmpp.add_filter('out', self._encrypt_wrapper) - for tab_t in (DynamicConversationTab, StaticConversationTab, PrivateTab, MucTab): + for tab_t in self.supported_tab_types: self.api.add_tab_command( tab_t, self.encryption_short_name, @@ -148,6 +159,25 @@ class E2EEPlugin(BasePlugin): help='Toggle automatic {} encryption for tab.'.format(self.encryption_name), ) + trust_msg = 'Set {name} state to {state} for this fingerprint on this JID.' + for state in self._all_trust_states: + for tab_t in self.supported_tab_types: + self.api.add_tab_command( + tab_t, + self.encryption_short_name + '_' + state, + lambda args: self.__command_set_state_local(args, state), + usage='', + short=trust_msg.format(name=self.encryption_short_name, state=state), + help=trust_msg.format(name=self.encryption_short_name, state=state), + ) + self.api.add_command( + self.encryption_short_name + '_' + state, + lambda args: self.__command_set_state_global(args, state), + usage=' ', + short=trust_msg.format(name=self.encryption_short_name, state=state), + help=trust_msg.format(name=self.encryption_short_name, state=state), + ) + ConversationTab.add_information_element( self.encryption_short_name, self._display_encryption_status, @@ -161,6 +191,15 @@ class E2EEPlugin(BasePlugin): self._display_encryption_status, ) + self.__load_encrypted_states() + + def __load_encrypted_states(self) -> None: + """Load previously stored encryption states for jids.""" + for section in config.sections(): + value = config.get('encryption', section=section) + if value and value == self.encryption_short_name: + self._enabled_tabs[section] = self.encrypt + def cleanup(self): ConversationTab.remove_information_element(self.encryption_short_name) MucTab.remove_information_element(self.encryption_short_name) @@ -184,19 +223,40 @@ class E2EEPlugin(BasePlugin): def _toggle_tab(self, _input: str) -> None: jid = self.api.current_tab().jid # type: JID - if self._encryption_enabled(jid): + if self._enabled_tabs.get(jid) == self.encrypt: del self._enabled_tabs[jid] + config.remove_and_save('encryption', section=jid) self.api.information( '{} encryption disabled for {}'.format(self.encryption_name, jid), 'Info', ) else: self._enabled_tabs[jid] = self.encrypt + config.set_and_save('encryption', self.encryption_short_name, section=jid) self.api.information( '{} encryption enabled for {}'.format(self.encryption_name, jid), 'Info', ) + @command_args_parser.quoted(2) + def __command_set_state_global(self, args, state='') -> None: + jid, fpr = args + if state not in self._all_trust_states: + self.api.information('Unknown state for plugin %s: %s' % (self.encryption_short_name, state), 'Error') + return + self.store_trust(jid, state, fpr) + + @command_args_parser.quoted(1) + def __command_set_state_local(self, args, state='') -> None: + if isinstance(self.api.current_tab(), MucTab): + return + jid = self.api.get_current_tab().name + fpr = args[0] + if state not in self._all_trust_states: + self.api.information('Unknown state for plugin %s: %s' % (self.encryption_short_name, state), 'Error') + return + self.store_trust(jid, state, fpr) + def _encryption_enabled(self, jid: JID) -> bool: return jid in self._enabled_tabs and self._enabled_tabs[jid] == self.encrypt @@ -306,6 +366,17 @@ class E2EEPlugin(BasePlugin): log.debug('Encrypted %s message: %r', self.encryption_name, message) return message + def store_trust(self, jid: JID, state: str, fingerprint: str) -> None: + """Store trust for a fingerprint and a jid.""" + option_name = '%s:%s' % (self.encryption_short_name, fingerprint) + config.silent_set(option=option_name, value=state, section=jid) + + def fetch_trust(self, jid: JID, fingerprint: str) -> str: + """Fetch trust of a fingerprint and a jid. + """ + option_name = '%s:%s' % (self.encryption_short_name, fingerprint) + return config.get(option=option_name, section=jid) + async def decrypt(self, _message: Message, tab: ChatTabs): """Decryption method