Split web and xmpp parts into different tasks

Signed-off-by: pep <pep@bouah.net>
This commit is contained in:
pep 2024-11-21 17:11:43 +01:00
parent fc0b13079f
commit d16ead8bc9
Signed by: pep
GPG key ID: DEDA74AEECA9D0F2
2 changed files with 40 additions and 28 deletions

View file

@ -20,6 +20,7 @@ use xmpp::jid::{BareJid, Jid};
use xmpp::parsers::message::MessageType; use xmpp::parsers::message::MessageType;
use xmpp::tokio_xmpp::connect::StartTlsServerConnector; use xmpp::tokio_xmpp::connect::StartTlsServerConnector;
use xmpp::{Agent, ClientBuilder, ClientFeature, ClientType, Event}; use xmpp::{Agent, ClientBuilder, ClientFeature, ClientType, Event};
use tokio::sync::mpsc;
pub struct XmppClient { pub struct XmppClient {
is_online: bool, is_online: bool,
@ -75,8 +76,22 @@ impl XmppClient {
} }
} }
pub async fn receive(&mut self, mut rx: mpsc::UnboundedReceiver<Hook>) {
loop {
tokio::select! {
_ = self.next() => (),
wh = rx.recv() => {
if let Some(hook) = wh {
debug!("XMPP Bot Received Hook");
self.hook(hook).await
}
}
}
}
}
pub async fn hook(&mut self, wh: Hook) { pub async fn hook(&mut self, wh: Hook) {
debug!("Received Hook"); debug!("XMPP Bot Processing Hook");
if let Some(display) = format_hook(&wh) { if let Some(display) = format_hook(&wh) {
debug!("Hook: {}", display); debug!("Hook: {}", display);
for room in &self.rooms { for room in &self.rooms {
@ -89,6 +104,7 @@ impl XmppClient {
) )
.await .await
} }
debug!("XMPP Bot Processed Hook");
} }
} }
} }

View file

@ -28,6 +28,7 @@ use crate::error::Error;
use crate::hooks::Hook; use crate::hooks::Hook;
use crate::web::hooks; use crate::web::hooks;
use log::error;
use camino::Utf8PathBuf; use camino::Utf8PathBuf;
use clap::{command, value_parser, Arg}; use clap::{command, value_parser, Arg};
use hyper::{server::conn::http1, service::service_fn}; use hyper::{server::conn::http1, service::service_fn};
@ -50,48 +51,43 @@ async fn main() -> Result<!, Error> {
let config = Config::from_arg(matches.get_one::<Utf8PathBuf>("config")).await?; let config = Config::from_arg(matches.get_one::<Utf8PathBuf>("config")).await?;
let (value_tx, mut value_rx) = mpsc::unbounded_channel::<Hook>(); let (value_tx, value_rx) = mpsc::unbounded_channel::<Hook>();
let mut client = XmppClient::new( let mut bot = XmppClient::new(
config.jid, config.jid,
config.password.as_str(), config.password.as_str(),
config.rooms, config.rooms,
config.nickname, config.nickname,
); );
tokio::task::spawn(async move {
bot.receive(value_rx).await;
error!("XMPP client exited early");
});
let tcp_bind = TcpListener::bind(config.addr).await?; let tcp_bind = TcpListener::bind(config.addr).await?;
loop { loop {
let value_tx = value_tx.clone(); let value_tx = value_tx.clone();
let secret = config.secret.clone(); let secret = config.secret.clone();
tokio::select! { if let Ok((tcp, _)) = tcp_bind.accept().await {
_ = client.next() => (), let io = TokioIo::new(tcp);
accept = tcp_bind.accept() => { tokio::task::spawn(async move {
if let Ok((tcp, _)) = accept { if let Err(err) = http1::Builder::new()
let io = TokioIo::new(tcp); .timer(TokioTimer::new())
tokio::task::spawn(async move { .serve_connection(io, service_fn(|request| {
if let Err(err) = http1::Builder::new() let value_tx = value_tx.clone();
.timer(TokioTimer::new()) let secret = secret.clone();
.serve_connection(io, service_fn(|request| { async move {
let value_tx = value_tx.clone(); hooks(request, &secret, value_tx).await
let secret = secret.clone();
async move {
hooks(request, &secret, value_tx).await
}
}))
.await
{
println!("Error serving connection: {:?}", err);
} }
}); }))
.await
{
println!("Error serving connection: {:?}", err);
} }
} });
wh = value_rx.recv() => {
if let Some(hook) = wh {
client.hook(hook).await
}
}
} }
} }
} }