diff --git a/examples/echo_bot.rs b/examples/echo_bot.rs index 24b1010..0b69b93 100644 --- a/examples/echo_bot.rs +++ b/examples/echo_bot.rs @@ -1,4 +1,4 @@ -use futures::{future, Sink, Stream}; +use futures::{future, Future, Sink, Stream}; use std::env::args; use std::process::exit; use tokio::runtime::current_thread::Runtime; @@ -23,36 +23,50 @@ fn main() { // Make the two interfaces for sending and receiving independent // of each other so we can move one into a closure. - let (mut sink, stream) = client.split(); - // Wrap sink in Option so that we can take() it for the send(self) - // to consume and return it back when ready. - let mut send = move |stanza| { - sink.start_send(stanza).expect("start_send"); - }; + let (sink, stream) = client.split(); + let mut sink_state = Some(sink); // Main loop, processes events - let done = stream.for_each(|event| { + let done = stream.for_each(move |event| { + let mut sink_future = None; + if event.is_online() { println!("Online!"); let presence = make_presence(); - send(presence); + let sink = sink_state.take().unwrap(); + sink_future = Some(Box::new(sink.send(presence))); } else if let Some(message) = event .into_stanza() .and_then(|stanza| Message::try_from(stanza).ok()) { - // This is a message we'll echo match (message.from, message.bodies.get("")) { - (Some(from), Some(body)) => { + (Some(ref from), Some(ref body)) if body.0 == "die" => { + println!("Secret die command triggered by {}", from); + let sink = sink_state.as_mut().unwrap(); + sink.close().expect("close"); + } + (Some(ref from), Some(ref body)) => { if message.type_ != MessageType::Error { - let reply = make_reply(from, &body.0); - send(reply); + // This is a message we'll echo + let reply = make_reply(from.clone(), &body.0); + let sink = sink_state.take().unwrap(); + sink_future = Some(Box::new(sink.send(reply))); } } - _ => (), + _ => {} } - } + }; - Box::new(future::ok(())) + sink_future + .map(|future| { + let wait_send: Box> = + Box::new(future + .map(|sink| { + sink_state = Some(sink); + })); + wait_send + }) + .unwrap_or_else(|| Box::new(future::ok(()))) }); // Start polling `done`