From 92386fc48856888235cef586e49198620bb666ce Mon Sep 17 00:00:00 2001 From: Emmanuel Gil Peyrot Date: Thu, 21 Mar 2019 18:41:29 +0100 Subject: [PATCH] Hello world! --- .gitignore | 3 + Cargo.toml | 11 ++ examples/hello_bot.rs | 74 +++++++++++ src/avatar.rs | 65 ++++++++++ src/lib.rs | 286 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 439 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 examples/hello_bot.rs create mode 100644 src/avatar.rs create mode 100644 src/lib.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6936990 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/target +**/*.rs.bk +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..916a6d1 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "xmpp" +version = "0.3.0" +authors = ["Emmanuel Gil Peyrot "] +edition = "2018" + +[dependencies] +tokio-xmpp = "1" +xmpp-parsers = "0.13" +futures = "0.1" +tokio = "0.1" diff --git a/examples/hello_bot.rs b/examples/hello_bot.rs new file mode 100644 index 0000000..db5f412 --- /dev/null +++ b/examples/hello_bot.rs @@ -0,0 +1,74 @@ +// Copyright (c) 2019 Emmanuel Gil Peyrot +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use futures::{Future, Stream, sync::mpsc}; +use std::env::args; +use std::process::exit; +use std::str::FromStr; +use tokio::runtime::current_thread::Runtime; +use xmpp_parsers::{Jid, message::MessageType}; +use xmpp::{ClientBuilder, ClientType, ClientFeature, Event}; + +fn main() { + let args: Vec = args().collect(); + if args.len() != 5 { + println!("Usage: {} ", args[0]); + exit(1); + } + let jid = &args[1]; + let password = &args[2]; + let room_jid = &args[3]; + let nick: &str = &args[4]; + + // tokio_core context + let mut rt = Runtime::new().unwrap(); + + let (value_tx, value_rx) = mpsc::unbounded(); + + // Client instance + let (client, mut agent) = ClientBuilder::new(jid, password) + .set_client(ClientType::Bot, "xmpp-rs") + .set_website("https://gitlab.com/xmpp-rs/xmpp-rs") + .enable_feature(ClientFeature::Avatars) + .build(value_tx) + .unwrap(); + + let forwarder = value_rx.for_each(|evt: Event| { + match evt { + Event::Online => { + println!("Online."); + let room_jid = Jid::from_str(room_jid).unwrap().with_resource(nick); + agent.join_room(room_jid, "en", "Yet another bot!"); + }, + Event::Disconnected => { + println!("Disconnected."); + return Err(()); + }, + Event::RoomJoined(jid) => { + println!("Joined room {}.", jid); + agent.send_message(jid.into_bare_jid(), MessageType::Groupchat, "en", "Hello world!"); + }, + Event::AvatarRetrieved(jid, path) => { + println!("Received avatar for {} in {}.", jid, path); + }, + } + Ok(()) + }) + .map_err(|e| println!("{:?}", e)); + + // Start polling + match rt.block_on(client + .select2(forwarder) + .map(|_| ()) + .map_err(|_| ()) + ) { + Ok(_) => (), + Err(e) => { + println!("Fatal: {:?}", e); + () + } + } +} diff --git a/src/avatar.rs b/src/avatar.rs new file mode 100644 index 0000000..9368bc8 --- /dev/null +++ b/src/avatar.rs @@ -0,0 +1,65 @@ +// Copyright (c) 2019 Emmanuel Gil Peyrot +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use futures::{Sink, sync::mpsc}; +use std::fs::{create_dir_all, File}; +use std::io::{self, Write}; +use tokio_xmpp::Packet; +use xmpp_parsers::{ + avatar::{Data, Metadata}, + iq::Iq, + ns, + pubsub::{ + event::Item, + pubsub::{Items, PubSub}, + NodeName, + }, + Jid, TryFrom, +}; +use crate::Event; + +pub(crate) fn handle_metadata_pubsub_event(from: &Jid, tx: &mut mpsc::UnboundedSender, items: Vec) { + for item in items { + let payload = item.payload.clone().unwrap(); + if payload.is("metadata", ns::AVATAR_METADATA) { + // TODO: do something with these metadata. + let _metadata = Metadata::try_from(payload).unwrap(); + let iq = download_avatar(from); + tx.start_send(Packet::Stanza(iq.into())).unwrap(); + } + } +} + +fn download_avatar(from: &Jid) -> Iq { + Iq::from_get("coucou", PubSub::Items(Items { + max_items: None, + node: NodeName(String::from(ns::AVATAR_DATA)), + subid: None, + items: Vec::new(), + })) + .with_to(from.clone()) +} + +pub(crate) fn handle_data_pubsub_iq(from: &Jid, tx: &mut mpsc::UnboundedSender, items: Items) { + for item in items.items { + if let Some(id) = item.id.clone() { + if let Some(payload) = &item.payload { + let data = Data::try_from(payload.clone()).unwrap(); + let filename = save_avatar(from, id.0, &data.data).unwrap(); + tx.unbounded_send(Event::AvatarRetrieved(from.clone(), filename)).unwrap(); + } + } + } +} + +fn save_avatar(from: &Jid, id: String, data: &[u8]) -> io::Result { + let directory = format!("data/{}", from); + let filename = format!("data/{}/{}", from, id); + create_dir_all(directory)?; + let mut file = File::create(&filename)?; + file.write_all(data)?; + Ok(filename) +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..56eda8a --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,286 @@ +// Copyright (c) 2019 Emmanuel Gil Peyrot +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use std::str::FromStr; +use futures::{Future,Stream, Sink, sync::mpsc}; +use tokio_xmpp::{ + Client as TokioXmppClient, + Event as TokioXmppEvent, + Packet, +}; +use xmpp_parsers::{ + caps::{compute_disco, hash_caps, Caps}, + disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity}, + hashes::Algo, + iq::{Iq, IqType}, + message::{Message, MessageType, Body}, + muc::{ + Muc, + user::{MucUser, Status}, + }, + ns, + presence::{Presence, Type as PresenceType}, + pubsub::{ + event::PubSubEvent, + pubsub::PubSub, + }, + stanza_error::{StanzaError, ErrorType, DefinedCondition}, + Jid, JidParseError, TryFrom, +}; + +mod avatar; + +#[derive(Debug)] +pub enum ClientType { + Bot, + Pc, +} + +impl Default for ClientType { + fn default() -> Self { + ClientType::Bot + } +} + +impl ToString for ClientType { + fn to_string(&self) -> String { + String::from( + match self { + ClientType::Bot => "bot", + ClientType::Pc => "pc", + } + ) + } +} + +#[derive(PartialEq)] +pub enum ClientFeature { + Avatars, +} + +pub enum Event { + Online, + Disconnected, + AvatarRetrieved(Jid, String), + RoomJoined(Jid), +} + +#[derive(Default)] +pub struct ClientBuilder<'a> { + jid: &'a str, + password: &'a str, + website: String, + disco: (ClientType, String), + features: Vec, +} + +impl ClientBuilder<'_> { + pub fn new<'a>(jid: &'a str, password: &'a str) -> ClientBuilder<'a> { + ClientBuilder { + jid, + password, + website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"), + disco: (ClientType::default(), String::from("tokio-xmpp")), + features: vec![], + } + } + + pub fn set_client(mut self, type_: ClientType, name: &str) -> Self { + self.disco = (type_, String::from(name)); + self + } + + pub fn set_website(mut self, url: &str) -> Self { + self.website = String::from(url); + self + } + + pub fn enable_feature(mut self, feature: ClientFeature) -> Self { + self.features.push(feature); + self + } + + fn make_disco(&self) -> DiscoInfoResult { + let identities = vec![Identity::new("client", self.disco.0.to_string(), + "en", self.disco.1.to_string())]; + let mut features = vec![ + Feature::new(ns::DISCO_INFO), + ]; + if self.features.contains(&ClientFeature::Avatars) { + features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA))); + } + DiscoInfoResult { + node: None, + identities, + features, + extensions: vec![], + } + } + + fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence { + let caps_data = compute_disco(disco); + let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap(); + let caps = Caps::new(node, hash); + + let mut presence = Presence::new(PresenceType::None); + presence.add_payload(caps); + presence + } + + pub fn build(self, mut app_tx: mpsc::UnboundedSender) -> Result<(Box>, Client), JidParseError> { + let disco = self.make_disco(); + let node = self.website; + let (sender_tx, sender_rx) = mpsc::unbounded(); + + let client = TokioXmppClient::new(self.jid, self.password)?; + let (sink, stream) = client.split(); + + let reader = { + let mut sender_tx = sender_tx.clone(); + let jid = self.jid.to_owned(); + stream.for_each(move |event| { + // Helper function to send an iq error. + let send_error = |to, id, type_, condition, text: &str| { + let error = StanzaError::new(type_, condition, "en", text); + let iq = Iq::from_error(id, error) + .with_to(to) + .into(); + sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap(); + }; + + match event { + TokioXmppEvent::Online => { + let presence = ClientBuilder::make_initial_presence(&disco, &node).into(); + let packet = Packet::Stanza(presence); + sender_tx.unbounded_send(packet) + .unwrap(); + app_tx.unbounded_send(Event::Online).unwrap(); + } + TokioXmppEvent::Disconnected => { + app_tx.unbounded_send(Event::Disconnected).unwrap(); + } + TokioXmppEvent::Stanza(stanza) => { + if stanza.is("iq", "jabber:client") { + let iq = Iq::try_from(stanza).unwrap(); + if let IqType::Get(payload) = iq.payload { + if payload.is("query", ns::DISCO_INFO) { + let query = DiscoInfoQuery::try_from(payload); + match query { + Ok(query) => { + let mut disco_info = disco.clone(); + disco_info.node = query.node; + let iq = Iq::from_result(iq.id, Some(disco_info)) + .with_to(iq.from.unwrap()) + .into(); + sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap(); + }, + Err(err) => { + send_error(iq.from.unwrap(), iq.id, ErrorType::Modify, DefinedCondition::BadRequest, &format!("{}", err)); + }, + } + } else { + // We MUST answer unhandled get iqs with a service-unavailable error. + send_error(iq.from.unwrap(), iq.id, ErrorType::Cancel, DefinedCondition::ServiceUnavailable, "No handler defined for this kind of iq."); + } + } else if let IqType::Result(Some(payload)) = iq.payload { + if payload.is("pubsub", ns::PUBSUB) { + let pubsub = PubSub::try_from(payload).unwrap(); + let from = + iq.from.clone().unwrap_or(Jid::from_str(&jid).unwrap()); + if let PubSub::Items(items) = pubsub { + if items.node.0 == ns::AVATAR_DATA { + avatar::handle_data_pubsub_iq(&from, &mut app_tx, items); + } + } + } + } else if let IqType::Set(_) = iq.payload { + // We MUST answer unhandled set iqs with a service-unavailable error. + send_error(iq.from.unwrap(), iq.id, ErrorType::Cancel, DefinedCondition::ServiceUnavailable, "No handler defined for this kind of iq."); + } + } else if stanza.is("message", "jabber:client") { + let message = Message::try_from(stanza).unwrap(); + let from = message.from.clone().unwrap(); + for child in message.payloads { + if child.is("event", ns::PUBSUB_EVENT) { + let event = PubSubEvent::try_from(child).unwrap(); + if let PubSubEvent::PublishedItems { node, items } = event { + if node.0 == ns::AVATAR_METADATA { + avatar::handle_metadata_pubsub_event(&from, &mut sender_tx, items); + } + } + } + } + } else if stanza.is("presence", "jabber:client") { + let presence = Presence::try_from(stanza).unwrap(); + let from = presence.from.clone().unwrap(); + for payload in presence.payloads.into_iter() { + let muc_user = match MucUser::try_from(payload) { + Ok(muc_user) => muc_user, + _ => continue + }; + for status in muc_user.status.into_iter() { + if status == Status::SelfPresence { + app_tx.unbounded_send(Event::RoomJoined(from.clone())).unwrap(); + break; + } + } + } + } else if stanza.is("error", "http://etherx.jabber.org/streams") { + println!("Received a fatal stream error: {}", String::from(&stanza)); + } else { + panic!("Unknown stanza: {}", String::from(&stanza)); + } + } + } + + Ok(()) + }) + }; + + let sender = sender_rx + .map_err(|e| panic!("Sink error: {:?}", e)) + .forward(sink) + .map(|(rx, mut sink)| { + drop(rx); + let _ = sink.close(); + }); + + let future = reader.select(sender) + .map(|_| ()) + .map_err(|_| ()); + + let agent = Client { + sender_tx, + }; + + Ok((Box::new(future), agent)) + } +} + +pub struct Client { + sender_tx: mpsc::UnboundedSender, +} + +impl Client { + pub fn join_room(&mut self, room: Jid, lang: &str, status: &str) { + let mut presence = Presence::new(PresenceType::None) + .with_to(Some(room)) + .with_payloads(vec![Muc::new().into()]); + presence.set_status(String::from(lang), String::from(status)); + let presence = presence.into(); + self.sender_tx.unbounded_send(Packet::Stanza(presence)) + .unwrap(); + } + + pub fn send_message(&mut self, recipient: Jid, type_: MessageType, lang: &str, text: &str) { + let mut message = Message::new(Some(recipient)); + message.type_ = type_; + message.bodies.insert(String::from(lang), Body(String::from(text))); + let message = message.into(); + self.sender_tx.unbounded_send(Packet::Stanza(message)) + .unwrap(); + } +}