diff --git a/examples/ibb_transfer/ibb_receiver.py b/examples/ibb_transfer/ibb_receiver.py
index 46dec047..e934f295 100755
--- a/examples/ibb_transfer/ibb_receiver.py
+++ b/examples/ibb_transfer/ibb_receiver.py
@@ -22,13 +22,10 @@ class IBBReceiver(slixmpp.ClientXMPP):
     A basic example of creating and using an in-band bytestream.
     """
 
-    def __init__(self, jid, password):
+    def __init__(self, jid, password, filename):
         slixmpp.ClientXMPP.__init__(self, jid, password)
 
-        self.register_plugin('xep_0030') # Service Discovery
-        self.register_plugin('xep_0047', {
-            'auto_accept': True
-        }) # In-band Bytestreams
+        self.file = open(filename, 'wb')
 
         # The session_start event will be triggered when
         # the bot establishes its connection with the server
@@ -39,6 +36,7 @@ class IBBReceiver(slixmpp.ClientXMPP):
 
         self.add_event_handler("ibb_stream_start", self.stream_opened)
         self.add_event_handler("ibb_stream_data", self.stream_data)
+        self.add_event_handler("ibb_stream_end", self.stream_closed)
 
     def start(self, event):
         """
@@ -56,29 +54,16 @@ class IBBReceiver(slixmpp.ClientXMPP):
         self.send_presence()
         self.get_roster()
 
-    def accept_stream(self, iq):
-        """
-        Check that it is ok to accept a stream request.
-
-        Controlling stream acceptance can be done via either:
-            - setting 'auto_accept' to False in the plugin
-              configuration. The default is True.
-            - setting 'accept_stream' to a function which accepts
-              an Iq stanza as its argument, like this one.
-
-        The accept_stream function will be used if it exists, and the
-        auto_accept value will be used otherwise.
-        """
-        return True
-
     def stream_opened(self, stream):
         print('Stream opened: %s from %s' % (stream.sid, stream.peer_jid))
 
-        # You could run a loop reading from the stream using stream.recv(),
-        # or use the ibb_stream_data event.
+    def stream_data(self, stream):
+        self.file.write(stream.read())
 
-    def stream_data(self, event):
-        print(event['data'])
+    def stream_closed(self, stream):
+        print('Stream closed: %s from %s' % (stream.sid, stream.peer_jid))
+        self.file.close()
+        self.disconnect()
 
 if __name__ == '__main__':
     # Setup the command line arguments.
@@ -97,6 +82,8 @@ if __name__ == '__main__':
                         help="JID to use")
     parser.add_argument("-p", "--password", dest="password",
                         help="password to use")
+    parser.add_argument("-o", "--out", dest="filename",
+                        help="file to save to")
 
     args = parser.parse_args()
 
@@ -108,9 +95,18 @@ if __name__ == '__main__':
         args.jid = input("Username: ")
     if args.password is None:
         args.password = getpass("Password: ")
+    if args.filename is None:
+        args.filename = input("File path: ")
 
-    xmpp = IBBReceiver(args.jid, args.password)
+    # Setup the IBBReceiver and register plugins. Note that while plugins may
+    # have interdependencies, the order in which you register them does
+    # not matter.
+    xmpp = IBBReceiver(args.jid, args.password, args.filename)
+    xmpp.register_plugin('xep_0030') # Service Discovery
+    xmpp.register_plugin('xep_0047', {
+        'auto_accept': True
+    }) # In-band Bytestreams
 
     # Connect to the XMPP server and start processing XMPP stanzas.
     xmpp.connect()
-    xmpp.process()
+    xmpp.process(forever=False)
diff --git a/examples/ibb_transfer/ibb_sender.py b/examples/ibb_transfer/ibb_sender.py
index c7e87bb4..f1c0cab2 100755
--- a/examples/ibb_transfer/ibb_sender.py
+++ b/examples/ibb_transfer/ibb_sender.py
@@ -9,11 +9,13 @@
     See the file LICENSE for copying permission.
 """
 
+import asyncio
 import logging
 from getpass import getpass
 from argparse import ArgumentParser
 
 import slixmpp
+from slixmpp.exceptions import IqError, IqTimeout
 
 
 class IBBSender(slixmpp.ClientXMPP):
@@ -22,11 +24,13 @@ class IBBSender(slixmpp.ClientXMPP):
     A basic example of creating and using an in-band bytestream.
     """
 
-    def __init__(self, jid, password, receiver, filename):
+    def __init__(self, jid, password, receiver, filename, use_messages=False):
         slixmpp.ClientXMPP.__init__(self, jid, password)
 
         self.receiver = receiver
-        self.filename = filename
+
+        self.file = open(filename, 'rb')
+        self.use_messages = use_messages
 
         # The session_start event will be triggered when
         # the bot establishes its connection with the server
@@ -35,6 +39,7 @@ class IBBSender(slixmpp.ClientXMPP):
         # our roster.
         self.add_event_handler("session_start", self.start)
 
+    @asyncio.coroutine
     def start(self, event):
         """
         Process the session_start event.
@@ -51,15 +56,22 @@ class IBBSender(slixmpp.ClientXMPP):
         self.send_presence()
         self.get_roster()
 
-        # For the purpose of demonstration, we'll set a very small block
-        # size. The default block size is 4096. We'll also use a window
-        # allowing sending multiple blocks at a time; in this case, three
-        # block transfers may be in progress at any time.
-        stream = self['xep_0047'].open_stream(self.receiver)
+        try:
+            # Open the IBB stream in which to write to.
+            stream = yield from self['xep_0047'].open_stream(self.receiver, use_messages=self.use_messages)
 
-        with open(self.filename) as f:
-            data = f.read()
-            stream.sendall(data)
+            # If you want to send in-memory bytes, use stream.sendall() instead.
+            yield from stream.sendfile(self.file, timeout=10)
+
+            # And finally close the stream.
+            yield from stream.close(timeout=10)
+        except (IqError, IqTimeout):
+            print('File transfer errored')
+        else:
+            print('File transfer finished')
+        finally:
+            self.file.close()
+            self.disconnect()
 
 
 if __name__ == '__main__':
@@ -80,9 +92,11 @@ if __name__ == '__main__':
     parser.add_argument("-p", "--password", dest="password",
                         help="password to use")
     parser.add_argument("-r", "--receiver", dest="receiver",
-                        help="JID to use")
+                        help="JID of the receiver")
     parser.add_argument("-f", "--file", dest="filename",
-                        help="JID to use")
+                        help="file to send")
+    parser.add_argument("-m", "--use-messages", action="store_true",
+                        help="use messages instead of iqs for file transfer")
 
     args = parser.parse_args()
 
@@ -99,16 +113,13 @@ if __name__ == '__main__':
     if args.filename is None:
         args.filename = input("File path: ")
 
-    # Setup the EchoBot and register plugins. Note that while plugins may
+    # Setup the IBBSender and register plugins. Note that while plugins may
     # have interdependencies, the order in which you register them does
     # not matter.
-    xmpp = IBBSender(args.jid, args.password, args.receiver, args.filename)
+    xmpp = IBBSender(args.jid, args.password, args.receiver, args.filename, args.use_messages)
     xmpp.register_plugin('xep_0030') # Service Discovery
-    xmpp.register_plugin('xep_0004') # Data Forms
     xmpp.register_plugin('xep_0047') # In-band Bytestreams
-    xmpp.register_plugin('xep_0060') # PubSub
-    xmpp.register_plugin('xep_0199') # XMPP Ping
 
     # Connect to the XMPP server and start processing XMPP stanzas.
     xmpp.connect()
-    xmpp.process()
+    xmpp.process(forever=False)
diff --git a/slixmpp/plugins/xep_0047/ibb.py b/slixmpp/plugins/xep_0047/ibb.py
index 87cd1f51..52d7fbe5 100644
--- a/slixmpp/plugins/xep_0047/ibb.py
+++ b/slixmpp/plugins/xep_0047/ibb.py
@@ -1,6 +1,6 @@
+import asyncio
 import uuid
 import logging
-import threading
 
 from slixmpp import Message, Iq
 from slixmpp.exceptions import XMPPError
@@ -23,17 +23,11 @@ class XEP_0047(BasePlugin):
     default_config = {
         'block_size': 4096,
         'max_block_size': 8192,
-        'window_size': 1,
         'auto_accept': False,
     }
 
     def plugin_init(self):
         self._streams = {}
-        self._pending_streams = {}
-        self._pending_lock = threading.Lock()
-        self._stream_lock = threading.Lock()
-
-        self._preauthed_sids_lock = threading.Lock()
         self._preauthed_sids = {}
 
         register_stanza_plugin(Iq, Open)
@@ -85,9 +79,8 @@ class XEP_0047(BasePlugin):
         self._streams[(jid, sid, peer_jid)] = stream
 
     def _del_stream(self, jid, sid, peer_jid, data):
-        with self._stream_lock:
-            if (jid, sid, peer_jid) in self._streams:
-                del self._streams[(jid, sid, peer_jid)]
+        if (jid, sid, peer_jid) in self._streams:
+            del self._streams[(jid, sid, peer_jid)]
 
     def _accept_stream(self, iq):
         receiver = iq['to']
@@ -100,22 +93,19 @@ class XEP_0047(BasePlugin):
 
     def _authorized(self, jid, sid, ifrom, iq):
         if self.auto_accept:
-            if iq['ibb_open']['block_size'] <= self.max_block_size:
-                return True
+            return True
         return False
 
     def _authorized_sid(self, jid, sid, ifrom, iq):
-        with self._preauthed_sids_lock:
-            if (jid, sid, ifrom) in self._preauthed_sids:
-                del self._preauthed_sids[(jid, sid, ifrom)]
-                return True
-            return False
+        if (jid, sid, ifrom) in self._preauthed_sids:
+            del self._preauthed_sids[(jid, sid, ifrom)]
+            return True
+        return False
 
     def _preauthorize_sid(self, jid, sid, ifrom, data):
-        with self._preauthed_sids_lock:
-            self._preauthed_sids[(jid, sid, ifrom)] = True
+        self._preauthed_sids[(jid, sid, ifrom)] = True
 
-    def open_stream(self, jid, block_size=None, sid=None, window=1, use_messages=False,
+    def open_stream(self, jid, block_size=None, sid=None, use_messages=False,
                     ifrom=None, timeout=None, callback=None):
         if sid is None:
             sid = str(uuid.uuid4())
@@ -128,43 +118,28 @@ class XEP_0047(BasePlugin):
         iq['from'] = ifrom
         iq['ibb_open']['block_size'] = block_size
         iq['ibb_open']['sid'] = sid
-        iq['ibb_open']['stanza'] = 'iq'
+        iq['ibb_open']['stanza'] = 'message' if use_messages else 'iq'
 
         stream = IBBytestream(self.xmpp, sid, block_size,
-                              iq['from'], iq['to'], window,
-                              use_messages)
+                              iq['from'], iq['to'], use_messages)
 
-        with self._stream_lock:
-            self._pending_streams[iq['id']] = stream
+        stream_future = asyncio.Future()
 
-        self._pending_streams[iq['id']] = stream
+        def _handle_opened_stream(iq):
+            log.debug('IBB stream (%s) accepted by %s', stream.sid, iq['from'])
+            stream.self_jid = iq['to']
+            stream.peer_jid = iq['from']
+            stream.stream_started = True
+            self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream)
+            stream_future.set_result(stream)
+            if callback is not None:
+                callback(stream)
+            self.xmpp.event('ibb_stream_start', stream)
+            self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream)
 
-        cb = None
-        if callback is not None:
-            def chained(resp):
-                self._handle_opened_stream(resp)
-                callback(resp)
-            cb = chained
-        else:
-            cb = self._handle_opened_stream
-        return iq.send(timeout=timeout, callback=cb)
+        iq.send(timeout=timeout, callback=_handle_opened_stream)
 
-    def _handle_opened_stream(self, iq):
-        if iq['type'] == 'result':
-            with self._stream_lock:
-                stream = self._pending_streams.get(iq['id'], None)
-            if stream is not None:
-                log.debug('IBB stream (%s) accepted by %s', stream.sid, iq['from'])
-                stream.self_jid = iq['to']
-                stream.peer_jid = iq['from']
-                stream.stream_started.set()
-                self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream)
-                self.xmpp.event('ibb_stream_start', stream)
-                self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream)
-
-        with self._stream_lock:
-            if iq['id'] in self._pending_streams:
-                del self._pending_streams[iq['id']]
+        return stream_future
 
     def _handle_open_request(self, iq):
         sid = iq['ibb_open']['sid']
@@ -176,15 +151,14 @@ class XEP_0047(BasePlugin):
             raise XMPPError(etype='modify', condition='bad-request')
 
         if not self._accept_stream(iq):
-            raise XMPPError(etype='modify', condition='not-acceptable')
+            raise XMPPError(etype='cancel', condition='not-acceptable')
 
         if size > self.max_block_size:
             raise XMPPError('resource-constraint')
 
         stream = IBBytestream(self.xmpp, sid, size,
-                              iq['to'], iq['from'],
-                              self.window_size)
-        stream.stream_started.set()
+                              iq['to'], iq['from'])
+        stream.stream_started = True
         self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream)
         iq.reply().send()
 
diff --git a/slixmpp/plugins/xep_0047/stanza.py b/slixmpp/plugins/xep_0047/stanza.py
index 62199077..7f8ff0ba 100644
--- a/slixmpp/plugins/xep_0047/stanza.py
+++ b/slixmpp/plugins/xep_0047/stanza.py
@@ -24,7 +24,7 @@ class Open(ElementBase):
     interfaces = set(('block_size', 'sid', 'stanza'))
 
     def get_block_size(self):
-        return int(self._get_attr('block-size'))
+        return int(self._get_attr('block-size', '0'))
 
     def set_block_size(self, value):
         self._set_attr('block-size', str(value))
@@ -47,7 +47,10 @@ class Data(ElementBase):
         self._set_attr('seq', str(value))
 
     def get_data(self):
-        b64_data = self.xml.text.strip()
+        text = self.xml.text
+        if not text:
+            raise XMPPError('not-acceptable', 'IBB data element is empty.')
+        b64_data = text.strip()
         if VALID_B64.match(b64_data).group() == b64_data:
             return from_b64(b64_data)
         else:
diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py
index 817f96a1..3be894eb 100644
--- a/slixmpp/plugins/xep_0047/stream.py
+++ b/slixmpp/plugins/xep_0047/stream.py
@@ -1,7 +1,6 @@
+import asyncio
 import socket
-import threading
 import logging
-from queue import Queue
 
 from slixmpp.stanza import Iq
 from slixmpp.exceptions import XMPPError
@@ -12,11 +11,10 @@ log = logging.getLogger(__name__)
 
 class IBBytestream(object):
 
-    def __init__(self, xmpp, sid, block_size, jid, peer, window_size=1, use_messages=False):
+    def __init__(self, xmpp, sid, block_size, jid, peer, use_messages=False):
         self.xmpp = xmpp
         self.sid = sid
         self.block_size = block_size
-        self.window_size = window_size
         self.use_messages = use_messages
 
         if jid is None:
@@ -27,29 +25,20 @@ class IBBytestream(object):
         self.send_seq = -1
         self.recv_seq = -1
 
-        self._send_seq_lock = threading.Lock()
-        self._recv_seq_lock = threading.Lock()
+        self.stream_started = False
+        self.stream_in_closed = False
+        self.stream_out_closed = False
 
-        self.stream_started = threading.Event()
-        self.stream_in_closed = threading.Event()
-        self.stream_out_closed = threading.Event()
+        self.recv_queue = asyncio.Queue()
 
-        self.recv_queue = Queue()
-
-        self.send_window = threading.BoundedSemaphore(value=self.window_size)
-        self.window_ids = set()
-        self.window_empty = threading.Event()
-        self.window_empty.set()
-
-    def send(self, data):
-        if not self.stream_started.is_set() or \
-               self.stream_out_closed.is_set():
+    @asyncio.coroutine
+    def send(self, data, timeout=None):
+        if not self.stream_started or self.stream_out_closed:
             raise socket.error
-        data = data[0:self.block_size]
-        self.send_window.acquire()
-        with self._send_seq_lock:
-            self.send_seq = (self.send_seq + 1) % 65535
-            seq = self.send_seq
+        if len(data) > self.block_size:
+            data = data[:self.block_size]
+        self.send_seq = (self.send_seq + 1) % 65535
+        seq = self.send_seq
         if self.use_messages:
             msg = self.xmpp.Message()
             msg['to'] = self.peer_jid
@@ -59,7 +48,6 @@ class IBBytestream(object):
             msg['ibb_data']['seq'] = seq
             msg['ibb_data']['data'] = data
             msg.send()
-            self.send_window.release()
         else:
             iq = self.xmpp.Iq()
             iq['type'] = 'set'
@@ -68,71 +56,65 @@ class IBBytestream(object):
             iq['ibb_data']['sid'] = self.sid
             iq['ibb_data']['seq'] = seq
             iq['ibb_data']['data'] = data
-            self.window_empty.clear()
-            self.window_ids.add(iq['id'])
-            iq.send(callback=self._recv_ack)
+            yield from iq.send(timeout=timeout)
         return len(data)
 
-    def sendall(self, data):
+    @asyncio.coroutine
+    def sendall(self, data, timeout=None):
         sent_len = 0
         while sent_len < len(data):
-            sent_len += self.send(data[sent_len:])
+            sent_len += yield from self.send(data[sent_len:self.block_size], timeout=timeout)
 
-    def _recv_ack(self, iq):
-        self.window_ids.remove(iq['id'])
-        if not self.window_ids:
-            self.window_empty.set()
-        self.send_window.release()
-        if iq['type'] == 'error':
-            self.close()
+    @asyncio.coroutine
+    def sendfile(self, file, timeout=None):
+        while True:
+            data = file.read(self.block_size)
+            if not data:
+                break
+            yield from self.send(data, timeout=timeout)
 
     def _recv_data(self, stanza):
-        with self._recv_seq_lock:
-            new_seq = stanza['ibb_data']['seq']
-            if new_seq != (self.recv_seq + 1) % 65535:
-                self.close()
-                raise XMPPError('unexpected-request')
-            self.recv_seq = new_seq
+        new_seq = stanza['ibb_data']['seq']
+        if new_seq != (self.recv_seq + 1) % 65535:
+            self.close()
+            raise XMPPError('unexpected-request')
+        self.recv_seq = new_seq
 
         data = stanza['ibb_data']['data']
         if len(data) > self.block_size:
             self.close()
             raise XMPPError('not-acceptable')
 
-        self.recv_queue.put(data)
-        self.xmpp.event('ibb_stream_data', {'stream': self, 'data': data})
+        self.recv_queue.put_nowait(data)
+        self.xmpp.event('ibb_stream_data', self)
 
         if isinstance(stanza, Iq):
             stanza.reply().send()
 
     def recv(self, *args, **kwargs):
-        return self.read(block=True)
+        return self.read()
 
-    def read(self, block=True, timeout=None, **kwargs):
-        if not self.stream_started.is_set() or \
-               self.stream_in_closed.is_set():
+    def read(self):
+        if not self.stream_started or self.stream_in_closed:
             raise socket.error
-        if timeout is not None:
-            block = True
-        try:
-            return self.recv_queue.get(block, timeout)
-        except:
-            return None
+        return self.recv_queue.get_nowait()
 
-    def close(self):
+    def close(self, timeout=None):
         iq = self.xmpp.Iq()
         iq['type'] = 'set'
         iq['to'] = self.peer_jid
         iq['from'] = self.self_jid
         iq['ibb_close']['sid'] = self.sid
-        self.stream_out_closed.set()
-        iq.send(block=False,
-                callback=lambda x: self.stream_in_closed.set())
+        self.stream_out_closed = True
+        def _close_stream(_):
+            self.stream_in_closed = True
+        future = iq.send(timeout=timeout, callback=_close_stream)
         self.xmpp.event('ibb_stream_end', self)
+        return future
 
     def _closed(self, iq):
-        self.stream_in_closed.set()
-        self.stream_out_closed.set()
+        self.stream_in_closed = True
+        self.stream_out_closed = True
         iq.reply().send()
         self.xmpp.event('ibb_stream_end', self)
 
diff --git a/tests/test_stream_xep_0047.py b/tests/test_stream_xep_0047.py
index 2cc43823..ecba2445 100644
--- a/tests/test_stream_xep_0047.py
+++ b/tests/test_stream_xep_0047.py
@@ -1,3 +1,4 @@
+import asyncio
 import threading
 import time
 
@@ -78,6 +79,7 @@ class TestInBandByteStreams(SlixTest):
 
         self.assertEqual(events, set(['ibb_stream_start', 'callback']))
 
+    @asyncio.coroutine
     def testSendData(self):
         """Test sending data over an in-band bytestream."""
 
@@ -115,7 +117,7 @@ class TestInBandByteStreams(SlixTest):
 
 
         # Test sending data out
-        stream.send("Testing")
+        yield from stream.send("Testing")
 
         self.send("""
           <iq type="set" id="2"