diff --git a/Cargo.toml b/Cargo.toml index 6f7eacf..dafcb44 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,3 +13,5 @@ log = "0.4" tokio = { version = "1", features = [ "full" ] } pretty_env_logger = "0.5" serde_json = "1.0" +xmpp = "0.4" +xmpp-parsers = "0.19" diff --git a/src/main.rs b/src/main.rs index e9ec8c4..356d5e5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,27 +17,53 @@ mod error; mod web; +mod webhook; +mod xmpp; use crate::web::webhooks; +use crate::webhook::WebHook; +use crate::xmpp::XmppClient; use std::convert::Infallible; use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; use hyper::{ service::{make_service_fn, service_fn}, Server, }; +use tokio::sync::mpsc; #[tokio::main] async fn main() { pretty_env_logger::init(); let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); - let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(webhooks)) }); + let (value_tx, mut value_rx) = mpsc::unbounded_channel::(); + let value_tx = Arc::new(Mutex::new(value_tx)); + let make_svc = make_service_fn(move |_conn| { + let value_tx = value_tx.clone(); + async move { + Ok::<_, Infallible>(service_fn(move |req| { + let value_tx = value_tx.clone(); + webhooks(req, value_tx) + })) + } + }); let server = Server::bind(&addr).serve(make_svc); println!("Listening on http://{}", addr); - if let Err(e) = server.await { - eprintln!("Server error: {e}"); + let _join = tokio::spawn(server); + let mut client = XmppClient::new("JID", "PASSWD"); + + loop { + tokio::select! { + _ = client.next() => (), + wh = value_rx.recv() => { + if let Some(wh) = wh { + client.webhook(wh).await + } + } + } } } diff --git a/src/web.rs b/src/web.rs index dd34f85..d3df0f9 100644 --- a/src/web.rs +++ b/src/web.rs @@ -14,13 +14,15 @@ // along with this program. If not, see . use crate::error::Error; +use crate::webhook::WebHook; use std::convert::Infallible; use std::str::from_utf8; +use std::sync::{Arc, Mutex}; -use gitlab::webhooks::WebHook; use hyper::{body, header, Body, Method, Request, Response}; use log::{debug, error}; +use tokio::sync::mpsc::UnboundedSender; fn error_res(e: E) -> Result, Infallible> { error!("error response: {:?}", e); @@ -33,7 +35,7 @@ fn error_res(e: E) -> Result, Infallible> { Ok(res) } -async fn webhooks_inner(req: Request) -> Result, Error> { +async fn webhooks_inner(req: Request) -> Result { match req.method() { &Method::POST => (), _ => return Err(Error::MethodMismatch), @@ -43,24 +45,33 @@ async fn webhooks_inner(req: Request) -> Result, Error> { let headers = req.headers(); if let Some(content_type) = headers.get(header::CONTENT_TYPE) && - let Some(token) = headers.get("X-Gitlab-Token") { - if content_type != "application/json" { - return Err(Error::InvalidContentType); - } + let Some(token) = headers.get("X-Gitlab-Token") { + if content_type != "application/json" { + return Err(Error::InvalidContentType); + } - if token != "secret" { - return Err(Error::InvalidToken); - } - } + if token != "secret" { + return Err(Error::InvalidToken); + } + } let tmp = body::to_bytes(req.into_body()).await?; let text: &str = from_utf8(&tmp)?; - let json: WebHook = serde_json::from_str(text)?; - debug!("Passed: {:?}", json); - - Ok(Response::new("Hello world".into())) + Ok(serde_json::from_str(text)?) } -pub async fn webhooks(req: Request) -> Result, Infallible> { - webhooks_inner(req).await.or_else(error_res) +pub async fn webhooks( + req: Request, + value_tx: Arc>>, +) -> Result, Infallible> { + match webhooks_inner(req).await { + Ok(wh) => { + debug!("Passed: {:?}", wh); + + value_tx.lock().unwrap().send(wh).unwrap(); + + Ok(Response::new("Hello world".into())) + } + Err(err) => error_res(err), + } } diff --git a/src/webhook.rs b/src/webhook.rs new file mode 100644 index 0000000..a09d5dd --- /dev/null +++ b/src/webhook.rs @@ -0,0 +1,114 @@ +// Copyright (C) 2023-2099 The crate authors. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU Affero General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License +// for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +pub use gitlab::webhooks::{IssueAction, MergeRequestAction, WebHook, WikiPageAction}; +use log::debug; + +pub fn format_webhook(wh: &WebHook) -> Option { + Some(match wh { + WebHook::Push(push) => { + let mut text = format!( + "{} pushed {} commits to {} branch {}", + push.user_name, + push.commits.len(), + push.project.name, + push.ref_ + ); + for commit in &push.commits { + match commit.message.lines().nth(0) { + Some(subject) => { + text = format!("{}\n• {} <{}>", text, subject, commit.url); + } + None => {} + } + } + text + } + WebHook::Issue(issue) => { + let action = match issue.object_attributes.action { + Some(IssueAction::Update) => "updated", + Some(IssueAction::Open) => "opened", + Some(IssueAction::Close) => "closed", + Some(IssueAction::Reopen) => "reopened", + None => return None, + }; + format!( + "{} {} issue {} in {}: {}{}", + issue.user.name, + action, + issue.object_attributes.iid, + issue.project.name, + issue.object_attributes.title, + issue + .object_attributes + .url + .as_ref() + .map(|url| format!(" <{}>", url)) + .unwrap_or("".to_owned()) + ) + } + WebHook::MergeRequest(merge_req) => { + let action = match merge_req.object_attributes.action { + Some(MergeRequestAction::Update) => "updated", + Some(MergeRequestAction::Open) => "opened", + Some(MergeRequestAction::Close) => "closed", + Some(MergeRequestAction::Reopen) => "reopened", + Some(MergeRequestAction::Merge) => "merged", + None => return None, + _ => todo!(), + }; + format!( + "{} {} merge request {} in {}: {}{}", + merge_req.user.name, + action, + merge_req.object_attributes.iid, + merge_req.project.name, + merge_req.object_attributes.title, + merge_req + .object_attributes + .url + .as_ref() + .map(|url| format!(" <{}>", url)) + .unwrap_or("".to_owned()) + ) + } + WebHook::Note(note) => { + println!("Note: {:?}", note); + return None; + } + WebHook::Build(build) => { + println!("Build: {:?}", build); + return None; + } + WebHook::WikiPage(page) => { + let action = match page.object_attributes.action { + WikiPageAction::Update => "updated", + WikiPageAction::Create => "created", + }; + format!( + "{} {} {} wiki page {} <{}>", + page.user.name, + action, + page.project.name, + page.object_attributes.title, + page.object_attributes.url, + ) + } + _wh => { + debug!("Webhook not supported"); + return None; + } + }) +} diff --git a/src/xmpp.rs b/src/xmpp.rs new file mode 100644 index 0000000..b909dbd --- /dev/null +++ b/src/xmpp.rs @@ -0,0 +1,85 @@ +// Copyright (C) 2023-2099 The crate authors. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU Affero General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License +// for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use crate::webhook::{format_webhook, WebHook}; + +use std::str::FromStr; + +use log::debug; +use xmpp::{Agent, ClientBuilder, ClientFeature, ClientType, Event}; +use xmpp_parsers::{message::MessageType, BareJid, Jid}; + +pub struct XmppClient { + is_online: bool, + agent: Agent, +} + +impl XmppClient { + pub fn new(jid: &str, password: &str) -> XmppClient { + let agent = ClientBuilder::new(jid, password) + .set_client(ClientType::Bot, "xmpp-rs") + .set_website("https://gitlab.com/xmpp-rs/xmpp-rs") + .set_default_nick("bot") + .enable_feature(ClientFeature::JoinRooms) + .build() + .unwrap(); + + XmppClient { + is_online: false, + agent, + } + } + + pub async fn next(&mut self) { + if let Some(events) = self.agent.wait_for_events().await { + for event in events { + match event { + Event::Online => { + self.is_online = true; + debug!("XMPP Online"); + + self.agent + .join_room( + BareJid::from_str("chat@xmpp.rs").unwrap(), + Some(String::from("bot")), + None, + "en", + "Hi there!", + ) + .await + } + _ => { + debug!("XMPP Event not supported") + } + } + } + } + } + + pub async fn webhook(&mut self, wh: WebHook) { + debug!("Received Webhook"); + if let Some(display) = format_webhook(&wh) { + debug!("Webhook: {}", display); + self.agent + .send_message( + Jid::Bare(BareJid::from_str("chat@xmpp.rs").unwrap()), + MessageType::Groupchat, + "en", + &display, + ) + .await + } + } +}