2020-05-29 22:07:36 +00:00
|
|
|
|
use futures::{sink::SinkExt, task::Poll, Future, Sink, Stream};
|
|
|
|
|
use std::mem::replace;
|
|
|
|
|
use std::pin::Pin;
|
|
|
|
|
use std::task::Context;
|
|
|
|
|
use tokio::task::JoinHandle;
|
2023-06-01 09:58:04 +00:00
|
|
|
|
use xmpp_parsers::{ns, Element, Jid};
|
2020-05-29 22:07:36 +00:00
|
|
|
|
|
2023-12-31 03:08:37 +00:00
|
|
|
|
use super::connect::client_login;
|
|
|
|
|
use crate::connect::{AsyncReadAndWrite, ServerConnector};
|
2020-05-29 22:07:36 +00:00
|
|
|
|
use crate::event::Event;
|
|
|
|
|
use crate::xmpp_codec::Packet;
|
2023-12-31 03:08:37 +00:00
|
|
|
|
use crate::xmpp_stream::{add_stanza_id, XMPPStream};
|
|
|
|
|
use crate::{Error, ProtocolError};
|
2020-05-29 22:07:36 +00:00
|
|
|
|
|
|
|
|
|
/// XMPP client connection and state
|
|
|
|
|
///
|
|
|
|
|
/// It is able to reconnect. TODO: implement session management.
|
|
|
|
|
///
|
|
|
|
|
/// This implements the `futures` crate's [`Stream`](#impl-Stream) and
|
|
|
|
|
/// [`Sink`](#impl-Sink<Packet>) traits.
|
2023-12-29 06:09:33 +00:00
|
|
|
|
pub struct Client<C: ServerConnector> {
|
|
|
|
|
config: Config<C>,
|
|
|
|
|
state: ClientState<C::Stream>,
|
2020-09-11 15:39:12 +00:00
|
|
|
|
reconnect: bool,
|
|
|
|
|
// TODO: tls_required=true
|
|
|
|
|
}
|
|
|
|
|
|
2023-12-29 06:09:33 +00:00
|
|
|
|
/// XMPP client configuration
|
|
|
|
|
#[derive(Clone, Debug)]
|
|
|
|
|
pub struct Config<C> {
|
|
|
|
|
/// jid of the account
|
|
|
|
|
pub jid: Jid,
|
|
|
|
|
/// password of the account
|
|
|
|
|
pub password: String,
|
|
|
|
|
/// server configuration for the account
|
|
|
|
|
pub server: C,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
enum ClientState<S: AsyncReadAndWrite> {
|
2020-05-29 22:07:36 +00:00
|
|
|
|
Invalid,
|
|
|
|
|
Disconnected,
|
2023-12-29 06:09:33 +00:00
|
|
|
|
Connecting(JoinHandle<Result<XMPPStream<S>, Error>>),
|
|
|
|
|
Connected(XMPPStream<S>),
|
2020-05-29 22:07:36 +00:00
|
|
|
|
}
|
|
|
|
|
|
2023-12-29 06:09:33 +00:00
|
|
|
|
impl<C: ServerConnector> Client<C> {
|
2020-05-29 22:07:36 +00:00
|
|
|
|
/// Start a new client given that the JID is already parsed.
|
2023-12-29 06:09:33 +00:00
|
|
|
|
pub fn new_with_config(config: Config<C>) -> Self {
|
2023-12-30 05:07:59 +00:00
|
|
|
|
let connect = tokio::spawn(client_login(
|
2020-09-11 15:39:12 +00:00
|
|
|
|
config.server.clone(),
|
|
|
|
|
config.jid.clone(),
|
|
|
|
|
config.password.clone(),
|
2020-09-11 15:05:49 +00:00
|
|
|
|
));
|
2020-05-29 22:07:36 +00:00
|
|
|
|
let client = Client {
|
2020-12-06 13:28:57 +00:00
|
|
|
|
config,
|
2022-03-24 02:26:30 +00:00
|
|
|
|
state: ClientState::Connecting(connect),
|
2020-05-29 22:07:36 +00:00
|
|
|
|
reconnect: false,
|
|
|
|
|
};
|
|
|
|
|
client
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Set whether to reconnect (`true`) or let the stream end
|
|
|
|
|
/// (`false`) when a connection to the server has ended.
|
|
|
|
|
pub fn set_reconnect(&mut self, reconnect: bool) -> &mut Self {
|
|
|
|
|
self.reconnect = reconnect;
|
|
|
|
|
self
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Get the client's bound JID (the one reported by the XMPP
|
|
|
|
|
/// server).
|
|
|
|
|
pub fn bound_jid(&self) -> Option<&Jid> {
|
|
|
|
|
match self.state {
|
|
|
|
|
ClientState::Connected(ref stream) => Some(&stream.jid),
|
|
|
|
|
_ => None,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Send stanza
|
|
|
|
|
pub async fn send_stanza(&mut self, stanza: Element) -> Result<(), Error> {
|
2023-06-04 17:27:46 +00:00
|
|
|
|
self.send(Packet::Stanza(add_stanza_id(stanza, ns::JABBER_CLIENT)))
|
|
|
|
|
.await
|
2020-05-29 22:07:36 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// End connection by sending `</stream:stream>`
|
|
|
|
|
///
|
|
|
|
|
/// You may expect the server to respond with the same. This
|
|
|
|
|
/// client will then drop its connection.
|
|
|
|
|
///
|
|
|
|
|
/// Make sure to disable reconnect.
|
|
|
|
|
pub async fn send_end(&mut self) -> Result<(), Error> {
|
|
|
|
|
self.send(Packet::StreamEnd).await
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Incoming XMPP events
|
|
|
|
|
///
|
|
|
|
|
/// In an `async fn` you may want to use this with `use
|
|
|
|
|
/// futures::stream::StreamExt;`
|
2023-12-29 06:09:33 +00:00
|
|
|
|
impl<C: ServerConnector> Stream for Client<C> {
|
2020-05-29 22:07:36 +00:00
|
|
|
|
type Item = Event;
|
|
|
|
|
|
|
|
|
|
/// Low-level read on the XMPP stream, allowing the underlying
|
|
|
|
|
/// machinery to:
|
|
|
|
|
///
|
|
|
|
|
/// * connect,
|
|
|
|
|
/// * starttls,
|
|
|
|
|
/// * authenticate,
|
|
|
|
|
/// * bind a session, and finally
|
|
|
|
|
/// * receive stanzas
|
|
|
|
|
///
|
|
|
|
|
/// ...for your client
|
|
|
|
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
|
|
|
|
let state = replace(&mut self.state, ClientState::Invalid);
|
|
|
|
|
|
|
|
|
|
match state {
|
|
|
|
|
ClientState::Invalid => panic!("Invalid client state"),
|
|
|
|
|
ClientState::Disconnected if self.reconnect => {
|
|
|
|
|
// TODO: add timeout
|
2023-12-30 05:07:59 +00:00
|
|
|
|
let connect = tokio::spawn(client_login(
|
2020-09-11 15:39:12 +00:00
|
|
|
|
self.config.server.clone(),
|
|
|
|
|
self.config.jid.clone(),
|
|
|
|
|
self.config.password.clone(),
|
2020-09-11 15:05:49 +00:00
|
|
|
|
));
|
2022-03-24 02:26:30 +00:00
|
|
|
|
self.state = ClientState::Connecting(connect);
|
2020-05-29 22:07:36 +00:00
|
|
|
|
self.poll_next(cx)
|
|
|
|
|
}
|
|
|
|
|
ClientState::Disconnected => Poll::Ready(None),
|
2022-03-22 22:29:25 +00:00
|
|
|
|
ClientState::Connecting(mut connect) => match Pin::new(&mut connect).poll(cx) {
|
|
|
|
|
Poll::Ready(Ok(Ok(stream))) => {
|
|
|
|
|
let bound_jid = stream.jid.clone();
|
|
|
|
|
self.state = ClientState::Connected(stream);
|
|
|
|
|
Poll::Ready(Some(Event::Online {
|
|
|
|
|
bound_jid,
|
|
|
|
|
resumed: false,
|
|
|
|
|
}))
|
2020-05-29 22:07:36 +00:00
|
|
|
|
}
|
2022-03-22 22:29:25 +00:00
|
|
|
|
Poll::Ready(Ok(Err(e))) => {
|
|
|
|
|
self.state = ClientState::Disconnected;
|
|
|
|
|
return Poll::Ready(Some(Event::Disconnected(e.into())));
|
|
|
|
|
}
|
|
|
|
|
Poll::Ready(Err(e)) => {
|
|
|
|
|
self.state = ClientState::Disconnected;
|
|
|
|
|
panic!("connect task: {}", e);
|
|
|
|
|
}
|
|
|
|
|
Poll::Pending => {
|
|
|
|
|
self.state = ClientState::Connecting(connect);
|
|
|
|
|
Poll::Pending
|
|
|
|
|
}
|
|
|
|
|
},
|
2020-05-29 22:07:36 +00:00
|
|
|
|
ClientState::Connected(mut stream) => {
|
|
|
|
|
// Poll sink
|
|
|
|
|
match Pin::new(&mut stream).poll_ready(cx) {
|
|
|
|
|
Poll::Pending => (),
|
|
|
|
|
Poll::Ready(Ok(())) => (),
|
|
|
|
|
Poll::Ready(Err(e)) => {
|
|
|
|
|
self.state = ClientState::Disconnected;
|
|
|
|
|
return Poll::Ready(Some(Event::Disconnected(e.into())));
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Poll stream
|
2023-06-21 11:43:24 +00:00
|
|
|
|
//
|
|
|
|
|
// This needs to be a loop in order to ignore packets we don’t care about, or those
|
|
|
|
|
// we want to handle elsewhere. Returning something isn’t correct in those two
|
|
|
|
|
// cases because it would signal to tokio that the XMPPStream is also done, while
|
|
|
|
|
// there could be additional packets waiting for us.
|
|
|
|
|
//
|
|
|
|
|
// The proper solution is thus a loop which we exit once we have something to
|
|
|
|
|
// return.
|
|
|
|
|
loop {
|
|
|
|
|
match Pin::new(&mut stream).poll_next(cx) {
|
|
|
|
|
Poll::Ready(None) => {
|
|
|
|
|
// EOF
|
|
|
|
|
self.state = ClientState::Disconnected;
|
|
|
|
|
return Poll::Ready(Some(Event::Disconnected(Error::Disconnected)));
|
|
|
|
|
}
|
|
|
|
|
Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => {
|
|
|
|
|
// Receive stanza
|
|
|
|
|
self.state = ClientState::Connected(stream);
|
|
|
|
|
return Poll::Ready(Some(Event::Stanza(stanza)));
|
|
|
|
|
}
|
|
|
|
|
Poll::Ready(Some(Ok(Packet::Text(_)))) => {
|
|
|
|
|
// Ignore text between stanzas
|
|
|
|
|
}
|
|
|
|
|
Poll::Ready(Some(Ok(Packet::StreamStart(_)))) => {
|
|
|
|
|
// <stream:stream>
|
|
|
|
|
self.state = ClientState::Disconnected;
|
|
|
|
|
return Poll::Ready(Some(Event::Disconnected(
|
|
|
|
|
ProtocolError::InvalidStreamStart.into(),
|
|
|
|
|
)));
|
|
|
|
|
}
|
|
|
|
|
Poll::Ready(Some(Ok(Packet::StreamEnd))) => {
|
|
|
|
|
// End of stream: </stream:stream>
|
|
|
|
|
self.state = ClientState::Disconnected;
|
|
|
|
|
return Poll::Ready(Some(Event::Disconnected(Error::Disconnected)));
|
|
|
|
|
}
|
|
|
|
|
Poll::Pending => {
|
|
|
|
|
// Try again later
|
|
|
|
|
self.state = ClientState::Connected(stream);
|
|
|
|
|
return Poll::Pending;
|
|
|
|
|
}
|
|
|
|
|
Poll::Ready(Some(Err(e))) => {
|
|
|
|
|
self.state = ClientState::Disconnected;
|
|
|
|
|
return Poll::Ready(Some(Event::Disconnected(e.into())));
|
|
|
|
|
}
|
2020-05-29 22:07:36 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Outgoing XMPP packets
|
|
|
|
|
///
|
|
|
|
|
/// See `send_stanza()` for an `async fn`
|
2023-12-29 06:09:33 +00:00
|
|
|
|
impl<C: ServerConnector> Sink<Packet> for Client<C> {
|
2020-05-29 22:07:36 +00:00
|
|
|
|
type Error = Error;
|
|
|
|
|
|
|
|
|
|
fn start_send(mut self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> {
|
|
|
|
|
match self.state {
|
|
|
|
|
ClientState::Connected(ref mut stream) => {
|
|
|
|
|
Pin::new(stream).start_send(item).map_err(|e| e.into())
|
|
|
|
|
}
|
|
|
|
|
_ => Err(Error::InvalidState),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
|
|
|
|
|
match self.state {
|
|
|
|
|
ClientState::Connected(ref mut stream) => {
|
|
|
|
|
Pin::new(stream).poll_ready(cx).map_err(|e| e.into())
|
|
|
|
|
}
|
|
|
|
|
_ => Poll::Pending,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
|
|
|
|
|
match self.state {
|
|
|
|
|
ClientState::Connected(ref mut stream) => {
|
|
|
|
|
Pin::new(stream).poll_flush(cx).map_err(|e| e.into())
|
|
|
|
|
}
|
|
|
|
|
_ => Poll::Pending,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
|
|
|
|
|
match self.state {
|
|
|
|
|
ClientState::Connected(ref mut stream) => {
|
|
|
|
|
Pin::new(stream).poll_close(cx).map_err(|e| e.into())
|
|
|
|
|
}
|
|
|
|
|
_ => Poll::Pending,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|