New ComponentTrait for testing

Move handlers in their own module. Add a component module for the
Component newtype and TestComponent. Add a tests component.

Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
This commit is contained in:
Maxime “pep” Buquet 2022-09-08 10:49:16 +02:00
parent 6019772e64
commit 88828da67e
Signed by: pep
GPG key ID: DEDA74AEECA9D0F2
7 changed files with 396 additions and 142 deletions

View file

@ -6,8 +6,10 @@ license = "AGPL-3.0-or-later"
description = "MUC implementation allowing participants to play the Hanabi game."
[dependencies]
async-trait = "^0.1"
env_logger = "^0.9"
futures = "^0.3"
lazy_static = "^1.4"
log = "^0.4"
tokio = "^1.20"
tokio-xmpp = { version = "^3.2", default-features = false, features = ["tls-rust"] }

148
src/component.rs Normal file
View file

@ -0,0 +1,148 @@
//
// component.rs
// Copyright (C) 2022 Maxime “pep” Buquet <pep@bouah.net>
// Distributed under terms of the GPLv3+ license.
//
use crate::error::Error;
use std::marker::Send;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::task::Context;
use async_trait::async_trait;
use futures::{task::Poll, Stream};
use log::debug;
use tokio_xmpp::Component as TokioXMPPComponent;
use xmpp_parsers::Element;
// Testable interface
#[async_trait]
pub trait ComponentTrait: Stream<Item = Element> + Unpin {
async fn send_stanza<E: Into<Element> + Send>(&mut self, el: E) -> Result<(), Error>;
}
pub struct Component(TokioXMPPComponent);
impl Stream for Component {
type Item = Element;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.0).poll_next(cx)
}
}
#[async_trait]
impl ComponentTrait for Component {
async fn send_stanza<E: Into<Element> + Send>(&mut self, el: E) -> Result<(), Error> {
let el: Element = el.into();
debug!("SEND: {}", String::from(&el));
self.0.send_stanza(el).await?;
Ok(())
}
}
#[async_trait]
impl ComponentTrait for &mut Component {
async fn send_stanza<E: Into<Element> + Send>(&mut self, el: E) -> Result<(), Error> {
let el: Element = el.into();
debug!("SEND: {}", String::from(&el));
self.0.send_stanza(el).await?;
Ok(())
}
}
impl Deref for Component {
type Target = TokioXMPPComponent;
fn deref(&self) -> &TokioXMPPComponent {
&self.0
}
}
impl DerefMut for Component {
fn deref_mut(&mut self) -> &mut TokioXMPPComponent {
&mut self.0
}
}
impl Component {
pub async fn new(
jid: &str,
password: &str,
server: &str,
port: u16,
) -> Result<Self, Error> {
Ok(Component(
TokioXMPPComponent::new(jid, password, server, port).await?,
))
}
}
#[derive(Debug)]
pub struct TestComponent {
in_buffer: Vec<Element>,
out_buffer: Vec<Element>,
expect_buffer: Vec<Element>,
}
impl TestComponent {
pub fn new(in_buffer: Vec<Element>) -> Self {
TestComponent {
in_buffer,
out_buffer: Vec::new(),
expect_buffer: Vec::new(),
}
}
/// Adds elements to be expected, in the order they're being added
pub fn expect<E: Into<Element>>(&mut self, el: E) {
self.expect_buffer.push(el.into())
}
/// Asserts expected output and actual output are the same
pub fn assert(&mut self) {
loop {
let out = self.out_buffer.pop();
let expected = self.expect_buffer.pop();
match (out, expected) {
(None, None) => break,
(Some(out), Some(expected)) => assert_eq!(String::from(&expected), String::from(&out)),
(Some(out), None) => assert_eq!(format!(""), String::from(&out)),
(None, Some(expected)) => assert_eq!(String::from(&expected), format!("")),
}
}
}
fn _send_stanza<E: Into<Element> + Send>(&mut self, el: E) -> Result<(), Error> {
Ok(self.out_buffer.push(el.into()))
}
}
impl Stream for TestComponent {
type Item = Element;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
while self.in_buffer.len() > 0 {
return Poll::Ready(self.in_buffer.pop());
}
Poll::Ready(None)
}
}
#[async_trait]
impl ComponentTrait for TestComponent {
async fn send_stanza<E: Into<Element> + Send>(&mut self, el: E) -> Result<(), Error> {
self._send_stanza(el)
}
}
#[async_trait]
impl ComponentTrait for &mut TestComponent {
async fn send_stanza<E: Into<Element> + Send>(&mut self, el: E) -> Result<(), Error> {
self._send_stanza(el)
}
}

View file

@ -20,7 +20,7 @@ use tokio_xmpp::Error as TokioXMPPError;
use xmpp_parsers::Jid;
#[derive(Debug)]
pub(crate) enum Error {
pub enum Error {
MismatchJids(Jid),
NickAlreadyAssigned(String),
XMPPError(TokioXMPPError),

164
src/handlers.rs Normal file
View file

@ -0,0 +1,164 @@
// Copyright (C) 2022-2099 The crate authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Affero General Public License as published by the
// Free Software Foundation, either version 3 of the License, or (at your
// option) any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
// for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::component::ComponentTrait;
use crate::error::Error;
use crate::types::{Nick, Room, ROOMS};
use std::ops::ControlFlow;
use futures::stream::StreamExt;
use log::{debug, error};
use xmpp_parsers::{
disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
iq::{Iq, IqType},
message::Message,
muc::Muc,
ns,
presence::Presence,
stanza_error::{DefinedCondition, ErrorType, StanzaError},
BareJid, Element, Jid,
};
async fn handle_iq_disco<C: ComponentTrait>(
component: &mut C,
iq: Iq,
payload: Element,
) -> Result<(), Error> {
match DiscoInfoQuery::try_from(payload) {
Ok(DiscoInfoQuery { node }) if node.is_none() => {
let identities = vec![Identity::new("conference", "text", "en", "Hanabi")];
let features = vec![
Feature::new("http://jabber.org/protocol/disco#info"),
Feature::new("xmpp:bouah.net:hanabi:muc:0"),
];
let extensions = Vec::new();
let payload = DiscoInfoResult {
node: None,
identities,
features,
extensions,
};
let reply = Iq::from_result(iq.id, Some(payload))
.with_from(iq.to.unwrap())
.with_to(iq.from.unwrap());
component.send_stanza(reply).await?;
}
Ok(DiscoInfoQuery { .. }) => {
let error = StanzaError::new(
ErrorType::Modify,
DefinedCondition::BadRequest,
"en",
format!("Unknown disco#info node"),
);
let reply = Iq::from_error(iq.id, error)
.with_from(iq.to.unwrap())
.with_to(iq.from.unwrap());
component.send_stanza(reply).await?;
}
Err(err) => error!("Failed to parse iq: {}", err),
}
Ok(())
}
async fn handle_iq<C: ComponentTrait>(component: &mut C, iq: Iq) -> Result<(), Error> {
match iq.payload {
IqType::Get(ref payload) => {
if payload.is("query", ns::DISCO_INFO) {
handle_iq_disco(component, iq.clone(), payload.clone()).await?
} else {
// We MUST answer unhandled get iqs with a service-unavailable error.
let error = StanzaError::new(
ErrorType::Cancel,
DefinedCondition::ServiceUnavailable,
"en",
"No handler defined for this kind of iq.",
);
let iq: Element = Iq::from_error(iq.id, error)
.with_from(iq.to.unwrap())
.with_to(iq.from.unwrap())
.into();
component.send_stanza(iq).await?;
}
}
_ => error!("Not handled iq: {:?}", iq),
}
Ok(())
}
async fn handle_presence<C: ComponentTrait>(
component: &mut C,
presence: Presence,
) -> Result<(), Error> {
let muc = presence
.payloads
.into_iter()
.try_for_each(|payload| match Muc::try_from(payload) {
Ok(muc) => ControlFlow::Break(muc),
_ => ControlFlow::Continue(()),
});
if let ControlFlow::Continue(_) = muc {
return Ok(());
}
// Presences to MUC come from resources not accounts
if let Jid::Full(realjid) = presence.from.unwrap() &&
let Jid::Full(participant) = presence.to.unwrap() {
let roomjid = BareJid::from(participant.clone());
let nick: Nick = participant.resource.clone();
// Room already exists
if let Some(room) = unsafe { ROOMS.lock().unwrap().get_mut(&roomjid) } {
debug!("Presence received to existing room: {}", &roomjid);
room.add_session(component, realjid, nick).await.unwrap();
} else {
debug!("Presence received to new room: {}", &roomjid);
let mut room = Room::new(roomjid.clone());
room.add_session(component, realjid, nick).await.unwrap();
let _ = unsafe { ROOMS.lock().unwrap().insert(roomjid, room) };
}
}
Ok(())
}
async fn handle_message<C: ComponentTrait>(
_component: &mut C,
_message: Message,
) -> Result<(), Error> {
Ok(())
}
pub(crate) async fn handle_stanza<C: ComponentTrait>(component: &mut C) -> Result<(), Error> {
while let Some(elem) = component.next().await {
debug!("RECV {}", String::from(&elem));
if elem.is("iq", ns::COMPONENT_ACCEPT) {
let iq = Iq::try_from(elem).unwrap();
handle_iq(component, iq).await?;
} else if elem.is("message", ns::COMPONENT_ACCEPT) {
let message = Message::try_from(elem).unwrap();
handle_message(component, message).await?;
} else if elem.is("presence", ns::COMPONENT_ACCEPT) {
let presence = Presence::try_from(elem).unwrap();
handle_presence(component, presence).await?;
}
}
Ok(())
}

View file

@ -14,128 +14,28 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#![feature(once_cell)]
#![feature(let_chains)]
mod component;
mod error;
mod handlers;
mod types;
use crate::types::{send_stanza, Nick, Room, ROOMS};
#[cfg(test)]
mod tests;
use crate::component::Component;
use crate::error::Error;
use crate::handlers::handle_stanza;
use std::env::args;
use std::ops::ControlFlow;
use std::process::exit;
use env_logger;
use futures::stream::StreamExt;
use log::{debug, error, info};
use tokio_xmpp::Component;
use xmpp_parsers::{
disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
iq::{Iq, IqType},
message::Message,
muc::Muc,
ns,
presence::Presence,
stanza_error::{DefinedCondition, ErrorType, StanzaError},
BareJid, Element, Jid,
};
async fn handle_iq_disco(component: &mut Component, iq: Iq, payload: Element) {
match DiscoInfoQuery::try_from(payload) {
Ok(DiscoInfoQuery { node }) if node.is_none() => {
let identities = vec![Identity::new("conference", "text", "en", "Hanabi")];
let features = vec![
Feature::new("http://jabber.org/protocol/disco#info"),
Feature::new("xmpp:bouah.net:hanabi:muc:0"),
];
let extensions = Vec::new();
let payload = DiscoInfoResult {
node: None,
identities,
features,
extensions,
};
let reply = Iq::from_result(iq.id, Some(payload))
.with_from(iq.to.unwrap())
.with_to(iq.from.unwrap());
send_stanza(component, reply).await.unwrap();
}
Ok(DiscoInfoQuery { .. }) => {
let error = StanzaError::new(
ErrorType::Modify,
DefinedCondition::BadRequest,
"en",
format!("Unknown disco#info node"),
);
let reply = Iq::from_error(iq.id, error)
.with_from(iq.to.unwrap())
.with_to(iq.from.unwrap());
send_stanza(component, reply).await.unwrap();
}
Err(err) => error!("Failed to parse iq: {}", err),
}
}
async fn handle_iq(component: &mut Component, iq: Iq) {
match iq.payload {
IqType::Get(ref payload) => {
if payload.is("query", ns::DISCO_INFO) {
handle_iq_disco(component, iq.clone(), payload.clone()).await
} else {
// We MUST answer unhandled get iqs with a service-unavailable error.
let error = StanzaError::new(
ErrorType::Cancel,
DefinedCondition::ServiceUnavailable,
"en",
"No handler defined for this kind of iq.",
);
let iq = Iq::from_error(iq.id, error)
.with_from(iq.to.unwrap())
.with_to(iq.from.unwrap())
.into();
let _ = component.send_stanza(iq).await;
}
}
_ => error!("Not handled iq: {:?}", iq),
}
}
async fn handle_presence(component: &mut Component, presence: Presence) {
let muc = presence
.payloads
.into_iter()
.try_for_each(|payload| match Muc::try_from(payload) {
Ok(muc) => ControlFlow::Break(muc),
_ => ControlFlow::Continue(()),
});
if let ControlFlow::Continue(_) = muc {
return;
}
// Presences to MUC come from resources not accounts
if let Jid::Full(realjid) = presence.from.unwrap() &&
let Jid::Full(participant) = presence.to.unwrap() {
let roomjid = BareJid::from(participant.clone());
let nick: Nick = participant.resource.clone();
// Room already exists
if let Some(room) = unsafe { ROOMS.lock().unwrap().get_mut(&roomjid) } {
debug!("Presence received to existing room: {}", &roomjid);
room.add_session(component, realjid, nick).await.unwrap();
} else {
debug!("Presence received to new room: {}", &roomjid);
let mut room = Room::new(roomjid.clone());
room.add_session(component, realjid, nick).await.unwrap();
let _ = unsafe { ROOMS.lock().unwrap().insert(roomjid, room) };
}
}
}
async fn handle_message(_component: &mut Component, _message: Message) {}
use log::info;
#[tokio::main]
async fn main() {
async fn main() -> Result<(), Error> {
let args: Vec<String> = args().collect();
if args.len() != 3 {
println!("Usage: {} <jid> <password>", args[0]);
@ -152,17 +52,7 @@ async fn main() {
let mut component = Component::new(jid, passwd, server, port).await.unwrap();
info!("Online as {}!", component.jid);
while let Some(elem) = component.next().await {
debug!("RECV {}", String::from(&elem));
if elem.is("iq", ns::COMPONENT_ACCEPT) {
let iq = Iq::try_from(elem).unwrap();
handle_iq(&mut component, iq).await;
} else if elem.is("message", ns::COMPONENT_ACCEPT) {
let message = Message::try_from(elem).unwrap();
handle_message(&mut component, message).await;
} else if elem.is("presence", ns::COMPONENT_ACCEPT) {
let presence = Presence::try_from(elem).unwrap();
handle_presence(&mut component, presence).await;
}
}
handle_stanza(&mut component).await?;
Ok(())
}

60
src/tests.rs Normal file
View file

@ -0,0 +1,60 @@
// Copyright (C) 2022-2099 The crate authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Affero General Public License as published by the
// Free Software Foundation, either version 3 of the License, or (at your
// option) any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
// for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::str::FromStr;
use crate::component::TestComponent;
use crate::handlers::handle_stanza;
use lazy_static::lazy_static;
use xmpp_parsers::{
iq::{Iq, IqType},
BareJid, Element, FullJid, Jid,
stanza_error::{DefinedCondition, ErrorType, StanzaError},
};
lazy_static! {
static ref COMPONENT_JID: BareJid = BareJid::from_str("muc.component").unwrap();
}
#[tokio::test]
async fn test_iq_unimplemented() {
let from = Jid::Full(FullJid::from_str("foo@bar/qxx").unwrap());
let to = Jid::Bare(COMPONENT_JID.clone());
let disco: Element = Iq {
from: Some(from.clone()),
to: Some(to.clone()),
id: String::from("disco"),
payload: IqType::Get(Element::builder("x", "urn:example:unimplemented").build()),
}.into();
let reply: Element = Iq::from_error("disco", StanzaError::new(
ErrorType::Cancel,
DefinedCondition::ServiceUnavailable,
"en",
"No handler defined for this kind of iq.",
))
.with_from(to)
.with_to(from)
.into();
let mut component = TestComponent::new(vec![disco]);
component.expect(reply);
handle_stanza(&mut component).await.unwrap();
component.assert();
}

View file

@ -13,6 +13,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::component::ComponentTrait;
use crate::error::Error;
use std::collections::HashMap;
@ -20,7 +21,6 @@ use std::iter::IntoIterator;
use std::sync::{LazyLock, Mutex};
use log::debug;
use tokio_xmpp::Component;
use xmpp_parsers::{
message::{Message, MessageType, Subject},
muc::{
@ -28,19 +28,9 @@ use xmpp_parsers::{
MucUser,
},
presence::{Presence, Type as PresenceType},
BareJid, Element, FullJid, Jid,
BareJid, FullJid, Jid,
};
pub(crate) async fn send_stanza<E: Into<Element>>(
component: &mut Component,
elem: E,
) -> Result<(), Error> {
let elem: Element = elem.into();
debug!("SEND: {}", String::from(&elem));
component.send_stanza(elem).await?;
Ok(())
}
pub(crate) type Nick = String;
#[derive(Debug)]
@ -57,9 +47,9 @@ impl Room {
}
}
pub(crate) async fn add_session(
pub(crate) async fn add_session<C: ComponentTrait>(
&mut self,
component: &mut Component,
component: &mut C,
realjid: FullJid,
nick: Nick,
) -> Result<(), Error> {
@ -84,7 +74,7 @@ impl Room {
for (_, occupant) in self.occupants.iter() {
for session in occupant.iter() {
let presence = presence.clone().with_from(session.clone());
send_stanza(component, presence).await?;
component.send_stanza(presence).await?;
}
}
@ -102,7 +92,7 @@ impl Room {
.with_from(participant)
.with_to(realjid.clone())
.with_payloads(vec![MucUser { status, items }.into()]);
send_stanza(component, self_presence).await?;
component.send_stanza(self_presence).await?;
// Send subject
debug!("Sending subject!");
@ -112,7 +102,7 @@ impl Room {
.subjects
.insert(String::from("en"), Subject(String::from("Hanabi")));
subject.type_ = MessageType::Groupchat;
send_stanza(component, subject).await?;
component.send_stanza(subject).await?;
}
Ok(())