Adapted the xep_0065 plugin to be compatible with all kind of others XMPP

client.
Sent a 'socks_connected' xmpp event when the streamer is connected.
This commit is contained in:
Sandro Munda 2012-11-04 11:38:57 +01:00
parent 3a7569e3ea
commit 032d41dbb8

View file

@ -1,12 +1,6 @@
import sys
import logging
import struct
import pickle
import socket
from threading import Thread, Event
from hashlib import sha1
from select import select
from uuid import uuid4
from sleekxmpp.plugins.xep_0065 import stanza
@ -32,7 +26,7 @@ class XEP_0065(base_plugin):
# A dict contains for each SID, the proxy thread currently
# running.
proxy_threads = {}
proxies = {}
def plugin_init(self):
""" Initializes the xep_0065 plugin and all event callbacks.
@ -57,9 +51,9 @@ class XEP_0065(base_plugin):
""" Returns the socket associated to the SID.
"""
proxy = self.proxy_threads.get(sid)
proxy = self.proxies.get(sid)
if proxy:
return proxy.s
return proxy
def handshake(self, to, streamer=None):
""" Starts the handshake to establish the socks5 bytestreams
@ -138,15 +132,11 @@ class XEP_0065(base_plugin):
# Next the Target attempts to open a standard TCP socket on
# the network address of the Proxy.
self.proxy_thread = Proxy(sid, requester, target, self.proxy_host,
self.proxy_port, self.on_recv)
self.proxy_thread.start()
self.proxy = self._connect_proxy(sid, requester, target,
self.proxy_host, self.proxy_port)
# Registers the new thread in the proxy_thread dict.
self.proxy_threads[sid] = self.proxy_thread
# Wait until the proxy is connected
self.proxy_thread.connected.wait()
# Registers the new proxy to the proxies dict.
self.proxies[sid] = self.proxy
# Replies to the incoming iq with a streamhost-used stanza.
res_iq = iq.reply()
@ -163,23 +153,18 @@ class XEP_0065(base_plugin):
# Sets the SID, the requester and the target.
sid = iq['socks']['sid']
requester = '%s' % self.xmpp.boundjid
target = '%s' % iq['from']
target = '%s' % iq['from']
# The Requester will establish a connection to the SOCKS5
# proxy in the same way the Target did.
self.proxy_thread = Proxy(sid, requester, target, self.proxy_host,
self.proxy_port, self.on_recv)
self.proxy_thread.start()
self.proxy = self._connect_proxy(sid, requester, target,
self.proxy_host, self.proxy_port)
# Registers the new thread in the proxy_thread dict.
self.proxy_threads[sid] = self.proxy_thread
self.proxies[sid] = self.proxy
# Wait until the proxy is connected
self.proxy_thread.connected.wait()
# Requester sends IQ-set to StreamHost requesting that
# StreamHost activate the bytestream associated with the
# StreamID.
# Requester sends IQ-set to StreamHost requesting that StreamHost
# activate the bytestream associated with the StreamID.
self.activate(iq['socks']['sid'], target)
def activate(self, sid, to):
@ -199,197 +184,63 @@ class XEP_0065(base_plugin):
""" Closes the Proxy thread associated to this SID.
"""
proxy = self.proxy_threads.get(sid)
proxy = self.proxies.get(sid)
if proxy:
proxy.s.close()
del self.proxy_threads[sid]
del self.proxies[sid]
def close(self):
""" Closes all Proxy threads.
"""
for sid, proxy in self.proxy_threads.items():
proxy.s.close()
del self.proxy_threads[sid]
for sid, proxy in self.proxies.items():
proxy.close()
del self.proxies[sid]
def send(self, sid, data):
""" Sends the data over the Proxy socket associated to the
SID.
"""
proxy = self.proxy_threads.get(sid)
proxy = self.get_socket(sid)
if proxy:
proxy.send(data)
proxy.sendall(data)
def on_recv(self, sid, data):
""" Calls when data is recv from the Proxy socket associated
to the SID.
Triggers a socks_closed event if the socket is closed. The sid
is passed to this event.
Triggers a socks_recv event if there's available data. A dict
that contains the sid and the data is passed to this event.
"""
proxy = self.proxy_threads.get(sid)
if proxy:
if not data:
self.xmpp.event('socks_closed', sid)
else:
self.xmpp.event('socks_recv', {'sid': sid, 'data': data})
class Proxy(Thread):
""" Establishes in a thread a connection between the client and
the server-side Socks5 proxy.
"""
def __init__(self, sid, requester, target, proxy, proxy_port,
on_recv):
""" Initializes the proxy thread.
def _connect_proxy(self, sid, requester, target, proxy, proxy_port):
""" Establishes a connection between the client and the server-side
Socks5 proxy.
sid : The StreamID. <str>
requester : The JID of the requester. <str>
target : The JID of the target. <str>
proxy_host : The hostname or the IP of the proxy. <str>
proxy_port : The port of the proxy. <str> or <int>
on_recv : A callback called when data are received from the
socket. <Callable>
"""
# Initializes the thread.
Thread.__init__(self)
# Because the xep_0065 plugin uses the proxy_port as string,
# the Proxy class accepts the proxy_port argument as a string
# or an integer. Here, we force to use the port as an integer.
proxy_port = int(proxy_port)
# Creates a connected event to warn when to proxy is
# connected.
self.connected = Event()
# Registers the arguments.
self.sid = sid
self.requester = requester
self.target = target
self.proxy = proxy
self.proxy_port = proxy_port
self.on_recv = on_recv
def run(self):
""" Starts the thread.
"""
# Creates the socks5 proxy socket
self.s = socksocket()
self.s.setproxy(PROXY_TYPE_SOCKS5, self.proxy, port=self.proxy_port)
sock = socksocket()
sock.setproxy(PROXY_TYPE_SOCKS5, proxy, port=proxy_port)
# The hostname MUST be SHA1(SID + Requester JID + Target JID)
# where the output is hexadecimal-encoded (not binary).
digest = sha1()
digest.update(self.sid) # SID
digest.update(self.requester) # Requester JID
digest.update(self.target) # Target JID
digest.update(sid) # SID
digest.update(requester) # Requester JID
digest.update(target) # Target JID
# Computes the digest in hex.
dest = '%s' % digest.hexdigest()
# The port MUST be 0.
self.s.connect((dest, 0))
sock.connect((dest, 0))
log.info('Socket connected.')
self.connected.set()
# Blocks until the socket need to be closed.
self.listen()
# Send the XMPP event.
self.xmpp.event('socks_connected', sid)
# Closes the socket.
self.s.close()
log.info('Socket closed.')
def send(self, data):
""" Send data through the socket.
"""
try:
packed_data = self._pack(data)
self.s.sendall(packed_data)
except pickle.PickleError as err:
log.error(err)
def _pack(self, data):
""" Packs the data.
"""
# The data format is: `len_data`+`data`. Useful to receive all the data
# at once (avoid splitted data) thanks to the recv_size method.
data = pickle.dumps(data)
return struct.pack('>i', len(data)) + data
def _unpack(self, data):
""" Unpacks the data. On error, log an error message and returns None.
"""
try:
return pickle.loads(data)
except Exception as err:
log.error(err)
def listen(self):
""" Listen for data on the socket. When receiving data, call
the callback on_recv callable.
"""
socket_open = True
while socket_open:
ins = []
try:
# Wait any read available data on socket. Timeout
# after 5 secs.
ins, out, err = select([self.s, ], [], [], 5)
except socket.error as (errno, err):
# 9 means the socket is closed. It can be normal. Otherwise,
# log the error.
if errno != 9:
log.debug('Socket error: %s' % err)
break
except Exception as e:
log.debug(e)
break
for s in ins:
data = self.recv_size(self.s)
if not data:
socket_open = False
else:
unpacked_data = self._unpack(data)
if unpacked_data:
self.on_recv(self.sid, unpacked_data)
def recv_size(self, the_socket):
total_len = 0
total_data = []
size = sys.maxint
size_data = sock_data = ''
recv_size = 8192
while total_len < size:
sock_data = the_socket.recv(recv_size)
if not sock_data:
return ''.join(total_data)
if not total_data:
if len(sock_data) > 4:
size_data += sock_data
size = struct.unpack('>i', size_data[:4])[0]
recv_size = size
if recv_size > 524288:
recv_size = 524288
total_data.append(size_data[4:])
else:
size_data += sock_data
else:
total_data.append(sock_data)
total_len = sum([len(i) for i in total_data])
return ''.join(total_data)
return sock