Simplify the API by removing explicit channels.

This commit is contained in:
Marcin Mielniczuk 2019-07-05 18:59:05 +02:00
parent 08af035eb1
commit 3f056813ed
No known key found for this signature in database
GPG key ID: 2CF0CE66660B8CC9
3 changed files with 86 additions and 46 deletions

View file

@ -4,7 +4,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use futures::{Future, Stream, sync::mpsc};
use futures::prelude::*;
use std::env::args;
use std::process::exit;
use std::str::FromStr;
@ -26,18 +26,19 @@ fn main() {
// tokio_core context
let mut rt = Runtime::new().unwrap();
let (value_tx, value_rx) = mpsc::unbounded();
// Client instance
let (client, mut agent) = ClientBuilder::new(jid, password)
let (mut agent, stream) = ClientBuilder::new(jid, password)
.set_client(ClientType::Bot, "xmpp-rs")
.set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
.enable_feature(ClientFeature::Avatars)
.enable_feature(ClientFeature::ContactList)
.build(value_tx)
.build()
.unwrap();
let forwarder = value_rx.for_each(|evt: Event| {
// We return either Some(Error) if an error was encountered
// or None, if we were simply disconnected
let handler = stream.map_err(Some).for_each(|evt: Event| {
match evt {
Event::Online => {
println!("Online.");
@ -46,7 +47,7 @@ fn main() {
},
Event::Disconnected => {
println!("Disconnected.");
return Err(());
return Err(None);
},
Event::ContactAdded(contact) => {
println!("Contact {:?} added.", contact);
@ -66,19 +67,10 @@ fn main() {
},
}
Ok(())
})
.map_err(|e| println!("{:?}", e));
});
// Start polling
match rt.block_on(client
.select2(forwarder)
.map(|_| ())
.map_err(|_| ())
) {
Ok(_) => (),
Err(e) => {
println!("Fatal: {:?}", e);
()
}
}
rt.block_on(handler).unwrap_or_else(|e| match e {
Some(e) => println!("Error: {:?}", e),
None => println!("Disconnected."),
});
}

View file

@ -4,7 +4,8 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use futures::{Sink, sync::mpsc};
use crate::Event;
use futures::{sync::mpsc, Sink};
use std::fs::{create_dir_all, File};
use std::io::{self, Write};
use tokio_xmpp::Packet;
@ -19,7 +20,6 @@ use xmpp_parsers::{
},
Jid, TryFrom,
};
use crate::Event;
pub(crate) fn handle_metadata_pubsub_event(from: &Jid, tx: &mut mpsc::UnboundedSender<Packet>, items: Vec<Item>) {
for item in items {
@ -43,16 +43,24 @@ fn download_avatar(from: &Jid) -> Iq {
.with_to(from.clone())
}
pub(crate) fn handle_data_pubsub_iq(from: &Jid, tx: &mut mpsc::UnboundedSender<Event>, items: Items) {
for item in items.items {
if let Some(id) = item.id.clone() {
if let Some(payload) = &item.payload {
// The return value of this function will be simply pushed to a Vec in the caller function,
// so it makes no sense to allocate a Vec here - we're lazy instead
pub(crate) fn handle_data_pubsub_iq<'a>(
from: &'a Jid,
items: &'a Items,
) -> impl IntoIterator<Item = Event> + 'a {
let from = from.clone();
items
.items
.iter()
.filter_map(move |item| match (&item.id, &item.payload) {
(Some(id), Some(payload)) => {
let data = Data::try_from(payload.clone()).unwrap();
let filename = save_avatar(from, id.0, &data.data).unwrap();
tx.unbounded_send(Event::AvatarRetrieved(from.clone(), filename)).unwrap();
let filename = save_avatar(&from, id.0.clone(), &data.data).unwrap();
Some(Event::AvatarRetrieved(from.clone(), filename))
}
}
}
_ => None,
})
}
fn save_avatar(from: &Jid, id: String, data: &[u8]) -> io::Result<String> {

View file

@ -34,6 +34,8 @@ use xmpp_parsers::{
mod avatar;
pub type Error = tokio_xmpp::Error;
#[derive(Debug)]
pub enum ClientType {
Bot,
@ -135,19 +137,35 @@ impl ClientBuilder<'_> {
presence
}
pub fn build(self, mut app_tx: mpsc::UnboundedSender<Event>) -> Result<(Box<Future<Item = (), Error = ()>>, Client), JidParseError> {
pub fn build(
self,
) -> Result<(Agent, impl Stream<Item = Event, Error = tokio_xmpp::Error>), JidParseError> {
let client = TokioXmppClient::new(self.jid, self.password)?;
Ok(self.build_impl(client))
}
// This function is meant to be used for testing build
pub(crate) fn build_impl<S>(
self,
stream: S,
) -> (Agent, impl Stream<Item = Event, Error = tokio_xmpp::Error>)
where
S: Stream<Item = tokio_xmpp::Event, Error = tokio_xmpp::Error>
+ Sink<SinkItem = tokio_xmpp::Packet, SinkError = tokio_xmpp::Error>,
{
let disco = self.make_disco();
let node = self.website;
let (sender_tx, sender_rx) = mpsc::unbounded();
let client = TokioXmppClient::new(self.jid, self.password)?;
let client = stream;
let (sink, stream) = client.split();
let reader = {
let mut sender_tx = sender_tx.clone();
let jid = self.jid.to_owned();
stream.for_each(move |event| {
stream.map(move |event| {
// Helper function to send an iq error.
let mut events = Vec::new();
let send_error = |to, id, type_, condition, text: &str| {
let error = StanzaError::new(type_, condition, "en", text);
let iq = Iq::from_error(id, error)
@ -162,13 +180,13 @@ impl ClientBuilder<'_> {
let packet = Packet::Stanza(presence);
sender_tx.unbounded_send(packet)
.unwrap();
app_tx.unbounded_send(Event::Online).unwrap();
events.push(Event::Online);
let iq = Iq::from_get("roster", Roster { ver: None, items: vec![] })
.into();
sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
}
TokioXmppEvent::Disconnected => {
app_tx.unbounded_send(Event::Disconnected).unwrap();
events.push(Event::Disconnected);
}
TokioXmppEvent::Stanza(stanza) => {
if stanza.is("iq", "jabber:client") {
@ -197,7 +215,7 @@ impl ClientBuilder<'_> {
if payload.is("query", ns::ROSTER) {
let roster = Roster::try_from(payload).unwrap();
for item in roster.items.into_iter() {
app_tx.unbounded_send(Event::ContactAdded(item)).unwrap();
events.push(Event::ContactAdded(item));
}
} else if payload.is("pubsub", ns::PUBSUB) {
let pubsub = PubSub::try_from(payload).unwrap();
@ -205,7 +223,8 @@ impl ClientBuilder<'_> {
iq.from.clone().unwrap_or(Jid::from_str(&jid).unwrap());
if let PubSub::Items(items) = pubsub {
if items.node.0 == ns::AVATAR_DATA {
avatar::handle_data_pubsub_iq(&from, &mut app_tx, items);
let new_events = avatar::handle_data_pubsub_iq(&from, &items);
events.extend(new_events);
}
}
}
@ -236,7 +255,7 @@ impl ClientBuilder<'_> {
};
for status in muc_user.status.into_iter() {
if status == Status::SelfPresence {
app_tx.unbounded_send(Event::RoomJoined(from.clone())).unwrap();
events.push(Event::RoomJoined(from.clone()));
break;
}
}
@ -249,8 +268,9 @@ impl ClientBuilder<'_> {
}
}
Ok(())
futures::stream::iter_ok(events)
})
.flatten()
};
let sender = sender_rx
@ -259,25 +279,45 @@ impl ClientBuilder<'_> {
.map(|(rx, mut sink)| {
drop(rx);
let _ = sink.close();
None
});
let future = reader.select(sender)
.map(|_| ())
.map_err(|_| ());
// TODO is this correct?
// Some(Error) means a real error
// None means the end of the sender stream and can be ignored
let future = reader
.map(Some)
.select(sender.into_stream())
.filter_map(|x| x);
let agent = Client {
sender_tx,
};
let agent = Agent { sender_tx };
Ok((Box::new(future), agent))
(agent, future)
}
}
pub struct Client {
sender_tx: mpsc::UnboundedSender<Packet>,
stream: Box<Stream<Item = Event, Error = Error>>,
}
impl Client {
pub fn get_agent(&self) -> Agent {
Agent {
sender_tx: self.sender_tx.clone(),
}
}
pub fn listen(self) -> Box<Stream<Item = Event, Error = Error>> {
self.stream
}
}
pub struct Agent {
sender_tx: mpsc::UnboundedSender<Packet>,
}
impl Agent {
pub fn join_room(&mut self, room: Jid, lang: &str, status: &str) {
let mut presence = Presence::new(PresenceType::None)
.with_to(Some(room))