diff --git a/examples/echo_bot.rs b/examples/echo_bot.rs index 2cb09a91..462f7fe8 100644 --- a/examples/echo_bot.rs +++ b/examples/echo_bot.rs @@ -10,7 +10,7 @@ use std::env::args; use std::process::exit; use try_from::TryFrom; use tokio_core::reactor::Core; -use futures::{Future, Stream, Sink, future}; +use futures::{Stream, Sink, future}; use tokio_xmpp::Client; use minidom::Element; use xmpp_parsers::presence::{Presence, Type as PresenceType, Show as PresenceShow}; @@ -33,18 +33,11 @@ fn main() { // Make the two interfaces for sending and receiving independent // of each other so we can move one into a closure. - let (sink, stream) = client.split(); + let (mut sink, stream) = client.split(); // Wrap sink in Option so that we can take() it for the send(self) // to consume and return it back when ready. - let mut sink = Some(sink); let mut send = move |stanza| { - sink = Some( - sink.take(). - expect("sink") - .send(stanza) - .wait() - .expect("sink.send") - ); + sink.start_send(stanza).expect("start_send"); }; // Main loop, processes events let done = stream.for_each(|event| { diff --git a/examples/echo_component.rs b/examples/echo_component.rs index ab385732..745e0771 100644 --- a/examples/echo_component.rs +++ b/examples/echo_component.rs @@ -11,7 +11,7 @@ use std::process::exit; use std::str::FromStr; use try_from::TryFrom; use tokio_core::reactor::Core; -use futures::{Future, Stream, Sink, future}; +use futures::{Stream, Sink, future}; use tokio_xmpp::Component; use minidom::Element; use xmpp_parsers::presence::{Presence, Type as PresenceType, Show as PresenceShow}; @@ -38,18 +38,11 @@ fn main() { // Make the two interfaces for sending and receiving independent // of each other so we can move one into a closure. println!("Got it: {}", component.jid); - let (sink, stream) = component.split(); + let (mut sink, stream) = component.split(); // Wrap sink in Option so that we can take() it for the send(self) // to consume and return it back when ready. - let mut sink = Some(sink); let mut send = move |stanza| { - sink = Some( - sink.take(). - expect("sink") - .send(stanza) - .wait() - .expect("sink.send") - ); + sink.start_send(stanza).expect("start_send"); }; // Main loop, processes events let done = stream.for_each(|event| { diff --git a/src/client/mod.rs b/src/client/mod.rs index e814f9b6..dc948fa5 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -137,6 +137,15 @@ impl Stream for Client { } }, ClientState::Connected(mut stream) => { + // Poll sink + match stream.poll_complete() { + Ok(Async::NotReady) => (), + Ok(Async::Ready(())) => (), + Err(e) => + return Err(e.description().to_owned()), + }; + + // Poll stream match stream.poll() { Ok(Async::Ready(None)) => { // EOF diff --git a/src/component/mod.rs b/src/component/mod.rs index 2964700f..04613c50 100644 --- a/src/component/mod.rs +++ b/src/component/mod.rs @@ -92,6 +92,15 @@ impl Stream for Component { } }, ComponentState::Connected(mut stream) => { + // Poll sink + match stream.poll_complete() { + Ok(Async::NotReady) => (), + Ok(Async::Ready(())) => (), + Err(e) => + return Err(e.description().to_owned()), + }; + + // Poll stream match stream.poll() { Ok(Async::NotReady) => { self.state = ComponentState::Connected(stream);