From 8566adb376e34c7000b511cb64bf67ccf2c56273 Mon Sep 17 00:00:00 2001 From: xmppftw Date: Sat, 17 Jun 2023 14:30:08 +0200 Subject: [PATCH] WHY DOES THIS DEADLOCK?! --- xmpp/src/action/join_room.rs | 41 +++++++++++++++-- xmpp/src/lib.rs | 88 ++++++++++++++++++++++++++++++++++-- 2 files changed, 122 insertions(+), 7 deletions(-) diff --git a/xmpp/src/action/join_room.rs b/xmpp/src/action/join_room.rs index 64b2a83..1042b2e 100644 --- a/xmpp/src/action/join_room.rs +++ b/xmpp/src/action/join_room.rs @@ -17,11 +17,46 @@ pub struct JoinRoomAction { pub history: History, } +#[derive(Clone, Debug, Default)] +pub struct JoinRoomResult { + pub nicks: Vec, + pub topic: String, +} + #[async_trait] -impl Act<()> for JoinRoomAction { - async fn act(self, agent: &mut Agent) -> Result<(), Error> { +impl Act for JoinRoomAction { + async fn act(self, agent: &mut Agent) -> Result { agent.client.send_stanza(self.clone().into()).await?; - Ok(()) + + let mut recv_presence = agent.hook_muc_presence(self.room.clone()); + let mut recv_topic = agent.hook_muc_topic(self.room.clone()); + let mut res = JoinRoomResult::default(); + + info!("Waiting for room response"); + // TODO: Why deadlock?! + //let res = tokio::spawn(async move { + //loop { + tokio::select! { + hook_res = recv_presence.recv() => { + if let Some(nick) = hook_res { + res.nicks.push(nick); + } + // break + }, + hook_res = recv_topic.recv() => { + if let Some((_opt_nick, subject, _lang)) = hook_res { + res.topic = subject.0; + } + //break + } + } + //} + + //return res; + + //}).await.unwrap(); + + Ok(res) } } diff --git a/xmpp/src/lib.rs b/xmpp/src/lib.rs index f90d9cc..52163a3 100644 --- a/xmpp/src/lib.rs +++ b/xmpp/src/lib.rs @@ -10,6 +10,7 @@ use futures::stream::StreamExt; use reqwest::{ header::HeaderMap as ReqwestHeaderMap, Body as ReqwestBody, Client as ReqwestClient, }; +use std::collections::HashMap; use std::convert::TryFrom; use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; @@ -23,7 +24,7 @@ use xmpp_parsers::{ hashes::Algo, http_upload::{Header as HttpUploadHeader, SlotRequest, SlotResult}, iq::{Iq, IqType}, - message::{Body, Message, MessageType}, + message::{Body, Message, MessageType, Subject}, muc::{ user::{MucUser, Status}, Muc, @@ -210,6 +211,7 @@ impl ClientBuilder<'_> { disco, node, uploads: Vec::new(), + hooks: Hooks::default(), } } } @@ -221,6 +223,7 @@ pub struct Agent { disco: DiscoInfoResult, node: String, uploads: Vec<(String, Jid, PathBuf)>, + hooks: Hooks, } impl Agent { @@ -374,7 +377,7 @@ impl Agent { let mut events = vec![]; let from = message.from.clone().unwrap(); let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect(); - match message.get_best_body(langs) { + match message.get_best_body(langs.clone()) { Some((_lang, body)) => match message.type_ { MessageType::Groupchat => { let event = match from.clone() { @@ -425,7 +428,31 @@ impl Agent { } _ => (), }, - None => (), + None => { + // Check if we have a MUC topic. + // TODO: handle other subject cases + match message.get_best_subject(langs) { + Some((lang, subject)) => { + match message.type_ { + MessageType::Groupchat => { + // from.into() here drops the resource part and allows to check for room-hook + if let Some(hook_sender) = + self.hooks.muc_topic.get(&from.clone().into()) + { + // TODO: Error + hook_sender + .send((from.clone().resource(), subject.clone(), lang)) + .await + .unwrap(); + //hook_sender.send(message.clone()).await.unwrap(); + } + } + _ => {} + } + } + _ => {} + } + } } for child in message.payloads { if child.is("event", ns::PUBSUB_EVENT) { @@ -439,15 +466,28 @@ impl Agent { async fn handle_presence(&mut self, presence: Presence) -> Vec { let mut events = vec![]; + + // A MUC presence comes from the participant's FullJid, however we are + // interested in the room where this happened stored as `from` here, + // because we only handle the "self-presence" of when a room was joined. let from: BareJid = match presence.from.clone().unwrap() { Jid::Full(FullJid { node, domain, .. }) => BareJid { node, domain }, Jid::Bare(bare) => bare, }; - for payload in presence.payloads.into_iter() { + for payload in presence.clone().payloads.into_iter() { let muc_user = match MucUser::try_from(payload) { Ok(muc_user) => muc_user, _ => continue, }; + if let Some(hook_sender) = self.hooks.muc_presence.get(&from) { + // Check that there is an associated nick with this presence + if let Some(nick) = presence.from.clone().unwrap().resource() { + // TODO: Error + hook_sender.send(nick).await.unwrap(); + break; + } + } + for status in muc_user.status.into_iter() { if status == Status::SelfPresence { events.push(Event::RoomJoined(from.clone())); @@ -529,6 +569,35 @@ impl Agent { .push((String::from("upload1"), to, path.to_path_buf())); self.client.send_stanza(request.into()).await.unwrap(); } + + pub fn hook_muc_topic(&mut self, room: BareJid) -> tokio::sync::mpsc::Receiver { + let (tx, rx) = tokio::sync::mpsc::channel(100); + + // TODO: what about multiple hooks on same room? + self.hooks.muc_topic.insert(room, tx); + + rx + } + + pub fn remove_hook_muc_topic(&mut self, room: BareJid) { + self.hooks.muc_topic.remove(&room); + } + + pub fn hook_muc_presence( + &mut self, + room: BareJid, + ) -> tokio::sync::mpsc::Receiver { + let (tx, rx) = tokio::sync::mpsc::channel(100); + + // TODO: what about multiple hooks on same room? + self.hooks.muc_presence.insert(room, tx); + + rx + } + + pub fn remove_hook_muc_presence(&mut self, room: BareJid) { + self.hooks.muc_presence.remove(&room); + } } async fn handle_upload_result( @@ -578,6 +647,17 @@ async fn handle_upload_result( return vec![]; } +// TODO: Lang type +pub type MucTopicHook = (Option, Subject, String); + +pub type MucPresenceHook = RoomNick; + +#[derive(Clone, Debug, Default)] +pub struct Hooks { + muc_presence: HashMap>, + muc_topic: HashMap>, +} + #[cfg(test)] mod tests { use super::{Agent, BareJid, ClientBuilder, ClientFeature, ClientType, Event};