xmpp: Add to listen on NonTransactional stanza

Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
This commit is contained in:
Maxime “pep” Buquet 2024-08-22 01:04:51 +02:00
parent 10471bad63
commit 42130dd927
3 changed files with 20 additions and 10 deletions

View file

@ -8,7 +8,10 @@ use std::io;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
oneshot, Mutex,
};
use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::connect::ServerConnector;
pub use tokio_xmpp::parsers; pub use tokio_xmpp::parsers;
use tokio_xmpp::parsers::{disco::DiscoInfoResult, message::MessageType}; use tokio_xmpp::parsers::{disco::DiscoInfoResult, message::MessageType};
@ -31,8 +34,8 @@ pub struct Agent {
pub(crate) node: String, pub(crate) node: String,
pub(crate) uploads: Vec<(String, Jid, PathBuf)>, pub(crate) uploads: Vec<(String, Jid, PathBuf)>,
pub(crate) awaiting_disco_bookmarks_type: bool, pub(crate) awaiting_disco_bookmarks_type: bool,
cmdq: mpsc::UnboundedSender<Request>, cmdq: UnboundedSender<Request>,
miscq: mpsc::UnboundedSender<NonTransactional>, miscq: Arc<Mutex<UnboundedReceiver<NonTransactional>>>,
} }
impl Agent { impl Agent {
@ -45,11 +48,11 @@ impl Agent {
) -> Result<Agent, Error> { ) -> Result<Agent, Error> {
let (cmdtx, cmdrx) = mpsc::unbounded_channel(); let (cmdtx, cmdrx) = mpsc::unbounded_channel();
let (misctx, miscrx) = mpsc::unbounded_channel(); let (misctx, miscrx) = mpsc::unbounded_channel();
let _ = tokio::spawn(xml_stream_worker(client, cmdrx, miscrx)); let _ = tokio::spawn(xml_stream_worker(client, cmdrx, misctx));
Ok(Agent { Ok(Agent {
cmdq: cmdtx, cmdq: cmdtx,
miscq: misctx, miscq: Arc::new(Mutex::new(miscrx)),
// client, // client,
boundjid: Jid::new("foo@bar/meh").unwrap(), boundjid: Jid::new("foo@bar/meh").unwrap(),
default_nick: Arc::new(RwLock::new(default_nick)), default_nick: Arc::new(RwLock::new(default_nick)),
@ -61,6 +64,10 @@ impl Agent {
}) })
} }
pub async fn misc_receiver(&self) -> Arc<Mutex<UnboundedReceiver<NonTransactional>>> {
Arc::clone(&self.miscq)
}
pub async fn send_stanza(&mut self, _stanza: Element) -> Result<(), Error> { pub async fn send_stanza(&mut self, _stanza: Element) -> Result<(), Error> {
Ok(()) Ok(())
} }

View file

@ -25,7 +25,7 @@ pub mod message;
pub mod muc; pub mod muc;
pub mod presence; pub mod presence;
pub mod pubsub; pub mod pubsub;
mod stream; pub mod stream;
pub mod upload; pub mod upload;
// Module re-exports // Module re-exports

View file

@ -8,7 +8,10 @@ use std::collections::HashMap;
use std::io; use std::io;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use tokio::{sync::mpsc::UnboundedReceiver, sync::oneshot}; use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender},
sync::oneshot,
};
use tokio_xmpp::parsers::{ use tokio_xmpp::parsers::{
iq::IqType, message::Message, minidom::Element, presence::Presence, stanza_error::StanzaError, iq::IqType, message::Message, minidom::Element, presence::Presence, stanza_error::StanzaError,
}; };
@ -87,8 +90,8 @@ pub enum NonTransactional {
pub(crate) async fn xml_stream_worker<C: ServerConnector>( pub(crate) async fn xml_stream_worker<C: ServerConnector>(
mut client: TokioXmppClient<C>, mut client: TokioXmppClient<C>,
mut local_rx: UnboundedReceiver<Request>, mut _local_rx: UnboundedReceiver<Request>,
mut _misc_rx: UnboundedReceiver<NonTransactional>, mut _misc_tx: UnboundedSender<NonTransactional>,
) { ) {
println!("BAR0"); println!("BAR0");
let _pending_iqs: HashMap<(String, Option<String>), oneshot::Sender<io::Result<()>>> = let _pending_iqs: HashMap<(String, Option<String>), oneshot::Sender<io::Result<()>>> =
@ -99,7 +102,7 @@ pub(crate) async fn xml_stream_worker<C: ServerConnector>(
loop { loop {
println!("BAR1"); println!("BAR1");
tokio::select! { tokio::select! {
req = local_rx.recv() => match req { req = _local_rx.recv() => match req {
Some(_) => (), Some(_) => (),
None => { None => {
// Lost client. // Lost client.