//! `XMPPStream` provides encoding/decoding for XMPP use futures::sink::Send; use futures::{sink::SinkExt, task::Poll, Sink, Stream}; use std::ops::DerefMut; use std::pin::Pin; use std::sync::Mutex; use std::task::Context; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::codec::Framed; use xmpp_parsers::{Element, Jid}; use crate::stream_features::StreamFeatures; use crate::stream_start; use crate::xmpp_codec::{Packet, XMPPCodec}; use crate::Error; /// Wraps a binary stream (tokio's `AsyncRead + AsyncWrite`) to decode /// and encode XMPP packets. /// /// Implements `Sink + Stream` pub struct XMPPStream { /// The local Jabber-Id pub jid: Jid, /// Codec instance pub stream: Mutex>, /// `` for XMPP version 1.0 pub stream_features: StreamFeatures, /// Root namespace /// /// This is different for either c2s, s2s, or component /// connections. pub ns: String, /// Stream `id` attribute pub id: String, } impl XMPPStream { /// Constructor pub fn new( jid: Jid, stream: Framed, ns: String, id: String, stream_features: Element, ) -> Self { XMPPStream { jid, stream: Mutex::new(stream), stream_features: StreamFeatures::new(stream_features), ns, id, } } /// Send a `` start tag pub async fn start<'a>(stream: S, jid: Jid, ns: String) -> Result { let xmpp_stream = Framed::new(stream, XMPPCodec::new()); stream_start::start(xmpp_stream, jid, ns).await } /// Unwraps the inner stream pub fn into_inner(self) -> S { self.stream.into_inner().unwrap().into_inner() } /// Re-run `start()` pub async fn restart<'a>(self) -> Result { let stream = self.stream.into_inner().unwrap().into_inner(); Self::start(stream, self.jid, self.ns).await } } impl XMPPStream { /// Convenience method pub fn send_stanza>(&mut self, e: E) -> Send { self.send(Packet::Stanza(e.into())) } } /// Proxy to self.stream impl Sink for XMPPStream { type Error = crate::Error; fn poll_ready(self: Pin<&mut Self>, _ctx: &mut Context) -> Poll> { // Pin::new(&mut self.stream).poll_ready(ctx) // .map_err(|e| e.into()) Poll::Ready(Ok(())) } fn start_send(#[cfg_attr(rustc_least_1_48, allow(unused_mut))] mut self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> { Pin::new(&mut self.stream.lock().unwrap().deref_mut()) .start_send(item) .map_err(|e| e.into()) } fn poll_flush(#[cfg_attr(rustc_least_1_48, allow(unused_mut))] mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { Pin::new(&mut self.stream.lock().unwrap().deref_mut()) .poll_flush(cx) .map_err(|e| e.into()) } fn poll_close(#[cfg_attr(rustc_least_1_48, allow(unused_mut))] mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { Pin::new(&mut self.stream.lock().unwrap().deref_mut()) .poll_close(cx) .map_err(|e| e.into()) } } /// Proxy to self.stream impl Stream for XMPPStream { type Item = Result; fn poll_next(#[cfg_attr(rustc_least_1_48, allow(unused_mut))] mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { Pin::new(&mut self.stream.lock().unwrap().deref_mut()) .poll_next(cx) .map(|result| result.map(|result| result.map_err(|e| e.into()))) } }