From 42130dd927e256050d6f66320dbe172cada58801 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maxime=20=E2=80=9Cpep=E2=80=9D=20Buquet?= Date: Thu, 22 Aug 2024 01:04:51 +0200 Subject: [PATCH] xmpp: Add to listen on NonTransactional stanza MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Maxime “pep” Buquet --- xmpp/src/agent.rs | 17 ++++++++++++----- xmpp/src/lib.rs | 2 +- xmpp/src/stream.rs | 11 +++++++---- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/xmpp/src/agent.rs b/xmpp/src/agent.rs index 69c8e806..d2570ca7 100644 --- a/xmpp/src/agent.rs +++ b/xmpp/src/agent.rs @@ -8,7 +8,10 @@ use std::io; use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{ + mpsc::{self, UnboundedReceiver, UnboundedSender}, + oneshot, Mutex, +}; use tokio_xmpp::connect::ServerConnector; pub use tokio_xmpp::parsers; use tokio_xmpp::parsers::{disco::DiscoInfoResult, message::MessageType}; @@ -31,8 +34,8 @@ pub struct Agent { pub(crate) node: String, pub(crate) uploads: Vec<(String, Jid, PathBuf)>, pub(crate) awaiting_disco_bookmarks_type: bool, - cmdq: mpsc::UnboundedSender, - miscq: mpsc::UnboundedSender, + cmdq: UnboundedSender, + miscq: Arc>>, } impl Agent { @@ -45,11 +48,11 @@ impl Agent { ) -> Result { let (cmdtx, cmdrx) = mpsc::unbounded_channel(); let (misctx, miscrx) = mpsc::unbounded_channel(); - let _ = tokio::spawn(xml_stream_worker(client, cmdrx, miscrx)); + let _ = tokio::spawn(xml_stream_worker(client, cmdrx, misctx)); Ok(Agent { cmdq: cmdtx, - miscq: misctx, + miscq: Arc::new(Mutex::new(miscrx)), // client, boundjid: Jid::new("foo@bar/meh").unwrap(), default_nick: Arc::new(RwLock::new(default_nick)), @@ -61,6 +64,10 @@ impl Agent { }) } + pub async fn misc_receiver(&self) -> Arc>> { + Arc::clone(&self.miscq) + } + pub async fn send_stanza(&mut self, _stanza: Element) -> Result<(), Error> { Ok(()) } diff --git a/xmpp/src/lib.rs b/xmpp/src/lib.rs index 4e9b3902..7c0ae6ca 100644 --- a/xmpp/src/lib.rs +++ b/xmpp/src/lib.rs @@ -25,7 +25,7 @@ pub mod message; pub mod muc; pub mod presence; pub mod pubsub; -mod stream; +pub mod stream; pub mod upload; // Module re-exports diff --git a/xmpp/src/stream.rs b/xmpp/src/stream.rs index bc26e897..2dbcc0ea 100644 --- a/xmpp/src/stream.rs +++ b/xmpp/src/stream.rs @@ -8,7 +8,10 @@ use std::collections::HashMap; use std::io; use futures::stream::StreamExt; -use tokio::{sync::mpsc::UnboundedReceiver, sync::oneshot}; +use tokio::{ + sync::mpsc::{UnboundedReceiver, UnboundedSender}, + sync::oneshot, +}; use tokio_xmpp::parsers::{ iq::IqType, message::Message, minidom::Element, presence::Presence, stanza_error::StanzaError, }; @@ -87,8 +90,8 @@ pub enum NonTransactional { pub(crate) async fn xml_stream_worker( mut client: TokioXmppClient, - mut local_rx: UnboundedReceiver, - mut _misc_rx: UnboundedReceiver, + mut _local_rx: UnboundedReceiver, + mut _misc_tx: UnboundedSender, ) { println!("BAR0"); let _pending_iqs: HashMap<(String, Option), oneshot::Sender>> = @@ -99,7 +102,7 @@ pub(crate) async fn xml_stream_worker( loop { println!("BAR1"); tokio::select! { - req = local_rx.recv() => match req { + req = _local_rx.recv() => match req { Some(_) => (), None => { // Lost client.