From d16ead8bc9ce4e3e540278f621ff0f4b79bc18c2 Mon Sep 17 00:00:00 2001 From: pep Date: Thu, 21 Nov 2024 17:11:43 +0100 Subject: [PATCH] Split web and xmpp parts into different tasks Signed-off-by: pep --- src/bot.rs | 18 +++++++++++++++++- src/main.rs | 50 +++++++++++++++++++++++--------------------------- 2 files changed, 40 insertions(+), 28 deletions(-) diff --git a/src/bot.rs b/src/bot.rs index a901d9d..4614f39 100644 --- a/src/bot.rs +++ b/src/bot.rs @@ -20,6 +20,7 @@ use xmpp::jid::{BareJid, Jid}; use xmpp::parsers::message::MessageType; use xmpp::tokio_xmpp::connect::StartTlsServerConnector; use xmpp::{Agent, ClientBuilder, ClientFeature, ClientType, Event}; +use tokio::sync::mpsc; pub struct XmppClient { is_online: bool, @@ -75,8 +76,22 @@ impl XmppClient { } } + pub async fn receive(&mut self, mut rx: mpsc::UnboundedReceiver) { + loop { + tokio::select! { + _ = self.next() => (), + wh = rx.recv() => { + if let Some(hook) = wh { + debug!("XMPP Bot Received Hook"); + self.hook(hook).await + } + } + } + } + } + pub async fn hook(&mut self, wh: Hook) { - debug!("Received Hook"); + debug!("XMPP Bot Processing Hook"); if let Some(display) = format_hook(&wh) { debug!("Hook: {}", display); for room in &self.rooms { @@ -89,6 +104,7 @@ impl XmppClient { ) .await } + debug!("XMPP Bot Processed Hook"); } } } diff --git a/src/main.rs b/src/main.rs index bd5a4e3..e82bd47 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,6 +28,7 @@ use crate::error::Error; use crate::hooks::Hook; use crate::web::hooks; +use log::error; use camino::Utf8PathBuf; use clap::{command, value_parser, Arg}; use hyper::{server::conn::http1, service::service_fn}; @@ -50,48 +51,43 @@ async fn main() -> Result { let config = Config::from_arg(matches.get_one::("config")).await?; - let (value_tx, mut value_rx) = mpsc::unbounded_channel::(); + let (value_tx, value_rx) = mpsc::unbounded_channel::(); - let mut client = XmppClient::new( + let mut bot = XmppClient::new( config.jid, config.password.as_str(), config.rooms, config.nickname, ); + tokio::task::spawn(async move { + bot.receive(value_rx).await; + error!("XMPP client exited early"); + }); + let tcp_bind = TcpListener::bind(config.addr).await?; loop { let value_tx = value_tx.clone(); let secret = config.secret.clone(); - tokio::select! { - _ = client.next() => (), - accept = tcp_bind.accept() => { - if let Ok((tcp, _)) = accept { - let io = TokioIo::new(tcp); - tokio::task::spawn(async move { - if let Err(err) = http1::Builder::new() - .timer(TokioTimer::new()) - .serve_connection(io, service_fn(|request| { - let value_tx = value_tx.clone(); - let secret = secret.clone(); - async move { - hooks(request, &secret, value_tx).await - } - })) - .await - { - println!("Error serving connection: {:?}", err); + if let Ok((tcp, _)) = tcp_bind.accept().await { + let io = TokioIo::new(tcp); + tokio::task::spawn(async move { + if let Err(err) = http1::Builder::new() + .timer(TokioTimer::new()) + .serve_connection(io, service_fn(|request| { + let value_tx = value_tx.clone(); + let secret = secret.clone(); + async move { + hooks(request, &secret, value_tx).await } - }); + })) + .await + { + println!("Error serving connection: {:?}", err); } - } - wh = value_rx.recv() => { - if let Some(hook) = wh { - client.hook(hook).await - } - } + }); } } }