diff --git a/examples/echo_bot.rs b/examples/echo_bot.rs index e9761c9..9f39dfe 100644 --- a/examples/echo_bot.rs +++ b/examples/echo_bot.rs @@ -6,53 +6,85 @@ extern crate xml; use std::str::FromStr; use tokio_core::reactor::Core; -use futures::{Future, Stream, Sink}; -use jid::Jid; -use tokio_xmpp::TcpClient; +use futures::{Future, Stream, Sink, future}; +use tokio_xmpp::{Client, ClientEvent}; use tokio_xmpp::xmpp_codec::Packet; fn main() { - let jid = Jid::from_str("astrobot@example.net").expect("JID"); - let password = "".to_owned(); - - use std::net::ToSocketAddrs; - let addr = "[2a01:4f8:a0:33d0::5]:5222" - .to_socket_addrs().unwrap() - .next().unwrap(); - let mut core = Core::new().unwrap(); - let client = TcpClient::connect( - jid.clone(), - &addr, - &core.handle() - ).map_err(|e| format!("{}", e) - ).and_then(|stream| { - if stream.can_starttls() { - stream.starttls() - } else { - panic!("No STARTTLS") - } - }).and_then(|stream| { - let username = jid.node.as_ref().unwrap().to_owned(); - stream.auth(username, password).expect("auth") - }).and_then(|stream| { - stream.bind() - }).and_then(|stream| { - println!("Bound to {}", stream.jid); + let client = Client::new("astrobot@example.org", "", &core.handle()).unwrap(); + // let client = TcpClient::connect( + // jid.clone(), + // &addr, + // &core.handle() + // ).map_err(|e| format!("{}", e) + // ).and_then(|stream| { + // if stream.can_starttls() { + // stream.starttls() + // } else { + // panic!("No STARTTLS") + // } + // }).and_then(|stream| { + // let username = jid.node.as_ref().unwrap().to_owned(); + // stream.auth(username, password).expect("auth") + // }).and_then(|stream| { + // stream.bind() + // }).and_then(|stream| { + // println!("Bound to {}", stream.jid); - let presence = xml::Element::new("presence".to_owned(), None, vec![]); - stream.send(Packet::Stanza(presence)) - .map_err(|e| format!("{}", e)) - }).and_then(|stream| { - stream.for_each(|event| { - match event { - Packet::Stanza(el) => println!("<< {}", el), - _ => println!("!! {:?}", event), - } - Ok(()) - }).map_err(|e| format!("{}", e)) + // let presence = xml::Element::new("presence".to_owned(), None, vec![]); + // stream.send(Packet::Stanza(presence)) + // .map_err(|e| format!("{}", e)) + // }).and_then(|stream| { + // let main_loop = |stream| { + // stream.into_future() + // .and_then(|(event, stream)| { + // stream.send(Packet::Stanza(unreachable!())) + // }).and_then(main_loop) + // }; + // main_loop(stream) + // }).and_then(|(event, stream)| { + // let (mut sink, stream) = stream.split(); + // stream.for_each(move |event| { + // match event { + // Packet::Stanza(ref message) + // if message.name == "message" => { + // let ty = message.get_attribute("type", None); + // let body = message.get_child("body", Some("jabber:client")) + // .map(|body_el| body_el.content_str()); + // match ty { + // None | Some("normal") | Some("chat") + // if body.is_some() => { + // let from = message.get_attribute("from", None).unwrap(); + // println!("Got message from {}: {:?}", from, body); + // let reply = make_reply(from, body.unwrap()); + // sink.send(Packet::Stanza(reply)) + // .and_then(|_| Ok(())) + // }, + // _ => future::ok(()), + // } + // }, + // _ => future::ok(()), + // } + // }).map_err(|e| format!("{}", e)) + // }); + + let done = client.for_each(|event| { + match event { + ClientEvent::Online => { + println!("Online!"); + }, + ClientEvent::Stanza(stanza) => { + }, + _ => { + println!("Event: {:?}", event); + }, + } + + Ok(()) }); - match core.run(client) { + + match core.run(done) { Ok(_) => (), Err(e) => { println!("Fatal: {}", e); @@ -60,3 +92,15 @@ fn main() { } } } + +fn make_reply(to: &str, body: String) -> xml::Element { + let mut message = xml::Element::new( + "message".to_owned(), + None, + vec![("type".to_owned(), None, "chat".to_owned()), + ("to".to_owned(), None, to.to_owned())] + ); + message.tag(xml::Element::new("body".to_owned(), None, vec![])) + .text(body); + message +} diff --git a/src/client_auth.rs b/src/client/auth.rs similarity index 100% rename from src/client_auth.rs rename to src/client/auth.rs diff --git a/src/client_bind.rs b/src/client/bind.rs similarity index 100% rename from src/client_bind.rs rename to src/client/bind.rs diff --git a/src/client/mod.rs b/src/client/mod.rs new file mode 100644 index 0000000..624bf72 --- /dev/null +++ b/src/client/mod.rs @@ -0,0 +1,167 @@ +use std::mem::replace; +use std::str::FromStr; +use std::error::Error; +use tokio_core::reactor::{Core, Handle}; +use tokio_core::net::TcpStream; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_tls::TlsStream; +use futures::*; +use jid::{Jid, JidParseError}; +use xml; +use sasl::common::{Credentials, ChannelBinding}; + +use super::xmpp_codec::Packet; +use super::xmpp_stream; +use super::tcp::TcpClient; +use super::starttls::{NS_XMPP_TLS, StartTlsClient}; + +mod auth; +use self::auth::*; +mod bind; +use self::bind::*; + +pub struct Client { + pub jid: Jid, + password: String, + state: ClientState, +} + +type XMPPStream = xmpp_stream::XMPPStream>; + +enum ClientState { + Invalid, + Disconnected, + Connecting(Box>), + Connected(XMPPStream), + // Sending, + // Drain, +} + +impl Client { + pub fn new(jid: &str, password: &str, handle: &Handle) -> Result { + let jid = try!(Jid::from_str(jid)); + let password = password.to_owned(); + let connect = Self::make_connect(jid.clone(), password.clone(), handle); + Ok(Client { + jid, password, + state: ClientState::Connecting(connect), + }) + } + + fn make_connect(jid: Jid, password: String, handle: &Handle) -> Box> { + use std::net::ToSocketAddrs; + let addr = "89.238.79.220:5222" + .to_socket_addrs().unwrap() + .next().unwrap(); + let username = jid.node.as_ref().unwrap().to_owned(); + let password = password; + Box::new( + TcpClient::connect( + jid, + &addr, + handle + ).map_err(|e| format!("{}", e) + ).and_then(|stream| { + if Self::can_starttls(&stream) { + Self::starttls(stream) + } else { + panic!("No STARTTLS") + } + }).and_then(move |stream| { + Self::auth(stream, username, password).expect("auth") + }).and_then(|stream| { + Self::bind(stream) + }).and_then(|stream| { + println!("Bound to {}", stream.jid); + + let presence = xml::Element::new("presence".to_owned(), None, vec![]); + stream.send(Packet::Stanza(presence)) + .map_err(|e| format!("{}", e)) + }) + ) + } + + fn can_starttls(stream: &xmpp_stream::XMPPStream) -> bool { + stream.stream_features + .get_child("starttls", Some(NS_XMPP_TLS)) + .is_some() + } + + fn starttls(stream: xmpp_stream::XMPPStream) -> StartTlsClient { + StartTlsClient::from_stream(stream) + } + + fn auth(stream: xmpp_stream::XMPPStream, username: String, password: String) -> Result, String> { + let creds = Credentials::default() + .with_username(username) + .with_password(password) + .with_channel_binding(ChannelBinding::None); + ClientAuth::new(stream, creds) + } + + fn bind(stream: xmpp_stream::XMPPStream) -> ClientBind { + ClientBind::new(stream) + } +} + +#[derive(Debug)] +pub enum ClientEvent { + Online, + Disconnected, + Stanza(xml::Element), +} + +impl Stream for Client { + type Item = ClientEvent; + type Error = String; + + fn poll(&mut self) -> Poll, Self::Error> { + println!("stream.poll"); + let state = replace(&mut self.state, ClientState::Invalid); + + match state { + ClientState::Invalid => + Err("invalid client state".to_owned()), + ClientState::Disconnected => + Ok(Async::NotReady), + ClientState::Connecting(mut connect) => { + match connect.poll() { + Ok(Async::Ready(stream)) => { + println!("connected"); + self.state = ClientState::Connected(stream); + self.poll() + }, + Ok(Async::NotReady) => { + self.state = ClientState::Connecting(connect); + Ok(Async::NotReady) + }, + Err(e) => + Err(e), + } + }, + ClientState::Connected(mut stream) => { + match stream.poll() { + Ok(Async::NotReady) => { + self.state = ClientState::Connected(stream); + Ok(Async::NotReady) + }, + Ok(Async::Ready(None)) => { + // EOF + self.state = ClientState::Disconnected; + Ok(Async::Ready(Some(ClientEvent::Disconnected))) + }, + Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => { + self.state = ClientState::Connected(stream); + Ok(Async::Ready(Some(ClientEvent::Stanza(stanza)))) + }, + Ok(Async::Ready(_)) => { + self.state = ClientState::Connected(stream); + Ok(Async::NotReady) + }, + Err(e) => + Err(e.description().to_owned()), + } + }, + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 12b710e..49cab6e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,10 +18,8 @@ mod tcp; pub use tcp::*; mod starttls; pub use starttls::*; -mod client_auth; -pub use client_auth::*; -mod client_bind; -pub use client_bind::*; +mod client; +pub use client::{Client, ClientEvent}; // type FullClient = sasl::Client> diff --git a/src/starttls.rs b/src/starttls.rs index 97a65c5..79ee731 100644 --- a/src/starttls.rs +++ b/src/starttls.rs @@ -15,6 +15,7 @@ use stream_start::StreamStart; pub const NS_XMPP_TLS: &str = "urn:ietf:params:xml:ns:xmpp-tls"; + pub struct StartTlsClient { state: StartTlsClientState, jid: Jid, diff --git a/src/xmpp_stream.rs b/src/xmpp_stream.rs index 6eb2b24..01d4ace 100644 --- a/src/xmpp_stream.rs +++ b/src/xmpp_stream.rs @@ -4,14 +4,10 @@ use futures::*; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::Framed; use xml; -use sasl::common::{Credentials, ChannelBinding}; use jid::Jid; use xmpp_codec::*; use stream_start::*; -use starttls::{NS_XMPP_TLS, StartTlsClient}; -use client_auth::ClientAuth; -use client_bind::ClientBind; pub const NS_XMPP_STREAM: &str = "http://etherx.jabber.org/streams"; @@ -42,28 +38,6 @@ impl XMPPStream { pub fn restart(self) -> StreamStart { Self::from_stream(self.stream.into_inner(), self.jid) } - - pub fn can_starttls(&self) -> bool { - self.stream_features - .get_child("starttls", Some(NS_XMPP_TLS)) - .is_some() - } - - pub fn starttls(self) -> StartTlsClient { - StartTlsClient::from_stream(self) - } - - pub fn auth(self, username: String, password: String) -> Result, String> { - let creds = Credentials::default() - .with_username(username) - .with_password(password) - .with_channel_binding(ChannelBinding::None); - ClientAuth::new(self, creds) - } - - pub fn bind(self) -> ClientBind { - ClientBind::new(self) - } } /// Proxy to self.stream