Fix #2448 (SMP in the OTR plugin)

Add a /otrsmp <abort|ask|answer> command.

also improve usability a bit, and mention the trust status in the info
bar.
This commit is contained in:
mathieui 2014-12-30 22:00:02 +01:00
parent 2a376cf419
commit 96442e93e3
No known key found for this signature in database
GPG key ID: C59F84CEEFD616E3

View file

@ -71,6 +71,14 @@ Command added to Conversation Tabs and Private Tabs:
*NOT* with multiple rewrites in a secure manner, you should do that
yourself if you want to be sure.
/otrsmp
**Usage:** ``/otrsmp <ask|answer|abort> [question] [secret]``
Verify the identify of your contact by using a pre-defined secret.
- The ``abort`` command aborts an ongoing verification
- The ``ask`` command start a verification, with a question or not
- The ``answer`` command answers a verification and ends the smp session
To use OTR, make sure the plugin is loaded (if not, then do ``/load otr``).
@ -198,12 +206,14 @@ import curses
from potr.context import NotEncryptedError, UnencryptedMessage, ErrorReceived, NotOTRMessage,\
STATE_ENCRYPTED, STATE_PLAINTEXT, STATE_FINISHED, Context, Account, crypt
import common
import xhtml
from common import safeJID
from config import config
from plugin import BasePlugin
from tabs import ConversationTab, DynamicConversationTab, PrivateTab
from theming import get_theme, dump_tuple
from decorators import command_args_parser
OTR_DIR = os.path.join(os.getenv('XDG_DATA_HOME') or
'~/.local/share', 'poezio', 'otr')
@ -220,6 +230,22 @@ POLICY_FLAGS = {
log = logging.getLogger(__name__)
OTR_TUTORIAL = _(
"""%(info)sThis contact has not yet been verified.
You have several methods of authentication available:
1) Verify each other's fingerprints using a secure (and different) channel:
Your fingerprint: %(normal)s%(our_fpr)s%(info)s
%(jid_c)s%(jid)s%(info)s's fingerprint: %(normal)s%(remote_fpr)s%(info)s
Then use the command: /otr trust
2) SMP pre-shared secret you both know:
/otrsmp ask <secret>
3) SMP pre-shared secret you both know with a question:
/otrsmp ask <question> <secret>
""")
def hl(tab):
if tab.state != 'current':
tab.state = 'private'
@ -236,6 +262,8 @@ class PoezioContext(Context):
self.core = core
self.flags = {}
self.trustName = safeJID(peer).bare
self.in_smp = False
self.smp_own = False
def getPolicy(self, key):
if key in self.flags:
@ -243,6 +271,10 @@ class PoezioContext(Context):
else:
return False
def reset_smp(self):
self.in_smp = False
self.smp_own = False
def inject(self, msg, appdata=None):
message = self.xmpp.make_message(mto=self.peer,
mbody=msg.decode('ascii'),
@ -253,6 +285,7 @@ class PoezioContext(Context):
def setState(self, newstate):
color_jid = '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID)
color_info = '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT)
color_normal = '\x19%s}' % dump_tuple(get_theme().COLOR_NORMAL_TEXT)
tab = self.core.get_tab_by_name(self.peer)
if not tab:
@ -261,28 +294,27 @@ class PoezioContext(Context):
if tab and not tab.locked_resource == safeJID(self.peer).resource:
tab = None
if self.state == STATE_ENCRYPTED:
if newstate == STATE_ENCRYPTED:
if newstate == STATE_ENCRYPTED and tab:
log.debug('OTR conversation with %s refreshed', self.peer)
if tab:
if self.getCurrentTrust():
msg = _('%(info)sRefreshed \x19btrusted\x19o%(info)s'
' OTR conversation with %(jid_c)s%(jid)s') % {
'info': color_info,
'jid_c': color_jid,
'jid': self.peer
}
tab.add_message(msg, typ=self.log)
else:
msg = _('%(info)sRefreshed \x19buntrusted\x19o%(info)s'
' OTR conversation with %(jid_c)s%(jid)s'
'%(info)s, key: \x19o%(key)s') % {
'jid': self.peer,
'key': self.getCurrentKey(),
'info': color_info,
'jid_c': color_jid}
if self.getCurrentTrust():
msg = _('%(info)sRefreshed \x19btrusted\x19o%(info)s'
' OTR conversation with %(jid_c)s%(jid)s') % {
'info': color_info,
'jid_c': color_jid,
'jid': self.peer
}
tab.add_message(msg, typ=self.log)
else:
msg = _('%(info)sRefreshed \x19buntrusted\x19o%(info)s'
' OTR conversation with %(jid_c)s%(jid)s'
'%(info)s, key: \x19o%(key)s') % {
'jid': self.peer,
'key': self.getCurrentKey(),
'info': color_info,
'jid_c': color_jid}
tab.add_message(msg, typ=self.log)
hl(tab)
tab.add_message(msg, typ=self.log)
hl(tab)
elif newstate == STATE_FINISHED or newstate == STATE_PLAINTEXT:
log.debug('OTR conversation with %s finished', self.peer)
if tab:
@ -290,26 +322,32 @@ class PoezioContext(Context):
color_info, color_jid, self.peer),
typ=self.log)
hl(tab)
else:
if newstate == STATE_ENCRYPTED:
if tab:
if self.getCurrentTrust():
msg = _('%(info)sStarted a \x19btrusted\x19o%(info)s '
'OTR conversation with %(jid_c)s%(jid)s') % {
'jid': self.peer,
'info': color_info,
'jid_c': color_jid}
tab.add_message(msg, typ=self.log)
else:
msg = _('%(info)sStarted an \x19buntrusted\x19o%(info)s'
' OTR conversation with %(jid_c)s%(jid)s'
'%(info)s, key: \x19o%(key)s') % {
'jid': self.peer,
'key': self.getCurrentKey(),
'info': color_info,
'jid_c': color_jid}
tab.add_message(msg, typ=self.log)
hl(tab)
elif newstate == STATE_ENCRYPTED and tab:
if self.getCurrentTrust():
msg = _('%(info)sStarted a \x19btrusted\x19o%(info)s '
'OTR conversation with %(jid_c)s%(jid)s') % {
'jid': self.peer,
'info': color_info,
'jid_c': color_jid}
tab.add_message(msg, typ=self.log)
else:
tab.add_message(OTR_TUTORIAL % {
'jid': safeJID(self.peer).bare,
'remote_fpr': self.getCurrentKey(),
'our_fpr': self.user.getPrivkey(),
'info': color_info,
'normal': color_normal,
'jid_c': color_jid},
typ=0)
msg = _('%(info)sStarted an \x19buntrusted\x19o%(info)s'
' OTR conversation with %(jid_c)s%(jid)s'
'%(info)s, key: \x19o%(key)s') % {
'jid': self.peer,
'key': self.getCurrentKey(),
'info': color_info,
'jid_c': color_jid}
tab.add_message(msg, typ=self.log)
hl(tab)
log.debug('Set encryption state of %s to %s', self.peer, states[newstate])
super(PoezioContext, self).setState(newstate)
@ -414,7 +452,7 @@ class Plugin(BasePlugin):
self.account = PoezioAccount(self.core.xmpp.boundjid.bare, OTR_DIR)
self.account.load_trusts()
self.contexts = {}
usage = '[start|refresh|end|fpr|ourfpr|drop|trust|untrust]'
usage = '<start|refresh|end|fpr|ourfpr|drop|trust|untrust>'
shortdesc = 'Manage an OTR conversation'
desc = ('Manage an OTR conversation.\n'
'start/refresh: Start or refresh a conversation\n'
@ -424,6 +462,24 @@ class Plugin(BasePlugin):
'drop: Remove the current key (FOREVER)\n'
'trust: Set this key for this contact as trusted\n'
'untrust: Remove the trust for the key of this contact\n')
smp_usage = '<abort|ask|answer> [question] [answer]'
smp_short = 'Identify a contact'
smp_desc = ('Verify the identify of your contact by using a pre-defined secret.\n'
'abort: Abort an ongoing verification\n'
'ask: Start a verification, with a question or not\n'
'answer: Finish a verification\n')
self.api.add_tab_command(ConversationTab, 'otrsmp', self.command_smp,
help=smp_desc,
usage=smp_usage,
short=smp_short,
completion=self.completion_smp)
self.api.add_tab_command(PrivateTab, 'otrsmp', self.command_smp,
help=smp_desc,
usage=smp_usage,
short=smp_short,
completion=self.completion_smp)
self.api.add_tab_command(ConversationTab, 'otr', self.command_otr,
help=desc,
usage=usage,
@ -463,6 +519,63 @@ class Plugin(BasePlugin):
try:
ctx = self.get_context(msg['from'])
txt, tlvs = ctx.receiveMessage(msg["body"].encode('utf-8'))
if tlvs:
smp1q = get_tlv(tlvs, potr.proto.SMP1QTLV)
smp1 = get_tlv(tlvs, potr.proto.SMP1TLV)
smp2 = get_tlv(tlvs, potr.proto.SMP2TLV)
smp3 = get_tlv(tlvs, potr.proto.SMP3TLV)
smp4 = get_tlv(tlvs, potr.proto.SMP4TLV)
abort = get_tlv(tlvs, potr.proto.SMPABORTTLV)
if abort:
ctx.reset_smp()
tab.add_message('%sSMP aborted by peer.' % color_info, typ=0)
elif ctx.in_smp and not ctx.smpIsValid():
ctx.reset_smp()
tab.add_message('%sSMP aborted.' % color_info, typ=0)
elif smp1 or smp1q:
if smp1q:
try:
question = ' with question: \x19o' + smp1q.msg.decode('utf-8')
except UnicodeDecodeError:
self.api.information('The peer sent a question but it had a wrong encoding', 'Error')
question = ''
else:
question = ''
ctx.in_smp = True
ctx.smp_own = False
tab.add_message('%(info)sPeer %(jid_c)s%(jid)s%(info)s has '
'requested SMP verification%(q)s%(info)s.\n'
'Answer with: /otrsmp answer <secret>' % {
'q': question,
'info': color_info,
'jid': tab.name,
'jid_c': color_jid}, typ=0)
elif smp2:
if not ctx.in_smp:
ctx.reset_smp()
else:
tab.add_message('%sSMP progressing.' % color_info, typ=0)
elif smp3 or smp4:
if ctx.smpIsSuccess():
if not ctx.getCurrentTrust():
tab.add_message('%sYou may want to authenticate '
'your peer by asking your own '
'question: /otrsmp ask [question]'
' <secret>' % color_info,
typ=0)
ctx.reset_smp()
tab.add_message('%sSMP Verification \x19bsucceeded' % color_info,
typ=0)
#self.smp_finish('SMP verification succeeded.', 'success')
else:
ctx.reset_smp()
tab.add_message('%sSMP Verification \x19bfailed' % color_info,
typ=0)
#self.smp_finish('SMP verification failed.', 'error')
self.core.refresh_window()
except UnencryptedMessage as err:
# received an unencrypted message inside an OTR session
text = _('%(info)sThe following message from %(jid_c)s%(jid)s'
@ -632,13 +745,18 @@ class Plugin(BasePlugin):
if ctx:
context = ctx
state = states[context.state]
return ' OTR: %s' % state
trust = 'trusted' if context.getCurrentTrust() else 'untrusted'
return ' OTR: %s (%s)' % (state, trust)
def command_otr(self, arg):
"""
/otr [start|refresh|end|fpr|ourfpr]
"""
arg = arg.strip()
args = common.shell_split(arg)
if not args:
return self.api.core.command_help('otr')
action = args.pop(0)
tab = self.api.current_tab()
name = tab.name
color_jid = '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID)
@ -648,14 +766,13 @@ class Plugin(BasePlugin):
name = safeJID(tab.name)
name.resource = tab.locked_resource
name = name.full
if arg == 'end': # close the session
if action == 'end': # close the session
context = self.get_context(name)
context.disconnect()
if isinstance(tab, DynamicConversationTab) and not tab.locked_resource:
ctx = self.find_encrypted_context_with_matching(safeJID(name).bare)
ctx.disconnect()
elif arg == 'start' or arg == 'refresh':
elif action == 'start' or action == 'refresh':
otr = self.get_context(name)
secs = self.config.get('timeout', 3)
if isinstance(tab, DynamicConversationTab) and tab.locked_resource:
@ -689,7 +806,7 @@ class Plugin(BasePlugin):
'info': color_info,
'jid_c': color_jid}
tab.add_message(text, typ=0)
elif arg == 'ourfpr':
elif action == 'ourfpr':
fpr = self.account.getPrivkey()
text = _('%(info)sYour OTR key fingerprint is %(norm)s%(fpr)s.') % {
'jid': tab.name,
@ -697,7 +814,7 @@ class Plugin(BasePlugin):
'norm': color_normal,
'fpr': fpr}
tab.add_message(text, typ=0)
elif arg == 'fpr':
elif action == 'fpr':
if name in self.contexts:
ctx = self.contexts[name]
if ctx.getCurrentKey() is not None:
@ -716,14 +833,14 @@ class Plugin(BasePlugin):
'info': color_info,
'jid_c': color_jid}
tab.add_message(text, typ=0)
elif arg == 'drop':
elif action == 'drop':
# drop the privkey (and obviously, end the current conversations before that)
for context in self.contexts.values():
if context.state not in (STATE_FINISHED, STATE_PLAINTEXT):
context.disconnect()
self.account.drop_privkey()
tab.add_message('%sPrivate key dropped.' % color_info, typ=0)
elif arg == 'trust':
elif action == 'trust':
ctx = self.get_context(name)
key = ctx.getCurrentKey()
if key:
@ -740,7 +857,7 @@ class Plugin(BasePlugin):
'info': color_info,
'jid_c': color_jid}
tab.add_message(text, typ=0)
elif arg == 'untrust':
elif action == 'untrust':
ctx = self.get_context(name)
key = ctx.getCurrentKey()
if key:
@ -764,3 +881,62 @@ class Plugin(BasePlugin):
comp = ['start', 'fpr', 'ourfpr', 'refresh', 'end', 'trust', 'untrust']
return the_input.new_completion(comp, 1, quotify=False)
@command_args_parser.quoted(1, 2)
def command_smp(self, args):
"""
/otrsmp <ask|answer|abort> [question] [secret]
"""
if args is None or not args:
return self.core.command_help('otrsmp')
color_jid = '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID)
color_info = '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT)
length = len(args)
action = args.pop(0)
if length == 2:
question = None
secret = args.pop(0).encode('utf-8')
elif length == 3:
question = args.pop(0).encode('utf-8')
secret = args.pop(0).encode('utf-8')
else:
question = secret = None
tab = self.api.current_tab()
name = tab.name
if isinstance(tab, DynamicConversationTab) and tab.locked_resource:
name = safeJID(tab.name)
name.resource = tab.locked_resource
name = name.full
ctx = self.get_context(name)
if ctx.state != STATE_ENCRYPTED:
return self.api.information('The current conversation is not encrypted', 'Error')
if action == 'ask':
ctx.in_smp = True
ctx.smp_own = True
if question:
ctx.smpInit(secret, question)
else:
ctx.smpInit(secret)
tab.add_message('%(info)sInitiated SMP request with %(jid_c)s'
'%(jid)s%(info)s.' % {
'info': color_info,
'jid': name,
'jid_c': color_jid}, typ=0)
elif action == 'answer':
ctx.smpGotSecret(secret)
elif action == 'abort':
if ctx.in_smp:
ctx.smpAbort()
tab.add_message('%sSMP aborted.' % color_info, typ=0)
self.core.refresh_window()
def completion_smp(self, the_input):
if the_input.get_argument_position() == 1:
return the_input.new_completion(['ask', 'answer', 'abort'], 1, quotify=False)
def get_tlv(tlvs, cls):
for tlv in tlvs:
if isinstance(tlv, cls):
return tlv