From 9d17a798138fec1140c25f8d10af3ad543700140 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maxime=20=E2=80=9Cpep=E2=80=9D=20Buquet?= Date: Mon, 7 Sep 2020 09:56:04 +0200 Subject: [PATCH] xmpp: split wait_for_events methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Maxime “pep” Buquet --- xmpp/src/lib.rs | 233 +++++++++++++++++++++++------------------------- 1 file changed, 112 insertions(+), 121 deletions(-) diff --git a/xmpp/src/lib.rs b/xmpp/src/lib.rs index ffe5980c..799fd019 100644 --- a/xmpp/src/lib.rs +++ b/xmpp/src/lib.rs @@ -27,7 +27,7 @@ use xmpp_parsers::{ pubsub::pubsub::{Items, PubSub}, roster::{Item as RosterItem, Roster}, stanza_error::{DefinedCondition, ErrorType, StanzaError}, - BareJid, FullJid, Jid, + BareJid, Element, FullJid, Jid, }; #[macro_use] extern crate log; @@ -236,6 +236,102 @@ impl Agent { presence } + async fn handle_iq(&mut self, iq: Iq) -> () { + if let IqType::Get(payload) = iq.payload { + if payload.is("query", ns::DISCO_INFO) { + let query = DiscoInfoQuery::try_from(payload); + match query { + Ok(query) => { + let mut disco_info = self.disco.clone(); + disco_info.node = query.node; + let iq = Iq::from_result(iq.id, Some(disco_info)) + .with_to(iq.from.unwrap()) + .into(); + let _ = self.client.send_stanza(iq).await; + } + Err(err) => { + let error = StanzaError::new( + ErrorType::Modify, + DefinedCondition::BadRequest, + "en", + &format!("{}", err), + ); + let iq = Iq::from_error(iq.id, error) + .with_to(iq.from.unwrap()) + .into(); + let _ = self.client.send_stanza(iq).await; + } + } + } else { + // We MUST answer unhandled get iqs with a service-unavailable error. + let error = StanzaError::new( + ErrorType::Cancel, + DefinedCondition::ServiceUnavailable, + "en", + "No handler defined for this kind of iq.", + ); + let iq = Iq::from_error(iq.id, error) + .with_to(iq.from.unwrap()) + .into(); + let _ = self.client.send_stanza(iq).await; + } + } + } + + async fn handle_message(&mut self, message: Message) -> Vec { + let mut events = vec![]; + let from = message.from.clone().unwrap(); + let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect(); + match message.get_best_body(langs) { + Some((_lang, body)) => match message.type_ { + MessageType::Groupchat => { + let event = Event::RoomMessage( + from.clone().into(), + FullJid::try_from(from.clone()).unwrap().resource, + body.clone(), + ); + events.push(event) + } + MessageType::Chat | MessageType::Normal => { + let event = Event::ChatMessage(from.clone().into(), body.clone()); + events.push(event) + } + _ => (), + }, + None => (), + } + for child in message.payloads { + if child.is("event", ns::PUBSUB_EVENT) { + let new_events = pubsub::handle_event(&from, child, self).await; + events.extend(new_events); + } + } + + events + } + + async fn handle_presence(&mut self, presence: Presence) -> Vec { + let mut events = vec![]; + let from: BareJid = match presence.from.clone().unwrap() { + Jid::Full(FullJid { node, domain, .. }) => BareJid { node, domain }, + Jid::Bare(bare) => bare, + }; + for payload in presence.payloads.into_iter() { + let muc_user = match MucUser::try_from(payload) { + Ok(muc_user) => muc_user, + _ => continue, + }; + for status in muc_user.status.into_iter() { + if status == Status::SelfPresence { + events.push(Event::RoomJoined(from.clone())); + break; + } + } + } + + events + } + pub async fn wait_for_events(&mut self) -> Option> { if let Some(event) = self.client.next().await { let mut events = Vec::new(); @@ -264,127 +360,22 @@ impl Agent { TokioXmppEvent::Disconnected(_) => { events.push(Event::Disconnected); } - TokioXmppEvent::Stanza(stanza) => { - if stanza.is("iq", "jabber:client") { - let iq = Iq::try_from(stanza).unwrap(); - let from = iq - .from - .clone() - .unwrap_or_else(|| self.client.bound_jid().unwrap().clone()); - if let IqType::Get(payload) = iq.payload { - if payload.is("query", ns::DISCO_INFO) { - let query = DiscoInfoQuery::try_from(payload); - match query { - Ok(query) => { - let mut disco_info = self.disco.clone(); - disco_info.node = query.node; - let iq = Iq::from_result(iq.id, Some(disco_info)) - .with_to(iq.from.unwrap()) - .into(); - let _ = self.client.send_stanza(iq).await; - } - Err(err) => { - let error = StanzaError::new( - ErrorType::Modify, - DefinedCondition::BadRequest, - "en", - &format!("{}", err), - ); - let iq = Iq::from_error(iq.id, error) - .with_to(iq.from.unwrap()) - .into(); - let _ = self.client.send_stanza(iq).await; - } - } - } else { - // We MUST answer unhandled get iqs with a service-unavailable error. - let error = StanzaError::new( - ErrorType::Cancel, - DefinedCondition::ServiceUnavailable, - "en", - "No handler defined for this kind of iq.", - ); - let iq = Iq::from_error(iq.id, error) - .with_to(iq.from.unwrap()) - .into(); - let _ = self.client.send_stanza(iq).await; - } - } else if let IqType::Result(Some(payload)) = iq.payload { - // TODO: move private iqs like this one somewhere else, for - // security reasons. - if payload.is("query", ns::ROSTER) && iq.from.is_none() { - let roster = Roster::try_from(payload).unwrap(); - for item in roster.items.into_iter() { - events.push(Event::ContactAdded(item)); - } - } else if payload.is("pubsub", ns::PUBSUB) { - let new_events = pubsub::handle_iq_result(&from, payload); - events.extend(new_events); - } - } else if let IqType::Set(_) = iq.payload { - // We MUST answer unhandled set iqs with a service-unavailable error. - let error = StanzaError::new( - ErrorType::Cancel, - DefinedCondition::ServiceUnavailable, - "en", - "No handler defined for this kind of iq.", - ); - let iq = Iq::from_error(iq.id, error) - .with_to(iq.from.unwrap()) - .into(); - let _ = self.client.send_stanza(iq).await; - } - } else if stanza.is("message", "jabber:client") { - let message = Message::try_from(stanza).unwrap(); - let from = message.from.clone().unwrap(); - let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect(); - match message.get_best_body(langs) { - Some((_lang, body)) => match message.type_ { - MessageType::Groupchat => { - let event = Event::RoomMessage( - from.clone().into(), - FullJid::try_from(from.clone()).unwrap().resource, - body.clone(), - ); - events.push(event) - } - MessageType::Chat | MessageType::Normal => { - let event = - Event::ChatMessage(from.clone().into(), body.clone()); - events.push(event) - } - _ => (), - }, - None => (), - } - for child in message.payloads { - if child.is("event", ns::PUBSUB_EVENT) { - let new_events = pubsub::handle_event(&from, child, self).await; - events.extend(new_events); - } - } - } else if stanza.is("presence", "jabber:client") { - let presence = Presence::try_from(stanza).unwrap(); - let from: BareJid = match presence.from.clone().unwrap() { - Jid::Full(FullJid { node, domain, .. }) => BareJid { node, domain }, - Jid::Bare(bare) => bare, - }; - for payload in presence.payloads.into_iter() { - let muc_user = match MucUser::try_from(payload) { - Ok(muc_user) => muc_user, - _ => continue, - }; - for status in muc_user.status.into_iter() { - if status == Status::SelfPresence { - events.push(Event::RoomJoined(from.clone())); - break; - } - } - } - } else if stanza.is("error", "http://etherx.jabber.org/streams") { - println!("Received a fatal stream error: {}", String::from(&stanza)); + TokioXmppEvent::Stanza(elem) => { + if elem.is("iq", "jabber:client") { + let iq = Iq::try_from(elem).unwrap(); + self.handle_iq(iq).await; + } else if elem.is("message", "jabber:client") { + let message = Message::try_from(elem).unwrap(); + let new_events = self.handle_message(message).await; + events.extend(new_events); + } else if elem.is("presence", "jabber:client") { + let presence = Presence::try_from(elem).unwrap(); + let new_events = self.handle_presence(presence).await; + events.extend(new_events); + } else if elem.is("error", "http://etherx.jabber.org/streams") { + println!("Received a fatal stream error: {}", String::from(&elem)); } else { - panic!("Unknown stanza: {}", String::from(&stanza)); + panic!("Unknown stanza: {}", String::from(&elem)); } } }