use futures::{done, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream}; use idna; use xmpp_parsers::{Jid, JidParseError, Element}; use sasl::common::{ChannelBinding, Credentials}; use std::mem::replace; use std::str::FromStr; use tokio::net::TcpStream; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_tls::TlsStream; use super::event::Event; use super::happy_eyeballs::Connecter; use super::starttls::{StartTlsClient, NS_XMPP_TLS}; use super::xmpp_codec::Packet; use super::xmpp_stream; use super::{Error, ProtocolError}; mod auth; use self::auth::ClientAuth; mod bind; use self::bind::ClientBind; /// XMPP client connection and state pub struct Client { /// The client's current Jabber-Id pub jid: Jid, state: ClientState, } type XMPPStream = xmpp_stream::XMPPStream>; const NS_JABBER_CLIENT: &str = "jabber:client"; enum ClientState { Invalid, Disconnected, Connecting(Box>), Connected(XMPPStream), ClosingSendEnd(futures::sink::Send), ClosingClose(XMPPStream), } impl Client { /// Start a new XMPP client /// /// Start polling the returned instance so that it will connect /// and yield events. pub fn new(jid: &str, password: &str) -> Result { let jid = Jid::from_str(jid)?; let client = Self::new_with_jid(jid, password); Ok(client) } /// Start a new client given that the JID is already parsed. pub fn new_with_jid(jid: Jid, password: &str) -> Self { let password = password.to_owned(); let connect = Self::make_connect(jid.clone(), password.clone()); let client = Client { jid, state: ClientState::Connecting(Box::new(connect)), }; client } fn make_connect(jid: Jid, password: String) -> impl Future { let username = jid.node.as_ref().unwrap().to_owned(); let jid1 = jid.clone(); let jid2 = jid.clone(); let password = password; done(idna::domain_to_ascii(&jid.domain)) .map_err(|_| Error::Idna) .and_then(|domain| { done(Connecter::from_lookup( &domain, Some("_xmpp-client._tcp"), 5222, )) }) .flatten() .and_then(move |tcp_stream| { xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_CLIENT.to_owned()) }) .and_then(|xmpp_stream| { if Self::can_starttls(&xmpp_stream) { Ok(Self::starttls(xmpp_stream)) } else { Err(Error::Protocol(ProtocolError::NoTls)) } }) .flatten() .and_then(|tls_stream| XMPPStream::start(tls_stream, jid2, NS_JABBER_CLIENT.to_owned())) .and_then( move |xmpp_stream| done(Self::auth(xmpp_stream, username, password)), // TODO: flatten? ) .and_then(|auth| auth) .and_then(|xmpp_stream| Self::bind(xmpp_stream)) .and_then(|xmpp_stream| { // println!("Bound to {}", xmpp_stream.jid); Ok(xmpp_stream) }) } fn can_starttls(stream: &xmpp_stream::XMPPStream) -> bool { stream .stream_features .get_child("starttls", NS_XMPP_TLS) .is_some() } fn starttls( stream: xmpp_stream::XMPPStream, ) -> StartTlsClient { StartTlsClient::from_stream(stream) } fn auth( stream: xmpp_stream::XMPPStream, username: String, password: String, ) -> Result, Error> { let creds = Credentials::default() .with_username(username) .with_password(password) .with_channel_binding(ChannelBinding::None); ClientAuth::new(stream, creds) } fn bind(stream: xmpp_stream::XMPPStream) -> ClientBind { ClientBind::new(stream) } } impl Stream for Client { type Item = Event; type Error = Error; fn poll(&mut self) -> Poll, Self::Error> { let state = replace(&mut self.state, ClientState::Invalid); match state { ClientState::Invalid => Err(Error::InvalidState), ClientState::Disconnected => Ok(Async::Ready(None)), ClientState::Connecting(mut connect) => match connect.poll() { Ok(Async::Ready(stream)) => { self.state = ClientState::Connected(stream); Ok(Async::Ready(Some(Event::Online))) } Ok(Async::NotReady) => { self.state = ClientState::Connecting(connect); Ok(Async::NotReady) } Err(e) => Err(e), }, ClientState::Connected(mut stream) => { // Poll sink match stream.poll_complete() { Ok(Async::NotReady) => (), Ok(Async::Ready(())) => (), Err(e) => return Err(e)?, }; // Poll stream match stream.poll() { Ok(Async::Ready(None)) => { // EOF self.state = ClientState::Disconnected; Ok(Async::Ready(Some(Event::Disconnected))) } Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => { // Receive stanza self.state = ClientState::Connected(stream); Ok(Async::Ready(Some(Event::Stanza(stanza)))) } Ok(Async::Ready(Some(Packet::Text(_)))) => { // Ignore text between stanzas Ok(Async::NotReady) } Ok(Async::Ready(Some(Packet::StreamStart(_)))) => { // Err(ProtocolError::InvalidStreamStart.into()) } Ok(Async::Ready(Some(Packet::StreamEnd))) => { // End of stream: Ok(Async::Ready(None)) } Ok(Async::NotReady) => { // Try again later self.state = ClientState::Connected(stream); Ok(Async::NotReady) } Err(e) => Err(e)?, } } ClientState::ClosingSendEnd(_) => { self.state = state; Ok(Async::NotReady) } ClientState::ClosingClose(_) => { self.state = state; Ok(Async::NotReady) } } } } impl Sink for Client { type SinkItem = Element; type SinkError = Error; fn start_send(&mut self, item: Self::SinkItem) -> StartSend { match self.state { ClientState::Connected(ref mut stream) => { match stream.start_send(Packet::Stanza(item)) { Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) => Ok(AsyncSink::NotReady(stanza)), Ok(AsyncSink::NotReady(_)) => { panic!("Client.start_send with stanza but got something else back") } Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready), Err(e) => Err(e)?, } } _ => Ok(AsyncSink::NotReady(item)), } } fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { match self.state { ClientState::Connected(ref mut stream) => stream.poll_complete().map_err(|e| e.into()), ClientState::ClosingSendEnd(ref mut send) => { match send.poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(stream) => { self.state = ClientState::ClosingClose(stream); self.poll_complete() } } } ClientState::ClosingClose(ref mut stream) => { match stream.close()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(()) => { self.state = ClientState::Disconnected; Ok(Async::Ready(())) } } } _ => Ok(Async::Ready(())), } } /// Send ` and later close the inner TCP stream. fn close(&mut self) -> Poll<(), Self::SinkError> { let state = replace(&mut self.state, ClientState::Disconnected); match state { ClientState::Connected(stream) => { let send = stream.send(Packet::StreamEnd); self.state = ClientState::ClosingSendEnd(send); self.poll_complete() } _ => Ok(Async::Ready(())), } } }