From 58b5a84391399b31be551371e1986541df873e41 Mon Sep 17 00:00:00 2001 From: Astro Date: Thu, 24 Aug 2017 20:10:58 +0200 Subject: [PATCH] client: stream.poll_complete() for ease of use --- examples/echo_bot.rs | 13 +++---------- examples/echo_component.rs | 13 +++---------- src/client/mod.rs | 9 +++++++++ src/component/mod.rs | 9 +++++++++ 4 files changed, 24 insertions(+), 20 deletions(-) diff --git a/examples/echo_bot.rs b/examples/echo_bot.rs index 2cb09a91..462f7fe8 100644 --- a/examples/echo_bot.rs +++ b/examples/echo_bot.rs @@ -10,7 +10,7 @@ use std::env::args; use std::process::exit; use try_from::TryFrom; use tokio_core::reactor::Core; -use futures::{Future, Stream, Sink, future}; +use futures::{Stream, Sink, future}; use tokio_xmpp::Client; use minidom::Element; use xmpp_parsers::presence::{Presence, Type as PresenceType, Show as PresenceShow}; @@ -33,18 +33,11 @@ fn main() { // Make the two interfaces for sending and receiving independent // of each other so we can move one into a closure. - let (sink, stream) = client.split(); + let (mut sink, stream) = client.split(); // Wrap sink in Option so that we can take() it for the send(self) // to consume and return it back when ready. - let mut sink = Some(sink); let mut send = move |stanza| { - sink = Some( - sink.take(). - expect("sink") - .send(stanza) - .wait() - .expect("sink.send") - ); + sink.start_send(stanza).expect("start_send"); }; // Main loop, processes events let done = stream.for_each(|event| { diff --git a/examples/echo_component.rs b/examples/echo_component.rs index ab385732..745e0771 100644 --- a/examples/echo_component.rs +++ b/examples/echo_component.rs @@ -11,7 +11,7 @@ use std::process::exit; use std::str::FromStr; use try_from::TryFrom; use tokio_core::reactor::Core; -use futures::{Future, Stream, Sink, future}; +use futures::{Stream, Sink, future}; use tokio_xmpp::Component; use minidom::Element; use xmpp_parsers::presence::{Presence, Type as PresenceType, Show as PresenceShow}; @@ -38,18 +38,11 @@ fn main() { // Make the two interfaces for sending and receiving independent // of each other so we can move one into a closure. println!("Got it: {}", component.jid); - let (sink, stream) = component.split(); + let (mut sink, stream) = component.split(); // Wrap sink in Option so that we can take() it for the send(self) // to consume and return it back when ready. - let mut sink = Some(sink); let mut send = move |stanza| { - sink = Some( - sink.take(). - expect("sink") - .send(stanza) - .wait() - .expect("sink.send") - ); + sink.start_send(stanza).expect("start_send"); }; // Main loop, processes events let done = stream.for_each(|event| { diff --git a/src/client/mod.rs b/src/client/mod.rs index e814f9b6..dc948fa5 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -137,6 +137,15 @@ impl Stream for Client { } }, ClientState::Connected(mut stream) => { + // Poll sink + match stream.poll_complete() { + Ok(Async::NotReady) => (), + Ok(Async::Ready(())) => (), + Err(e) => + return Err(e.description().to_owned()), + }; + + // Poll stream match stream.poll() { Ok(Async::Ready(None)) => { // EOF diff --git a/src/component/mod.rs b/src/component/mod.rs index 2964700f..04613c50 100644 --- a/src/component/mod.rs +++ b/src/component/mod.rs @@ -92,6 +92,15 @@ impl Stream for Component { } }, ComponentState::Connected(mut stream) => { + // Poll sink + match stream.poll_complete() { + Ok(Async::NotReady) => (), + Ok(Async::Ready(())) => (), + Err(e) => + return Err(e.description().to_owned()), + }; + + // Poll stream match stream.poll() { Ok(Async::NotReady) => { self.state = ComponentState::Connected(stream);