From 379d3cfbe69bea15292f5ff0dedfb18362f4eed6 Mon Sep 17 00:00:00 2001 From: "xmppftw@kl.netlib.re" Date: Sat, 30 Dec 2023 00:13:25 +0100 Subject: [PATCH] Move wait_for_events to event_loop module --- xmpp/src/event_loop.rs | 77 ++++++++++++++++++++++++++++++++++++++++++ xmpp/src/lib.rs | 62 +++------------------------------- 2 files changed, 81 insertions(+), 58 deletions(-) create mode 100644 xmpp/src/event_loop.rs diff --git a/xmpp/src/event_loop.rs b/xmpp/src/event_loop.rs new file mode 100644 index 0000000..f28649a --- /dev/null +++ b/xmpp/src/event_loop.rs @@ -0,0 +1,77 @@ +// Copyright (c) 2023 xmpp-rs contributors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use futures::StreamExt; +use tokio_xmpp::{ + parsers::{ + disco::DiscoInfoQuery, iq::Iq, message::Message, presence::Presence, roster::Roster, + }, + Event as TokioXmppEvent, +}; + +use crate::{iq, message, presence, Agent, Event}; + +/// Wait for new events. +/// +/// # Returns +/// +/// - `Some(events)` if there are new events; multiple may be returned at once. +/// - `None` if the underlying stream is closed. +pub async fn wait_for_events(agent: &mut Agent) -> Option> { + if let Some(event) = agent.client.next().await { + let mut events = Vec::new(); + + match event { + TokioXmppEvent::Online { resumed: false, .. } => { + let presence = Agent::make_initial_presence(&agent.disco, &agent.node).into(); + let _ = agent.client.send_stanza(presence).await; + events.push(Event::Online); + // TODO: only send this when the ContactList feature is enabled. + let iq = Iq::from_get( + "roster", + Roster { + ver: None, + items: vec![], + }, + ) + .into(); + let _ = agent.client.send_stanza(iq).await; + + // Query account disco to know what bookmarks spec is used + let iq = Iq::from_get("disco-account", DiscoInfoQuery { node: None }).into(); + let _ = agent.client.send_stanza(iq).await; + agent.awaiting_disco_bookmarks_type = true; + } + TokioXmppEvent::Online { resumed: true, .. } => {} + TokioXmppEvent::Disconnected(e) => { + events.push(Event::Disconnected(e)); + } + TokioXmppEvent::Stanza(elem) => { + if elem.is("iq", "jabber:client") { + let iq = Iq::try_from(elem).unwrap(); + let new_events = iq::handle_iq(agent, iq).await; + events.extend(new_events); + } else if elem.is("message", "jabber:client") { + let message = Message::try_from(elem).unwrap(); + let new_events = message::handle_message(agent, message).await; + events.extend(new_events); + } else if elem.is("presence", "jabber:client") { + let presence = Presence::try_from(elem).unwrap(); + let new_events = presence::handle_presence(agent, presence).await; + events.extend(new_events); + } else if elem.is("error", "http://etherx.jabber.org/streams") { + println!("Received a fatal stream error: {}", String::from(&elem)); + } else { + panic!("Unknown stanza: {}", String::from(&elem)); + } + } + } + + Some(events) + } else { + None + } +} diff --git a/xmpp/src/lib.rs b/xmpp/src/lib.rs index 3c65fdb..d5388fd 100644 --- a/xmpp/src/lib.rs +++ b/xmpp/src/lib.rs @@ -6,21 +6,18 @@ #![deny(bare_trait_objects)] -use futures::stream::StreamExt; use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; pub use tokio_xmpp::parsers; use tokio_xmpp::parsers::{ caps::{compute_disco, hash_caps, Caps}, - disco::{DiscoInfoQuery, DiscoInfoResult}, + disco::DiscoInfoResult, hashes::Algo, - iq::Iq, message::{Body, Message, MessageType}, muc::{user::MucUser, Muc}, presence::{Presence, Type as PresenceType}, - roster::Roster, }; -use tokio_xmpp::{AsyncClient as TokioXmppClient, Event as TokioXmppEvent}; +use tokio_xmpp::AsyncClient as TokioXmppClient; pub use tokio_xmpp::{BareJid, Element, FullJid, Jid}; #[macro_use] extern crate log; @@ -28,6 +25,7 @@ extern crate log; pub mod builder; pub mod disco; pub mod event; +pub mod event_loop; pub mod feature; pub mod iq; pub mod message; @@ -177,59 +175,7 @@ impl Agent { /// - `Some(events)` if there are new events; multiple may be returned at once. /// - `None` if the underlying stream is closed. pub async fn wait_for_events(&mut self) -> Option> { - if let Some(event) = self.client.next().await { - let mut events = Vec::new(); - - match event { - TokioXmppEvent::Online { resumed: false, .. } => { - let presence = Self::make_initial_presence(&self.disco, &self.node).into(); - let _ = self.client.send_stanza(presence).await; - events.push(Event::Online); - // TODO: only send this when the ContactList feature is enabled. - let iq = Iq::from_get( - "roster", - Roster { - ver: None, - items: vec![], - }, - ) - .into(); - let _ = self.client.send_stanza(iq).await; - - // Query account disco to know what bookmarks spec is used - let iq = Iq::from_get("disco-account", DiscoInfoQuery { node: None }).into(); - let _ = self.client.send_stanza(iq).await; - self.awaiting_disco_bookmarks_type = true; - } - TokioXmppEvent::Online { resumed: true, .. } => {} - TokioXmppEvent::Disconnected(e) => { - events.push(Event::Disconnected(e)); - } - TokioXmppEvent::Stanza(elem) => { - if elem.is("iq", "jabber:client") { - let iq = Iq::try_from(elem).unwrap(); - let new_events = iq::handle_iq(self, iq).await; - events.extend(new_events); - } else if elem.is("message", "jabber:client") { - let message = Message::try_from(elem).unwrap(); - let new_events = message::handle_message(self, message).await; - events.extend(new_events); - } else if elem.is("presence", "jabber:client") { - let presence = Presence::try_from(elem).unwrap(); - let new_events = presence::handle_presence(self, presence).await; - events.extend(new_events); - } else if elem.is("error", "http://etherx.jabber.org/streams") { - println!("Received a fatal stream error: {}", String::from(&elem)); - } else { - panic!("Unknown stanza: {}", String::from(&elem)); - } - } - } - - Some(events) - } else { - None - } + event_loop::wait_for_events(self).await } pub async fn upload_file_with(&mut self, service: &str, path: &Path) {