xmpp-rs-mirror/tokio-xmpp/src/component/mod.rs

116 lines
3.6 KiB
Rust
Raw Normal View History

2018-08-02 17:58:19 +00:00
//! Components in XMPP are services/gateways that are logged into an
//! XMPP server under a JID consisting of just a domain name. They are
//! allowed to use any user and resource identifiers in their stanzas.
2020-03-05 00:25:24 +00:00
use futures::{sink::SinkExt, task::Poll, Sink, Stream};
use std::pin::Pin;
2017-07-22 00:59:51 +00:00
use std::str::FromStr;
2020-03-05 00:25:24 +00:00
use std::task::Context;
2018-09-01 19:59:02 +00:00
use tokio::net::TcpStream;
2020-03-05 00:25:24 +00:00
use xmpp_parsers::{Element, Jid};
2017-07-22 00:59:51 +00:00
2020-03-05 00:25:24 +00:00
use super::happy_eyeballs::connect;
2017-07-22 00:59:51 +00:00
use super::xmpp_codec::Packet;
use super::xmpp_stream;
2018-09-06 15:46:06 +00:00
use super::Error;
2017-07-22 00:59:51 +00:00
mod auth;
2018-08-02 17:58:19 +00:00
/// Component connection to an XMPP server
2020-03-05 00:25:24 +00:00
///
/// This simplifies the `XMPPStream` to a `Stream`/`Sink` of `Element`
/// (stanzas). Connection handling however is up to the user.
2017-07-22 00:59:51 +00:00
pub struct Component {
2018-08-02 17:58:19 +00:00
/// The component's Jabber-Id
2017-07-22 00:59:51 +00:00
pub jid: Jid,
2020-03-05 00:25:24 +00:00
stream: XMPPStream,
2017-07-22 00:59:51 +00:00
}
type XMPPStream = xmpp_stream::XMPPStream<TcpStream>;
const NS_JABBER_COMPONENT_ACCEPT: &str = "jabber:component:accept";
impl Component {
2018-08-02 17:58:19 +00:00
/// Start a new XMPP component
2020-03-05 00:25:24 +00:00
pub async fn new(jid: &str, password: &str, server: &str, port: u16) -> Result<Self, Error> {
2018-08-02 18:10:26 +00:00
let jid = Jid::from_str(jid)?;
2017-07-22 00:59:51 +00:00
let password = password.to_owned();
2020-03-05 00:25:24 +00:00
let stream = Self::connect(jid.clone(), password, server, port).await?;
Ok(Component { jid, stream })
2017-07-22 00:59:51 +00:00
}
2020-03-05 00:25:24 +00:00
async fn connect(
2018-12-18 18:04:31 +00:00
jid: Jid,
password: String,
server: &str,
port: u16,
2020-03-05 00:25:24 +00:00
) -> Result<XMPPStream, Error> {
2017-07-22 00:59:51 +00:00
let password = password;
2020-03-05 00:25:24 +00:00
let tcp_stream = connect(server, None, port).await?;
let mut xmpp_stream =
xmpp_stream::XMPPStream::start(tcp_stream, jid, NS_JABBER_COMPONENT_ACCEPT.to_owned())
.await?;
auth::auth(&mut xmpp_stream, password).await?;
Ok(xmpp_stream)
2017-07-22 00:59:51 +00:00
}
2020-03-05 00:25:24 +00:00
/// Send stanza
pub async fn send_stanza(&mut self, stanza: Element) -> Result<(), Error> {
self.send(stanza).await
}
/// End connection
pub async fn send_end(&mut self) -> Result<(), Error> {
self.close().await
2017-07-22 00:59:51 +00:00
}
}
impl Stream for Component {
2020-03-05 00:25:24 +00:00
type Item = Element;
2017-07-22 00:59:51 +00:00
2020-03-05 00:25:24 +00:00
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => return Poll::Ready(Some(stanza)),
Poll::Ready(Some(Ok(Packet::Text(_)))) => {
// retry
2018-12-18 18:04:31 +00:00
}
2020-03-05 00:25:24 +00:00
Poll::Ready(Some(Ok(_))) =>
// unexpected
{
return Poll::Ready(None)
2017-07-22 00:59:51 +00:00
}
2020-03-05 00:25:24 +00:00
Poll::Ready(Some(Err(_))) => return Poll::Ready(None),
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
2018-12-18 18:04:31 +00:00
}
2017-07-22 00:59:51 +00:00
}
}
}
2020-03-05 00:25:24 +00:00
impl Sink<Element> for Component {
type Error = Error;
2017-07-22 00:59:51 +00:00
2020-03-05 00:25:24 +00:00
fn start_send(mut self: Pin<&mut Self>, item: Element) -> Result<(), Self::Error> {
Pin::new(&mut self.stream)
.start_send(Packet::Stanza(item))
.map_err(|e| e.into())
2017-07-22 00:59:51 +00:00
}
2020-03-05 00:25:24 +00:00
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.stream)
.poll_ready(cx)
.map_err(|e| e.into())
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.stream)
.poll_flush(cx)
.map_err(|e| e.into())
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.stream)
.poll_close(cx)
.map_err(|e| e.into())
2017-07-22 00:59:51 +00:00
}
}