From 3a6166f827471edd82f8ca39ada656ba8979f10b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maxime=20=E2=80=9Cpep=E2=80=9D=20Buquet?= Date: Sun, 21 Jan 2024 21:20:40 +0100 Subject: [PATCH] WIP: stream-worker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Maxime “pep” Buquet --- xmpp/examples/foo.rs | 39 +++++++++ xmpp/src/agent.rs | 77 ++++++++++++----- xmpp/src/builder.rs | 9 +- xmpp/src/disco/mod.rs | 19 ++--- xmpp/src/error.rs | 71 ++++++++++++++++ xmpp/src/event.rs | 9 +- xmpp/src/event_loop.rs | 10 +-- xmpp/src/iq/get.rs | 11 ++- xmpp/src/iq/mod.rs | 5 +- xmpp/src/iq/result.rs | 7 +- xmpp/src/iq/set.rs | 7 +- xmpp/src/lib.rs | 6 +- xmpp/src/message/receive/chat.rs | 5 +- xmpp/src/message/receive/group_chat.rs | 5 +- xmpp/src/message/receive/mod.rs | 6 +- xmpp/src/message/send.rs | 7 +- xmpp/src/muc/private_message.rs | 7 +- xmpp/src/muc/room.rs | 13 ++- xmpp/src/presence/receive.rs | 6 +- xmpp/src/pubsub/avatar.rs | 7 +- xmpp/src/pubsub/mod.rs | 19 ++--- xmpp/src/stream.rs | 110 +++++++++++++++++++++++++ xmpp/src/upload/receive.rs | 5 +- xmpp/src/upload/send.rs | 9 +- 24 files changed, 344 insertions(+), 125 deletions(-) create mode 100644 xmpp/examples/foo.rs create mode 100644 xmpp/src/error.rs create mode 100644 xmpp/src/stream.rs diff --git a/xmpp/examples/foo.rs b/xmpp/examples/foo.rs new file mode 100644 index 00000000..36dcec45 --- /dev/null +++ b/xmpp/examples/foo.rs @@ -0,0 +1,39 @@ +// Copyright (c) 2019 Emmanuel Gil Peyrot +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use env_logger; +use std::env::args; +use std::str::FromStr; +use tokio_xmpp::parsers::{message::MessageType, BareJid, Jid}; +use xmpp::{ClientBuilder, ClientFeature, ClientType, Event}; + +#[tokio::main] +async fn main() -> Result<(), Option<()>> { + env_logger::init(); + + let args: Vec = args().collect(); + if args.len() != 3 { + println!("Usage: {} ", args[0]); + return Err(None); + } + + let jid = BareJid::from_str(&args[1]).expect(&format!("Invalid JID: {}", &args[1])); + let password = &args[2]; + + // Client instance + let mut client = ClientBuilder::new(jid, password) + .set_client(ClientType::Bot, "xmpp-rs") + .set_website("https://gitlab.com/xmpp-rs/xmpp-rs") + .set_default_nick("bot") + .enable_feature(ClientFeature::Avatars) + .enable_feature(ClientFeature::ContactList) + .enable_feature(ClientFeature::JoinRooms) + .build(); + + println!("FOO0: {:?}", client); + + Ok(()) +} diff --git a/xmpp/src/agent.rs b/xmpp/src/agent.rs index 7630a649..8e2a600e 100644 --- a/xmpp/src/agent.rs +++ b/xmpp/src/agent.rs @@ -4,46 +4,88 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. +use std::io; use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; + +use tokio::sync::{mpsc, oneshot}; use tokio_xmpp::connect::ServerConnector; pub use tokio_xmpp::parsers; use tokio_xmpp::parsers::{disco::DiscoInfoResult, message::MessageType}; pub use tokio_xmpp::{AsyncClient as TokioXmppClient, BareJid, Element, FullJid, Jid}; -use crate::{event_loop, message, muc, upload, Error, Event, RoomNick}; +use crate::stream::{xml_stream_worker, IqRequest, IqResponse, NonTransactional, Request}; +use crate::{message, muc, upload, Error, Event, RoomNick}; -pub struct Agent { - pub(crate) client: TokioXmppClient, +#[derive(Debug)] +pub struct Agent { + // pub(crate) client: TokioXmppClient, + boundjid: Jid, pub(crate) default_nick: Arc>, pub(crate) lang: Arc>, pub(crate) disco: DiscoInfoResult, pub(crate) node: String, pub(crate) uploads: Vec<(String, Jid, PathBuf)>, pub(crate) awaiting_disco_bookmarks_type: bool, + cmdq: mpsc::UnboundedSender, + miscq: mpsc::UnboundedSender, } -impl Agent { - async fn new( - client: TokioXmppClient, +impl Agent { + pub(crate) fn new( + client: TokioXmppClient, default_nick: String, - lang: String, - diso: DiscoInfoResult, + lang: Vec, + disco: DiscoInfoResult, node: String, - ) -> Agent { - Agent { - client, + ) -> Result { + let (cmdtx, cmdrx) = mpsc::unbounded_channel(); + let (misctx, miscrx) = mpsc::unbounded_channel(); + let _ = tokio::spawn(xml_stream_worker(client, cmdrx, miscrx)); + + Ok(Agent { + cmdq: cmdtx, + miscq: misctx, + // client, + boundjid: Jid::new("foo@bar/meh").unwrap(), default_nick: Arc::new(RwLock::new(default_nick)), lang: Arc::new(lang), disco, node, uploads: Vec::new(), awaiting_disco_bookmarks_type: false, - } + }) } + pub async fn send_stanza(&mut self, _stanza: Element) -> Result<(), Error> { + Ok(()) + } + + /* + pub async fn send_iq(&self, req: IqRequest) -> io::Result { + let (tx, rx) = oneshot::channel(); + let req = Request::SendIq { + to: req.to, + data: req.data, + response: tx, + }; + Ok(self.cmdq.send(req).unwrap()); + Ok(rx.await.unwrap()?) + } + */ + pub async fn disconnect(&mut self) -> Result<(), Error> { - self.client.send_end().await + let (tx, rx) = oneshot::channel(); + let req = Request::Disconnect { response: tx }; + let _ = Ok::<(), io::Error>(self.cmdq.send(req).unwrap()); + Ok(rx.await.unwrap()?) + } + + /// Get the bound jid of the client. + /// + /// If the client is not connected, this will be None. + pub fn bound_jid(&self) -> Option<&Jid> { + Some(&self.boundjid) } pub async fn join_room( @@ -98,6 +140,7 @@ impl Agent { muc::private_message::send_room_private_message(self, room, recipient, lang, text).await } + /* /// Wait for new events. /// /// # Returns @@ -107,15 +150,9 @@ impl Agent { pub async fn wait_for_events(&mut self) -> Option> { event_loop::wait_for_events(self).await } + */ pub async fn upload_file_with(&mut self, service: &str, path: &Path) { upload::send::upload_file_with(self, service, path).await } - - /// Get the bound jid of the client. - /// - /// If the client is not connected, this will be None. - pub fn bound_jid(&self) -> Option<&Jid> { - self.client.bound_jid() - } } diff --git a/xmpp/src/builder.rs b/xmpp/src/builder.rs index 9c996b00..81a66f20 100644 --- a/xmpp/src/builder.rs +++ b/xmpp/src/builder.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use std::sync::{Arc, RwLock}; use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ parsers::{ @@ -14,7 +13,7 @@ use tokio_xmpp::{ AsyncClient as TokioXmppClient, AsyncConfig, BareJid, Jid, }; -use crate::{Agent, ClientFeature}; +use crate::{Agent, ClientFeature, Error}; #[derive(Debug)] pub enum ClientType { @@ -134,7 +133,7 @@ impl ClientBuilder<'_, C> { } } - pub fn build(self) -> Agent { + pub fn build(self) -> Result { let jid: Jid = if let Some(resource) = &self.resource { self.jid.with_resource_str(resource).unwrap().into() } else { @@ -151,10 +150,10 @@ impl ClientBuilder<'_, C> { } // This function is meant to be used for testing build - pub(crate) fn build_impl(self, client: TokioXmppClient) -> Agent { + pub(crate) fn build_impl(self, client: TokioXmppClient) -> Result { let disco = self.make_disco(); let node = self.website; - Agent::new(client, default_nick, lang, disco, node) + Agent::new(client, self.default_nick, self.lang, disco, node) } } diff --git a/xmpp/src/disco/mod.rs b/xmpp/src/disco/mod.rs index eee104a9..7e821785 100644 --- a/xmpp/src/disco/mod.rs +++ b/xmpp/src/disco/mod.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ parsers::{ bookmarks, @@ -24,11 +23,7 @@ use crate::Agent; // FIXME: To be removed in the future // The server doesn't return disco#info feature when querying the account // so we add it manually because we know it's true -pub async fn handle_disco_info_result_payload( - agent: &mut Agent, - payload: Element, - from: Jid, -) { +pub async fn handle_disco_info_result_payload(agent: &mut Agent, payload: Element, from: Jid) { match DiscoInfoResult::try_from(payload.clone()) { Ok(disco) => { handle_disco_info_result(agent, disco, from).await; @@ -60,13 +55,9 @@ pub async fn handle_disco_info_result_payload( } } -pub async fn handle_disco_info_result( - agent: &mut Agent, - disco: DiscoInfoResult, - from: Jid, -) { +pub async fn handle_disco_info_result(agent: &mut Agent, disco: DiscoInfoResult, from: Jid) { // Safe unwrap because no DISCO is received when we are not online - if from == agent.client.bound_jid().unwrap().to_bare() && agent.awaiting_disco_bookmarks_type { + if from == agent.bound_jid().unwrap().to_bare() && agent.awaiting_disco_bookmarks_type { info!("Received disco info about bookmarks type"); // Trigger bookmarks query // TODO: only send this when the JoinRooms feature is enabled. @@ -82,7 +73,7 @@ pub async fn handle_disco_info_result( if perform_bookmarks2 { // XEP-0402 bookmarks (modern) let iq = Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2))).into(); - let _ = agent.client.send_stanza(iq).await; + let _ = agent.send_stanza(iq).await; } else { // XEP-0048 v1.0 bookmarks (legacy) let iq = Iq::from_get( @@ -92,7 +83,7 @@ pub async fn handle_disco_info_result( }, ) .into(); - let _ = agent.client.send_stanza(iq).await; + let _ = agent.send_stanza(iq).await; } } else { unimplemented!("Ignored disco#info response from {}", from); diff --git a/xmpp/src/error.rs b/xmpp/src/error.rs new file mode 100644 index 00000000..48732059 --- /dev/null +++ b/xmpp/src/error.rs @@ -0,0 +1,71 @@ +// Copyright (c) 2024-2099 xmpp-rs contributors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// use tokio::sync::mpsc::error as mpsc_error; +// use tokio::sync::oneshot::error as oneshot_error; +use tokio_xmpp::Error as XmppError; + +use std::error::Error as StdError; +use std::fmt; +use std::io::Error as IOError; + +#[derive(Debug)] +pub enum Error { + /// IO Errors + IO(IOError), + /// Errors from tokio-xmpp + Xmpp(XmppError), + // MpscSend(mpsc_error::SendError), + // OneshotRecv(oneshot_error::RecvError), +} + +impl StdError for Error { + fn cause(&self) -> Option<&dyn StdError> { + match self { + Error::IO(e) => Some(e), + Error::Xmpp(e) => Some(e), + // Error::MpscSend(e) => Some(e), + // Error::OneshotRecv(e) => Some(e), + } + } +} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::IO(e) => write!(fmt, "IO Error: {}", e), + Error::Xmpp(e) => write!(fmt, "XMPP Error: {}", e), + // Error::MpscSend(e) => write!(fmt, "Mpsc Send Error: {}", e), + // Error::OneshotRecv(e) => write!(fmt, "Oneshot Recv Error: {}", e), + } + } +} + +impl From for Error { + fn from(err: IOError) -> Error { + Error::IO(err) + } +} + +impl From for Error { + fn from(err: tokio_xmpp::Error) -> Error { + Error::Xmpp(err) + } +} + +/* +impl From for Error { + fn from(err: mpsc_error::SendError) -> Error { + Error::MpscSend(err) + } +} + +impl From for Error { + fn from(err: oneshot_error::RecvError) -> Error { + Error::OneshotRecv(err) + } +} +*/ diff --git a/xmpp/src/event.rs b/xmpp/src/event.rs index c7a8e3a2..4fffd3b6 100644 --- a/xmpp/src/event.rs +++ b/xmpp/src/event.rs @@ -4,16 +4,15 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -#[cfg(feature = "avatars")] -use tokio_xmpp::parsers::Jid; -use tokio_xmpp::parsers::{bookmarks2, message::Body, roster::Item as RosterItem, BareJid}; +use tokio_xmpp::parsers::{bookmarks2, message::Body, roster::Item as RosterItem, BareJid, Jid}; +use tokio_xmpp::Error as TokioXmppError; -use crate::{delay::StanzaTimeInfo, Error, Id, RoomNick}; +use crate::{delay::StanzaTimeInfo, Id, RoomNick}; #[derive(Debug)] pub enum Event { Online, - Disconnected(Error), + Disconnected(TokioXmppError), ContactAdded(RosterItem), ContactRemoved(RosterItem), ContactChanged(RosterItem), diff --git a/xmpp/src/event_loop.rs b/xmpp/src/event_loop.rs index a0c1774a..9d816fab 100644 --- a/xmpp/src/event_loop.rs +++ b/xmpp/src/event_loop.rs @@ -4,8 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use futures::StreamExt; -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ parsers::{ disco::DiscoInfoQuery, iq::Iq, message::Message, presence::Presence, roster::Roster, @@ -21,7 +19,7 @@ use crate::{iq, message, presence, Agent, Event}; /// /// - `Some(events)` if there are new events; multiple may be returned at once. /// - `None` if the underlying stream is closed. -pub async fn wait_for_events(agent: &mut Agent) -> Option> { +pub async fn wait_for_events(agent: &mut Agent) -> Option> { if let Some(event) = agent.client.next().await { let mut events = Vec::new(); @@ -29,7 +27,7 @@ pub async fn wait_for_events(agent: &mut Agent) -> Option TokioXmppEvent::Online { resumed: false, .. } => { let presence = presence::send::make_initial_presence(&agent.disco, &agent.node).into(); - let _ = agent.client.send_stanza(presence).await; + let _ = agent.send_stanza(presence).await; events.push(Event::Online); // TODO: only send this when the ContactList feature is enabled. let iq = Iq::from_get( @@ -40,11 +38,11 @@ pub async fn wait_for_events(agent: &mut Agent) -> Option }, ) .into(); - let _ = agent.client.send_stanza(iq).await; + let _ = agent.send_stanza(iq).await; // Query account disco to know what bookmarks spec is used let iq = Iq::from_get("disco-account", DiscoInfoQuery { node: None }).into(); - let _ = agent.client.send_stanza(iq).await; + let _ = agent.send_stanza(iq).await; agent.awaiting_disco_bookmarks_type = true; } TokioXmppEvent::Online { resumed: true, .. } => {} diff --git a/xmpp/src/iq/get.rs b/xmpp/src/iq/get.rs index f64a950f..316cc24b 100644 --- a/xmpp/src/iq/get.rs +++ b/xmpp/src/iq/get.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ parsers::{ disco::DiscoInfoQuery, @@ -17,8 +16,8 @@ use tokio_xmpp::{ use crate::{Agent, Event}; -pub async fn handle_iq_get( - agent: &mut Agent, +pub async fn handle_iq_get( + agent: &mut Agent, _events: &mut Vec, from: Jid, _to: Option, @@ -32,7 +31,7 @@ pub async fn handle_iq_get( let mut disco_info = agent.disco.clone(); disco_info.node = query.node; let iq = Iq::from_result(id, Some(disco_info)).with_to(from).into(); - let _ = agent.client.send_stanza(iq).await; + let _ = agent.send_stanza(iq).await; } Err(err) => { let error = StanzaError::new( @@ -42,7 +41,7 @@ pub async fn handle_iq_get( &format!("{}", err), ); let iq = Iq::from_error(id, error).with_to(from).into(); - let _ = agent.client.send_stanza(iq).await; + let _ = agent.send_stanza(iq).await; } } } else { @@ -54,6 +53,6 @@ pub async fn handle_iq_get( "No handler defined for this kind of iq.", ); let iq = Iq::from_error(id, error).with_to(from).into(); - let _ = agent.client.send_stanza(iq).await; + let _ = agent.send_stanza(iq).await; } } diff --git a/xmpp/src/iq/mod.rs b/xmpp/src/iq/mod.rs index 3d097bbc..ef5188d6 100644 --- a/xmpp/src/iq/mod.rs +++ b/xmpp/src/iq/mod.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::parsers::iq::{Iq, IqType}; use crate::{Agent, Event}; @@ -13,12 +12,12 @@ pub mod get; pub mod result; pub mod set; -pub async fn handle_iq(agent: &mut Agent, iq: Iq) -> Vec { +pub async fn handle_iq(agent: &mut Agent, iq: Iq) -> Vec { let mut events = vec![]; let from = iq .from .clone() - .unwrap_or_else(|| agent.client.bound_jid().unwrap().to_bare().into()); + .unwrap_or_else(|| agent.bound_jid().unwrap().to_bare().into()); if let IqType::Get(payload) = iq.payload { get::handle_iq_get(agent, &mut events, from, iq.to, iq.id, payload).await; } else if let IqType::Result(Some(payload)) = iq.payload { diff --git a/xmpp/src/iq/result.rs b/xmpp/src/iq/result.rs index da889607..8d097bff 100644 --- a/xmpp/src/iq/result.rs +++ b/xmpp/src/iq/result.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ parsers::{ns, private::Query as PrivateXMLQuery, roster::Roster}, Element, Jid, @@ -12,8 +11,8 @@ use tokio_xmpp::{ use crate::{disco, pubsub, upload, Agent, Event}; -pub async fn handle_iq_result( - agent: &mut Agent, +pub async fn handle_iq_result( + agent: &mut Agent, events: &mut Vec, from: Jid, _to: Option, @@ -22,7 +21,7 @@ pub async fn handle_iq_result( ) { // TODO: move private iqs like this one somewhere else, for // security reasons. - if payload.is("query", ns::ROSTER) && from == agent.client.bound_jid().unwrap().to_bare() { + if payload.is("query", ns::ROSTER) && from == agent.bound_jid().unwrap().to_bare() { let roster = Roster::try_from(payload).unwrap(); for item in roster.items.into_iter() { events.push(Event::ContactAdded(item)); diff --git a/xmpp/src/iq/set.rs b/xmpp/src/iq/set.rs index 4793d8d4..fe26d9fc 100644 --- a/xmpp/src/iq/set.rs +++ b/xmpp/src/iq/set.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ parsers::{ iq::Iq, @@ -15,8 +14,8 @@ use tokio_xmpp::{ use crate::{Agent, Event}; -pub async fn handle_iq_set( - agent: &mut Agent, +pub async fn handle_iq_set( + agent: &mut Agent, _events: &mut Vec, from: Jid, _to: Option, @@ -31,5 +30,5 @@ pub async fn handle_iq_set( "No handler defined for this kind of iq.", ); let iq = Iq::from_error(id, error).with_to(from).into(); - let _ = agent.client.send_stanza(iq).await; + let _ = agent.send_stanza(iq).await; } diff --git a/xmpp/src/lib.rs b/xmpp/src/lib.rs index 780fe4eb..da3e0396 100644 --- a/xmpp/src/lib.rs +++ b/xmpp/src/lib.rs @@ -15,23 +15,25 @@ pub mod agent; pub mod builder; pub mod delay; pub mod disco; +pub mod error; pub mod event; -pub mod event_loop; +// pub mod event_loop; pub mod feature; pub mod iq; pub mod message; pub mod muc; pub mod presence; pub mod pubsub; +mod stream; pub mod upload; // Module re-exports pub use agent::Agent; pub use builder::{ClientBuilder, ClientType}; +pub use error::Error; pub use event::Event; pub use feature::ClientFeature; -pub type Error = tokio_xmpp::Error; pub type Id = Option; pub type RoomNick = String; diff --git a/xmpp/src/message/receive/chat.rs b/xmpp/src/message/receive/chat.rs index 198ad6ac..685eea57 100644 --- a/xmpp/src/message/receive/chat.rs +++ b/xmpp/src/message/receive/chat.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ parsers::{message::Message, muc::user::MucUser}, Jid, @@ -12,8 +11,8 @@ use tokio_xmpp::{ use crate::{delay::StanzaTimeInfo, Agent, Event}; -pub async fn handle_message_chat( - agent: &mut Agent, +pub async fn handle_message_chat( + agent: &mut Agent, events: &mut Vec, from: Jid, message: &Message, diff --git a/xmpp/src/message/receive/group_chat.rs b/xmpp/src/message/receive/group_chat.rs index 9fa17cd1..417794f3 100644 --- a/xmpp/src/message/receive/group_chat.rs +++ b/xmpp/src/message/receive/group_chat.rs @@ -4,13 +4,12 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{parsers::message::Message, Jid}; use crate::{delay::StanzaTimeInfo, Agent, Event}; -pub async fn handle_message_group_chat( - agent: &mut Agent, +pub async fn handle_message_group_chat( + agent: &mut Agent, events: &mut Vec, from: Jid, message: &Message, diff --git a/xmpp/src/message/receive/mod.rs b/xmpp/src/message/receive/mod.rs index 5dbe8608..0b03112d 100644 --- a/xmpp/src/message/receive/mod.rs +++ b/xmpp/src/message/receive/mod.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::parsers::{ message::{Message, MessageType}, ns, @@ -15,10 +14,7 @@ use crate::{delay::message_time_info, pubsub, Agent, Event}; pub mod chat; pub mod group_chat; -pub async fn handle_message( - agent: &mut Agent, - message: Message, -) -> Vec { +pub async fn handle_message(agent: &mut Agent, message: Message) -> Vec { let mut events = vec![]; let from = message.from.clone().unwrap(); let time_info = message_time_info(&message); diff --git a/xmpp/src/message/send.rs b/xmpp/src/message/send.rs index 877aca82..b08bb93b 100644 --- a/xmpp/src/message/send.rs +++ b/xmpp/src/message/send.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ parsers::message::{Body, Message, MessageType}, Jid, @@ -12,8 +11,8 @@ use tokio_xmpp::{ use crate::Agent; -pub async fn send_message( - agent: &mut Agent, +pub async fn send_message( + agent: &mut Agent, recipient: Jid, type_: MessageType, lang: &str, @@ -24,5 +23,5 @@ pub async fn send_message( message .bodies .insert(String::from(lang), Body(String::from(text))); - let _ = agent.client.send_stanza(message.into()).await; + let _ = agent.send_stanza(message.into()).await; } diff --git a/xmpp/src/muc/private_message.rs b/xmpp/src/muc/private_message.rs index 7b5883ff..40bb4204 100644 --- a/xmpp/src/muc/private_message.rs +++ b/xmpp/src/muc/private_message.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ parsers::{ message::{Body, Message, MessageType}, @@ -15,8 +14,8 @@ use tokio_xmpp::{ use crate::{Agent, RoomNick}; -pub async fn send_room_private_message( - agent: &mut Agent, +pub async fn send_room_private_message( + agent: &mut Agent, room: BareJid, recipient: RoomNick, lang: &str, @@ -28,5 +27,5 @@ pub async fn send_room_private_message( message .bodies .insert(String::from(lang), Body(String::from(text))); - let _ = agent.client.send_stanza(message.into()).await; + let _ = agent.send_stanza(message.into()).await; } diff --git a/xmpp/src/muc/room.rs b/xmpp/src/muc/room.rs index 17d73d18..41a05a8c 100644 --- a/xmpp/src/muc/room.rs +++ b/xmpp/src/muc/room.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ parsers::{ muc::Muc, @@ -15,8 +14,8 @@ use tokio_xmpp::{ use crate::{Agent, RoomNick}; -pub async fn join_room( - agent: &mut Agent, +pub async fn join_room( + agent: &mut Agent, room: BareJid, nick: Option, password: Option, @@ -33,7 +32,7 @@ pub async fn join_room( let mut presence = Presence::new(PresenceType::None).with_to(room_jid); presence.add_payload(muc); presence.set_status(String::from(lang), String::from(status)); - let _ = agent.client.send_stanza(presence.into()).await; + let _ = agent.send_stanza(presence.into()).await; } /// Send a "leave room" request to the server (specifically, an "unavailable" presence stanza). @@ -55,8 +54,8 @@ pub async fn join_room( /// * `nickname`: The nickname to use in the room. /// * `lang`: The language of the status message. /// * `status`: The status message to send. -pub async fn leave_room( - agent: &mut Agent, +pub async fn leave_room( + agent: &mut Agent, room_jid: BareJid, nickname: RoomNick, lang: impl Into, @@ -76,7 +75,7 @@ pub async fn leave_room( presence.set_status(lang, status); // Send the presence stanza. - if let Err(e) = agent.client.send_stanza(presence.into()).await { + if let Err(e) = agent.send_stanza(presence.into()).await { // Report any errors to the log. error!("Failed to send leave room presence: {}", e); } diff --git a/xmpp/src/presence/receive.rs b/xmpp/src/presence/receive.rs index 4e6720e8..123f4d7b 100644 --- a/xmpp/src/presence/receive.rs +++ b/xmpp/src/presence/receive.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::parsers::{ muc::user::{MucUser, Status}, presence::{Presence, Type as PresenceType}, @@ -13,10 +12,7 @@ use tokio_xmpp::parsers::{ use crate::{Agent, Event}; /// Translate a `Presence` stanza into a list of higher-level `Event`s. -pub async fn handle_presence( - _agent: &mut Agent, - presence: Presence, -) -> Vec { +pub async fn handle_presence(_agent: &mut Agent, presence: Presence) -> Vec { // Allocate an empty vector to store the events. let mut events = vec![]; diff --git a/xmpp/src/pubsub/avatar.rs b/xmpp/src/pubsub/avatar.rs index 83728ad0..da099056 100644 --- a/xmpp/src/pubsub/avatar.rs +++ b/xmpp/src/pubsub/avatar.rs @@ -8,7 +8,6 @@ use super::Agent; use crate::Event; use std::fs::{self, File}; use std::io::{self, Write}; -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::parsers::{ avatar::{Data, Metadata}, iq::Iq, @@ -21,9 +20,9 @@ use tokio_xmpp::parsers::{ Jid, }; -pub(crate) async fn handle_metadata_pubsub_event( +pub(crate) async fn handle_metadata_pubsub_event( from: &Jid, - agent: &mut Agent, + agent: &mut Agent, items: Vec, ) -> Vec { let mut events = Vec::new(); @@ -43,7 +42,7 @@ pub(crate) async fn handle_metadata_pubsub_event( events.push(Event::AvatarRetrieved(from.clone(), filename)); } else { let iq = download_avatar(from); - let _ = agent.client.send_stanza(iq.into()).await; + let _ = agent.send_stanza(iq.into()).await; } } } diff --git a/xmpp/src/pubsub/mod.rs b/xmpp/src/pubsub/mod.rs index 921966dd..e74d7d82 100644 --- a/xmpp/src/pubsub/mod.rs +++ b/xmpp/src/pubsub/mod.rs @@ -7,24 +7,21 @@ use super::Agent; use crate::Event; use std::str::FromStr; -use tokio_xmpp::{ - connect::ServerConnector, - parsers::{ - bookmarks2::{self, Autojoin}, - ns, - pubsub::event::PubSubEvent, - pubsub::pubsub::PubSub, - BareJid, Element, Jid, - }, +use tokio_xmpp::parsers::{ + bookmarks2::{self, Autojoin}, + ns, + pubsub::event::PubSubEvent, + pubsub::pubsub::PubSub, + BareJid, Element, Jid, }; #[cfg(feature = "avatars")] pub(crate) mod avatar; -pub(crate) async fn handle_event( +pub(crate) async fn handle_event( #[cfg_attr(not(feature = "avatars"), allow(unused_variables))] from: &Jid, elem: Element, - #[cfg_attr(not(feature = "avatars"), allow(unused_variables))] agent: &mut Agent, + #[cfg_attr(not(feature = "avatars"), allow(unused_variables))] agent: &mut Agent, ) -> Vec { let mut events = Vec::new(); let event = PubSubEvent::try_from(elem); diff --git a/xmpp/src/stream.rs b/xmpp/src/stream.rs new file mode 100644 index 00000000..06c5365f --- /dev/null +++ b/xmpp/src/stream.rs @@ -0,0 +1,110 @@ +// Copyright (c) 2024-2099 xmpp-rs contributors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use std::collections::HashMap; +use std::io; + +use futures::stream::StreamExt; +use tokio::{sync::mpsc::UnboundedReceiver, sync::oneshot}; +use tokio_xmpp::parsers::{ + iq::IqType, message::Message, presence::Presence, stanza_error::StanzaError, Element, Jid, +}; +use tokio_xmpp::{connect::ServerConnector, AsyncClient as TokioXmppClient}; + +#[derive(Debug)] +pub enum IqRequestType { + Get(Element), + Set(Element), +} + +#[derive(Debug)] +pub struct IqRequest { + pub to: Jid, + pub data: IqRequestType, +} + +#[derive(Debug)] +pub enum IqResponseType { + Result(Option), + Error(StanzaError), +} + +#[derive(Debug)] +pub struct IqResponse { + pub from: Option, + pub to: Option, + pub data: IqResponseType, +} + +impl From for IqType { + fn from(other: IqRequestType) -> IqType { + match other { + IqRequestType::Get(e) => IqType::Get(e), + IqRequestType::Set(e) => IqType::Set(e), + } + } +} + +impl From for IqType { + fn from(other: IqResponseType) -> IqType { + match other { + IqResponseType::Result(e) => IqType::Result(e), + IqResponseType::Error(e) => IqType::Error(e), + } + } +} + +#[derive(Debug)] +pub enum Request { + SendMessage { + message: Message, + response: oneshot::Sender>, + }, + SendPresence { + presence: Presence, + response: oneshot::Sender>, + }, + SendIq { + to: Jid, + data: IqRequestType, + response: oneshot::Sender>, + }, + Disconnect { + response: oneshot::Sender>, + }, +} + +#[derive(Debug)] +pub enum NonTransactional { + Presence(Presence), + Message(Message), +} + +pub(crate) async fn xml_stream_worker( + mut client: TokioXmppClient, + mut local_rx: UnboundedReceiver, + mut _misc_rx: UnboundedReceiver, +) { + println!("BAR0"); + let _pending_iqs: HashMap<(String, Option), oneshot::Sender>> = + HashMap::new(); + + loop { + println!("BAR1"); + tokio::select! { + req = local_rx.recv() => match req { + Some(_) => (), + None => { + // Lost client. + } + }, + msg = client.next() => match msg { + Some(_) => println!("FOO0"), + None => break, + } + } + } +} diff --git a/xmpp/src/upload/receive.rs b/xmpp/src/upload/receive.rs index ff6ef557..f90af993 100644 --- a/xmpp/src/upload/receive.rs +++ b/xmpp/src/upload/receive.rs @@ -10,7 +10,6 @@ use reqwest::{ use std::path::PathBuf; use tokio::fs::File; use tokio_util::codec::{BytesCodec, FramedRead}; -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ parsers::http_upload::{Header as HttpUploadHeader, SlotResult}, Element, Jid, @@ -18,11 +17,11 @@ use tokio_xmpp::{ use crate::{Agent, Event}; -pub async fn handle_upload_result( +pub async fn handle_upload_result( from: &Jid, iqid: String, elem: Element, - agent: &mut Agent, + agent: &mut Agent, ) -> impl IntoIterator { let mut res: Option<(usize, PathBuf)> = None; diff --git a/xmpp/src/upload/send.rs b/xmpp/src/upload/send.rs index 54edc90f..564f6d8b 100644 --- a/xmpp/src/upload/send.rs +++ b/xmpp/src/upload/send.rs @@ -6,7 +6,6 @@ use std::path::Path; use tokio::fs::File; -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ parsers::{http_upload::SlotRequest, iq::Iq}, Jid, @@ -14,11 +13,7 @@ use tokio_xmpp::{ use crate::Agent; -pub async fn upload_file_with( - agent: &mut Agent, - service: &str, - path: &Path, -) { +pub async fn upload_file_with(agent: &mut Agent, service: &str, path: &Path) { let name = path.file_name().unwrap().to_str().unwrap().to_string(); let file = File::open(path).await.unwrap(); let size = file.metadata().await.unwrap().len(); @@ -32,5 +27,5 @@ pub async fn upload_file_with( agent .uploads .push((String::from("upload1"), to, path.to_path_buf())); - agent.client.send_stanza(request.into()).await.unwrap(); + agent.send_stanza(request.into()).await.unwrap(); }