WIP: stream-worker

Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
This commit is contained in:
Maxime “pep” Buquet 2024-01-21 21:20:40 +01:00
parent 320ef0781e
commit 85826c5ece
Signed by: pep
GPG key ID: DEDA74AEECA9D0F2
26 changed files with 382 additions and 127 deletions

View file

@ -1,4 +1,5 @@
use futures::{sink::SinkExt, task::Poll, Future, Sink, Stream};
use std::fmt;
use std::mem::replace;
use std::pin::Pin;
use std::task::Context;
@ -19,7 +20,8 @@ use crate::{Error, ProtocolError};
///
/// This implements the `futures` crate's [`Stream`](#impl-Stream) and
/// [`Sink`](#impl-Sink<Packet>) traits.
pub struct Client<C: ServerConnector> {
#[derive(Debug)]
pub struct Client<C: ServerConnector + fmt::Debug> {
config: Config<C>,
state: ClientState<C::Stream>,
reconnect: bool,
@ -44,9 +46,25 @@ enum ClientState<S: AsyncReadAndWrite> {
Connected(XMPPStream<S>),
}
impl<S: AsyncReadAndWrite> fmt::Debug for ClientState<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"ClientState::{}",
match self {
ClientState::Invalid => "Invalid",
ClientState::Disconnected => "Disconnected",
ClientState::Connecting(_) => "Connecting(_)",
ClientState::Connected(_) => "Connected(_)",
}
)
}
}
impl<C: ServerConnector> Client<C> {
/// Start a new client given that the JID is already parsed.
pub fn new_with_config(config: Config<C>) -> Self {
println!("MEH0: {:?}", config);
let connect = tokio::spawn(client_login(
config.server.clone(),
config.jid.clone(),
@ -119,6 +137,7 @@ impl<C: ServerConnector> Stream for Client<C> {
///
/// ...for your client
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
println!("BAZ0: State: {:?}", self.state);
let state = replace(&mut self.state, ClientState::Invalid);
match state {
@ -136,6 +155,7 @@ impl<C: ServerConnector> Stream for Client<C> {
ClientState::Disconnected => Poll::Ready(None),
ClientState::Connecting(mut connect) => match Pin::new(&mut connect).poll(cx) {
Poll::Ready(Ok(Ok(stream))) => {
println!("BAZ1");
let bound_jid = stream.jid.clone();
self.state = ClientState::Connected(stream);
Poll::Ready(Some(Event::Online {
@ -144,14 +164,17 @@ impl<C: ServerConnector> Stream for Client<C> {
}))
}
Poll::Ready(Ok(Err(e))) => {
println!("BAZ2");
self.state = ClientState::Disconnected;
return Poll::Ready(Some(Event::Disconnected(e.into())));
}
Poll::Ready(Err(e)) => {
println!("BAZ3");
self.state = ClientState::Disconnected;
panic!("connect task: {}", e);
}
Poll::Pending => {
println!("BAZ4");
self.state = ClientState::Connecting(connect);
Poll::Pending
}

View file

@ -16,7 +16,7 @@ pub trait ServerConnectorError: std::error::Error + Sync + Send {}
/// Trait called to connect to an XMPP server, perhaps called multiple times
pub trait ServerConnector: Clone + core::fmt::Debug + Send + Unpin + 'static {
/// The type of Stream this ServerConnector produces
type Stream: AsyncReadAndWrite;
type Stream: AsyncReadAndWrite + core::fmt::Debug;
/// Error type to return
type Error: ServerConnectorError;
/// This must return the connection ready to login, ie if starttls is involved, after TLS has been started, and then after the <stream headers are exchanged

41
xmpp/examples/foo.rs Normal file
View file

@ -0,0 +1,41 @@
// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use env_logger;
use std::env::args;
use std::str::FromStr;
use tokio_xmpp::parsers::{message::MessageType, BareJid, Jid};
use xmpp::{ClientBuilder, ClientFeature, ClientType, Event};
#[tokio::main]
async fn main() -> Result<(), Option<()>> {
env_logger::init();
let args: Vec<String> = args().collect();
if args.len() != 3 {
println!("Usage: {} <jid> <password>", args[0]);
return Err(None);
}
let jid = BareJid::from_str(&args[1]).expect(&format!("Invalid JID: {}", &args[1]));
let password = &args[2];
// Client instance
let mut client = ClientBuilder::new(jid, password)
.set_client(ClientType::Bot, "xmpp-rs")
.set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
.set_default_nick("bot")
.enable_feature(ClientFeature::Avatars)
.enable_feature(ClientFeature::ContactList)
.enable_feature(ClientFeature::JoinRooms)
.build();
println!("FOO0: {:?}", client);
tokio::time::sleep(tokio::time::Duration::from_millis(10000)).await;
Ok(())
}

View file

@ -4,46 +4,88 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use std::io;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use tokio::sync::{mpsc, oneshot};
use tokio_xmpp::connect::ServerConnector;
pub use tokio_xmpp::parsers;
use tokio_xmpp::parsers::{disco::DiscoInfoResult, message::MessageType};
pub use tokio_xmpp::{AsyncClient as TokioXmppClient, BareJid, Element, FullJid, Jid};
use crate::{event_loop, message, muc, upload, Error, Event, RoomNick};
use crate::stream::{xml_stream_worker, IqRequest, IqResponse, NonTransactional, Request};
use crate::{message, muc, upload, Error, Event, RoomNick};
pub struct Agent<C: ServerConnector> {
pub(crate) client: TokioXmppClient<C>,
#[derive(Debug)]
pub struct Agent {
// pub(crate) client: TokioXmppClient<C>,
boundjid: Jid,
pub(crate) default_nick: Arc<RwLock<String>>,
pub(crate) lang: Arc<Vec<String>>,
pub(crate) disco: DiscoInfoResult,
pub(crate) node: String,
pub(crate) uploads: Vec<(String, Jid, PathBuf)>,
pub(crate) awaiting_disco_bookmarks_type: bool,
cmdq: mpsc::UnboundedSender<Request>,
miscq: mpsc::UnboundedSender<NonTransactional>,
}
impl<C: ServerConnector> Agent<C> {
async fn new(
client: TokioXmppClient,
impl Agent {
pub(crate) fn new<C: ServerConnector>(
client: TokioXmppClient<C>,
default_nick: String,
lang: String,
diso: DiscoInfoResult,
lang: Vec<String>,
disco: DiscoInfoResult,
node: String,
) -> Agent {
Agent {
client,
) -> Result<Agent, Error> {
let (cmdtx, cmdrx) = mpsc::unbounded_channel();
let (misctx, miscrx) = mpsc::unbounded_channel();
let _ = tokio::spawn(xml_stream_worker(client, cmdrx, miscrx));
Ok(Agent {
cmdq: cmdtx,
miscq: misctx,
// client,
boundjid: Jid::new("foo@bar/meh").unwrap(),
default_nick: Arc::new(RwLock::new(default_nick)),
lang: Arc::new(lang),
disco,
node,
uploads: Vec::new(),
awaiting_disco_bookmarks_type: false,
}
})
}
pub async fn send_stanza(&mut self, _stanza: Element) -> Result<(), Error> {
Ok(())
}
/*
pub async fn send_iq(&self, req: IqRequest) -> io::Result<IqResponse> {
let (tx, rx) = oneshot::channel();
let req = Request::SendIq {
to: req.to,
data: req.data,
response: tx,
};
Ok(self.cmdq.send(req).unwrap());
Ok(rx.await.unwrap()?)
}
*/
pub async fn disconnect(&mut self) -> Result<(), Error> {
self.client.send_end().await
let (tx, rx) = oneshot::channel();
let req = Request::Disconnect { response: tx };
let _ = Ok::<(), io::Error>(self.cmdq.send(req).unwrap());
Ok(rx.await.unwrap()?)
}
/// Get the bound jid of the client.
///
/// If the client is not connected, this will be None.
pub fn bound_jid(&self) -> Option<&Jid> {
Some(&self.boundjid)
}
pub async fn join_room(
@ -98,6 +140,7 @@ impl<C: ServerConnector> Agent<C> {
muc::private_message::send_room_private_message(self, room, recipient, lang, text).await
}
/*
/// Wait for new events.
///
/// # Returns
@ -107,15 +150,9 @@ impl<C: ServerConnector> Agent<C> {
pub async fn wait_for_events(&mut self) -> Option<Vec<Event>> {
event_loop::wait_for_events(self).await
}
*/
pub async fn upload_file_with(&mut self, service: &str, path: &Path) {
upload::send::upload_file_with(self, service, path).await
}
/// Get the bound jid of the client.
///
/// If the client is not connected, this will be None.
pub fn bound_jid(&self) -> Option<&Jid> {
self.client.bound_jid()
}
}

View file

@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use std::sync::{Arc, RwLock};
use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
parsers::{
@ -14,7 +13,7 @@ use tokio_xmpp::{
AsyncClient as TokioXmppClient, AsyncConfig, BareJid, Jid,
};
use crate::{Agent, ClientFeature};
use crate::{Agent, ClientFeature, Error};
#[derive(Debug)]
pub enum ClientType {
@ -134,7 +133,7 @@ impl<C: ServerConnector> ClientBuilder<'_, C> {
}
}
pub fn build(self) -> Agent<C> {
pub fn build(self) -> Result<Agent, Error> {
let jid: Jid = if let Some(resource) = &self.resource {
self.jid.with_resource_str(resource).unwrap().into()
} else {
@ -151,10 +150,10 @@ impl<C: ServerConnector> ClientBuilder<'_, C> {
}
// This function is meant to be used for testing build
pub(crate) fn build_impl(self, client: TokioXmppClient<C>) -> Agent<C> {
pub(crate) fn build_impl(self, client: TokioXmppClient<C>) -> Result<Agent, Error> {
let disco = self.make_disco();
let node = self.website;
Agent::new(client, default_nick, lang, disco, node)
Agent::new(client, self.default_nick, self.lang, disco, node)
}
}

View file

@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
parsers::{
bookmarks,
@ -24,11 +23,7 @@ use crate::Agent;
// FIXME: To be removed in the future
// The server doesn't return disco#info feature when querying the account
// so we add it manually because we know it's true
pub async fn handle_disco_info_result_payload<C: ServerConnector>(
agent: &mut Agent<C>,
payload: Element,
from: Jid,
) {
pub async fn handle_disco_info_result_payload(agent: &mut Agent, payload: Element, from: Jid) {
match DiscoInfoResult::try_from(payload.clone()) {
Ok(disco) => {
handle_disco_info_result(agent, disco, from).await;
@ -60,13 +55,9 @@ pub async fn handle_disco_info_result_payload<C: ServerConnector>(
}
}
pub async fn handle_disco_info_result<C: ServerConnector>(
agent: &mut Agent<C>,
disco: DiscoInfoResult,
from: Jid,
) {
pub async fn handle_disco_info_result(agent: &mut Agent, disco: DiscoInfoResult, from: Jid) {
// Safe unwrap because no DISCO is received when we are not online
if from == agent.client.bound_jid().unwrap().to_bare() && agent.awaiting_disco_bookmarks_type {
if from == agent.bound_jid().unwrap().to_bare() && agent.awaiting_disco_bookmarks_type {
info!("Received disco info about bookmarks type");
// Trigger bookmarks query
// TODO: only send this when the JoinRooms feature is enabled.
@ -82,7 +73,7 @@ pub async fn handle_disco_info_result<C: ServerConnector>(
if perform_bookmarks2 {
// XEP-0402 bookmarks (modern)
let iq = Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2))).into();
let _ = agent.client.send_stanza(iq).await;
let _ = agent.send_stanza(iq).await;
} else {
// XEP-0048 v1.0 bookmarks (legacy)
let iq = Iq::from_get(
@ -92,7 +83,7 @@ pub async fn handle_disco_info_result<C: ServerConnector>(
},
)
.into();
let _ = agent.client.send_stanza(iq).await;
let _ = agent.send_stanza(iq).await;
}
} else {
unimplemented!("Ignored disco#info response from {}", from);

71
xmpp/src/error.rs Normal file
View file

@ -0,0 +1,71 @@
// Copyright (c) 2024-2099 xmpp-rs contributors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// use tokio::sync::mpsc::error as mpsc_error;
// use tokio::sync::oneshot::error as oneshot_error;
use tokio_xmpp::Error as XmppError;
use std::error::Error as StdError;
use std::fmt;
use std::io::Error as IOError;
#[derive(Debug)]
pub enum Error {
/// IO Errors
IO(IOError),
/// Errors from tokio-xmpp
Xmpp(XmppError),
// MpscSend(mpsc_error::SendError),
// OneshotRecv(oneshot_error::RecvError),
}
impl StdError for Error {
fn cause(&self) -> Option<&dyn StdError> {
match self {
Error::IO(e) => Some(e),
Error::Xmpp(e) => Some(e),
// Error::MpscSend(e) => Some(e),
// Error::OneshotRecv(e) => Some(e),
}
}
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::IO(e) => write!(fmt, "IO Error: {}", e),
Error::Xmpp(e) => write!(fmt, "XMPP Error: {}", e),
// Error::MpscSend(e) => write!(fmt, "Mpsc Send Error: {}", e),
// Error::OneshotRecv(e) => write!(fmt, "Oneshot Recv Error: {}", e),
}
}
}
impl From<IOError> for Error {
fn from(err: IOError) -> Error {
Error::IO(err)
}
}
impl From<tokio_xmpp::Error> for Error {
fn from(err: tokio_xmpp::Error) -> Error {
Error::Xmpp(err)
}
}
/*
impl From<mpsc_error::SendError> for Error {
fn from(err: mpsc_error::SendError) -> Error {
Error::MpscSend(err)
}
}
impl From<oneshot_error::RecvError> for Error {
fn from(err: oneshot_error::RecvError) -> Error {
Error::OneshotRecv(err)
}
}
*/

View file

@ -4,16 +4,15 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#[cfg(feature = "avatars")]
use tokio_xmpp::parsers::Jid;
use tokio_xmpp::parsers::{bookmarks2, message::Body, roster::Item as RosterItem, BareJid};
use tokio_xmpp::parsers::{bookmarks2, message::Body, roster::Item as RosterItem, BareJid, Jid};
use tokio_xmpp::Error as TokioXmppError;
use crate::{delay::StanzaTimeInfo, Error, Id, RoomNick};
use crate::{delay::StanzaTimeInfo, Id, RoomNick};
#[derive(Debug)]
pub enum Event {
Online,
Disconnected(Error),
Disconnected(TokioXmppError),
ContactAdded(RosterItem),
ContactRemoved(RosterItem),
ContactChanged(RosterItem),

View file

@ -4,8 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use futures::StreamExt;
use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
parsers::{
disco::DiscoInfoQuery, iq::Iq, message::Message, presence::Presence, roster::Roster,
@ -21,7 +19,7 @@ use crate::{iq, message, presence, Agent, Event};
///
/// - `Some(events)` if there are new events; multiple may be returned at once.
/// - `None` if the underlying stream is closed.
pub async fn wait_for_events<C: ServerConnector>(agent: &mut Agent<C>) -> Option<Vec<Event>> {
pub async fn wait_for_events(agent: &mut Agent) -> Option<Vec<Event>> {
if let Some(event) = agent.client.next().await {
let mut events = Vec::new();
@ -29,7 +27,7 @@ pub async fn wait_for_events<C: ServerConnector>(agent: &mut Agent<C>) -> Option
TokioXmppEvent::Online { resumed: false, .. } => {
let presence =
presence::send::make_initial_presence(&agent.disco, &agent.node).into();
let _ = agent.client.send_stanza(presence).await;
let _ = agent.send_stanza(presence).await;
events.push(Event::Online);
// TODO: only send this when the ContactList feature is enabled.
let iq = Iq::from_get(
@ -40,11 +38,11 @@ pub async fn wait_for_events<C: ServerConnector>(agent: &mut Agent<C>) -> Option
},
)
.into();
let _ = agent.client.send_stanza(iq).await;
let _ = agent.send_stanza(iq).await;
// Query account disco to know what bookmarks spec is used
let iq = Iq::from_get("disco-account", DiscoInfoQuery { node: None }).into();
let _ = agent.client.send_stanza(iq).await;
let _ = agent.send_stanza(iq).await;
agent.awaiting_disco_bookmarks_type = true;
}
TokioXmppEvent::Online { resumed: true, .. } => {}

View file

@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
parsers::{
disco::DiscoInfoQuery,
@ -17,8 +16,8 @@ use tokio_xmpp::{
use crate::{Agent, Event};
pub async fn handle_iq_get<C: ServerConnector>(
agent: &mut Agent<C>,
pub async fn handle_iq_get(
agent: &mut Agent,
_events: &mut Vec<Event>,
from: Jid,
_to: Option<Jid>,
@ -32,7 +31,7 @@ pub async fn handle_iq_get<C: ServerConnector>(
let mut disco_info = agent.disco.clone();
disco_info.node = query.node;
let iq = Iq::from_result(id, Some(disco_info)).with_to(from).into();
let _ = agent.client.send_stanza(iq).await;
let _ = agent.send_stanza(iq).await;
}
Err(err) => {
let error = StanzaError::new(
@ -42,7 +41,7 @@ pub async fn handle_iq_get<C: ServerConnector>(
&format!("{}", err),
);
let iq = Iq::from_error(id, error).with_to(from).into();
let _ = agent.client.send_stanza(iq).await;
let _ = agent.send_stanza(iq).await;
}
}
} else {
@ -54,6 +53,6 @@ pub async fn handle_iq_get<C: ServerConnector>(
"No handler defined for this kind of iq.",
);
let iq = Iq::from_error(id, error).with_to(from).into();
let _ = agent.client.send_stanza(iq).await;
let _ = agent.send_stanza(iq).await;
}
}

View file

@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::parsers::iq::{Iq, IqType};
use crate::{Agent, Event};
@ -13,12 +12,12 @@ pub mod get;
pub mod result;
pub mod set;
pub async fn handle_iq<C: ServerConnector>(agent: &mut Agent<C>, iq: Iq) -> Vec<Event> {
pub async fn handle_iq(agent: &mut Agent, iq: Iq) -> Vec<Event> {
let mut events = vec![];
let from = iq
.from
.clone()
.unwrap_or_else(|| agent.client.bound_jid().unwrap().to_bare().into());
.unwrap_or_else(|| agent.bound_jid().unwrap().to_bare().into());
if let IqType::Get(payload) = iq.payload {
get::handle_iq_get(agent, &mut events, from, iq.to, iq.id, payload).await;
} else if let IqType::Result(Some(payload)) = iq.payload {

View file

@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
parsers::{ns, private::Query as PrivateXMLQuery, roster::Roster},
Element, Jid,
@ -12,8 +11,8 @@ use tokio_xmpp::{
use crate::{disco, pubsub, upload, Agent, Event};
pub async fn handle_iq_result<C: ServerConnector>(
agent: &mut Agent<C>,
pub async fn handle_iq_result(
agent: &mut Agent,
events: &mut Vec<Event>,
from: Jid,
_to: Option<Jid>,
@ -22,7 +21,7 @@ pub async fn handle_iq_result<C: ServerConnector>(
) {
// TODO: move private iqs like this one somewhere else, for
// security reasons.
if payload.is("query", ns::ROSTER) && from == agent.client.bound_jid().unwrap().to_bare() {
if payload.is("query", ns::ROSTER) && from == agent.bound_jid().unwrap().to_bare() {
let roster = Roster::try_from(payload).unwrap();
for item in roster.items.into_iter() {
events.push(Event::ContactAdded(item));

View file

@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
parsers::{
iq::Iq,
@ -15,8 +14,8 @@ use tokio_xmpp::{
use crate::{Agent, Event};
pub async fn handle_iq_set<C: ServerConnector>(
agent: &mut Agent<C>,
pub async fn handle_iq_set(
agent: &mut Agent,
_events: &mut Vec<Event>,
from: Jid,
_to: Option<Jid>,
@ -31,5 +30,5 @@ pub async fn handle_iq_set<C: ServerConnector>(
"No handler defined for this kind of iq.",
);
let iq = Iq::from_error(id, error).with_to(from).into();
let _ = agent.client.send_stanza(iq).await;
let _ = agent.send_stanza(iq).await;
}

View file

@ -15,23 +15,25 @@ pub mod agent;
pub mod builder;
pub mod delay;
pub mod disco;
pub mod error;
pub mod event;
pub mod event_loop;
// pub mod event_loop;
pub mod feature;
pub mod iq;
pub mod message;
pub mod muc;
pub mod presence;
pub mod pubsub;
mod stream;
pub mod upload;
// Module re-exports
pub use agent::Agent;
pub use builder::{ClientBuilder, ClientType};
pub use error::Error;
pub use event::Event;
pub use feature::ClientFeature;
pub type Error = tokio_xmpp::Error;
pub type Id = Option<String>;
pub type RoomNick = String;

View file

@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
parsers::{message::Message, muc::user::MucUser},
Jid,
@ -12,8 +11,8 @@ use tokio_xmpp::{
use crate::{delay::StanzaTimeInfo, Agent, Event};
pub async fn handle_message_chat<C: ServerConnector>(
agent: &mut Agent<C>,
pub async fn handle_message_chat(
agent: &mut Agent,
events: &mut Vec<Event>,
from: Jid,
message: &Message,

View file

@ -4,13 +4,12 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{parsers::message::Message, Jid};
use crate::{delay::StanzaTimeInfo, Agent, Event};
pub async fn handle_message_group_chat<C: ServerConnector>(
agent: &mut Agent<C>,
pub async fn handle_message_group_chat(
agent: &mut Agent,
events: &mut Vec<Event>,
from: Jid,
message: &Message,

View file

@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::parsers::{
message::{Message, MessageType},
ns,
@ -15,10 +14,7 @@ use crate::{delay::message_time_info, pubsub, Agent, Event};
pub mod chat;
pub mod group_chat;
pub async fn handle_message<C: ServerConnector>(
agent: &mut Agent<C>,
message: Message,
) -> Vec<Event> {
pub async fn handle_message(agent: &mut Agent, message: Message) -> Vec<Event> {
let mut events = vec![];
let from = message.from.clone().unwrap();
let time_info = message_time_info(&message);

View file

@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
parsers::message::{Body, Message, MessageType},
Jid,
@ -12,8 +11,8 @@ use tokio_xmpp::{
use crate::Agent;
pub async fn send_message<C: ServerConnector>(
agent: &mut Agent<C>,
pub async fn send_message(
agent: &mut Agent,
recipient: Jid,
type_: MessageType,
lang: &str,
@ -24,5 +23,5 @@ pub async fn send_message<C: ServerConnector>(
message
.bodies
.insert(String::from(lang), Body(String::from(text)));
let _ = agent.client.send_stanza(message.into()).await;
let _ = agent.send_stanza(message.into()).await;
}

View file

@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
parsers::{
message::{Body, Message, MessageType},
@ -15,8 +14,8 @@ use tokio_xmpp::{
use crate::{Agent, RoomNick};
pub async fn send_room_private_message<C: ServerConnector>(
agent: &mut Agent<C>,
pub async fn send_room_private_message(
agent: &mut Agent,
room: BareJid,
recipient: RoomNick,
lang: &str,
@ -28,5 +27,5 @@ pub async fn send_room_private_message<C: ServerConnector>(
message
.bodies
.insert(String::from(lang), Body(String::from(text)));
let _ = agent.client.send_stanza(message.into()).await;
let _ = agent.send_stanza(message.into()).await;
}

View file

@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
parsers::{
muc::Muc,
@ -15,8 +14,8 @@ use tokio_xmpp::{
use crate::{Agent, RoomNick};
pub async fn join_room<C: ServerConnector>(
agent: &mut Agent<C>,
pub async fn join_room(
agent: &mut Agent,
room: BareJid,
nick: Option<String>,
password: Option<String>,
@ -33,7 +32,7 @@ pub async fn join_room<C: ServerConnector>(
let mut presence = Presence::new(PresenceType::None).with_to(room_jid);
presence.add_payload(muc);
presence.set_status(String::from(lang), String::from(status));
let _ = agent.client.send_stanza(presence.into()).await;
let _ = agent.send_stanza(presence.into()).await;
}
/// Send a "leave room" request to the server (specifically, an "unavailable" presence stanza).
@ -55,8 +54,8 @@ pub async fn join_room<C: ServerConnector>(
/// * `nickname`: The nickname to use in the room.
/// * `lang`: The language of the status message.
/// * `status`: The status message to send.
pub async fn leave_room<C: ServerConnector>(
agent: &mut Agent<C>,
pub async fn leave_room(
agent: &mut Agent,
room_jid: BareJid,
nickname: RoomNick,
lang: impl Into<String>,
@ -76,7 +75,7 @@ pub async fn leave_room<C: ServerConnector>(
presence.set_status(lang, status);
// Send the presence stanza.
if let Err(e) = agent.client.send_stanza(presence.into()).await {
if let Err(e) = agent.send_stanza(presence.into()).await {
// Report any errors to the log.
error!("Failed to send leave room presence: {}", e);
}

View file

@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::parsers::{
muc::user::{MucUser, Status},
presence::{Presence, Type as PresenceType},
@ -13,10 +12,7 @@ use tokio_xmpp::parsers::{
use crate::{Agent, Event};
/// Translate a `Presence` stanza into a list of higher-level `Event`s.
pub async fn handle_presence<C: ServerConnector>(
_agent: &mut Agent<C>,
presence: Presence,
) -> Vec<Event> {
pub async fn handle_presence(_agent: &mut Agent, presence: Presence) -> Vec<Event> {
// Allocate an empty vector to store the events.
let mut events = vec![];

View file

@ -8,7 +8,6 @@ use super::Agent;
use crate::Event;
use std::fs::{self, File};
use std::io::{self, Write};
use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::parsers::{
avatar::{Data, Metadata},
iq::Iq,
@ -21,9 +20,9 @@ use tokio_xmpp::parsers::{
Jid,
};
pub(crate) async fn handle_metadata_pubsub_event<C: ServerConnector>(
pub(crate) async fn handle_metadata_pubsub_event(
from: &Jid,
agent: &mut Agent<C>,
agent: &mut Agent,
items: Vec<Item>,
) -> Vec<Event> {
let mut events = Vec::new();
@ -43,7 +42,7 @@ pub(crate) async fn handle_metadata_pubsub_event<C: ServerConnector>(
events.push(Event::AvatarRetrieved(from.clone(), filename));
} else {
let iq = download_avatar(from);
let _ = agent.client.send_stanza(iq.into()).await;
let _ = agent.send_stanza(iq.into()).await;
}
}
}

View file

@ -7,24 +7,21 @@
use super::Agent;
use crate::Event;
use std::str::FromStr;
use tokio_xmpp::{
connect::ServerConnector,
parsers::{
bookmarks2::{self, Autojoin},
ns,
pubsub::event::PubSubEvent,
pubsub::pubsub::PubSub,
BareJid, Element, Jid,
},
use tokio_xmpp::parsers::{
bookmarks2::{self, Autojoin},
ns,
pubsub::event::PubSubEvent,
pubsub::pubsub::PubSub,
BareJid, Element, Jid,
};
#[cfg(feature = "avatars")]
pub(crate) mod avatar;
pub(crate) async fn handle_event<C: ServerConnector>(
pub(crate) async fn handle_event(
#[cfg_attr(not(feature = "avatars"), allow(unused_variables))] from: &Jid,
elem: Element,
#[cfg_attr(not(feature = "avatars"), allow(unused_variables))] agent: &mut Agent<C>,
#[cfg_attr(not(feature = "avatars"), allow(unused_variables))] agent: &mut Agent,
) -> Vec<Event> {
let mut events = Vec::new();
let event = PubSubEvent::try_from(elem);

121
xmpp/src/stream.rs Normal file
View file

@ -0,0 +1,121 @@
// Copyright (c) 2024-2099 xmpp-rs contributors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use std::collections::HashMap;
use std::io;
use futures::stream::StreamExt;
use tokio::{sync::mpsc::UnboundedReceiver, sync::oneshot};
use tokio_xmpp::parsers::{
iq::IqType, message::Message, presence::Presence, stanza_error::StanzaError, Element, Jid,
};
use tokio_xmpp::{connect::ServerConnector, AsyncClient as TokioXmppClient};
#[derive(Debug)]
pub enum IqRequestType {
Get(Element),
Set(Element),
}
#[derive(Debug)]
pub struct IqRequest {
pub to: Jid,
pub data: IqRequestType,
}
#[derive(Debug)]
pub enum IqResponseType {
Result(Option<Element>),
Error(StanzaError),
}
#[derive(Debug)]
pub struct IqResponse {
pub from: Option<Jid>,
pub to: Option<Jid>,
pub data: IqResponseType,
}
impl From<IqRequestType> for IqType {
fn from(other: IqRequestType) -> IqType {
match other {
IqRequestType::Get(e) => IqType::Get(e),
IqRequestType::Set(e) => IqType::Set(e),
}
}
}
impl From<IqResponseType> for IqType {
fn from(other: IqResponseType) -> IqType {
match other {
IqResponseType::Result(e) => IqType::Result(e),
IqResponseType::Error(e) => IqType::Error(e),
}
}
}
#[derive(Debug)]
pub enum Request {
SendMessage {
message: Message,
response: oneshot::Sender<io::Result<()>>,
},
SendPresence {
presence: Presence,
response: oneshot::Sender<io::Result<()>>,
},
SendIq {
to: Jid,
data: IqRequestType,
response: oneshot::Sender<io::Result<IqResponse>>,
},
Disconnect {
response: oneshot::Sender<io::Result<()>>,
},
}
#[derive(Debug)]
pub enum NonTransactional {
Presence(Presence),
Message(Message),
}
pub(crate) async fn xml_stream_worker<C: ServerConnector>(
mut client: TokioXmppClient<C>,
mut local_rx: UnboundedReceiver<Request>,
mut _misc_rx: UnboundedReceiver<NonTransactional>,
) {
println!("BAR0");
let _pending_iqs: HashMap<(String, Option<String>), oneshot::Sender<io::Result<()>>> =
HashMap::new();
println!("BAR1");
/*
loop {
println!("BAR1");
tokio::select! {
req = local_rx.recv() => match req {
Some(_) => (),
None => {
// Lost client.
}
},
msg = client.next() => match msg {
Some(_) => println!("FOO0"),
None => break,
}
}
}
*/
println!("BAR2, {:?}", client);
println!("BAR3: {:?}", client.next().await);
loop {
while let Some(event) = client.next().await {
println!("BAR4, {:?}", event);
}
}
}

View file

@ -10,7 +10,6 @@ use reqwest::{
use std::path::PathBuf;
use tokio::fs::File;
use tokio_util::codec::{BytesCodec, FramedRead};
use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
parsers::http_upload::{Header as HttpUploadHeader, SlotResult},
Element, Jid,
@ -18,11 +17,11 @@ use tokio_xmpp::{
use crate::{Agent, Event};
pub async fn handle_upload_result<C: ServerConnector>(
pub async fn handle_upload_result(
from: &Jid,
iqid: String,
elem: Element,
agent: &mut Agent<C>,
agent: &mut Agent,
) -> impl IntoIterator<Item = Event> {
let mut res: Option<(usize, PathBuf)> = None;

View file

@ -6,7 +6,6 @@
use std::path::Path;
use tokio::fs::File;
use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
parsers::{http_upload::SlotRequest, iq::Iq},
Jid,
@ -14,11 +13,7 @@ use tokio_xmpp::{
use crate::Agent;
pub async fn upload_file_with<C: ServerConnector>(
agent: &mut Agent<C>,
service: &str,
path: &Path,
) {
pub async fn upload_file_with(agent: &mut Agent, service: &str, path: &Path) {
let name = path.file_name().unwrap().to_str().unwrap().to_string();
let file = File::open(path).await.unwrap();
let size = file.metadata().await.unwrap().len();
@ -32,5 +27,5 @@ pub async fn upload_file_with<C: ServerConnector>(
agent
.uploads
.push((String::from("upload1"), to, path.to_path_buf()));
agent.client.send_stanza(request.into()).await.unwrap();
agent.send_stanza(request.into()).await.unwrap();
}