diff --git a/examples/hello_bot.rs b/examples/hello_bot.rs index e21e1490..2d8aae6d 100644 --- a/examples/hello_bot.rs +++ b/examples/hello_bot.rs @@ -4,7 +4,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use futures::{Future, Stream, sync::mpsc}; +use futures::prelude::*; use std::env::args; use std::process::exit; use std::str::FromStr; @@ -26,18 +26,19 @@ fn main() { // tokio_core context let mut rt = Runtime::new().unwrap(); - let (value_tx, value_rx) = mpsc::unbounded(); // Client instance - let (client, mut agent) = ClientBuilder::new(jid, password) + let (mut agent, stream) = ClientBuilder::new(jid, password) .set_client(ClientType::Bot, "xmpp-rs") .set_website("https://gitlab.com/xmpp-rs/xmpp-rs") .enable_feature(ClientFeature::Avatars) .enable_feature(ClientFeature::ContactList) - .build(value_tx) + .build() .unwrap(); - let forwarder = value_rx.for_each(|evt: Event| { + // We return either Some(Error) if an error was encountered + // or None, if we were simply disconnected + let handler = stream.map_err(Some).for_each(|evt: Event| { match evt { Event::Online => { println!("Online."); @@ -46,7 +47,7 @@ fn main() { }, Event::Disconnected => { println!("Disconnected."); - return Err(()); + return Err(None); }, Event::ContactAdded(contact) => { println!("Contact {:?} added.", contact); @@ -66,19 +67,10 @@ fn main() { }, } Ok(()) - }) - .map_err(|e| println!("{:?}", e)); + }); - // Start polling - match rt.block_on(client - .select2(forwarder) - .map(|_| ()) - .map_err(|_| ()) - ) { - Ok(_) => (), - Err(e) => { - println!("Fatal: {:?}", e); - () - } - } + rt.block_on(handler).unwrap_or_else(|e| match e { + Some(e) => println!("Error: {:?}", e), + None => println!("Disconnected."), + }); } diff --git a/src/avatar.rs b/src/avatar.rs index 9368bc84..b0e1f444 100644 --- a/src/avatar.rs +++ b/src/avatar.rs @@ -4,7 +4,8 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use futures::{Sink, sync::mpsc}; +use crate::Event; +use futures::{sync::mpsc, Sink}; use std::fs::{create_dir_all, File}; use std::io::{self, Write}; use tokio_xmpp::Packet; @@ -19,7 +20,6 @@ use xmpp_parsers::{ }, Jid, TryFrom, }; -use crate::Event; pub(crate) fn handle_metadata_pubsub_event(from: &Jid, tx: &mut mpsc::UnboundedSender, items: Vec) { for item in items { @@ -43,16 +43,24 @@ fn download_avatar(from: &Jid) -> Iq { .with_to(from.clone()) } -pub(crate) fn handle_data_pubsub_iq(from: &Jid, tx: &mut mpsc::UnboundedSender, items: Items) { - for item in items.items { - if let Some(id) = item.id.clone() { - if let Some(payload) = &item.payload { +// The return value of this function will be simply pushed to a Vec in the caller function, +// so it makes no sense to allocate a Vec here - we're lazy instead +pub(crate) fn handle_data_pubsub_iq<'a>( + from: &'a Jid, + items: &'a Items, +) -> impl IntoIterator + 'a { + let from = from.clone(); + items + .items + .iter() + .filter_map(move |item| match (&item.id, &item.payload) { + (Some(id), Some(payload)) => { let data = Data::try_from(payload.clone()).unwrap(); - let filename = save_avatar(from, id.0, &data.data).unwrap(); - tx.unbounded_send(Event::AvatarRetrieved(from.clone(), filename)).unwrap(); + let filename = save_avatar(&from, id.0.clone(), &data.data).unwrap(); + Some(Event::AvatarRetrieved(from.clone(), filename)) } - } - } + _ => None, + }) } fn save_avatar(from: &Jid, id: String, data: &[u8]) -> io::Result { diff --git a/src/lib.rs b/src/lib.rs index aca578f6..0f0afe85 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,6 +34,8 @@ use xmpp_parsers::{ mod avatar; +pub type Error = tokio_xmpp::Error; + #[derive(Debug)] pub enum ClientType { Bot, @@ -135,19 +137,35 @@ impl ClientBuilder<'_> { presence } - pub fn build(self, mut app_tx: mpsc::UnboundedSender) -> Result<(Box>, Client), JidParseError> { + pub fn build( + self, + ) -> Result<(Agent, impl Stream), JidParseError> { + let client = TokioXmppClient::new(self.jid, self.password)?; + Ok(self.build_impl(client)) + } + + // This function is meant to be used for testing build + pub(crate) fn build_impl( + self, + stream: S, + ) -> (Agent, impl Stream) + where + S: Stream + + Sink, + { let disco = self.make_disco(); let node = self.website; let (sender_tx, sender_rx) = mpsc::unbounded(); - let client = TokioXmppClient::new(self.jid, self.password)?; + let client = stream; let (sink, stream) = client.split(); let reader = { let mut sender_tx = sender_tx.clone(); let jid = self.jid.to_owned(); - stream.for_each(move |event| { + stream.map(move |event| { // Helper function to send an iq error. + let mut events = Vec::new(); let send_error = |to, id, type_, condition, text: &str| { let error = StanzaError::new(type_, condition, "en", text); let iq = Iq::from_error(id, error) @@ -162,13 +180,13 @@ impl ClientBuilder<'_> { let packet = Packet::Stanza(presence); sender_tx.unbounded_send(packet) .unwrap(); - app_tx.unbounded_send(Event::Online).unwrap(); + events.push(Event::Online); let iq = Iq::from_get("roster", Roster { ver: None, items: vec![] }) .into(); sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap(); } TokioXmppEvent::Disconnected => { - app_tx.unbounded_send(Event::Disconnected).unwrap(); + events.push(Event::Disconnected); } TokioXmppEvent::Stanza(stanza) => { if stanza.is("iq", "jabber:client") { @@ -197,7 +215,7 @@ impl ClientBuilder<'_> { if payload.is("query", ns::ROSTER) { let roster = Roster::try_from(payload).unwrap(); for item in roster.items.into_iter() { - app_tx.unbounded_send(Event::ContactAdded(item)).unwrap(); + events.push(Event::ContactAdded(item)); } } else if payload.is("pubsub", ns::PUBSUB) { let pubsub = PubSub::try_from(payload).unwrap(); @@ -205,7 +223,8 @@ impl ClientBuilder<'_> { iq.from.clone().unwrap_or(Jid::from_str(&jid).unwrap()); if let PubSub::Items(items) = pubsub { if items.node.0 == ns::AVATAR_DATA { - avatar::handle_data_pubsub_iq(&from, &mut app_tx, items); + let new_events = avatar::handle_data_pubsub_iq(&from, &items); + events.extend(new_events); } } } @@ -236,7 +255,7 @@ impl ClientBuilder<'_> { }; for status in muc_user.status.into_iter() { if status == Status::SelfPresence { - app_tx.unbounded_send(Event::RoomJoined(from.clone())).unwrap(); + events.push(Event::RoomJoined(from.clone())); break; } } @@ -249,8 +268,9 @@ impl ClientBuilder<'_> { } } - Ok(()) + futures::stream::iter_ok(events) }) + .flatten() }; let sender = sender_rx @@ -259,25 +279,45 @@ impl ClientBuilder<'_> { .map(|(rx, mut sink)| { drop(rx); let _ = sink.close(); + None }); - let future = reader.select(sender) - .map(|_| ()) - .map_err(|_| ()); + // TODO is this correct? + // Some(Error) means a real error + // None means the end of the sender stream and can be ignored + let future = reader + .map(Some) + .select(sender.into_stream()) + .filter_map(|x| x); - let agent = Client { - sender_tx, - }; + let agent = Agent { sender_tx }; - Ok((Box::new(future), agent)) + (agent, future) } } pub struct Client { sender_tx: mpsc::UnboundedSender, + stream: Box>, } impl Client { + pub fn get_agent(&self) -> Agent { + Agent { + sender_tx: self.sender_tx.clone(), + } + } + + pub fn listen(self) -> Box> { + self.stream + } +} + +pub struct Agent { + sender_tx: mpsc::UnboundedSender, +} + +impl Agent { pub fn join_room(&mut self, room: Jid, lang: &str, status: &str) { let mut presence = Presence::new(PresenceType::None) .with_to(Some(room))