//! `XMPPStream` provides encoding/decoding for XMPP use futures::sink::Send; use futures::{sink::SinkExt, task::Poll, Sink, Stream}; use std::pin::Pin; use std::task::Context; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::codec::Framed; use xmpp_parsers::{Element, Jid}; use crate::stream_features::StreamFeatures; use crate::stream_start; use crate::xmpp_codec::{Packet, XMPPCodec}; use crate::Error; /// Wraps a binary stream (tokio's `AsyncRead + AsyncWrite`) to decode /// and encode XMPP packets. /// /// Implements `Sink + Stream` pub struct XMPPStream { /// The local Jabber-Id pub jid: Jid, /// Codec instance pub stream: Framed, /// `` for XMPP version 1.0 pub stream_features: StreamFeatures, /// Root namespace /// /// This is different for either c2s, s2s, or component /// connections. pub ns: String, /// Stream `id` attribute pub id: String, } impl XMPPStream { /// Constructor pub fn new( jid: Jid, stream: Framed, ns: String, id: String, stream_features: Element, ) -> Self { XMPPStream { jid, stream, stream_features: StreamFeatures::new(stream_features), ns, id, } } /// Send a `` start tag pub async fn start(stream: S, jid: Jid, ns: String) -> Result { let xmpp_stream = Framed::new(stream, XMPPCodec::new()); stream_start::start(xmpp_stream, jid, ns).await } /// Unwraps the inner stream pub fn into_inner(self) -> S { self.stream.into_inner() } /// Re-run `start()` pub async fn restart(self) -> Result { let stream = self.stream.into_inner(); Self::start(stream, self.jid, self.ns).await } } impl XMPPStream { /// Convenience method pub fn send_stanza>(&mut self, e: E) -> Send { self.send(Packet::Stanza(e.into())) } } /// Proxy to self.stream impl Sink for XMPPStream { type Error = crate::Error; fn poll_ready(self: Pin<&mut Self>, _ctx: &mut Context) -> Poll> { // Pin::new(&mut self.stream).poll_ready(ctx) // .map_err(|e| e.into()) Poll::Ready(Ok(())) } fn start_send( #[cfg_attr(rustc_least_1_46, allow(unused_mut))] mut self: Pin<&mut Self>, item: Packet, ) -> Result<(), Self::Error> { Pin::new(&mut self.stream) .start_send(item) .map_err(|e| e.into()) } fn poll_flush( #[cfg_attr(rustc_least_1_46, allow(unused_mut))] mut self: Pin<&mut Self>, cx: &mut Context, ) -> Poll> { Pin::new(&mut self.stream) .poll_flush(cx) .map_err(|e| e.into()) } fn poll_close( #[cfg_attr(rustc_least_1_46, allow(unused_mut))] mut self: Pin<&mut Self>, cx: &mut Context, ) -> Poll> { Pin::new(&mut self.stream) .poll_close(cx) .map_err(|e| e.into()) } } /// Proxy to self.stream impl Stream for XMPPStream { type Item = Result; fn poll_next( #[cfg_attr(rustc_least_1_46, allow(unused_mut))] mut self: Pin<&mut Self>, cx: &mut Context, ) -> Poll> { Pin::new(&mut self.stream) .poll_next(cx) .map(|result| result.map(|result| result.map_err(|e| e.into()))) } }