Initial work on XEP_0070, plugin and examples
This commit is contained in:
parent
9208bf5bf1
commit
9019e2bc71
5 changed files with 333 additions and 0 deletions
85
examples/confirm_answer.py
Executable file
85
examples/confirm_answer.py
Executable file
|
@ -0,0 +1,85 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
Slixmpp: The Slick XMPP Library
|
||||
Copyright (C) 2015 Emmanuel Gil Peyrot
|
||||
This file is part of Slixmpp.
|
||||
|
||||
See the file LICENSE for copying permission.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from getpass import getpass
|
||||
from argparse import ArgumentParser
|
||||
|
||||
import slixmpp
|
||||
from slixmpp.exceptions import XMPPError
|
||||
from slixmpp import asyncio
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AnswerConfirm(slixmpp.ClientXMPP):
|
||||
|
||||
"""
|
||||
A basic client demonstrating how to confirm or deny an HTTP request.
|
||||
"""
|
||||
|
||||
def __init__(self, jid, password, trusted):
|
||||
slixmpp.ClientXMPP.__init__(self, jid, password)
|
||||
|
||||
self.trusted = trusted
|
||||
self.api.register(self.confirm, 'xep_0070', 'get_confirm')
|
||||
|
||||
def confirm(self, jid, id, url, method):
|
||||
log.info('Received confirm request %s from %s to access %s using '
|
||||
'method %s' % (id, jid, url, method))
|
||||
if jid not in self.trusted:
|
||||
log.info('Denied')
|
||||
return False
|
||||
log.info('Confirmed')
|
||||
return True
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Setup the command line arguments.
|
||||
parser = ArgumentParser()
|
||||
parser.add_argument("-q","--quiet", help="set logging to ERROR",
|
||||
action="store_const",
|
||||
dest="loglevel",
|
||||
const=logging.ERROR,
|
||||
default=logging.INFO)
|
||||
parser.add_argument("-d","--debug", help="set logging to DEBUG",
|
||||
action="store_const",
|
||||
dest="loglevel",
|
||||
const=logging.DEBUG,
|
||||
default=logging.INFO)
|
||||
|
||||
# JID and password options.
|
||||
parser.add_argument("-j", "--jid", dest="jid",
|
||||
help="JID to use")
|
||||
parser.add_argument("-p", "--password", dest="password",
|
||||
help="password to use")
|
||||
|
||||
# Other options.
|
||||
parser.add_argument("-t", "--trusted", nargs='*',
|
||||
help="List of trusted JIDs")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Setup logging.
|
||||
logging.basicConfig(level=args.loglevel,
|
||||
format='%(levelname)-8s %(message)s')
|
||||
|
||||
if args.jid is None:
|
||||
args.jid = input("Username: ")
|
||||
if args.password is None:
|
||||
args.password = getpass("Password: ")
|
||||
|
||||
xmpp = AnswerConfirm(args.jid, args.password, args.trusted)
|
||||
xmpp.register_plugin('xep_0070')
|
||||
|
||||
# Connect to the XMPP server and start processing XMPP stanzas.
|
||||
xmpp.connect()
|
||||
xmpp.process()
|
109
examples/confirm_ask.py
Executable file
109
examples/confirm_ask.py
Executable file
|
@ -0,0 +1,109 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
Slixmpp: The Slick XMPP Library
|
||||
Copyright (C) 2015 Emmanuel Gil Peyrot
|
||||
This file is part of Slixmpp.
|
||||
|
||||
See the file LICENSE for copying permission.
|
||||
"""
|
||||
|
||||
import sys
|
||||
|
||||
import logging
|
||||
from getpass import getpass
|
||||
from argparse import ArgumentParser
|
||||
|
||||
import slixmpp
|
||||
from slixmpp.exceptions import XMPPError
|
||||
from slixmpp import asyncio
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AskConfirm(slixmpp.ClientXMPP):
|
||||
|
||||
"""
|
||||
A basic client asking an entity if they confirm the access to an HTTP URL.
|
||||
"""
|
||||
|
||||
def __init__(self, jid, password, recipient, id, url, method):
|
||||
slixmpp.ClientXMPP.__init__(self, jid, password)
|
||||
|
||||
self.recipient = recipient
|
||||
self.id = id
|
||||
self.url = url
|
||||
self.method = method
|
||||
|
||||
# Will be used to set the proper exit code.
|
||||
self.confirmed = None
|
||||
|
||||
self.add_event_handler("session_start", self.start)
|
||||
|
||||
@asyncio.coroutine
|
||||
def start(self, event):
|
||||
log.info('Sending confirm request %s to %s who wants to access %s using '
|
||||
'method %s...' % (self.id, self.recipient, self.url, self.method))
|
||||
confirmed = yield from self['xep_0070'].ask_confirm(self.recipient,
|
||||
id=self.id,
|
||||
url=self.url,
|
||||
method=self.method,
|
||||
message='Plz say yes or no for {method} {url} ({id}).')
|
||||
if confirmed:
|
||||
print('Confirmed')
|
||||
else:
|
||||
print('Denied')
|
||||
self.confirmed = confirmed
|
||||
self.disconnect()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Setup the command line arguments.
|
||||
parser = ArgumentParser()
|
||||
parser.add_argument("-q","--quiet", help="set logging to ERROR",
|
||||
action="store_const",
|
||||
dest="loglevel",
|
||||
const=logging.ERROR,
|
||||
default=logging.INFO)
|
||||
parser.add_argument("-d","--debug", help="set logging to DEBUG",
|
||||
action="store_const",
|
||||
dest="loglevel",
|
||||
const=logging.DEBUG,
|
||||
default=logging.INFO)
|
||||
|
||||
# JID and password options.
|
||||
parser.add_argument("-j", "--jid", dest="jid",
|
||||
help="JID to use")
|
||||
parser.add_argument("-p", "--password", dest="password",
|
||||
help="password to use")
|
||||
|
||||
# Other options.
|
||||
parser.add_argument("-r", "--recipient", required=True,
|
||||
help="Recipient JID")
|
||||
parser.add_argument("-i", "--id", required=True,
|
||||
help="id TODO")
|
||||
parser.add_argument("-u", "--url", required=True,
|
||||
help="URL the user tried to access")
|
||||
parser.add_argument("-m", "--method", required=True,
|
||||
help="HTTP method used")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Setup logging.
|
||||
logging.basicConfig(level=args.loglevel,
|
||||
format='%(levelname)-8s %(message)s')
|
||||
|
||||
if args.jid is None:
|
||||
args.jid = input("Username: ")
|
||||
if args.password is None:
|
||||
args.password = getpass("Password: ")
|
||||
|
||||
xmpp = AskConfirm(args.jid, args.password, args.recipient, args.id,
|
||||
args.url, args.method)
|
||||
xmpp.register_plugin('xep_0070')
|
||||
|
||||
# Connect to the XMPP server and start processing XMPP stanzas.
|
||||
xmpp.connect()
|
||||
xmpp.process(forever=False)
|
||||
sys.exit(0 if xmpp.confirmed else 1)
|
15
slixmpp/plugins/xep_0070/__init__.py
Normal file
15
slixmpp/plugins/xep_0070/__init__.py
Normal file
|
@ -0,0 +1,15 @@
|
|||
"""
|
||||
Slixmpp: The Slick XMPP Library
|
||||
Copyright (C) 2015 Emmanuel Gil Peyrot
|
||||
This file is part of Slixmpp.
|
||||
|
||||
See the file LICENSE for copying permission.
|
||||
"""
|
||||
|
||||
from slixmpp.plugins.base import register_plugin
|
||||
|
||||
from slixmpp.plugins.xep_0070.stanza import Confirm
|
||||
from slixmpp.plugins.xep_0070.confirm import XEP_0070
|
||||
|
||||
|
||||
register_plugin(XEP_0070)
|
107
slixmpp/plugins/xep_0070/confirm.py
Normal file
107
slixmpp/plugins/xep_0070/confirm.py
Normal file
|
@ -0,0 +1,107 @@
|
|||
"""
|
||||
Slixmpp: The Slick XMPP Library
|
||||
Copyright (C) 2015 Emmanuel Gil Peyrot
|
||||
This file is part of Slixmpp.
|
||||
|
||||
See the file LICENSE for copying permission.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from slixmpp.plugins import BasePlugin, register_plugin
|
||||
from slixmpp import future_wrapper, Iq, Message
|
||||
from slixmpp.exceptions import XMPPError, IqError, IqTimeout
|
||||
from slixmpp.xmlstream import JID, register_stanza_plugin
|
||||
from slixmpp.xmlstream.handler import Callback
|
||||
from slixmpp.xmlstream.matcher import StanzaPath
|
||||
from slixmpp.plugins.xep_0070 import stanza, Confirm
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class XEP_0070(BasePlugin):
|
||||
|
||||
"""
|
||||
XEP-0070 Verifying HTTP Requests via XMPP
|
||||
"""
|
||||
|
||||
name = 'xep_0070'
|
||||
description = 'XEP-0070: Verifying HTTP Requests via XMPP'
|
||||
dependencies = {'xep_0030'}
|
||||
stanza = stanza
|
||||
|
||||
def plugin_init(self):
|
||||
register_stanza_plugin(Iq, Confirm)
|
||||
register_stanza_plugin(Message, Confirm)
|
||||
|
||||
self.xmpp.register_handler(
|
||||
Callback('Confirm',
|
||||
StanzaPath('iq@type=get/confirm'),
|
||||
self._handle_iq_confirm))
|
||||
|
||||
self.xmpp.register_handler(
|
||||
Callback('Confirm',
|
||||
StanzaPath('message/confirm'),
|
||||
self._handle_message_confirm))
|
||||
|
||||
#self.api.register(self._default_get_confirm,
|
||||
# 'get_confirm',
|
||||
# default=True)
|
||||
|
||||
def plugin_end(self):
|
||||
self.xmpp.remove_handler('Confirm')
|
||||
self.xmpp['xep_0030'].del_feature(feature='http://jabber.org/protocol/http-auth')
|
||||
|
||||
def session_bind(self, jid):
|
||||
self.xmpp['xep_0030'].add_feature('http://jabber.org/protocol/http-auth')
|
||||
|
||||
def ask_confirm(self, jid, id, url, method, *, ifrom=None, message=None):
|
||||
if message is None:
|
||||
stanza = self.xmpp.Iq()
|
||||
stanza['type'] = 'get'
|
||||
else:
|
||||
stanza = self.xmpp.Message()
|
||||
stanza['from'] = ifrom
|
||||
stanza['to'] = jid
|
||||
stanza['confirm']['id'] = id
|
||||
stanza['confirm']['url'] = url
|
||||
stanza['confirm']['method'] = method
|
||||
if message is not None:
|
||||
stanza['body'] = message.format(id=id, url=url, method=method)
|
||||
stanza.send()
|
||||
else:
|
||||
try:
|
||||
yield from stanza.send()
|
||||
except IqError:
|
||||
return False
|
||||
except IqTimeout:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def _handle_iq_confirm(self, iq):
|
||||
emitter = iq['from']
|
||||
id = iq['confirm']['id']
|
||||
url = iq['confirm']['url']
|
||||
method = iq['confirm']['method']
|
||||
accept = self.api['get_confirm'](emitter, id, url, method)
|
||||
if not accept:
|
||||
raise XMPPError(etype='auth', condition='not-authorized')
|
||||
|
||||
iq.reply().send()
|
||||
|
||||
def _handle_message_confirm(self, message):
|
||||
emitter = message['from']
|
||||
id = message['confirm']['id']
|
||||
url = message['confirm']['url']
|
||||
method = message['confirm']['method']
|
||||
accept = self.api['get_confirm'](emitter, id, url, method)
|
||||
if not accept:
|
||||
raise XMPPError(etype='auth', condition='not-authorized')
|
||||
|
||||
message.reply().send()
|
||||
|
||||
#def _default_get_confirm(self, jid, id, url, method):
|
||||
# return False
|
17
slixmpp/plugins/xep_0070/stanza.py
Normal file
17
slixmpp/plugins/xep_0070/stanza.py
Normal file
|
@ -0,0 +1,17 @@
|
|||
"""
|
||||
Slixmpp: The Slick XMPP Library
|
||||
Copyright (C) 2015 Emmanuel Gil Peyrot
|
||||
This file is part of Slixmpp.
|
||||
|
||||
See the file LICENSE for copying permission.
|
||||
"""
|
||||
|
||||
from slixmpp.xmlstream import ElementBase
|
||||
|
||||
|
||||
class Confirm(ElementBase):
|
||||
|
||||
name = 'confirm'
|
||||
namespace = 'http://jabber.org/protocol/http-auth'
|
||||
plugin_attrib = 'confirm'
|
||||
interfaces = {'id', 'url', 'method'}
|
Loading…
Reference in a new issue