diff --git a/sleekxmpp/plugins/xep_0065/proxy.py b/sleekxmpp/plugins/xep_0065/proxy.py index eb0b9e8c..bcb43cfa 100644 --- a/sleekxmpp/plugins/xep_0065/proxy.py +++ b/sleekxmpp/plugins/xep_0065/proxy.py @@ -27,6 +27,10 @@ class xep_0065(base_plugin): dependencies = set(['xep_0030', ]) xep = '0065' + # A dict contains for each SID, the proxy thread currently + # running. + proxy_threads = {} + def plugin_init(self): """ Initializes the xep_0065 plugin and all event callbacks. """ @@ -127,6 +131,9 @@ class xep_0065(base_plugin): self.proxy_port, self.on_recv) self.proxy_thread.start() + # Registers the new thread in the proxy_thread dict. + self.proxy_threads[sid] = self.proxy_thread + # Wait until the proxy is connected self.proxy_thread.connected.wait() @@ -142,17 +149,20 @@ class xep_0065(base_plugin): """ Handles all streamhost-used stanzas. """ - # Sets the requester and the target. + # Sets the SID, the requester and the target. + sid = iq['q']['sid'] requester = '%s' % self.xmpp.boundjid target = '%s' % iq['from'] # The Requester will establish a connection to the SOCKS5 # proxy in the same way the Target did. - self.proxy_thread = Proxy(iq['q']['sid'], requester, target, - self.proxy_host, self.proxy_port, - self.on_recv) + self.proxy_thread = Proxy(sid, requester, target, self.proxy_host, + self.proxy_port, self.on_recv) self.proxy_thread.start() + # Registers the new thread in the proxy_thread dict. + self.proxy_threads[sid] = self.proxy_thread + # Wait until the proxy is connected self.proxy_thread.connected.wait() @@ -174,21 +184,34 @@ class xep_0065(base_plugin): # Send the IQ. act_iq.send() - def send(self, msg): + def send(self, sid, msg): """ Sends the msg to the socket. + sid : The SID to retrieve the good proxy stored in the + proxy_threads dict msg : The message data. """ - self.proxy_thread.send(msg) + proxy = self.proxy_threads.get(sid) + if proxy: + proxy.send(msg) + else: + # TODO: raise an exception. + pass - def on_recv(self, data): - """ A default behavior when socket are receiving data. - - This method should be overriden. + def on_recv(self, sid, data): + """ Called when receiving data """ - log.debug('Received data: %s' % data) + if not data: + try: + del self.proxy_threads[sid] + except KeyError: + # TODO: internal error, raise an exception ? + pass + else: + log.debug('Received data: %s' % data) + self.xmpp.event('socks_recv', data) class Proxy(Thread): @@ -212,9 +235,6 @@ class Proxy(Thread): # Initializes the thread. Thread.__init__(self) - # This thread is a daemon thread. - self.daemon = True - # Because the xep_0065 plugin uses the proxy_port as string, # the Proxy class accepts the proxy_port argument as a string # or an integer. Here, we force to use the port as an integer. @@ -252,26 +272,33 @@ class Proxy(Thread): # The port MUST be 0. self.s.connect((dest, 0)) - log.info('Connected') + log.info('Socket connected.') self.connected.set() # Listen for data on the socket self.listen() + # Listen returns when the socket must be closed. + self.s.close() + log.info('Socket closed.') + def listen(self): """ Listen for data on the socket. When receiving data, call the callback on_recv callable. """ - while True: + socket_open = True + while socket_open: ins, out, err = select([self.s, ], [], []) for s in ins: data = self.recv_size(self.s) - self.on_recv(data) + if not data: + socket_open = False + + self.on_recv(self.sid, data) def recv_size(self, the_socket): - # Data length is packed into 4 bytes. total_len = 0 total_data = [] size = sys.maxint @@ -280,6 +307,9 @@ class Proxy(Thread): while total_len < size: sock_data = the_socket.recv(recv_size) + if not sock_data: + return ''.join(total_data) + if not total_data: if len(sock_data) > 4: size_data += sock_data