From 3c952e47d13dc9851f2518609791da85e5a05bb9 Mon Sep 17 00:00:00 2001 From: Astro Date: Sun, 2 Jul 2017 01:25:22 +0200 Subject: [PATCH] impl Sink for Client + complete echo_bot --- examples/echo_bot.rs | 114 +++++++++++++++++++------------------------ src/client/mod.rs | 52 +++++++++++++++----- src/xmpp_stream.rs | 1 - 3 files changed, 89 insertions(+), 78 deletions(-) diff --git a/examples/echo_bot.rs b/examples/echo_bot.rs index 9f39dfe4..03053dbd 100644 --- a/examples/echo_bot.rs +++ b/examples/echo_bot.rs @@ -4,84 +4,61 @@ extern crate tokio_xmpp; extern crate jid; extern crate xml; -use std::str::FromStr; use tokio_core::reactor::Core; use futures::{Future, Stream, Sink, future}; use tokio_xmpp::{Client, ClientEvent}; -use tokio_xmpp::xmpp_codec::Packet; fn main() { let mut core = Core::new().unwrap(); let client = Client::new("astrobot@example.org", "", &core.handle()).unwrap(); - // let client = TcpClient::connect( - // jid.clone(), - // &addr, - // &core.handle() - // ).map_err(|e| format!("{}", e) - // ).and_then(|stream| { - // if stream.can_starttls() { - // stream.starttls() - // } else { - // panic!("No STARTTLS") - // } - // }).and_then(|stream| { - // let username = jid.node.as_ref().unwrap().to_owned(); - // stream.auth(username, password).expect("auth") - // }).and_then(|stream| { - // stream.bind() - // }).and_then(|stream| { - // println!("Bound to {}", stream.jid); - // let presence = xml::Element::new("presence".to_owned(), None, vec![]); - // stream.send(Packet::Stanza(presence)) - // .map_err(|e| format!("{}", e)) - // }).and_then(|stream| { - // let main_loop = |stream| { - // stream.into_future() - // .and_then(|(event, stream)| { - // stream.send(Packet::Stanza(unreachable!())) - // }).and_then(main_loop) - // }; - // main_loop(stream) - // }).and_then(|(event, stream)| { - // let (mut sink, stream) = stream.split(); - // stream.for_each(move |event| { - // match event { - // Packet::Stanza(ref message) - // if message.name == "message" => { - // let ty = message.get_attribute("type", None); - // let body = message.get_child("body", Some("jabber:client")) - // .map(|body_el| body_el.content_str()); - // match ty { - // None | Some("normal") | Some("chat") - // if body.is_some() => { - // let from = message.get_attribute("from", None).unwrap(); - // println!("Got message from {}: {:?}", from, body); - // let reply = make_reply(from, body.unwrap()); - // sink.send(Packet::Stanza(reply)) - // .and_then(|_| Ok(())) - // }, - // _ => future::ok(()), - // } - // }, - // _ => future::ok(()), - // } - // }).map_err(|e| format!("{}", e)) - // }); - - let done = client.for_each(|event| { - match event { + let (sink, stream) = client.split(); + let mut sink = Some(sink); + let done = stream.for_each(move |event| { + let result: Box> = match event { ClientEvent::Online => { println!("Online!"); + + let presence = make_presence(); + sink = Some( + sink.take(). + expect("sink") + .send(presence) + .wait() + .expect("sink.send") + ); + Box::new( + future::ok(()) + ) }, - ClientEvent::Stanza(stanza) => { + ClientEvent::Stanza(ref stanza) + if stanza.name == "message" + && stanza.get_attribute("type", None) != Some("error") => + { + let from = stanza.get_attribute("from", None); + let body = stanza.get_child("body", Some("jabber:client")) + .map(|el| el.content_str()); + + match (from.as_ref(), body) { + (Some(from), Some(body)) => { + let reply = make_reply(from, body); + sink = Some( + sink.take(). + expect("sink") + .send(reply) + .wait() + .expect("sink.send") + ); + }, + _ => (), + }; + Box::new(future::ok(())) }, _ => { - println!("Event: {:?}", event); + Box::new(future::ok(())) }, - } - - Ok(()) + }; + result }); match core.run(done) { @@ -93,6 +70,15 @@ fn main() { } } +fn make_presence() -> xml::Element { + let mut presence = xml::Element::new("presence".to_owned(), None, vec![]); + presence.tag(xml::Element::new("status".to_owned(), None, vec![])) + .text("chat".to_owned()); + presence.tag(xml::Element::new("show".to_owned(), None, vec![])) + .text("Echoing messages".to_owned()); + presence +} + fn make_reply(to: &str, body: String) -> xml::Element { let mut message = xml::Element::new( "message".to_owned(), diff --git a/src/client/mod.rs b/src/client/mod.rs index 624bf724..6554a6bc 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,7 +1,7 @@ use std::mem::replace; use std::str::FromStr; use std::error::Error; -use tokio_core::reactor::{Core, Handle}; +use tokio_core::reactor::Handle; use tokio_core::net::TcpStream; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_tls::TlsStream; @@ -22,7 +22,6 @@ use self::bind::*; pub struct Client { pub jid: Jid, - password: String, state: ClientState, } @@ -33,8 +32,6 @@ enum ClientState { Disconnected, Connecting(Box>), Connected(XMPPStream), - // Sending, - // Drain, } impl Client { @@ -43,7 +40,7 @@ impl Client { let password = password.to_owned(); let connect = Self::make_connect(jid.clone(), password.clone(), handle); Ok(Client { - jid, password, + jid, state: ClientState::Connecting(connect), }) } @@ -73,10 +70,7 @@ impl Client { Self::bind(stream) }).and_then(|stream| { println!("Bound to {}", stream.jid); - - let presence = xml::Element::new("presence".to_owned(), None, vec![]); - stream.send(Packet::Stanza(presence)) - .map_err(|e| format!("{}", e)) + Ok(stream) }) ) } @@ -116,20 +110,18 @@ impl Stream for Client { type Error = String; fn poll(&mut self) -> Poll, Self::Error> { - println!("stream.poll"); let state = replace(&mut self.state, ClientState::Invalid); match state { ClientState::Invalid => Err("invalid client state".to_owned()), ClientState::Disconnected => - Ok(Async::NotReady), + Ok(Async::Ready(None)), ClientState::Connecting(mut connect) => { match connect.poll() { Ok(Async::Ready(stream)) => { - println!("connected"); self.state = ClientState::Connected(stream); - self.poll() + Ok(Async::Ready(Some(ClientEvent::Online))) }, Ok(Async::NotReady) => { self.state = ClientState::Connecting(connect); @@ -165,3 +157,37 @@ impl Stream for Client { } } } + +impl Sink for Client { + type SinkItem = xml::Element; + type SinkError = String; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + match self.state { + ClientState::Connected(ref mut stream) => + match stream.start_send(Packet::Stanza(item)) { + Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) => + Ok(AsyncSink::NotReady(stanza)), + Ok(AsyncSink::NotReady(_)) => + panic!("Client.start_send with stanza but got something else back"), + Ok(AsyncSink::Ready) => { + Ok(AsyncSink::Ready) + }, + Err(e) => + Err(e.description().to_owned()), + }, + _ => + Ok(AsyncSink::NotReady(item)), + } + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + match &mut self.state { + &mut ClientState::Connected(ref mut stream) => + stream.poll_complete() + .map_err(|e| e.description().to_owned()), + _ => + Ok(Async::Ready(())), + } + } +} diff --git a/src/xmpp_stream.rs b/src/xmpp_stream.rs index 01d4aceb..a09b5a98 100644 --- a/src/xmpp_stream.rs +++ b/src/xmpp_stream.rs @@ -1,4 +1,3 @@ -use std::default::Default; use std::collections::HashMap; use futures::*; use tokio_io::{AsyncRead, AsyncWrite};