diff --git a/tokio-xmpp/examples/send_message.rs b/tokio-xmpp/examples/send_message.rs new file mode 100644 index 0000000..fd69612 --- /dev/null +++ b/tokio-xmpp/examples/send_message.rs @@ -0,0 +1,37 @@ +use std::io::{Read, stdin}; +use std::env::args; +use std::process::exit; +use std::str::FromStr; +use tokio; +use tokio_xmpp::OneshotClient as Client; +use xmpp_parsers::message::{Body, Message}; +use xmpp_parsers::Jid; + +#[tokio::main] +async fn main() { + let args: Vec<String> = args().collect(); + if args.len() != 4 { + println!("Usage: {} <jid> <password> <recipient>", args[0]); + exit(1); + } + // Configuration + let jid = &args[1]; + let password = &args[2]; + let recipient = Jid::from_str(&args[3]).unwrap(); + + // Client instance + let mut client = Client::new(jid, password.to_owned()).await.unwrap(); + + // Read from stdin + println!("Client connected, type message and submit with Ctrl-D"); + let mut body = String::new(); + stdin().lock().read_to_string(&mut body).unwrap(); + + // Send message + let mut message = Message::new(Some(recipient)); + message.bodies.insert(String::new(), Body(body.to_owned())); + client.send_stanza(message).await.unwrap(); + + // Close client connection + client.end().await.unwrap(); +} diff --git a/tokio-xmpp/src/client/mod.rs b/tokio-xmpp/src/client/mod.rs index 96ca288..277bc4c 100644 --- a/tokio-xmpp/src/client/mod.rs +++ b/tokio-xmpp/src/client/mod.rs @@ -23,6 +23,8 @@ use auth::auth; mod bind; use bind::bind; +pub mod oneshot_client; + pub const NS_XMPP_SASL: &str = "urn:ietf:params:xml:ns:xmpp-sasl"; pub const NS_XMPP_BIND: &str = "urn:ietf:params:xml:ns:xmpp-bind"; diff --git a/tokio-xmpp/src/client/oneshot_client.rs b/tokio-xmpp/src/client/oneshot_client.rs new file mode 100644 index 0000000..c677a3d --- /dev/null +++ b/tokio-xmpp/src/client/oneshot_client.rs @@ -0,0 +1,154 @@ +use futures::{sink::SinkExt, Sink, Stream}; +use idna; +use sasl::common::{ChannelBinding, Credentials}; +use std::pin::Pin; +use std::str::FromStr; +use std::task::{Context, Poll}; +use tokio::{net::TcpStream, stream::StreamExt}; +use tokio_tls::TlsStream; +use xmpp_parsers::{Element, Jid}; + +use crate::happy_eyeballs::connect; +use crate::starttls::starttls; +use crate::xmpp_codec::Packet; +use crate::xmpp_stream; +use crate::{Error, ProtocolError}; +use super::auth::auth; +use super::bind::bind; + +/// XMPP client connection and state +/// +/// This implements the `futures` crate's [`Stream`](#impl-Stream) and +/// [`Sink`](#impl-Sink<Packet>) traits. +pub struct Client { + stream: XMPPStream, +} + +type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>; +const NS_JABBER_CLIENT: &str = "jabber:client"; + +impl Client { + /// Start a new XMPP client and wait for a usable session + pub async fn new<P: Into<String>>(jid: &str, password: P) -> Result<Self, Error> { + let jid = Jid::from_str(jid)?; + let client = Self::new_with_jid(jid, password.into()).await?; + Ok(client) + } + + /// Start a new client given that the JID is already parsed. + pub async fn new_with_jid(jid: Jid, password: String) -> Result<Self, Error> { + let stream = Self::connect(jid.clone(), password.clone()).await?; + Ok(Client { stream }) + } + + async fn connect(jid: Jid, password: String) -> Result<XMPPStream, Error> { + let username = jid.clone().node().unwrap(); + let password = password; + let domain = idna::domain_to_ascii(&jid.clone().domain()).map_err(|_| Error::Idna)?; + + // TCP connection + let tcp_stream = connect(&domain, Some("_xmpp-client._tcp"), 5222).await?; + + // Unencryped XMPPStream + let xmpp_stream = + xmpp_stream::XMPPStream::start(tcp_stream, jid.clone(), NS_JABBER_CLIENT.to_owned()).await?; + + let xmpp_stream = if xmpp_stream.stream_features.can_starttls() { + // TlsStream + let tls_stream = starttls(xmpp_stream).await?; + // Encrypted XMPPStream + xmpp_stream::XMPPStream::start(tls_stream, jid.clone(), NS_JABBER_CLIENT.to_owned()).await? + } else { + return Err(Error::Protocol(ProtocolError::NoTls)); + }; + + let creds = Credentials::default() + .with_username(username) + .with_password(password) + .with_channel_binding(ChannelBinding::None); + // Authenticated (unspecified) stream + let stream = auth(xmpp_stream, creds).await?; + // Authenticated XMPPStream + let xmpp_stream = xmpp_stream::XMPPStream::start(stream, jid, NS_JABBER_CLIENT.to_owned()).await?; + + // XMPPStream bound to user session + let xmpp_stream = bind(xmpp_stream).await?; + Ok(xmpp_stream) + } + + /// Get the client's bound JID (the one reported by the XMPP + /// server). + pub fn bound_jid(&self) -> &Jid { + &self.stream.jid + } + + /// Send stanza + pub async fn send_stanza<E>(&mut self, stanza: E) -> Result<(), Error> + where + E: Into<Element>, + { + self.send(Packet::Stanza(stanza.into())).await + } + + /// End connection by sending `</stream:stream>` + /// + /// You may expect the server to respond with the same. This + /// client will then drop its connection. + pub async fn end(mut self) -> Result<(), Error> { + self.send(Packet::StreamEnd).await?; + + // Wait for stream end from server + while let Some(Ok(_)) = self.next().await {} + + Ok(()) + } +} + +/// Incoming XMPP events +/// +/// In an `async fn` you may want to use this with `use +/// futures::stream::StreamExt;` +impl Stream for Client { + type Item = Result<Element, Error>; + + /// Low-level read on the XMPP stream + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { + loop { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Pending => + return Poll::Pending, + Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => + return Poll::Ready(Some(Ok(stanza))), + Poll::Ready(Some(Ok(Packet::Text(_)))) => { + // Ignore, retry + } + Poll::Ready(_) => + // Unexpected and errors, just end + return Poll::Ready(None), + } + } + } +} + +/// Outgoing XMPP packets +/// +/// See `send_stanza()` for an `async fn` +impl Sink<Packet> for Client { + type Error = Error; + + fn start_send(mut self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> { + Pin::new(&mut self.stream).start_send(item) + } + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { + Pin::new(&mut self.stream).poll_ready(cx) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { + Pin::new(&mut self.stream).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { + Pin::new(&mut self.stream).poll_close(cx) + } +} diff --git a/tokio-xmpp/src/lib.rs b/tokio-xmpp/src/lib.rs index 9a8081a..b11ff07 100644 --- a/tokio-xmpp/src/lib.rs +++ b/tokio-xmpp/src/lib.rs @@ -12,7 +12,7 @@ pub mod xmpp_stream; pub mod stream_features; pub use crate::event::Event; mod client; -pub use crate::client::Client; +pub use client::{Client, oneshot_client::Client as OneshotClient}; mod component; pub use crate::component::Component; mod error;