Connect to XMPP, join room, send message

Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
This commit is contained in:
Maxime “pep” Buquet 2023-05-20 20:30:56 +02:00
parent 36673bd5bf
commit 0cba90dd68
5 changed files with 257 additions and 19 deletions

View file

@ -13,3 +13,5 @@ log = "0.4"
tokio = { version = "1", features = [ "full" ] }
pretty_env_logger = "0.5"
serde_json = "1.0"
xmpp = "0.4"
xmpp-parsers = "0.19"

View file

@ -17,27 +17,53 @@
mod error;
mod web;
mod webhook;
mod xmpp;
use crate::web::webhooks;
use crate::webhook::WebHook;
use crate::xmpp::XmppClient;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use hyper::{
service::{make_service_fn, service_fn},
Server,
};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
pretty_env_logger::init();
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(webhooks)) });
let (value_tx, mut value_rx) = mpsc::unbounded_channel::<WebHook>();
let value_tx = Arc::new(Mutex::new(value_tx));
let make_svc = make_service_fn(move |_conn| {
let value_tx = value_tx.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let value_tx = value_tx.clone();
webhooks(req, value_tx)
}))
}
});
let server = Server::bind(&addr).serve(make_svc);
println!("Listening on http://{}", addr);
if let Err(e) = server.await {
eprintln!("Server error: {e}");
let _join = tokio::spawn(server);
let mut client = XmppClient::new("JID", "PASSWD");
loop {
tokio::select! {
_ = client.next() => (),
wh = value_rx.recv() => {
if let Some(wh) = wh {
client.webhook(wh).await
}
}
}
}
}

View file

@ -14,13 +14,15 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::error::Error;
use crate::webhook::WebHook;
use std::convert::Infallible;
use std::str::from_utf8;
use std::sync::{Arc, Mutex};
use gitlab::webhooks::WebHook;
use hyper::{body, header, Body, Method, Request, Response};
use log::{debug, error};
use tokio::sync::mpsc::UnboundedSender;
fn error_res<E: std::fmt::Debug>(e: E) -> Result<Response<Body>, Infallible> {
error!("error response: {:?}", e);
@ -33,7 +35,7 @@ fn error_res<E: std::fmt::Debug>(e: E) -> Result<Response<Body>, Infallible> {
Ok(res)
}
async fn webhooks_inner(req: Request<Body>) -> Result<Response<Body>, Error> {
async fn webhooks_inner(req: Request<Body>) -> Result<WebHook, Error> {
match req.method() {
&Method::POST => (),
_ => return Err(Error::MethodMismatch),
@ -43,24 +45,33 @@ async fn webhooks_inner(req: Request<Body>) -> Result<Response<Body>, Error> {
let headers = req.headers();
if let Some(content_type) = headers.get(header::CONTENT_TYPE) &&
let Some(token) = headers.get("X-Gitlab-Token") {
if content_type != "application/json" {
return Err(Error::InvalidContentType);
}
let Some(token) = headers.get("X-Gitlab-Token") {
if content_type != "application/json" {
return Err(Error::InvalidContentType);
}
if token != "secret" {
return Err(Error::InvalidToken);
}
}
if token != "secret" {
return Err(Error::InvalidToken);
}
}
let tmp = body::to_bytes(req.into_body()).await?;
let text: &str = from_utf8(&tmp)?;
let json: WebHook = serde_json::from_str(text)?;
debug!("Passed: {:?}", json);
Ok(Response::new("Hello world".into()))
Ok(serde_json::from_str(text)?)
}
pub async fn webhooks(req: Request<Body>) -> Result<Response<Body>, Infallible> {
webhooks_inner(req).await.or_else(error_res)
pub async fn webhooks(
req: Request<Body>,
value_tx: Arc<Mutex<UnboundedSender<WebHook>>>,
) -> Result<Response<Body>, Infallible> {
match webhooks_inner(req).await {
Ok(wh) => {
debug!("Passed: {:?}", wh);
value_tx.lock().unwrap().send(wh).unwrap();
Ok(Response::new("Hello world".into()))
}
Err(err) => error_res(err),
}
}

114
src/webhook.rs Normal file
View file

@ -0,0 +1,114 @@
// Copyright (C) 2023-2099 The crate authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Affero General Public License as published by the
// Free Software Foundation, either version 3 of the License, or (at your
// option) any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
// for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
pub use gitlab::webhooks::{IssueAction, MergeRequestAction, WebHook, WikiPageAction};
use log::debug;
pub fn format_webhook(wh: &WebHook) -> Option<String> {
Some(match wh {
WebHook::Push(push) => {
let mut text = format!(
"{} pushed {} commits to {} branch {}",
push.user_name,
push.commits.len(),
push.project.name,
push.ref_
);
for commit in &push.commits {
match commit.message.lines().nth(0) {
Some(subject) => {
text = format!("{}\n{} <{}>", text, subject, commit.url);
}
None => {}
}
}
text
}
WebHook::Issue(issue) => {
let action = match issue.object_attributes.action {
Some(IssueAction::Update) => "updated",
Some(IssueAction::Open) => "opened",
Some(IssueAction::Close) => "closed",
Some(IssueAction::Reopen) => "reopened",
None => return None,
};
format!(
"{} {} issue {} in {}: {}{}",
issue.user.name,
action,
issue.object_attributes.iid,
issue.project.name,
issue.object_attributes.title,
issue
.object_attributes
.url
.as_ref()
.map(|url| format!(" <{}>", url))
.unwrap_or("".to_owned())
)
}
WebHook::MergeRequest(merge_req) => {
let action = match merge_req.object_attributes.action {
Some(MergeRequestAction::Update) => "updated",
Some(MergeRequestAction::Open) => "opened",
Some(MergeRequestAction::Close) => "closed",
Some(MergeRequestAction::Reopen) => "reopened",
Some(MergeRequestAction::Merge) => "merged",
None => return None,
_ => todo!(),
};
format!(
"{} {} merge request {} in {}: {}{}",
merge_req.user.name,
action,
merge_req.object_attributes.iid,
merge_req.project.name,
merge_req.object_attributes.title,
merge_req
.object_attributes
.url
.as_ref()
.map(|url| format!(" <{}>", url))
.unwrap_or("".to_owned())
)
}
WebHook::Note(note) => {
println!("Note: {:?}", note);
return None;
}
WebHook::Build(build) => {
println!("Build: {:?}", build);
return None;
}
WebHook::WikiPage(page) => {
let action = match page.object_attributes.action {
WikiPageAction::Update => "updated",
WikiPageAction::Create => "created",
};
format!(
"{} {} {} wiki page {} <{}>",
page.user.name,
action,
page.project.name,
page.object_attributes.title,
page.object_attributes.url,
)
}
_wh => {
debug!("Webhook not supported");
return None;
}
})
}

85
src/xmpp.rs Normal file
View file

@ -0,0 +1,85 @@
// Copyright (C) 2023-2099 The crate authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Affero General Public License as published by the
// Free Software Foundation, either version 3 of the License, or (at your
// option) any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
// for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::webhook::{format_webhook, WebHook};
use std::str::FromStr;
use log::debug;
use xmpp::{Agent, ClientBuilder, ClientFeature, ClientType, Event};
use xmpp_parsers::{message::MessageType, BareJid, Jid};
pub struct XmppClient {
is_online: bool,
agent: Agent,
}
impl XmppClient {
pub fn new(jid: &str, password: &str) -> XmppClient {
let agent = ClientBuilder::new(jid, password)
.set_client(ClientType::Bot, "xmpp-rs")
.set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
.set_default_nick("bot")
.enable_feature(ClientFeature::JoinRooms)
.build()
.unwrap();
XmppClient {
is_online: false,
agent,
}
}
pub async fn next(&mut self) {
if let Some(events) = self.agent.wait_for_events().await {
for event in events {
match event {
Event::Online => {
self.is_online = true;
debug!("XMPP Online");
self.agent
.join_room(
BareJid::from_str("chat@xmpp.rs").unwrap(),
Some(String::from("bot")),
None,
"en",
"Hi there!",
)
.await
}
_ => {
debug!("XMPP Event not supported")
}
}
}
}
}
pub async fn webhook(&mut self, wh: WebHook) {
debug!("Received Webhook");
if let Some(display) = format_webhook(&wh) {
debug!("Webhook: {}", display);
self.agent
.send_message(
Jid::Bare(BareJid::from_str("chat@xmpp.rs").unwrap()),
MessageType::Groupchat,
"en",
&display,
)
.await
}
}
}