From 1d69b1a256f9d64deb4dce0422736056798b30be Mon Sep 17 00:00:00 2001 From: Astro Date: Tue, 29 Jan 2019 23:04:42 +0100 Subject: [PATCH] improve echo_bot example --- examples/echo_bot.rs | 49 ++++++++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/examples/echo_bot.rs b/examples/echo_bot.rs index 9a47ccb..cda7053 100644 --- a/examples/echo_bot.rs +++ b/examples/echo_bot.rs @@ -24,17 +24,32 @@ fn main() { // Make the two interfaces for sending and receiving independent // of each other so we can move one into a closure. let (sink, stream) = client.split(); - let mut sink_state = Some(sink); - // Main loop, processes events - let done = stream.for_each(move |event| { - let mut sink_future = None; - if event.is_online() { + // Create outgoing pipe + let (mut tx, rx) = futures::unsync::mpsc::unbounded(); + rt.spawn( + rx.forward( + sink.sink_map_err(|_| panic!("Pipe")) + ) + .map(|(rx, mut sink)| { + drop(rx); + let _ = sink.close(); + }) + .map_err(|e| { + panic!("Send error: {:?}", e); + }) + ); + + // Main loop, processes events + let mut wait_for_stream_end = false; + let done = stream.for_each(move |event| { + if wait_for_stream_end { + /* Do nothing */ + } else if event.is_online() { println!("Online!"); let presence = make_presence(); - let sink = sink_state.take().unwrap(); - sink_future = Some(Box::new(sink.send(Packet::Stanza(presence)))); + tx.start_send(Packet::Stanza(presence)).unwrap(); } else if let Some(message) = event .into_stanza() .and_then(|stanza| Message::try_from(stanza).ok()) @@ -42,31 +57,21 @@ fn main() { match (message.from, message.bodies.get("")) { (Some(ref from), Some(ref body)) if body.0 == "die" => { println!("Secret die command triggered by {}", from); - let sink = sink_state.take().unwrap(); - sink_future = Some(Box::new(sink.send(Packet::StreamEnd))); + wait_for_stream_end = true; + tx.start_send(Packet::StreamEnd).unwrap(); } (Some(ref from), Some(ref body)) => { if message.type_ != MessageType::Error { // This is a message we'll echo let reply = make_reply(from.clone(), &body.0); - let sink = sink_state.take().unwrap(); - sink_future = Some(Box::new(sink.send(Packet::Stanza(reply)))); + tx.start_send(Packet::Stanza(reply)).unwrap(); } } _ => {} } - }; + } - sink_future - .map(|future| { - let wait_send: Box> = - Box::new(future - .map(|sink| { - sink_state = Some(sink); - })); - wait_send - }) - .unwrap_or_else(|| Box::new(future::ok(()))) + future::ok(()) }); // Start polling `done`