diff --git a/xmpp-rs/Cargo.toml b/xmpp-rs/Cargo.toml index 30541f62..19c81cac 100644 --- a/xmpp-rs/Cargo.toml +++ b/xmpp-rs/Cargo.toml @@ -14,10 +14,10 @@ license = "MPL-2.0" edition = "2018" [dependencies] -tokio-xmpp = "1.0.1" +tokio-xmpp = "2.0.0" xmpp-parsers = "0.17" -futures = "0.1" -tokio = "0.1" +futures = "0.3" +tokio = "0.2" log = "0.4" [features] diff --git a/xmpp-rs/examples/hello_bot.rs b/xmpp-rs/examples/hello_bot.rs index 42ad0d14..abce4b11 100644 --- a/xmpp-rs/examples/hello_bot.rs +++ b/xmpp-rs/examples/hello_bot.rs @@ -4,27 +4,22 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use futures::prelude::*; use std::env::args; -use std::process::exit; -use tokio::runtime::current_thread::Runtime; use xmpp::{ClientBuilder, ClientFeature, ClientType, Event}; use xmpp_parsers::{message::MessageType, Jid}; -fn main() { +#[tokio::main] +async fn main() -> Result<(), Option<()>> { let args: Vec = args().collect(); if args.len() != 3 { println!("Usage: {} ", args[0]); - exit(1); + return Err(None); } let jid = &args[1]; let password = &args[2]; - // tokio_core context - let mut rt = Runtime::new().unwrap(); - // Client instance - let (mut agent, stream) = ClientBuilder::new(jid, password) + let mut client = ClientBuilder::new(jid, password) .set_client(ClientType::Bot, "xmpp-rs") .set_website("https://gitlab.com/xmpp-rs/xmpp-rs") .set_default_nick("bot") @@ -34,58 +29,58 @@ fn main() { .build() .unwrap(); - // We return either Some(Error) if an error was encountered - // or None, if we were simply disconnected - let handler = stream.map_err(Some).for_each(|evt: Event| { - match evt { - Event::Online => { - println!("Online."); - } - Event::Disconnected => { - println!("Disconnected."); - return Err(None); - } - Event::ContactAdded(contact) => { - println!("Contact {} added.", contact.jid); - } - Event::ContactRemoved(contact) => { - println!("Contact {} removed.", contact.jid); - } - Event::ContactChanged(contact) => { - println!("Contact {} changed.", contact.jid); - } - Event::JoinRoom(jid, conference) => { - println!("Joining room {} ({:?})…", jid, conference.name); - agent.join_room( - jid, - conference.nick, - conference.password, - "en", - "Yet another bot!", - ); - } - Event::LeaveRoom(jid) => { - println!("Leaving room {}…", jid); - } - Event::LeaveAllRooms => { - println!("Leaving all rooms…"); - } - Event::RoomJoined(jid) => { - println!("Joined room {}.", jid); - agent.send_message(Jid::Bare(jid), MessageType::Groupchat, "en", "Hello world!"); - } - Event::RoomLeft(jid) => { - println!("Left room {}.", jid); - } - Event::AvatarRetrieved(jid, path) => { - println!("Received avatar for {} in {}.", jid, path); + while let Some(events) = client.wait_for_events().await { + for event in events { + match event { + Event::Online => { + println!("Online."); + } + Event::Disconnected => { + println!("Disconnected"); + return Err(None); + } + Event::ContactAdded(contact) => { + println!("Contact {} added.", contact.jid); + } + Event::ContactRemoved(contact) => { + println!("Contact {} removed.", contact.jid); + } + Event::ContactChanged(contact) => { + println!("Contact {} changed.", contact.jid); + } + Event::JoinRoom(jid, conference) => { + println!("Joining room {} ({:?})…", jid, conference.name); + client + .join_room( + jid, + conference.nick, + conference.password, + "en", + "Yet another bot!", + ) + .await; + } + Event::LeaveRoom(jid) => { + println!("Leaving room {}…", jid); + } + Event::LeaveAllRooms => { + println!("Leaving all rooms…"); + } + Event::RoomJoined(jid) => { + println!("Joined room {}.", jid); + client + .send_message(Jid::Bare(jid), MessageType::Groupchat, "en", "Hello world!") + .await; + } + Event::RoomLeft(jid) => { + println!("Left room {}.", jid); + } + Event::AvatarRetrieved(jid, path) => { + println!("Received avatar for {} in {}.", jid, path); + } } } - Ok(()) - }); + } - rt.block_on(handler).unwrap_or_else(|e| match e { - Some(e) => println!("Error: {:?}", e), - None => println!("Disconnected."), - }); + Ok(()) } diff --git a/xmpp-rs/src/lib.rs b/xmpp-rs/src/lib.rs index ee0277ff..8f7d1790 100644 --- a/xmpp-rs/src/lib.rs +++ b/xmpp-rs/src/lib.rs @@ -6,12 +6,11 @@ #![deny(bare_trait_objects)] -use futures::{sync::mpsc, Future, Sink, Stream}; +use futures::stream::StreamExt; use std::cell::RefCell; use std::convert::TryFrom; use std::rc::Rc; -use std::str::FromStr; -use tokio_xmpp::{Client as TokioXmppClient, Event as TokioXmppEvent, Packet}; +use tokio_xmpp::{Client as TokioXmppClient, Event as TokioXmppEvent}; use xmpp_parsers::{ bookmarks2::Conference, caps::{compute_disco, hash_caps, Caps}, @@ -28,7 +27,7 @@ use xmpp_parsers::{ pubsub::pubsub::{Items, PubSub}, roster::{Item as RosterItem, Roster}, stanza_error::{DefinedCondition, ErrorType, StanzaError}, - BareJid, FullJid, Jid, JidParseError, + BareJid, FullJid, Jid, }; #[macro_use] extern crate log; @@ -149,225 +148,36 @@ impl ClientBuilder<'_> { } } - fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence { - let caps_data = compute_disco(disco); - let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap(); - let caps = Caps::new(node, hash); - - let mut presence = Presence::new(PresenceType::None); - presence.add_payload(caps); - presence - } - - pub fn build( - self, - ) -> Result<(Agent, impl Stream), JidParseError> { + pub fn build(self) -> Result { let client = TokioXmppClient::new(self.jid, self.password)?; - let (sender_tx, sender_rx) = mpsc::unbounded(); - Ok(self.build_impl(client, sender_tx, sender_rx)?) + Ok(self.build_impl(client)?) } // This function is meant to be used for testing build - pub(crate) fn build_impl( - self, - stream: S, - sender_tx: mpsc::UnboundedSender, - sender_rx: mpsc::UnboundedReceiver, - ) -> Result<(Agent, impl Stream), JidParseError> - where - S: Stream - + Sink, - { + pub(crate) fn build_impl(self, client: TokioXmppClient) -> Result { let disco = self.make_disco(); let node = self.website; - let client = stream; - let (sink, stream) = client.split(); - - let reader = { - let mut sender_tx = sender_tx.clone(); - let jid = self.jid.to_owned(); - stream - .map(move |event| { - // Helper function to send an iq error. - let mut events = Vec::new(); - let send_error = |to, id, type_, condition, text: &str| { - let error = StanzaError::new(type_, condition, "en", text); - let iq = Iq::from_error(id, error).with_to(to).into(); - sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap(); - }; - - match event { - TokioXmppEvent::Online(_) => { - let presence = - ClientBuilder::make_initial_presence(&disco, &node).into(); - let packet = Packet::Stanza(presence); - sender_tx.unbounded_send(packet).unwrap(); - events.push(Event::Online); - // TODO: only send this when the ContactList feature is enabled. - let iq = Iq::from_get( - "roster", - Roster { - ver: None, - items: vec![], - }, - ) - .into(); - sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap(); - // TODO: only send this when the JoinRooms feature is enabled. - let iq = Iq::from_get( - "bookmarks", - PubSub::Items(Items::new(ns::BOOKMARKS2)), - ) - .into(); - sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap(); - } - TokioXmppEvent::Disconnected => { - events.push(Event::Disconnected); - } - TokioXmppEvent::Stanza(stanza) => { - if stanza.is("iq", "jabber:client") { - let iq = Iq::try_from(stanza).unwrap(); - let from = iq - .from - .clone() - .unwrap_or_else(|| Jid::from_str(&jid).unwrap()); - if let IqType::Get(payload) = iq.payload { - if payload.is("query", ns::DISCO_INFO) { - let query = DiscoInfoQuery::try_from(payload); - match query { - Ok(query) => { - let mut disco_info = disco.clone(); - disco_info.node = query.node; - let iq = Iq::from_result(iq.id, Some(disco_info)) - .with_to(iq.from.unwrap()) - .into(); - sender_tx - .unbounded_send(Packet::Stanza(iq)) - .unwrap(); - } - Err(err) => { - send_error( - iq.from.unwrap(), - iq.id, - ErrorType::Modify, - DefinedCondition::BadRequest, - &format!("{}", err), - ); - } - } - } else { - // We MUST answer unhandled get iqs with a service-unavailable error. - send_error( - iq.from.unwrap(), - iq.id, - ErrorType::Cancel, - DefinedCondition::ServiceUnavailable, - "No handler defined for this kind of iq.", - ); - } - } else if let IqType::Result(Some(payload)) = iq.payload { - // TODO: move private iqs like this one somewhere else, for - // security reasons. - if payload.is("query", ns::ROSTER) && iq.from.is_none() { - let roster = Roster::try_from(payload).unwrap(); - for item in roster.items.into_iter() { - events.push(Event::ContactAdded(item)); - } - } else if payload.is("pubsub", ns::PUBSUB) { - let new_events = pubsub::handle_iq_result(&from, payload); - events.extend(new_events); - } - } else if let IqType::Set(_) = iq.payload { - // We MUST answer unhandled set iqs with a service-unavailable error. - send_error( - iq.from.unwrap(), - iq.id, - ErrorType::Cancel, - DefinedCondition::ServiceUnavailable, - "No handler defined for this kind of iq.", - ); - } - } else if stanza.is("message", "jabber:client") { - let message = Message::try_from(stanza).unwrap(); - let from = message.from.clone().unwrap(); - for child in message.payloads { - if child.is("event", ns::PUBSUB_EVENT) { - let new_events = - pubsub::handle_event(&from, child, &mut sender_tx); - events.extend(new_events); - } - } - } else if stanza.is("presence", "jabber:client") { - let presence = Presence::try_from(stanza).unwrap(); - let from: BareJid = match presence.from.clone().unwrap() { - Jid::Full(FullJid { node, domain, .. }) => { - BareJid { node, domain } - } - Jid::Bare(bare) => bare, - }; - for payload in presence.payloads.into_iter() { - let muc_user = match MucUser::try_from(payload) { - Ok(muc_user) => muc_user, - _ => continue, - }; - for status in muc_user.status.into_iter() { - if status == Status::SelfPresence { - events.push(Event::RoomJoined(from.clone())); - break; - } - } - } - } else if stanza.is("error", "http://etherx.jabber.org/streams") { - println!( - "Received a fatal stream error: {}", - String::from(&stanza) - ); - } else { - panic!("Unknown stanza: {}", String::from(&stanza)); - } - } - } - - futures::stream::iter_ok(events) - }) - .flatten() - }; - - let sender = sender_rx - .map_err(|e| panic!("Sink error: {:?}", e)) - .forward(sink) - .map(|(rx, mut sink)| { - drop(rx); - let _ = sink.close(); - None - }); - - // TODO is this correct? - // Some(Error) means a real error - // None means the end of the sender stream and can be ignored - let future = reader - .map(Some) - .select(sender.into_stream()) - .filter_map(|x| x); - let agent = Agent { - sender_tx, + client, default_nick: Rc::new(RefCell::new(self.default_nick)), + disco, + node, }; - Ok((agent, future)) + Ok(agent) } } -#[derive(Clone, Debug)] pub struct Agent { - sender_tx: mpsc::UnboundedSender, + client: TokioXmppClient, default_nick: Rc>, + disco: DiscoInfoResult, + node: String, } impl Agent { - pub fn join_room( + pub async fn join_room( &mut self, room: BareJid, nick: Option, @@ -385,39 +195,181 @@ impl Agent { let mut presence = Presence::new(PresenceType::None).with_to(Jid::Full(room_jid)); presence.add_payload(muc); presence.set_status(String::from(lang), String::from(status)); - let presence = presence.into(); - self.sender_tx - .unbounded_send(Packet::Stanza(presence)) - .unwrap(); + let _ = self.client.send_stanza(presence.into()).await; } - pub fn send_message(&mut self, recipient: Jid, type_: MessageType, lang: &str, text: &str) { + pub async fn send_message( + &mut self, + recipient: Jid, + type_: MessageType, + lang: &str, + text: &str, + ) { let mut message = Message::new(Some(recipient)); message.type_ = type_; message .bodies .insert(String::from(lang), Body(String::from(text))); - let message = message.into(); - self.sender_tx - .unbounded_send(Packet::Stanza(message)) - .unwrap(); + let _ = self.client.send_stanza(message.into()).await; + } + + fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence { + let caps_data = compute_disco(disco); + let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap(); + let caps = Caps::new(node, hash); + + let mut presence = Presence::new(PresenceType::None); + presence.add_payload(caps); + presence + } + + pub async fn wait_for_events(&mut self) -> Option> { + if let Some(event) = self.client.next().await { + let mut events = Vec::new(); + + match event { + TokioXmppEvent::Online(_) => { + let presence = Self::make_initial_presence(&self.disco, &self.node).into(); + let _ = self.client.send_stanza(presence).await; + events.push(Event::Online); + // TODO: only send this when the ContactList feature is enabled. + let iq = Iq::from_get( + "roster", + Roster { + ver: None, + items: vec![], + }, + ) + .into(); + let _ = self.client.send_stanza(iq).await; + // TODO: only send this when the JoinRooms feature is enabled. + let iq = + Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2))).into(); + let _ = self.client.send_stanza(iq).await; + } + TokioXmppEvent::Disconnected(_) => { + events.push(Event::Disconnected); + } + TokioXmppEvent::Stanza(stanza) => { + if stanza.is("iq", "jabber:client") { + let iq = Iq::try_from(stanza).unwrap(); + let from = iq + .from + .clone() + .unwrap_or_else(|| self.client.bound_jid().unwrap().clone()); + if let IqType::Get(payload) = iq.payload { + if payload.is("query", ns::DISCO_INFO) { + let query = DiscoInfoQuery::try_from(payload); + match query { + Ok(query) => { + let mut disco_info = self.disco.clone(); + disco_info.node = query.node; + let iq = Iq::from_result(iq.id, Some(disco_info)) + .with_to(iq.from.unwrap()) + .into(); + let _ = self.client.send_stanza(iq).await; + } + Err(err) => { + let error = StanzaError::new( + ErrorType::Modify, + DefinedCondition::BadRequest, + "en", + &format!("{}", err), + ); + let iq = Iq::from_error(iq.id, error) + .with_to(iq.from.unwrap()) + .into(); + let _ = self.client.send_stanza(iq).await; + } + } + } else { + // We MUST answer unhandled get iqs with a service-unavailable error. + let error = StanzaError::new( + ErrorType::Cancel, + DefinedCondition::ServiceUnavailable, + "en", + "No handler defined for this kind of iq.", + ); + let iq = Iq::from_error(iq.id, error) + .with_to(iq.from.unwrap()) + .into(); + let _ = self.client.send_stanza(iq).await; + } + } else if let IqType::Result(Some(payload)) = iq.payload { + // TODO: move private iqs like this one somewhere else, for + // security reasons. + if payload.is("query", ns::ROSTER) && iq.from.is_none() { + let roster = Roster::try_from(payload).unwrap(); + for item in roster.items.into_iter() { + events.push(Event::ContactAdded(item)); + } + } else if payload.is("pubsub", ns::PUBSUB) { + let new_events = pubsub::handle_iq_result(&from, payload); + events.extend(new_events); + } + } else if let IqType::Set(_) = iq.payload { + // We MUST answer unhandled set iqs with a service-unavailable error. + let error = StanzaError::new( + ErrorType::Cancel, + DefinedCondition::ServiceUnavailable, + "en", + "No handler defined for this kind of iq.", + ); + let iq = Iq::from_error(iq.id, error) + .with_to(iq.from.unwrap()) + .into(); + let _ = self.client.send_stanza(iq).await; + } + } else if stanza.is("message", "jabber:client") { + let message = Message::try_from(stanza).unwrap(); + let from = message.from.clone().unwrap(); + for child in message.payloads { + if child.is("event", ns::PUBSUB_EVENT) { + let new_events = pubsub::handle_event(&from, child, self).await; + events.extend(new_events); + } + } + } else if stanza.is("presence", "jabber:client") { + let presence = Presence::try_from(stanza).unwrap(); + let from: BareJid = match presence.from.clone().unwrap() { + Jid::Full(FullJid { node, domain, .. }) => BareJid { node, domain }, + Jid::Bare(bare) => bare, + }; + for payload in presence.payloads.into_iter() { + let muc_user = match MucUser::try_from(payload) { + Ok(muc_user) => muc_user, + _ => continue, + }; + for status in muc_user.status.into_iter() { + if status == Status::SelfPresence { + events.push(Event::RoomJoined(from.clone())); + break; + } + } + } + } else if stanza.is("error", "http://etherx.jabber.org/streams") { + println!("Received a fatal stream error: {}", String::from(&stanza)); + } else { + panic!("Unknown stanza: {}", String::from(&stanza)); + } + } + } + + Some(events) + } else { + None + } } } #[cfg(test)] mod tests { use super::{Agent, ClientBuilder, ClientFeature, ClientType, Event}; - use futures::prelude::*; - use futures::sync::mpsc; - use tokio::runtime::current_thread::Runtime; use tokio_xmpp::Client as TokioXmppClient; - #[test] - fn test_simple() { - // tokio_core context - let mut rt = Runtime::new().unwrap(); + #[tokio::test] + async fn test_simple() { let client = TokioXmppClient::new("foo@bar", "meh").unwrap(); - let (sender_tx, sender_rx) = mpsc::unbounded(); // Client instance let client_builder = ClientBuilder::new("foo@bar", "meh") @@ -427,16 +379,15 @@ mod tests { .enable_feature(ClientFeature::Avatars) .enable_feature(ClientFeature::ContactList); - let (_agent, stream): (Agent, _) = client_builder - .build_impl(client, sender_tx.clone(), sender_rx) - .unwrap(); + let mut agent: Agent = client_builder.build_impl(client).unwrap(); - let handler = stream.map_err(Some).for_each(|_evt: Event| { - return Err(None); - }); - - rt.block_on(handler).unwrap_or_else(|e| match e { - _ => (), - }); + while let Some(events) = agent.wait_for_events().await { + assert!(match events[0] { + Event::Disconnected => true, + _ => false, + }); + assert_eq!(events.len(), 1); + break; + } } } diff --git a/xmpp-rs/src/pubsub/avatar.rs b/xmpp-rs/src/pubsub/avatar.rs index bdfcceae..54a5db79 100644 --- a/xmpp-rs/src/pubsub/avatar.rs +++ b/xmpp-rs/src/pubsub/avatar.rs @@ -4,12 +4,11 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. +use super::Agent; use crate::Event; -use futures::{sync::mpsc, Sink}; use std::convert::TryFrom; use std::fs::{self, File}; use std::io::{self, Write}; -use tokio_xmpp::Packet; use xmpp_parsers::{ avatar::{Data, Metadata}, iq::Iq, @@ -22,11 +21,11 @@ use xmpp_parsers::{ Jid, }; -pub(crate) fn handle_metadata_pubsub_event( +pub(crate) async fn handle_metadata_pubsub_event( from: &Jid, - tx: &mut mpsc::UnboundedSender, + agent: &mut Agent, items: Vec, -) -> impl IntoIterator { +) -> Vec { let mut events = Vec::new(); for item in items { let payload = item.payload.clone().unwrap(); @@ -43,7 +42,7 @@ pub(crate) fn handle_metadata_pubsub_event( events.push(Event::AvatarRetrieved(from.clone(), filename)); } else { let iq = download_avatar(from); - tx.start_send(Packet::Stanza(iq.into())).unwrap(); + let _ = agent.client.send_stanza(iq.into()).await; } } } diff --git a/xmpp-rs/src/pubsub/mod.rs b/xmpp-rs/src/pubsub/mod.rs index f7cb91e7..f0a4b753 100644 --- a/xmpp-rs/src/pubsub/mod.rs +++ b/xmpp-rs/src/pubsub/mod.rs @@ -4,11 +4,10 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. +use super::Agent; use crate::Event; -use futures::sync::mpsc; use std::convert::TryFrom; use std::str::FromStr; -use tokio_xmpp::Packet; use xmpp_parsers::{ bookmarks2::{Autojoin, Conference}, ns, @@ -20,11 +19,7 @@ use xmpp_parsers::{ #[cfg(feature = "avatars")] pub(crate) mod avatar; -pub(crate) fn handle_event( - from: &Jid, - elem: Element, - mut tx: &mut mpsc::UnboundedSender, -) -> impl IntoIterator { +pub(crate) async fn handle_event(from: &Jid, elem: Element, agent: &mut Agent) -> Vec { let mut events = Vec::new(); let event = PubSubEvent::try_from(elem); trace!("PubSub event: {:#?}", event); @@ -33,7 +28,8 @@ pub(crate) fn handle_event( match node.0 { #[cfg(feature = "avatars")] ref node if node == ns::AVATAR_METADATA => { - let new_events = avatar::handle_metadata_pubsub_event(&from, &mut tx, items); + let new_events = + avatar::handle_metadata_pubsub_event(&from, agent, items).await; events.extend(new_events); } ref node if node == ns::BOOKMARKS2 => {