diff --git a/xmpp/Cargo.toml b/xmpp/Cargo.toml index 68be247..ce5145e 100644 --- a/xmpp/Cargo.toml +++ b/xmpp/Cargo.toml @@ -17,8 +17,10 @@ edition = "2018" tokio-xmpp = "3.0.0" xmpp-parsers = "0.19" futures = "0.3" -tokio = "1" +tokio = { version = "1", features = ["full"] } log = "0.4" +reqwest = { version = "0.11.8", features = ["stream"] } +tokio-util = { version = "0.6.9", features = ["codec"] } [dev-dependencies] env_logger = "0.8" diff --git a/xmpp/src/lib.rs b/xmpp/src/lib.rs index 82c5df4..0f5b87a 100644 --- a/xmpp/src/lib.rs +++ b/xmpp/src/lib.rs @@ -7,15 +7,20 @@ #![deny(bare_trait_objects)] use futures::stream::StreamExt; +use reqwest::{Body as ReqwestBody, Client as ReqwestClient}; use std::cell::RefCell; use std::convert::TryFrom; +use std::path::{Path, PathBuf}; use std::rc::Rc; +use tokio::fs::File; +use tokio_util::codec::{BytesCodec, FramedRead}; use tokio_xmpp::{AsyncClient as TokioXmppClient, Event as TokioXmppEvent}; use xmpp_parsers::{ bookmarks2::Conference, caps::{compute_disco, hash_caps, Caps}, disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity}, hashes::Algo, + http_upload::{SlotRequest, SlotResult}, iq::{Iq, IqType}, message::{Body, Message, MessageType}, muc::{ @@ -27,7 +32,7 @@ use xmpp_parsers::{ pubsub::pubsub::{Items, PubSub}, roster::{Item as RosterItem, Roster}, stanza_error::{DefinedCondition, ErrorType, StanzaError}, - BareJid, FullJid, Jid, + BareJid, Element, FullJid, Jid, }; #[macro_use] extern crate log; @@ -83,6 +88,7 @@ pub enum Event { RoomJoined(BareJid), RoomLeft(BareJid), RoomMessage(BareJid, RoomNick, Body), + HttpUploadedFile(String), } #[derive(Default)] @@ -175,6 +181,7 @@ impl ClientBuilder<'_> { lang: Rc::new(self.lang), disco, node, + uploads: Vec::new(), }; Ok(agent) @@ -187,6 +194,7 @@ pub struct Agent { lang: Rc>, disco: DiscoInfoResult, node: String, + uploads: Vec<(String, Jid, PathBuf)>, } impl Agent { @@ -291,6 +299,9 @@ impl Agent { } else if payload.is("pubsub", ns::PUBSUB) { let new_events = pubsub::handle_iq_result(&from, payload); events.extend(new_events); + } else if payload.is("slot", ns::HTTP_UPLOAD) { + let new_events = handle_upload_result(&from, iq.id, payload, self).await; + events.extend(new_events); } } else if let IqType::Set(_) = iq.payload { // We MUST answer unhandled set iqs with a service-unavailable error. @@ -417,6 +428,57 @@ impl Agent { None } } + + pub async fn upload_file_with(&mut self, service: &str, path: &Path) { + let name = path.file_name().unwrap().to_str().unwrap().to_string(); + let file = File::open(path).await.unwrap(); + let size = file.metadata().await.unwrap().len(); + let slot_request = SlotRequest { + filename: name, + size: size, + content_type: None, + }; + let to = service.parse::().unwrap(); + let request = Iq::from_get("upload1", slot_request).with_to(to.clone()); + self.uploads + .push((String::from("upload1"), to, path.to_path_buf())); + self.client.send_stanza(request.into()).await.unwrap(); + } +} + +async fn handle_upload_result( + from: &Jid, + iqid: String, + elem: Element, + agent: &mut Agent, +) -> impl IntoIterator { + let mut res: Option<(usize, PathBuf)> = None; + + for (i, (id, to, filepath)) in agent.uploads.iter().enumerate() { + if to == from && id == &iqid { + res = Some((i, filepath.to_path_buf())); + break; + } + } + + if let Some((index, file)) = res { + agent.uploads.remove(index); + let slot = SlotResult::try_from(elem).unwrap(); + let web = ReqwestClient::new(); + let stream = FramedRead::new(File::open(file).await.unwrap(), BytesCodec::new()); + let body = ReqwestBody::wrap_stream(stream); + let res = web + .put(slot.put.url.as_str()) + .body(body) + .send() + .await + .unwrap(); + if res.status() == 201 { + return vec![Event::HttpUploadedFile(slot.get.url)]; + } + } + + return vec![]; } #[cfg(test)]