From 537b4825a85cc997e136999126dbdb881b1bb0e9 Mon Sep 17 00:00:00 2001 From: Astro Date: Sat, 30 May 2020 00:07:36 +0200 Subject: [PATCH] tokio-xmpp: rename Client to AsyncClient --- tokio-xmpp/examples/contact_addr.rs | 2 +- tokio-xmpp/examples/download_avatars.rs | 2 +- tokio-xmpp/examples/echo_bot.rs | 2 +- tokio-xmpp/src/client/async_client.rs | 293 ++++++++++++++++++++++++ tokio-xmpp/src/client/mod.rs | 291 +---------------------- tokio-xmpp/src/lib.rs | 2 +- 6 files changed, 298 insertions(+), 294 deletions(-) create mode 100644 tokio-xmpp/src/client/async_client.rs diff --git a/tokio-xmpp/examples/contact_addr.rs b/tokio-xmpp/examples/contact_addr.rs index c81f663..fcf65eb 100644 --- a/tokio-xmpp/examples/contact_addr.rs +++ b/tokio-xmpp/examples/contact_addr.rs @@ -2,7 +2,7 @@ use futures::stream::StreamExt; use std::convert::TryFrom; use std::env::args; use std::process::exit; -use tokio_xmpp::Client; +use tokio_xmpp::AsyncClient as Client; use xmpp_parsers::{ disco::{DiscoInfoQuery, DiscoInfoResult}, iq::{Iq, IqType}, diff --git a/tokio-xmpp/examples/download_avatars.rs b/tokio-xmpp/examples/download_avatars.rs index 7562d07..7fbdf60 100644 --- a/tokio-xmpp/examples/download_avatars.rs +++ b/tokio-xmpp/examples/download_avatars.rs @@ -6,7 +6,7 @@ use std::io::{self, Write}; use std::process::exit; use std::str::FromStr; use tokio; -use tokio_xmpp::Client; +use tokio_xmpp::AsyncClient as Client; use xmpp_parsers::{ avatar::{Data as AvatarData, Metadata as AvatarMetadata}, caps::{compute_disco, hash_caps, Caps}, diff --git a/tokio-xmpp/examples/echo_bot.rs b/tokio-xmpp/examples/echo_bot.rs index 4ba837c..dcc4b02 100644 --- a/tokio-xmpp/examples/echo_bot.rs +++ b/tokio-xmpp/examples/echo_bot.rs @@ -3,7 +3,7 @@ use std::convert::TryFrom; use std::env::args; use std::process::exit; use tokio; -use tokio_xmpp::Client; +use tokio_xmpp::AsyncClient as Client; use xmpp_parsers::message::{Body, Message, MessageType}; use xmpp_parsers::presence::{Presence, Show as PresenceShow, Type as PresenceType}; use xmpp_parsers::{Element, Jid}; diff --git a/tokio-xmpp/src/client/async_client.rs b/tokio-xmpp/src/client/async_client.rs new file mode 100644 index 0000000..4df3407 --- /dev/null +++ b/tokio-xmpp/src/client/async_client.rs @@ -0,0 +1,293 @@ +use futures::{sink::SinkExt, task::Poll, Future, Sink, Stream}; +use idna; +use sasl::common::{ChannelBinding, Credentials}; +use std::mem::replace; +use std::pin::Pin; +use std::str::FromStr; +use std::task::Context; +use tokio::net::TcpStream; +use tokio::task::JoinHandle; +use tokio::task::LocalSet; +use tokio_tls::TlsStream; +use xmpp_parsers::{Element, Jid, JidParseError}; + +use super::auth::auth; +use super::bind::bind; +use crate::event::Event; +use crate::happy_eyeballs::connect; +use crate::starttls::starttls; +use crate::xmpp_codec::Packet; +use crate::xmpp_stream; +use crate::{Error, ProtocolError}; + + +/// XMPP client connection and state +/// +/// It is able to reconnect. TODO: implement session management. +/// +/// This implements the `futures` crate's [`Stream`](#impl-Stream) and +/// [`Sink`](#impl-Sink) traits. +pub struct Client { + state: ClientState, + jid: Jid, + password: String, + reconnect: bool, + // TODO: tls_required=true +} + +type XMPPStream = xmpp_stream::XMPPStream>; +const NS_JABBER_CLIENT: &str = "jabber:client"; + +enum ClientState { + Invalid, + Disconnected, + Connecting(JoinHandle>, LocalSet), + Connected(XMPPStream), +} + +impl Client { + /// Start a new XMPP client + /// + /// Start polling the returned instance so that it will connect + /// and yield events. + pub fn new>(jid: &str, password: P) -> Result { + let jid = Jid::from_str(jid)?; + let client = Self::new_with_jid(jid, password.into()); + Ok(client) + } + + /// Start a new client given that the JID is already parsed. + pub fn new_with_jid(jid: Jid, password: String) -> Self { + let local = LocalSet::new(); + let connect = local.spawn_local(Self::connect(jid.clone(), password.clone())); + let client = Client { + jid, + password, + state: ClientState::Connecting(connect, local), + reconnect: false, + }; + client + } + + /// Set whether to reconnect (`true`) or let the stream end + /// (`false`) when a connection to the server has ended. + pub fn set_reconnect(&mut self, reconnect: bool) -> &mut Self { + self.reconnect = reconnect; + self + } + + async fn connect(jid: Jid, password: String) -> Result { + let username = jid.clone().node().unwrap(); + let password = password; + let domain = idna::domain_to_ascii(&jid.clone().domain()).map_err(|_| Error::Idna)?; + + // TCP connection + let tcp_stream = connect(&domain, Some("_xmpp-client._tcp"), 5222).await?; + + // Unencryped XMPPStream + let xmpp_stream = + xmpp_stream::XMPPStream::start(tcp_stream, jid.clone(), NS_JABBER_CLIENT.to_owned()).await?; + + let xmpp_stream = if xmpp_stream.stream_features.can_starttls() { + // TlsStream + let tls_stream = starttls(xmpp_stream).await?; + // Encrypted XMPPStream + xmpp_stream::XMPPStream::start(tls_stream, jid.clone(), NS_JABBER_CLIENT.to_owned()).await? + } else { + return Err(Error::Protocol(ProtocolError::NoTls)); + }; + + let creds = Credentials::default() + .with_username(username) + .with_password(password) + .with_channel_binding(ChannelBinding::None); + // Authenticated (unspecified) stream + let stream = auth(xmpp_stream, creds).await?; + // Authenticated XMPPStream + let xmpp_stream = xmpp_stream::XMPPStream::start(stream, jid, NS_JABBER_CLIENT.to_owned()).await?; + + // XMPPStream bound to user session + let xmpp_stream = bind(xmpp_stream).await?; + Ok(xmpp_stream) + } + + /// Get the client's bound JID (the one reported by the XMPP + /// server). + pub fn bound_jid(&self) -> Option<&Jid> { + match self.state { + ClientState::Connected(ref stream) => Some(&stream.jid), + _ => None, + } + } + + /// Send stanza + pub async fn send_stanza(&mut self, stanza: Element) -> Result<(), Error> { + self.send(Packet::Stanza(stanza)).await + } + + /// End connection by sending `` + /// + /// You may expect the server to respond with the same. This + /// client will then drop its connection. + /// + /// Make sure to disable reconnect. + pub async fn send_end(&mut self) -> Result<(), Error> { + self.send(Packet::StreamEnd).await + } +} + +/// Incoming XMPP events +/// +/// In an `async fn` you may want to use this with `use +/// futures::stream::StreamExt;` +impl Stream for Client { + type Item = Event; + + /// Low-level read on the XMPP stream, allowing the underlying + /// machinery to: + /// + /// * connect, + /// * starttls, + /// * authenticate, + /// * bind a session, and finally + /// * receive stanzas + /// + /// ...for your client + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let state = replace(&mut self.state, ClientState::Invalid); + + match state { + ClientState::Invalid => panic!("Invalid client state"), + ClientState::Disconnected if self.reconnect => { + // TODO: add timeout + let mut local = LocalSet::new(); + let connect = + local.spawn_local(Self::connect(self.jid.clone(), self.password.clone())); + let _ = Pin::new(&mut local).poll(cx); + self.state = ClientState::Connecting(connect, local); + self.poll_next(cx) + } + ClientState::Disconnected => Poll::Ready(None), + ClientState::Connecting(mut connect, mut local) => { + match Pin::new(&mut connect).poll(cx) { + Poll::Ready(Ok(Ok(stream))) => { + let bound_jid = stream.jid.clone(); + self.state = ClientState::Connected(stream); + Poll::Ready(Some(Event::Online { + bound_jid, + resumed: false, + })) + } + Poll::Ready(Ok(Err(e))) => { + self.state = ClientState::Disconnected; + return Poll::Ready(Some(Event::Disconnected(e.into()))); + } + Poll::Ready(Err(e)) => { + self.state = ClientState::Disconnected; + panic!("connect task: {}", e); + } + Poll::Pending => { + let _ = Pin::new(&mut local).poll(cx); + + self.state = ClientState::Connecting(connect, local); + Poll::Pending + } + } + } + ClientState::Connected(mut stream) => { + // Poll sink + match Pin::new(&mut stream).poll_ready(cx) { + Poll::Pending => (), + Poll::Ready(Ok(())) => (), + Poll::Ready(Err(e)) => { + self.state = ClientState::Disconnected; + return Poll::Ready(Some(Event::Disconnected(e.into()))); + } + }; + + // Poll stream + match Pin::new(&mut stream).poll_next(cx) { + Poll::Ready(None) => { + // EOF + self.state = ClientState::Disconnected; + Poll::Ready(Some(Event::Disconnected(Error::Disconnected))) + } + Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => { + // Receive stanza + self.state = ClientState::Connected(stream); + Poll::Ready(Some(Event::Stanza(stanza))) + } + Poll::Ready(Some(Ok(Packet::Text(_)))) => { + // Ignore text between stanzas + self.state = ClientState::Connected(stream); + Poll::Pending + } + Poll::Ready(Some(Ok(Packet::StreamStart(_)))) => { + // + self.state = ClientState::Disconnected; + Poll::Ready(Some(Event::Disconnected( + ProtocolError::InvalidStreamStart.into(), + ))) + } + Poll::Ready(Some(Ok(Packet::StreamEnd))) => { + // End of stream: + self.state = ClientState::Disconnected; + Poll::Ready(Some(Event::Disconnected(Error::Disconnected))) + } + Poll::Pending => { + // Try again later + self.state = ClientState::Connected(stream); + Poll::Pending + } + Poll::Ready(Some(Err(e))) => { + self.state = ClientState::Disconnected; + Poll::Ready(Some(Event::Disconnected(e.into()))) + } + } + } + } + } +} + +/// Outgoing XMPP packets +/// +/// See `send_stanza()` for an `async fn` +impl Sink for Client { + type Error = Error; + + fn start_send(mut self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> { + match self.state { + ClientState::Connected(ref mut stream) => { + Pin::new(stream).start_send(item).map_err(|e| e.into()) + } + _ => Err(Error::InvalidState), + } + } + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + match self.state { + ClientState::Connected(ref mut stream) => { + Pin::new(stream).poll_ready(cx).map_err(|e| e.into()) + } + _ => Poll::Pending, + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + match self.state { + ClientState::Connected(ref mut stream) => { + Pin::new(stream).poll_flush(cx).map_err(|e| e.into()) + } + _ => Poll::Pending, + } + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + match self.state { + ClientState::Connected(ref mut stream) => { + Pin::new(stream).poll_close(cx).map_err(|e| e.into()) + } + _ => Poll::Pending, + } + } +} diff --git a/tokio-xmpp/src/client/mod.rs b/tokio-xmpp/src/client/mod.rs index 13ed923..c100064 100644 --- a/tokio-xmpp/src/client/mod.rs +++ b/tokio-xmpp/src/client/mod.rs @@ -1,297 +1,8 @@ -use futures::{sink::SinkExt, task::Poll, Future, Sink, Stream}; -use idna; -use sasl::common::{ChannelBinding, Credentials}; -use std::mem::replace; -use std::pin::Pin; -use std::str::FromStr; -use std::task::Context; -use tokio::net::TcpStream; -use tokio::task::JoinHandle; -use tokio::task::LocalSet; -use tokio_tls::TlsStream; -use xmpp_parsers::{Element, Jid, JidParseError}; - -use super::event::Event; -use super::happy_eyeballs::connect; -use super::starttls::starttls; -use super::xmpp_codec::Packet; -use super::xmpp_stream; -use super::{Error, ProtocolError}; - mod auth; -use auth::auth; mod bind; -use bind::bind; pub mod simple_client; +pub mod async_client; pub const NS_XMPP_SASL: &str = "urn:ietf:params:xml:ns:xmpp-sasl"; pub const NS_XMPP_BIND: &str = "urn:ietf:params:xml:ns:xmpp-bind"; - -/// XMPP client connection and state -/// -/// This implements the `futures` crate's [`Stream`](#impl-Stream) and -/// [`Sink`](#impl-Sink) traits. -pub struct Client { - state: ClientState, - jid: Jid, - password: String, - reconnect: bool, -} - -type XMPPStream = xmpp_stream::XMPPStream>; -const NS_JABBER_CLIENT: &str = "jabber:client"; - -enum ClientState { - Invalid, - Disconnected, - Connecting(JoinHandle>, LocalSet), - Connected(XMPPStream), -} - -impl Client { - /// Start a new XMPP client - /// - /// Start polling the returned instance so that it will connect - /// and yield events. - pub fn new>(jid: &str, password: P) -> Result { - let jid = Jid::from_str(jid)?; - let client = Self::new_with_jid(jid, password.into()); - Ok(client) - } - - /// Start a new client given that the JID is already parsed. - pub fn new_with_jid(jid: Jid, password: String) -> Self { - let local = LocalSet::new(); - let connect = local.spawn_local(Self::connect(jid.clone(), password.clone())); - let client = Client { - jid, - password, - state: ClientState::Connecting(connect, local), - reconnect: false, - }; - client - } - - /// Set whether to reconnect (`true`) or let the stream end - /// (`false`) when a connection to the server has ended. - pub fn set_reconnect(&mut self, reconnect: bool) -> &mut Self { - self.reconnect = reconnect; - self - } - - async fn connect(jid: Jid, password: String) -> Result { - let username = jid.clone().node().unwrap(); - let password = password; - let domain = idna::domain_to_ascii(&jid.clone().domain()).map_err(|_| Error::Idna)?; - - // TCP connection - let tcp_stream = connect(&domain, Some("_xmpp-client._tcp"), 5222).await?; - - // Unencryped XMPPStream - let xmpp_stream = - xmpp_stream::XMPPStream::start(tcp_stream, jid.clone(), NS_JABBER_CLIENT.to_owned()).await?; - - let xmpp_stream = if xmpp_stream.stream_features.can_starttls() { - // TlsStream - let tls_stream = starttls(xmpp_stream).await?; - // Encrypted XMPPStream - xmpp_stream::XMPPStream::start(tls_stream, jid.clone(), NS_JABBER_CLIENT.to_owned()).await? - } else { - return Err(Error::Protocol(ProtocolError::NoTls)); - }; - - let creds = Credentials::default() - .with_username(username) - .with_password(password) - .with_channel_binding(ChannelBinding::None); - // Authenticated (unspecified) stream - let stream = auth(xmpp_stream, creds).await?; - // Authenticated XMPPStream - let xmpp_stream = xmpp_stream::XMPPStream::start(stream, jid, NS_JABBER_CLIENT.to_owned()).await?; - - // XMPPStream bound to user session - let xmpp_stream = bind(xmpp_stream).await?; - Ok(xmpp_stream) - } - - /// Get the client's bound JID (the one reported by the XMPP - /// server). - pub fn bound_jid(&self) -> Option<&Jid> { - match self.state { - ClientState::Connected(ref stream) => Some(&stream.jid), - _ => None, - } - } - - /// Send stanza - pub async fn send_stanza(&mut self, stanza: Element) -> Result<(), Error> { - self.send(Packet::Stanza(stanza)).await - } - - /// End connection by sending `` - /// - /// You may expect the server to respond with the same. This - /// client will then drop its connection. - /// - /// Make sure to disable reconnect. - pub async fn send_end(&mut self) -> Result<(), Error> { - self.send(Packet::StreamEnd).await - } -} - -/// Incoming XMPP events -/// -/// In an `async fn` you may want to use this with `use -/// futures::stream::StreamExt;` -impl Stream for Client { - type Item = Event; - - /// Low-level read on the XMPP stream, allowing the underlying - /// machinery to: - /// - /// * connect, - /// * starttls, - /// * authenticate, - /// * bind a session, and finally - /// * receive stanzas - /// - /// ...for your client - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let state = replace(&mut self.state, ClientState::Invalid); - - match state { - ClientState::Invalid => panic!("Invalid client state"), - ClientState::Disconnected if self.reconnect => { - // TODO: add timeout - let mut local = LocalSet::new(); - let connect = - local.spawn_local(Self::connect(self.jid.clone(), self.password.clone())); - let _ = Pin::new(&mut local).poll(cx); - self.state = ClientState::Connecting(connect, local); - self.poll_next(cx) - } - ClientState::Disconnected => Poll::Ready(None), - ClientState::Connecting(mut connect, mut local) => { - match Pin::new(&mut connect).poll(cx) { - Poll::Ready(Ok(Ok(stream))) => { - let bound_jid = stream.jid.clone(); - self.state = ClientState::Connected(stream); - Poll::Ready(Some(Event::Online { - bound_jid, - resumed: false, - })) - } - Poll::Ready(Ok(Err(e))) => { - self.state = ClientState::Disconnected; - return Poll::Ready(Some(Event::Disconnected(e.into()))); - } - Poll::Ready(Err(e)) => { - self.state = ClientState::Disconnected; - panic!("connect task: {}", e); - } - Poll::Pending => { - let _ = Pin::new(&mut local).poll(cx); - - self.state = ClientState::Connecting(connect, local); - Poll::Pending - } - } - } - ClientState::Connected(mut stream) => { - // Poll sink - match Pin::new(&mut stream).poll_ready(cx) { - Poll::Pending => (), - Poll::Ready(Ok(())) => (), - Poll::Ready(Err(e)) => { - self.state = ClientState::Disconnected; - return Poll::Ready(Some(Event::Disconnected(e.into()))); - } - }; - - // Poll stream - match Pin::new(&mut stream).poll_next(cx) { - Poll::Ready(None) => { - // EOF - self.state = ClientState::Disconnected; - Poll::Ready(Some(Event::Disconnected(Error::Disconnected))) - } - Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => { - // Receive stanza - self.state = ClientState::Connected(stream); - Poll::Ready(Some(Event::Stanza(stanza))) - } - Poll::Ready(Some(Ok(Packet::Text(_)))) => { - // Ignore text between stanzas - self.state = ClientState::Connected(stream); - Poll::Pending - } - Poll::Ready(Some(Ok(Packet::StreamStart(_)))) => { - // - self.state = ClientState::Disconnected; - Poll::Ready(Some(Event::Disconnected( - ProtocolError::InvalidStreamStart.into(), - ))) - } - Poll::Ready(Some(Ok(Packet::StreamEnd))) => { - // End of stream: - self.state = ClientState::Disconnected; - Poll::Ready(Some(Event::Disconnected(Error::Disconnected))) - } - Poll::Pending => { - // Try again later - self.state = ClientState::Connected(stream); - Poll::Pending - } - Poll::Ready(Some(Err(e))) => { - self.state = ClientState::Disconnected; - Poll::Ready(Some(Event::Disconnected(e.into()))) - } - } - } - } - } -} - -/// Outgoing XMPP packets -/// -/// See `send_stanza()` for an `async fn` -impl Sink for Client { - type Error = Error; - - fn start_send(mut self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> { - match self.state { - ClientState::Connected(ref mut stream) => { - Pin::new(stream).start_send(item).map_err(|e| e.into()) - } - _ => Err(Error::InvalidState), - } - } - - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match self.state { - ClientState::Connected(ref mut stream) => { - Pin::new(stream).poll_ready(cx).map_err(|e| e.into()) - } - _ => Poll::Pending, - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match self.state { - ClientState::Connected(ref mut stream) => { - Pin::new(stream).poll_flush(cx).map_err(|e| e.into()) - } - _ => Poll::Pending, - } - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match self.state { - ClientState::Connected(ref mut stream) => { - Pin::new(stream).poll_close(cx).map_err(|e| e.into()) - } - _ => Poll::Pending, - } - } -} diff --git a/tokio-xmpp/src/lib.rs b/tokio-xmpp/src/lib.rs index bef7a47..11fc5fa 100644 --- a/tokio-xmpp/src/lib.rs +++ b/tokio-xmpp/src/lib.rs @@ -12,7 +12,7 @@ pub mod xmpp_stream; pub mod stream_features; pub use crate::event::Event; mod client; -pub use client::{Client, simple_client::Client as SimpleClient}; +pub use client::{async_client::Client as AsyncClient, simple_client::Client as SimpleClient}; mod component; pub use crate::component::Component; mod error;