diff --git a/Cargo.toml b/Cargo.toml index 714c5d3..40b1512 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,8 +6,10 @@ license = "AGPL-3.0-or-later" description = "MUC implementation allowing participants to play the Hanabi game." [dependencies] +async-trait = "^0.1" env_logger = "^0.9" futures = "^0.3" +lazy_static = "^1.4" log = "^0.4" tokio = "^1.20" tokio-xmpp = { version = "^3.2", default-features = false, features = ["tls-rust"] } diff --git a/src/component.rs b/src/component.rs new file mode 100644 index 0000000..5dd1e28 --- /dev/null +++ b/src/component.rs @@ -0,0 +1,148 @@ +// +// component.rs +// Copyright (C) 2022 Maxime “pep” Buquet +// Distributed under terms of the GPLv3+ license. +// + +use crate::error::Error; + +use std::marker::Send; +use std::ops::{Deref, DerefMut}; +use std::pin::Pin; +use std::task::Context; + +use async_trait::async_trait; +use futures::{task::Poll, Stream}; +use log::debug; +use tokio_xmpp::Component as TokioXMPPComponent; +use xmpp_parsers::Element; + +// Testable interface +#[async_trait] +pub trait ComponentTrait: Stream + Unpin { + async fn send_stanza + Send>(&mut self, el: E) -> Result<(), Error>; +} + +pub struct Component(TokioXMPPComponent); + +impl Stream for Component { + type Item = Element; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.0).poll_next(cx) + } +} + +#[async_trait] +impl ComponentTrait for Component { + async fn send_stanza + Send>(&mut self, el: E) -> Result<(), Error> { + let el: Element = el.into(); + debug!("SEND: {}", String::from(&el)); + self.0.send_stanza(el).await?; + Ok(()) + } +} + +#[async_trait] +impl ComponentTrait for &mut Component { + async fn send_stanza + Send>(&mut self, el: E) -> Result<(), Error> { + let el: Element = el.into(); + debug!("SEND: {}", String::from(&el)); + self.0.send_stanza(el).await?; + Ok(()) + } +} + +impl Deref for Component { + type Target = TokioXMPPComponent; + + fn deref(&self) -> &TokioXMPPComponent { + &self.0 + } +} + +impl DerefMut for Component { + fn deref_mut(&mut self) -> &mut TokioXMPPComponent { + &mut self.0 + } +} + +impl Component { + pub async fn new( + jid: &str, + password: &str, + server: &str, + port: u16, + ) -> Result { + Ok(Component( + TokioXMPPComponent::new(jid, password, server, port).await?, + )) + } +} + +#[derive(Debug)] +pub struct TestComponent { + in_buffer: Vec, + out_buffer: Vec, + expect_buffer: Vec, +} + +impl TestComponent { + pub fn new(in_buffer: Vec) -> Self { + TestComponent { + in_buffer, + out_buffer: Vec::new(), + expect_buffer: Vec::new(), + } + } + + /// Adds elements to be expected, in the order they're being added + pub fn expect>(&mut self, el: E) { + self.expect_buffer.push(el.into()) + } + + /// Asserts expected output and actual output are the same + pub fn assert(&mut self) { + loop { + let out = self.out_buffer.pop(); + let expected = self.expect_buffer.pop(); + + match (out, expected) { + (None, None) => break, + (Some(out), Some(expected)) => assert_eq!(String::from(&expected), String::from(&out)), + (Some(out), None) => assert_eq!(format!(""), String::from(&out)), + (None, Some(expected)) => assert_eq!(String::from(&expected), format!("")), + } + } + } + + fn _send_stanza + Send>(&mut self, el: E) -> Result<(), Error> { + Ok(self.out_buffer.push(el.into())) + } +} + +impl Stream for TestComponent { + type Item = Element; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + while self.in_buffer.len() > 0 { + return Poll::Ready(self.in_buffer.pop()); + } + + Poll::Ready(None) + } +} + +#[async_trait] +impl ComponentTrait for TestComponent { + async fn send_stanza + Send>(&mut self, el: E) -> Result<(), Error> { + self._send_stanza(el) + } +} + +#[async_trait] +impl ComponentTrait for &mut TestComponent { + async fn send_stanza + Send>(&mut self, el: E) -> Result<(), Error> { + self._send_stanza(el) + } +} diff --git a/src/error.rs b/src/error.rs index 9aaf764..990df26 100644 --- a/src/error.rs +++ b/src/error.rs @@ -20,7 +20,7 @@ use tokio_xmpp::Error as TokioXMPPError; use xmpp_parsers::Jid; #[derive(Debug)] -pub(crate) enum Error { +pub enum Error { MismatchJids(Jid), NickAlreadyAssigned(String), XMPPError(TokioXMPPError), diff --git a/src/handlers.rs b/src/handlers.rs new file mode 100644 index 0000000..6655c12 --- /dev/null +++ b/src/handlers.rs @@ -0,0 +1,164 @@ +// Copyright (C) 2022-2099 The crate authors. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU Affero General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License +// for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use crate::component::ComponentTrait; +use crate::error::Error; +use crate::types::{Nick, Room, ROOMS}; + +use std::ops::ControlFlow; + +use futures::stream::StreamExt; +use log::{debug, error}; +use xmpp_parsers::{ + disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity}, + iq::{Iq, IqType}, + message::Message, + muc::Muc, + ns, + presence::Presence, + stanza_error::{DefinedCondition, ErrorType, StanzaError}, + BareJid, Element, Jid, +}; + +async fn handle_iq_disco( + component: &mut C, + iq: Iq, + payload: Element, +) -> Result<(), Error> { + match DiscoInfoQuery::try_from(payload) { + Ok(DiscoInfoQuery { node }) if node.is_none() => { + let identities = vec![Identity::new("conference", "text", "en", "Hanabi")]; + let features = vec![ + Feature::new("http://jabber.org/protocol/disco#info"), + Feature::new("xmpp:bouah.net:hanabi:muc:0"), + ]; + let extensions = Vec::new(); + let payload = DiscoInfoResult { + node: None, + identities, + features, + extensions, + }; + let reply = Iq::from_result(iq.id, Some(payload)) + .with_from(iq.to.unwrap()) + .with_to(iq.from.unwrap()); + component.send_stanza(reply).await?; + } + Ok(DiscoInfoQuery { .. }) => { + let error = StanzaError::new( + ErrorType::Modify, + DefinedCondition::BadRequest, + "en", + format!("Unknown disco#info node"), + ); + let reply = Iq::from_error(iq.id, error) + .with_from(iq.to.unwrap()) + .with_to(iq.from.unwrap()); + component.send_stanza(reply).await?; + } + Err(err) => error!("Failed to parse iq: {}", err), + } + + Ok(()) +} + +async fn handle_iq(component: &mut C, iq: Iq) -> Result<(), Error> { + match iq.payload { + IqType::Get(ref payload) => { + if payload.is("query", ns::DISCO_INFO) { + handle_iq_disco(component, iq.clone(), payload.clone()).await? + } else { + // We MUST answer unhandled get iqs with a service-unavailable error. + let error = StanzaError::new( + ErrorType::Cancel, + DefinedCondition::ServiceUnavailable, + "en", + "No handler defined for this kind of iq.", + ); + let iq: Element = Iq::from_error(iq.id, error) + .with_from(iq.to.unwrap()) + .with_to(iq.from.unwrap()) + .into(); + component.send_stanza(iq).await?; + } + } + _ => error!("Not handled iq: {:?}", iq), + } + + Ok(()) +} + +async fn handle_presence( + component: &mut C, + presence: Presence, +) -> Result<(), Error> { + let muc = presence + .payloads + .into_iter() + .try_for_each(|payload| match Muc::try_from(payload) { + Ok(muc) => ControlFlow::Break(muc), + _ => ControlFlow::Continue(()), + }); + + if let ControlFlow::Continue(_) = muc { + return Ok(()); + } + + // Presences to MUC come from resources not accounts + if let Jid::Full(realjid) = presence.from.unwrap() && + let Jid::Full(participant) = presence.to.unwrap() { + + let roomjid = BareJid::from(participant.clone()); + let nick: Nick = participant.resource.clone(); + + // Room already exists + if let Some(room) = unsafe { ROOMS.lock().unwrap().get_mut(&roomjid) } { + debug!("Presence received to existing room: {}", &roomjid); + room.add_session(component, realjid, nick).await.unwrap(); + } else { + debug!("Presence received to new room: {}", &roomjid); + let mut room = Room::new(roomjid.clone()); + room.add_session(component, realjid, nick).await.unwrap(); + let _ = unsafe { ROOMS.lock().unwrap().insert(roomjid, room) }; + } + } + + Ok(()) +} + +async fn handle_message( + _component: &mut C, + _message: Message, +) -> Result<(), Error> { + Ok(()) +} + +pub(crate) async fn handle_stanza(component: &mut C) -> Result<(), Error> { + while let Some(elem) = component.next().await { + debug!("RECV {}", String::from(&elem)); + if elem.is("iq", ns::COMPONENT_ACCEPT) { + let iq = Iq::try_from(elem).unwrap(); + handle_iq(component, iq).await?; + } else if elem.is("message", ns::COMPONENT_ACCEPT) { + let message = Message::try_from(elem).unwrap(); + handle_message(component, message).await?; + } else if elem.is("presence", ns::COMPONENT_ACCEPT) { + let presence = Presence::try_from(elem).unwrap(); + handle_presence(component, presence).await?; + } + } + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 0b9e917..99e1e3f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,128 +14,28 @@ // along with this program. If not, see . #![feature(once_cell)] +#![feature(let_chains)] +mod component; mod error; +mod handlers; mod types; -use crate::types::{send_stanza, Nick, Room, ROOMS}; +#[cfg(test)] +mod tests; + +use crate::component::Component; +use crate::error::Error; +use crate::handlers::handle_stanza; use std::env::args; -use std::ops::ControlFlow; use std::process::exit; use env_logger; -use futures::stream::StreamExt; -use log::{debug, error, info}; -use tokio_xmpp::Component; -use xmpp_parsers::{ - disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity}, - iq::{Iq, IqType}, - message::Message, - muc::Muc, - ns, - presence::Presence, - stanza_error::{DefinedCondition, ErrorType, StanzaError}, - BareJid, Element, Jid, -}; - -async fn handle_iq_disco(component: &mut Component, iq: Iq, payload: Element) { - match DiscoInfoQuery::try_from(payload) { - Ok(DiscoInfoQuery { node }) if node.is_none() => { - let identities = vec![Identity::new("conference", "text", "en", "Hanabi")]; - let features = vec![ - Feature::new("http://jabber.org/protocol/disco#info"), - Feature::new("xmpp:bouah.net:hanabi:muc:0"), - ]; - let extensions = Vec::new(); - let payload = DiscoInfoResult { - node: None, - identities, - features, - extensions, - }; - let reply = Iq::from_result(iq.id, Some(payload)) - .with_from(iq.to.unwrap()) - .with_to(iq.from.unwrap()); - send_stanza(component, reply).await.unwrap(); - } - Ok(DiscoInfoQuery { .. }) => { - let error = StanzaError::new( - ErrorType::Modify, - DefinedCondition::BadRequest, - "en", - format!("Unknown disco#info node"), - ); - let reply = Iq::from_error(iq.id, error) - .with_from(iq.to.unwrap()) - .with_to(iq.from.unwrap()); - send_stanza(component, reply).await.unwrap(); - } - Err(err) => error!("Failed to parse iq: {}", err), - } -} - -async fn handle_iq(component: &mut Component, iq: Iq) { - match iq.payload { - IqType::Get(ref payload) => { - if payload.is("query", ns::DISCO_INFO) { - handle_iq_disco(component, iq.clone(), payload.clone()).await - } else { - // We MUST answer unhandled get iqs with a service-unavailable error. - let error = StanzaError::new( - ErrorType::Cancel, - DefinedCondition::ServiceUnavailable, - "en", - "No handler defined for this kind of iq.", - ); - let iq = Iq::from_error(iq.id, error) - .with_from(iq.to.unwrap()) - .with_to(iq.from.unwrap()) - .into(); - let _ = component.send_stanza(iq).await; - } - } - _ => error!("Not handled iq: {:?}", iq), - } -} - -async fn handle_presence(component: &mut Component, presence: Presence) { - let muc = presence - .payloads - .into_iter() - .try_for_each(|payload| match Muc::try_from(payload) { - Ok(muc) => ControlFlow::Break(muc), - _ => ControlFlow::Continue(()), - }); - - if let ControlFlow::Continue(_) = muc { - return; - } - - // Presences to MUC come from resources not accounts - if let Jid::Full(realjid) = presence.from.unwrap() && - let Jid::Full(participant) = presence.to.unwrap() { - - let roomjid = BareJid::from(participant.clone()); - let nick: Nick = participant.resource.clone(); - - // Room already exists - if let Some(room) = unsafe { ROOMS.lock().unwrap().get_mut(&roomjid) } { - debug!("Presence received to existing room: {}", &roomjid); - room.add_session(component, realjid, nick).await.unwrap(); - } else { - debug!("Presence received to new room: {}", &roomjid); - let mut room = Room::new(roomjid.clone()); - room.add_session(component, realjid, nick).await.unwrap(); - let _ = unsafe { ROOMS.lock().unwrap().insert(roomjid, room) }; - } - } -} - -async fn handle_message(_component: &mut Component, _message: Message) {} +use log::info; #[tokio::main] -async fn main() { +async fn main() -> Result<(), Error> { let args: Vec = args().collect(); if args.len() != 3 { println!("Usage: {} ", args[0]); @@ -152,17 +52,7 @@ async fn main() { let mut component = Component::new(jid, passwd, server, port).await.unwrap(); info!("Online as {}!", component.jid); - while let Some(elem) = component.next().await { - debug!("RECV {}", String::from(&elem)); - if elem.is("iq", ns::COMPONENT_ACCEPT) { - let iq = Iq::try_from(elem).unwrap(); - handle_iq(&mut component, iq).await; - } else if elem.is("message", ns::COMPONENT_ACCEPT) { - let message = Message::try_from(elem).unwrap(); - handle_message(&mut component, message).await; - } else if elem.is("presence", ns::COMPONENT_ACCEPT) { - let presence = Presence::try_from(elem).unwrap(); - handle_presence(&mut component, presence).await; - } - } + handle_stanza(&mut component).await?; + + Ok(()) } diff --git a/src/tests.rs b/src/tests.rs new file mode 100644 index 0000000..3ab6a1d --- /dev/null +++ b/src/tests.rs @@ -0,0 +1,60 @@ +// Copyright (C) 2022-2099 The crate authors. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU Affero General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License +// for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::str::FromStr; + +use crate::component::TestComponent; +use crate::handlers::handle_stanza; + +use lazy_static::lazy_static; +use xmpp_parsers::{ + iq::{Iq, IqType}, + BareJid, Element, FullJid, Jid, + stanza_error::{DefinedCondition, ErrorType, StanzaError}, +}; + +lazy_static! { + static ref COMPONENT_JID: BareJid = BareJid::from_str("muc.component").unwrap(); +} + +#[tokio::test] +async fn test_iq_unimplemented() { + let from = Jid::Full(FullJid::from_str("foo@bar/qxx").unwrap()); + let to = Jid::Bare(COMPONENT_JID.clone()); + + let disco: Element = Iq { + from: Some(from.clone()), + to: Some(to.clone()), + id: String::from("disco"), + payload: IqType::Get(Element::builder("x", "urn:example:unimplemented").build()), + }.into(); + + let reply: Element = Iq::from_error("disco", StanzaError::new( + ErrorType::Cancel, + DefinedCondition::ServiceUnavailable, + "en", + "No handler defined for this kind of iq.", + )) + .with_from(to) + .with_to(from) + .into(); + + let mut component = TestComponent::new(vec![disco]); + component.expect(reply); + + handle_stanza(&mut component).await.unwrap(); + + component.assert(); +} diff --git a/src/types.rs b/src/types.rs index 27ce752..1a97fd0 100644 --- a/src/types.rs +++ b/src/types.rs @@ -13,6 +13,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use crate::component::ComponentTrait; use crate::error::Error; use std::collections::HashMap; @@ -20,7 +21,6 @@ use std::iter::IntoIterator; use std::sync::{LazyLock, Mutex}; use log::debug; -use tokio_xmpp::Component; use xmpp_parsers::{ message::{Message, MessageType, Subject}, muc::{ @@ -28,19 +28,9 @@ use xmpp_parsers::{ MucUser, }, presence::{Presence, Type as PresenceType}, - BareJid, Element, FullJid, Jid, + BareJid, FullJid, Jid, }; -pub(crate) async fn send_stanza>( - component: &mut Component, - elem: E, -) -> Result<(), Error> { - let elem: Element = elem.into(); - debug!("SEND: {}", String::from(&elem)); - component.send_stanza(elem).await?; - Ok(()) -} - pub(crate) type Nick = String; #[derive(Debug)] @@ -57,9 +47,9 @@ impl Room { } } - pub(crate) async fn add_session( + pub(crate) async fn add_session( &mut self, - component: &mut Component, + component: &mut C, realjid: FullJid, nick: Nick, ) -> Result<(), Error> { @@ -84,7 +74,7 @@ impl Room { for (_, occupant) in self.occupants.iter() { for session in occupant.iter() { let presence = presence.clone().with_from(session.clone()); - send_stanza(component, presence).await?; + component.send_stanza(presence).await?; } } @@ -102,7 +92,7 @@ impl Room { .with_from(participant) .with_to(realjid.clone()) .with_payloads(vec![MucUser { status, items }.into()]); - send_stanza(component, self_presence).await?; + component.send_stanza(self_presence).await?; // Send subject debug!("Sending subject!"); @@ -112,7 +102,7 @@ impl Room { .subjects .insert(String::from("en"), Subject(String::from("Hanabi"))); subject.type_ = MessageType::Groupchat; - send_stanza(component, subject).await?; + component.send_stanza(subject).await?; } Ok(())