xmpp: split wait_for_events methods
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
This commit is contained in:
parent
e23c161c0a
commit
9d17a79813
1 changed files with 112 additions and 121 deletions
233
xmpp/src/lib.rs
233
xmpp/src/lib.rs
|
@ -27,7 +27,7 @@ use xmpp_parsers::{
|
|||
pubsub::pubsub::{Items, PubSub},
|
||||
roster::{Item as RosterItem, Roster},
|
||||
stanza_error::{DefinedCondition, ErrorType, StanzaError},
|
||||
BareJid, FullJid, Jid,
|
||||
BareJid, Element, FullJid, Jid,
|
||||
};
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
@ -236,6 +236,102 @@ impl Agent {
|
|||
presence
|
||||
}
|
||||
|
||||
async fn handle_iq(&mut self, iq: Iq) -> () {
|
||||
if let IqType::Get(payload) = iq.payload {
|
||||
if payload.is("query", ns::DISCO_INFO) {
|
||||
let query = DiscoInfoQuery::try_from(payload);
|
||||
match query {
|
||||
Ok(query) => {
|
||||
let mut disco_info = self.disco.clone();
|
||||
disco_info.node = query.node;
|
||||
let iq = Iq::from_result(iq.id, Some(disco_info))
|
||||
.with_to(iq.from.unwrap())
|
||||
.into();
|
||||
let _ = self.client.send_stanza(iq).await;
|
||||
}
|
||||
Err(err) => {
|
||||
let error = StanzaError::new(
|
||||
ErrorType::Modify,
|
||||
DefinedCondition::BadRequest,
|
||||
"en",
|
||||
&format!("{}", err),
|
||||
);
|
||||
let iq = Iq::from_error(iq.id, error)
|
||||
.with_to(iq.from.unwrap())
|
||||
.into();
|
||||
let _ = self.client.send_stanza(iq).await;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// We MUST answer unhandled get iqs with a service-unavailable error.
|
||||
let error = StanzaError::new(
|
||||
ErrorType::Cancel,
|
||||
DefinedCondition::ServiceUnavailable,
|
||||
"en",
|
||||
"No handler defined for this kind of iq.",
|
||||
);
|
||||
let iq = Iq::from_error(iq.id, error)
|
||||
.with_to(iq.from.unwrap())
|
||||
.into();
|
||||
let _ = self.client.send_stanza(iq).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_message(&mut self, message: Message) -> Vec<Event> {
|
||||
let mut events = vec![];
|
||||
let from = message.from.clone().unwrap();
|
||||
let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect();
|
||||
match message.get_best_body(langs) {
|
||||
Some((_lang, body)) => match message.type_ {
|
||||
MessageType::Groupchat => {
|
||||
let event = Event::RoomMessage(
|
||||
from.clone().into(),
|
||||
FullJid::try_from(from.clone()).unwrap().resource,
|
||||
body.clone(),
|
||||
);
|
||||
events.push(event)
|
||||
}
|
||||
MessageType::Chat | MessageType::Normal => {
|
||||
let event = Event::ChatMessage(from.clone().into(), body.clone());
|
||||
events.push(event)
|
||||
}
|
||||
_ => (),
|
||||
},
|
||||
None => (),
|
||||
}
|
||||
for child in message.payloads {
|
||||
if child.is("event", ns::PUBSUB_EVENT) {
|
||||
let new_events = pubsub::handle_event(&from, child, self).await;
|
||||
events.extend(new_events);
|
||||
}
|
||||
}
|
||||
|
||||
events
|
||||
}
|
||||
|
||||
async fn handle_presence(&mut self, presence: Presence) -> Vec<Event> {
|
||||
let mut events = vec![];
|
||||
let from: BareJid = match presence.from.clone().unwrap() {
|
||||
Jid::Full(FullJid { node, domain, .. }) => BareJid { node, domain },
|
||||
Jid::Bare(bare) => bare,
|
||||
};
|
||||
for payload in presence.payloads.into_iter() {
|
||||
let muc_user = match MucUser::try_from(payload) {
|
||||
Ok(muc_user) => muc_user,
|
||||
_ => continue,
|
||||
};
|
||||
for status in muc_user.status.into_iter() {
|
||||
if status == Status::SelfPresence {
|
||||
events.push(Event::RoomJoined(from.clone()));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
events
|
||||
}
|
||||
|
||||
pub async fn wait_for_events(&mut self) -> Option<Vec<Event>> {
|
||||
if let Some(event) = self.client.next().await {
|
||||
let mut events = Vec::new();
|
||||
|
@ -264,127 +360,22 @@ impl Agent {
|
|||
TokioXmppEvent::Disconnected(_) => {
|
||||
events.push(Event::Disconnected);
|
||||
}
|
||||
TokioXmppEvent::Stanza(stanza) => {
|
||||
if stanza.is("iq", "jabber:client") {
|
||||
let iq = Iq::try_from(stanza).unwrap();
|
||||
let from = iq
|
||||
.from
|
||||
.clone()
|
||||
.unwrap_or_else(|| self.client.bound_jid().unwrap().clone());
|
||||
if let IqType::Get(payload) = iq.payload {
|
||||
if payload.is("query", ns::DISCO_INFO) {
|
||||
let query = DiscoInfoQuery::try_from(payload);
|
||||
match query {
|
||||
Ok(query) => {
|
||||
let mut disco_info = self.disco.clone();
|
||||
disco_info.node = query.node;
|
||||
let iq = Iq::from_result(iq.id, Some(disco_info))
|
||||
.with_to(iq.from.unwrap())
|
||||
.into();
|
||||
let _ = self.client.send_stanza(iq).await;
|
||||
}
|
||||
Err(err) => {
|
||||
let error = StanzaError::new(
|
||||
ErrorType::Modify,
|
||||
DefinedCondition::BadRequest,
|
||||
"en",
|
||||
&format!("{}", err),
|
||||
);
|
||||
let iq = Iq::from_error(iq.id, error)
|
||||
.with_to(iq.from.unwrap())
|
||||
.into();
|
||||
let _ = self.client.send_stanza(iq).await;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// We MUST answer unhandled get iqs with a service-unavailable error.
|
||||
let error = StanzaError::new(
|
||||
ErrorType::Cancel,
|
||||
DefinedCondition::ServiceUnavailable,
|
||||
"en",
|
||||
"No handler defined for this kind of iq.",
|
||||
);
|
||||
let iq = Iq::from_error(iq.id, error)
|
||||
.with_to(iq.from.unwrap())
|
||||
.into();
|
||||
let _ = self.client.send_stanza(iq).await;
|
||||
}
|
||||
} else if let IqType::Result(Some(payload)) = iq.payload {
|
||||
// TODO: move private iqs like this one somewhere else, for
|
||||
// security reasons.
|
||||
if payload.is("query", ns::ROSTER) && iq.from.is_none() {
|
||||
let roster = Roster::try_from(payload).unwrap();
|
||||
for item in roster.items.into_iter() {
|
||||
events.push(Event::ContactAdded(item));
|
||||
}
|
||||
} else if payload.is("pubsub", ns::PUBSUB) {
|
||||
let new_events = pubsub::handle_iq_result(&from, payload);
|
||||
events.extend(new_events);
|
||||
}
|
||||
} else if let IqType::Set(_) = iq.payload {
|
||||
// We MUST answer unhandled set iqs with a service-unavailable error.
|
||||
let error = StanzaError::new(
|
||||
ErrorType::Cancel,
|
||||
DefinedCondition::ServiceUnavailable,
|
||||
"en",
|
||||
"No handler defined for this kind of iq.",
|
||||
);
|
||||
let iq = Iq::from_error(iq.id, error)
|
||||
.with_to(iq.from.unwrap())
|
||||
.into();
|
||||
let _ = self.client.send_stanza(iq).await;
|
||||
}
|
||||
} else if stanza.is("message", "jabber:client") {
|
||||
let message = Message::try_from(stanza).unwrap();
|
||||
let from = message.from.clone().unwrap();
|
||||
let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect();
|
||||
match message.get_best_body(langs) {
|
||||
Some((_lang, body)) => match message.type_ {
|
||||
MessageType::Groupchat => {
|
||||
let event = Event::RoomMessage(
|
||||
from.clone().into(),
|
||||
FullJid::try_from(from.clone()).unwrap().resource,
|
||||
body.clone(),
|
||||
);
|
||||
events.push(event)
|
||||
}
|
||||
MessageType::Chat | MessageType::Normal => {
|
||||
let event =
|
||||
Event::ChatMessage(from.clone().into(), body.clone());
|
||||
events.push(event)
|
||||
}
|
||||
_ => (),
|
||||
},
|
||||
None => (),
|
||||
}
|
||||
for child in message.payloads {
|
||||
if child.is("event", ns::PUBSUB_EVENT) {
|
||||
let new_events = pubsub::handle_event(&from, child, self).await;
|
||||
events.extend(new_events);
|
||||
}
|
||||
}
|
||||
} else if stanza.is("presence", "jabber:client") {
|
||||
let presence = Presence::try_from(stanza).unwrap();
|
||||
let from: BareJid = match presence.from.clone().unwrap() {
|
||||
Jid::Full(FullJid { node, domain, .. }) => BareJid { node, domain },
|
||||
Jid::Bare(bare) => bare,
|
||||
};
|
||||
for payload in presence.payloads.into_iter() {
|
||||
let muc_user = match MucUser::try_from(payload) {
|
||||
Ok(muc_user) => muc_user,
|
||||
_ => continue,
|
||||
};
|
||||
for status in muc_user.status.into_iter() {
|
||||
if status == Status::SelfPresence {
|
||||
events.push(Event::RoomJoined(from.clone()));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if stanza.is("error", "http://etherx.jabber.org/streams") {
|
||||
println!("Received a fatal stream error: {}", String::from(&stanza));
|
||||
TokioXmppEvent::Stanza(elem) => {
|
||||
if elem.is("iq", "jabber:client") {
|
||||
let iq = Iq::try_from(elem).unwrap();
|
||||
self.handle_iq(iq).await;
|
||||
} else if elem.is("message", "jabber:client") {
|
||||
let message = Message::try_from(elem).unwrap();
|
||||
let new_events = self.handle_message(message).await;
|
||||
events.extend(new_events);
|
||||
} else if elem.is("presence", "jabber:client") {
|
||||
let presence = Presence::try_from(elem).unwrap();
|
||||
let new_events = self.handle_presence(presence).await;
|
||||
events.extend(new_events);
|
||||
} else if elem.is("error", "http://etherx.jabber.org/streams") {
|
||||
println!("Received a fatal stream error: {}", String::from(&elem));
|
||||
} else {
|
||||
panic!("Unknown stanza: {}", String::from(&stanza));
|
||||
panic!("Unknown stanza: {}", String::from(&elem));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue