Improve XEP-0070 and examples

This commit is contained in:
mathieui 2016-08-19 23:45:24 +02:00
parent 9019e2bc71
commit 39ee833c29
3 changed files with 64 additions and 54 deletions

View file

@ -29,18 +29,33 @@ class AnswerConfirm(slixmpp.ClientXMPP):
def __init__(self, jid, password, trusted):
slixmpp.ClientXMPP.__init__(self, jid, password)
self.trusted = trusted
self.api.register(self.confirm, 'xep_0070', 'get_confirm')
self.add_event_handler("http_confirm", self.confirm)
self.add_event_handler("session_start", self.start)
def confirm(self, jid, id, url, method):
log.info('Received confirm request %s from %s to access %s using '
'method %s' % (id, jid, url, method))
if jid not in self.trusted:
log.info('Denied')
return False
log.info('Confirmed')
return True
def start(self, *args):
self.make_presence().send()
def prompt(self, stanza):
confirm = stanza['confirm']
print('Received confirm request %s from %s to access %s using '
'method %s' % (
confirm['id'], stanza['from'], confirm['url'],
confirm['method'])
)
result = input("Do you accept (y/N)? ")
return 'y' == result.lower()
def confirm(self, stanza):
if self.prompt(stanza):
reply = stanza.reply()
else:
reply = stanza.reply()
reply.enable('error')
reply['error']['type'] = 'auth'
reply['error']['code'] = '401'
reply['error']['condition'] = 'not-authorized'
reply.append(stanza['confirm'])
reply.send()
if __name__ == '__main__':
# Setup the command line arguments.

View file

@ -16,7 +16,7 @@ from getpass import getpass
from argparse import ArgumentParser
import slixmpp
from slixmpp.exceptions import XMPPError
from slixmpp.exceptions import XMPPError, IqError
from slixmpp import asyncio
log = logging.getLogger(__name__)
@ -37,24 +37,40 @@ class AskConfirm(slixmpp.ClientXMPP):
self.method = method
# Will be used to set the proper exit code.
self.confirmed = None
self.confirmed = asyncio.Future()
self.add_event_handler("session_start", self.start)
self.add_event_handler("message", self.start)
self.add_event_handler("http_confirm_message", self.confirm)
def confirm(self, message):
print(message)
if message['confirm']['id'] == self.id:
if message['type'] == 'error':
self.confirmed.set_result(False)
else:
self.confirmed.set_result(True)
@asyncio.coroutine
def start(self, event):
log.info('Sending confirm request %s to %s who wants to access %s using '
'method %s...' % (self.id, self.recipient, self.url, self.method))
try:
confirmed = yield from self['xep_0070'].ask_confirm(self.recipient,
id=self.id,
url=self.url,
method=self.method,
message='Plz say yes or no for {method} {url} ({id}).')
if isinstance(confirmed, slixmpp.Message):
confirmed = yield from self.confirmed
else:
confirmed = True
except IqError:
confirmed = False
if confirmed:
print('Confirmed')
else:
print('Denied')
self.confirmed = confirmed
self.disconnect()

View file

@ -8,10 +8,12 @@
import asyncio
import logging
from uuid import uuid4
from slixmpp.plugins import BasePlugin, register_plugin
from slixmpp import future_wrapper, Iq, Message
from slixmpp.exceptions import XMPPError, IqError, IqTimeout
from slixmpp.jid import JID
from slixmpp.xmlstream import JID, register_stanza_plugin
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
@ -46,10 +48,6 @@ class XEP_0070(BasePlugin):
StanzaPath('message/confirm'),
self._handle_message_confirm))
#self.api.register(self._default_get_confirm,
# 'get_confirm',
# default=True)
def plugin_end(self):
self.xmpp.remove_handler('Confirm')
self.xmpp['xep_0030'].del_feature(feature='http://jabber.org/protocol/http-auth')
@ -57,51 +55,32 @@ class XEP_0070(BasePlugin):
def session_bind(self, jid):
self.xmpp['xep_0030'].add_feature('http://jabber.org/protocol/http-auth')
@future_wrapper
def ask_confirm(self, jid, id, url, method, *, ifrom=None, message=None):
if message is None:
jid = JID(jid)
if jid.resource:
stanza = self.xmpp.Iq()
stanza['type'] = 'get'
else:
stanza = self.xmpp.Message()
stanza['thread'] = uuid4().hex
stanza['from'] = ifrom
stanza['to'] = jid
stanza['confirm']['id'] = id
stanza['confirm']['url'] = url
stanza['confirm']['method'] = method
if not jid.resource:
if message is not None:
stanza['body'] = message.format(id=id, url=url, method=method)
stanza.send()
return stanza
else:
try:
yield from stanza.send()
except IqError:
return False
except IqTimeout:
return False
else:
return True
return stanza.send()
def _handle_iq_confirm(self, iq):
emitter = iq['from']
id = iq['confirm']['id']
url = iq['confirm']['url']
method = iq['confirm']['method']
accept = self.api['get_confirm'](emitter, id, url, method)
if not accept:
raise XMPPError(etype='auth', condition='not-authorized')
iq.reply().send()
self.xmpp.event('http_confirm_iq', iq)
self.xmpp.event('http_confirm', iq)
def _handle_message_confirm(self, message):
emitter = message['from']
id = message['confirm']['id']
url = message['confirm']['url']
method = message['confirm']['method']
accept = self.api['get_confirm'](emitter, id, url, method)
if not accept:
raise XMPPError(etype='auth', condition='not-authorized')
message.reply().send()
#def _default_get_confirm(self, jid, id, url, method):
# return False
self.xmpp.event('http_confirm_message', message)
self.xmpp.event('http_confirm', message)