use std::mem::replace; use std::str::FromStr; use std::error::Error; use tokio_core::reactor::Handle; use tokio_core::net::TcpStream; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_tls::TlsStream; use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink}; use minidom::Element; use jid::{Jid, JidParseError}; use sasl::common::{Credentials, ChannelBinding}; use super::xmpp_codec::Packet; use super::xmpp_stream; use super::starttls::{NS_XMPP_TLS, StartTlsClient}; use super::happy_eyeballs::Connecter; mod auth; use self::auth::ClientAuth; mod bind; use self::bind::ClientBind; mod event; pub use self::event::Event as ClientEvent; pub struct Client { pub jid: Jid, state: ClientState, } type XMPPStream = xmpp_stream::XMPPStream>; const NS_JABBER_CLIENT: &str = "jabber:client"; enum ClientState { Invalid, Disconnected, Connecting(Box>), Connected(XMPPStream), } impl Client { pub fn new(jid: &str, password: &str, handle: Handle) -> Result { let jid = try!(Jid::from_str(jid)); let password = password.to_owned(); let connect = Self::make_connect(jid.clone(), password.clone(), handle); Ok(Client { jid, state: ClientState::Connecting(connect), }) } fn make_connect(jid: Jid, password: String, handle: Handle) -> Box> { let username = jid.node.as_ref().unwrap().to_owned(); let jid1 = jid.clone(); let jid2 = jid.clone(); let password = password; Box::new( Connecter::from_lookup(handle, &jid.domain, "_xmpp-client._tcp", 5222) .expect("Connector::from_lookup") .and_then(move |tcp_stream| xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_CLIENT.to_owned()) .map_err(|e| format!("{}", e)) ).and_then(|xmpp_stream| { if Self::can_starttls(&xmpp_stream) { Self::starttls(xmpp_stream) } else { panic!("No STARTTLS") } }).and_then(|tls_stream| XMPPStream::start(tls_stream, jid2, NS_JABBER_CLIENT.to_owned()) .map_err(|e| format!("{}", e)) ).and_then(move |xmpp_stream| { Self::auth(xmpp_stream, username, password).expect("auth") }).and_then(|xmpp_stream| { Self::bind(xmpp_stream) }).and_then(|xmpp_stream| { println!("Bound to {}", xmpp_stream.jid); Ok(xmpp_stream) }) ) } fn can_starttls(stream: &xmpp_stream::XMPPStream) -> bool { stream.stream_features .get_child("starttls", NS_XMPP_TLS) .is_some() } fn starttls(stream: xmpp_stream::XMPPStream) -> StartTlsClient { StartTlsClient::from_stream(stream) } fn auth(stream: xmpp_stream::XMPPStream, username: String, password: String) -> Result, String> { let creds = Credentials::default() .with_username(username) .with_password(password) .with_channel_binding(ChannelBinding::None); ClientAuth::new(stream, creds) } fn bind(stream: xmpp_stream::XMPPStream) -> ClientBind { ClientBind::new(stream) } } impl Stream for Client { type Item = ClientEvent; type Error = String; fn poll(&mut self) -> Poll, Self::Error> { let state = replace(&mut self.state, ClientState::Invalid); match state { ClientState::Invalid => Err("invalid client state".to_owned()), ClientState::Disconnected => Ok(Async::Ready(None)), ClientState::Connecting(mut connect) => { match connect.poll() { Ok(Async::Ready(stream)) => { self.state = ClientState::Connected(stream); Ok(Async::Ready(Some(ClientEvent::Online))) }, Ok(Async::NotReady) => { self.state = ClientState::Connecting(connect); Ok(Async::NotReady) }, Err(e) => Err(e), } }, ClientState::Connected(mut stream) => { match stream.poll() { Ok(Async::Ready(None)) => { // EOF self.state = ClientState::Disconnected; Ok(Async::Ready(Some(ClientEvent::Disconnected))) }, Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => { self.state = ClientState::Connected(stream); Ok(Async::Ready(Some(ClientEvent::Stanza(stanza)))) }, Ok(Async::NotReady) | Ok(Async::Ready(_)) => { self.state = ClientState::Connected(stream); Ok(Async::NotReady) }, Err(e) => Err(e.description().to_owned()), } }, } } } impl Sink for Client { type SinkItem = Element; type SinkError = String; fn start_send(&mut self, item: Self::SinkItem) -> StartSend { match self.state { ClientState::Connected(ref mut stream) => match stream.start_send(Packet::Stanza(item)) { Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) => Ok(AsyncSink::NotReady(stanza)), Ok(AsyncSink::NotReady(_)) => panic!("Client.start_send with stanza but got something else back"), Ok(AsyncSink::Ready) => { Ok(AsyncSink::Ready) }, Err(e) => Err(e.description().to_owned()), }, _ => Ok(AsyncSink::NotReady(item)), } } fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { match self.state { ClientState::Connected(ref mut stream) => stream.poll_complete() .map_err(|e| e.description().to_owned()), _ => Ok(Async::Ready(())), } } }