Add broadcast_presence method

Also add bits of the next commit because there are too many changes now
and I failed to properly dissociate with add -p :(

Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
This commit is contained in:
Maxime “pep” Buquet 2022-09-18 11:44:20 +02:00
parent 4383b7de88
commit 3b49a5ae40
Signed by: pep
GPG key ID: DEDA74AEECA9D0F2
4 changed files with 439 additions and 119 deletions

View file

@ -165,9 +165,11 @@ pub struct TestComponent {
impl TestComponent {
pub fn new(in_buffer: Vec<Element>) -> Self {
TestComponent {
in_buffer: VecDeque::from(in_buffer.into_iter()
.map(|el| TestElement(el))
.collect::<Vec<_>>()
in_buffer: VecDeque::from(
in_buffer
.into_iter()
.map(|el| TestElement(el))
.collect::<Vec<_>>(),
),
out_buffer: VecDeque::new(),
expect_buffer: VecDeque::new(),

View file

@ -175,7 +175,7 @@ async fn handle_presence<C: ComponentTrait>(
).into()
]);
if let Some(mut room) = rooms.remove(&roomjid) {
match room.remove_session(component, realjid).await {
match room.remove_session(component, realjid, participant.resource.clone()).await {
Ok(()) => (),
Err(Error::NonexistantSession(_)) => {
component.send_stanza(error).await.unwrap();

View file

@ -16,7 +16,7 @@
use crate::component::ComponentTrait;
use crate::error::Error;
use std::collections::HashMap;
use std::collections::BTreeMap;
use std::iter::IntoIterator;
use chrono::{FixedOffset, Utc};
@ -34,11 +34,25 @@ use xmpp_parsers::{
};
pub type Nick = String;
type Session = FullJid;
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum BroadcastPresence {
/// Resource joined the room. It needs to know about all other participants, and other
/// participants needs to know about it.
Join,
/// Resource lost sync. It needs to know about all other participants.
Resync,
/// Resource change status (becomes busy, etc.), tell all other participants.
Update,
/// Resource leaves. It only needs confirmation that it leaves. Tell all other participants.
Leave,
}
#[derive(Debug, PartialEq)]
pub struct Room {
pub jid: BareJid,
pub occupants: HashMap<BareJid, Occupant>,
pub occupants: BTreeMap<Nick, Occupant>,
// TODO: Subject struct.
// TODO: Store subject lang
pub subject: Option<(String, Occupant, DateTime)>,
@ -48,81 +62,179 @@ impl Room {
pub fn new(jid: BareJid) -> Self {
Room {
jid,
occupants: HashMap::new(),
occupants: BTreeMap::new(),
subject: None,
}
}
pub async fn broadcast_presence<C: ComponentTrait>(
&self,
component: &mut C,
own_participant: &FullJid,
own_session: &Session,
own_nick: &Nick,
mode: BroadcastPresence,
) -> Result<(), Error> {
let leave = mode == BroadcastPresence::Leave;
// All participants to new participant
let presence_to_new = Presence::new(if leave {
PresenceType::Unavailable
} else {
PresenceType::None
})
.with_to(own_session.clone())
.with_payloads(vec![MucUser {
status: Vec::new(),
items: vec![MucItem::new(Affiliation::Owner, Role::Moderator)],
}
.into()]);
// New participant to all other sessions
let presence_to_old = Presence::new(if leave {
PresenceType::Unavailable
} else {
PresenceType::None
})
.with_from(Jid::Full(own_participant.clone()))
.with_payloads(vec![MucUser {
status: Vec::new(),
items: vec![MucItem::new(
Affiliation::Owner,
if leave { Role::None } else { Role::Moderator },
)],
}
.into()]);
let sync = match mode {
BroadcastPresence::Join | BroadcastPresence::Resync => true,
_ => false,
};
let update = match mode {
BroadcastPresence::Join | BroadcastPresence::Update => true,
_ => false,
};
/*
// MucItems to be sent to sessions of this occupant
let self_items = occupant.iter().map(|session| {
MucItem {
affiliation: Affiliation::Owner,
role: Role::Moderator,
jid: Some(session.clone()),
nick: None,
actor: None,
continue_: None,
reason: None,
}
}).collect::<Vec<MucItem>>();
*/
for (_, other) in self.occupants.iter() {
if sync {
// Send presences from others to participant.
if own_nick != &other.nick {
let presence = presence_to_new
.clone()
.with_from(Jid::Full(other.participant.clone()));
component.send_stanza(presence).await?;
}
}
if update || leave {
// Send presence from participant to others.
for session in other.iter() {
// Skip sending if it's us.
if session == own_session {
continue;
}
let presence = presence_to_old.clone().with_to(Jid::Full(session.clone()));
component.send_stanza(presence).await?;
}
}
}
// Send self-presence
if sync || leave {
// New participant to all other sessions
let self_presence = Presence::new(if leave {
PresenceType::Unavailable
} else {
PresenceType::None
})
.with_from(Jid::Full(own_participant.clone()))
.with_to(own_session.clone())
.with_payloads(vec![MucUser {
status: if leave {
vec![MucStatus::SelfPresence]
} else {
vec![MucStatus::SelfPresence, MucStatus::AssignedNick]
},
items: vec![MucItem::new(
Affiliation::Owner,
if leave { Role::None } else { Role::Moderator },
)],
}
.into()]);
component.send_stanza(self_presence).await?;
}
Ok(())
}
pub async fn add_session<C: ComponentTrait>(
&mut self,
component: &mut C,
realjid: FullJid,
nick: Nick,
realjid: Session,
new_nick: Nick,
) -> Result<(), Error> {
let bare = BareJid::from(realjid.clone());
if let Some(occupant) = self.occupants.get_mut(&bare) {
occupant.add_session(realjid)?;
// Ensure nick isn't already assigned
let _ = self.occupants.iter().try_for_each(|(nick, occupant)| {
let new_nick = new_nick.as_str();
if new_nick == nick && occupant.real != BareJid::from(realjid.clone()) {
return Err(Error::NickAlreadyAssigned(String::from(new_nick)));
}
Ok(())
})?;
if let Some(occupant) = self.occupants.get(&new_nick) {
// TODO: Use get_mut combined with immutable usage of occupant in the
// broadcast_presence call below. Instead of get / reinsert.
let mut occupant = occupant.clone();
match occupant.add_session(realjid.clone()) {
Ok(_) => (),
Err(Error::SessionAlreadyExists(_)) => {
self.broadcast_presence(
component,
&occupant.participant,
&realjid,
&new_nick,
BroadcastPresence::Resync,
)
.await?;
}
err => err?,
}
self.occupants.insert(new_nick, occupant);
// TODO: Send presence
} else {
debug!("{} is joining {}", realjid, self.jid);
let new_occupant = Occupant::new(&self, realjid.clone(), nick.clone());
// Ensure nick isn't already assigned
let _ = self.occupants.iter().try_for_each(|(_, occupant)| {
let nick = nick.clone();
if occupant.nick == nick {
return Err(Error::NickAlreadyAssigned(nick));
}
Ok(())
})?;
// Send occupants
debug!("Sending occupants for {}", realjid);
// Other participants to new participant
let presence_to_new = Presence::new(PresenceType::None)
// New occupant with a single session
.with_to(new_occupant.sessions[0].clone())
.with_payloads(vec![MucUser {
status: Vec::new(),
items: vec![MucItem::new(Affiliation::Owner, Role::Moderator)],
}
.into()]);
// New participant to other participants
let presence_to_old = Presence::new(PresenceType::None)
.with_from(Jid::Full(new_occupant.participant.clone()))
.with_payloads(vec![MucUser {
status: Vec::new(),
items: vec![MucItem::new(Affiliation::Owner, Role::Moderator)],
}
.into()]);
for (_, occupant) in self.occupants.iter() {
component
.send_stanza(
presence_to_new
.clone()
.with_from(occupant.participant.clone()),
)
.await?;
for session in occupant.iter() {
component
.send_stanza(presence_to_old.clone().with_to(session.clone()))
.await?;
}
}
let new_occupant = Occupant::new(&self, realjid.clone(), new_nick.clone());
self.broadcast_presence(
component,
&new_occupant.participant,
&realjid,
&new_nick,
BroadcastPresence::Join,
)
.await?;
// Add into occupants
let _ = self.occupants.insert(bare.clone(), new_occupant.clone());
// Self-presence
debug!("Sending self-presence for {}", realjid);
let participant: FullJid = self.jid.clone().with_resource(nick);
let status = vec![MucStatus::SelfPresence, MucStatus::AssignedNick];
let items = vec![MucItem::new(Affiliation::Owner, Role::Moderator)];
let self_presence = Presence::new(PresenceType::None)
.with_from(participant.clone())
.with_to(realjid.clone())
.with_payloads(vec![MucUser { status, items }.into()]);
component.send_stanza(self_presence).await?;
let _ = self
.occupants
.insert(new_nick.clone(), new_occupant.clone());
// Send subject
debug!("Sending subject!");
@ -157,56 +269,36 @@ impl Room {
pub async fn remove_session<C: ComponentTrait>(
&mut self,
component: &mut C,
realjid: FullJid,
realjid: Session,
nick: Nick,
) -> Result<(), Error> {
let bare = BareJid::from(realjid.clone());
// If occupant doesn't exist, ignore.
if let Some(mut occupant) = self.occupants.remove(&bare) {
let self_presence = Presence::new(PresenceType::Unavailable)
.with_from(occupant.participant.clone())
.with_to(realjid.clone())
.with_payloads(vec![MucUser {
status: vec![MucStatus::SelfPresence],
items: vec![MucItem::new(Affiliation::Owner, Role::None)],
}
.into()]);
if let Some(mut occupant) = self.occupants.remove(&nick) {
self.broadcast_presence(
component,
&occupant.participant,
&realjid,
&nick,
BroadcastPresence::Leave,
)
.await?;
component.send_stanza(self_presence).await?;
occupant.remove_session(realjid)?;
let presence = Presence::new(PresenceType::Unavailable)
.with_from(occupant.participant.clone())
.with_payloads(vec![MucUser {
status: Vec::new(),
items: vec![MucItem::new(Affiliation::Owner, Role::None)],
}
.into()]);
// Add remaining occupant sessions in the occupant pool
if occupant.sessions.len() > 0 {
self.occupants.insert(bare, occupant);
}
for (_, occupant) in self.occupants.iter() {
for session in occupant.iter() {
let presence = presence.clone().with_to(Jid::Full(session.clone()));
component.send_stanza(presence).await?;
}
}
} else {
// TODO: Error
}
Ok(())
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct Occupant {
/// Public Jid for the Occupant
real: BareJid,
participant: FullJid,
nick: Nick,
sessions: Vec<FullJid>,
pub real: BareJid,
pub participant: FullJid,
pub nick: Nick,
pub sessions: Vec<FullJid>,
}
impl Occupant {
@ -275,15 +367,241 @@ impl Occupant {
#[cfg(test)]
mod tests {
use super::*;
use crate::component::TestComponent;
use std::str::FromStr;
use xmpp_parsers::{BareJid, FullJid};
use xmpp_parsers::{
muc::{
user::{Affiliation, Item as MucItem, Role, Status as MucStatus},
MucUser,
},
presence::{Presence, Type as PresenceType},
BareJid,
};
#[test]
fn occupant_ignore_dup_session() {
let room = Room::new(BareJid::from_str("room@muc").unwrap());
let real = FullJid::from_str("foo@bar/meh").unwrap();
let mut occupant = Occupant::new(&room, real.clone(), String::from("nick"));
occupant.add_session(real.clone()).unwrap();
assert_eq!(occupant.iter().count(), 1);
#[tokio::test]
async fn test_broadcast_presence() {
let own_nick = String::from("nick3");
let roomjid = BareJid::from_str("room@muc").unwrap();
let realjid1 = FullJid::from_str("foo@bar/qxx").unwrap();
let participant1 = roomjid.clone().with_resource(String::from("nick1"));
let realjid2 = FullJid::from_str("qxx@foo/bar").unwrap();
let participant2 = roomjid.clone().with_resource(String::from("nick2"));
let realjid3 = FullJid::from_str("bar@qxx/foo").unwrap();
let participant3 = roomjid.clone().with_resource(String::from("nick3"));
let mut room = Room::new(roomjid.clone());
room.occupants = BTreeMap::new();
room.occupants.insert(
participant1.resource.clone(),
Occupant::new(&room, realjid1.clone(), String::from("nick1")),
);
room.occupants.insert(
participant2.resource.clone(),
Occupant::new(&room, realjid2.clone(), String::from("nick2")),
);
room.occupants.insert(
participant3.resource.clone(),
Occupant::new(&room, realjid3.clone(), String::from("nick3")),
);
// BroadcastPresence::Resync
let mut component = TestComponent::new(vec![]);
component.expect(
Presence::new(PresenceType::None)
.with_from(participant1.clone())
.with_to(realjid3.clone())
.with_payloads(vec![MucUser {
status: vec![],
items: vec![MucItem::new(Affiliation::Owner, Role::Moderator)],
}
.into()]),
);
component.expect(
Presence::new(PresenceType::None)
.with_from(participant2.clone())
.with_to(realjid3.clone())
.with_payloads(vec![MucUser {
status: vec![],
items: vec![MucItem::new(Affiliation::Owner, Role::Moderator)],
}
.into()]),
);
component.expect(
Presence::new(PresenceType::None)
.with_from(participant3.clone())
.with_to(realjid3.clone())
.with_payloads(vec![MucUser {
status: vec![MucStatus::SelfPresence, MucStatus::AssignedNick],
items: vec![MucItem::new(Affiliation::Owner, Role::Moderator)],
}
.into()]),
);
room.broadcast_presence(
&mut component,
&participant3,
&realjid3,
&own_nick,
BroadcastPresence::Resync,
)
.await
.unwrap();
component.assert();
// BroadcastPresence::Update
let mut component = TestComponent::new(vec![]);
component.expect(
Presence::new(PresenceType::None)
.with_from(participant3.clone())
.with_to(realjid1.clone())
.with_payloads(vec![MucUser {
status: vec![],
items: vec![MucItem::new(Affiliation::Owner, Role::Moderator)],
}
.into()]),
);
component.expect(
Presence::new(PresenceType::None)
.with_from(participant3.clone())
.with_to(realjid2.clone())
.with_payloads(vec![MucUser {
status: vec![],
items: vec![MucItem::new(Affiliation::Owner, Role::Moderator)],
}
.into()]),
);
room.broadcast_presence(
&mut component,
&participant3,
&realjid3,
&own_nick,
BroadcastPresence::Update,
)
.await
.unwrap();
component.assert();
// BroadcastPresence::Join
let mut component = TestComponent::new(vec![]);
component.expect(
Presence::new(PresenceType::None)
.with_from(participant1.clone())
.with_to(realjid3.clone())
.with_payloads(vec![MucUser {
status: vec![],
items: vec![MucItem::new(Affiliation::Owner, Role::Moderator)],
}
.into()]),
);
component.expect(
Presence::new(PresenceType::None)
.with_from(participant3.clone())
.with_to(realjid1.clone())
.with_payloads(vec![MucUser {
status: vec![],
items: vec![MucItem::new(Affiliation::Owner, Role::Moderator)],
}
.into()]),
);
component.expect(
Presence::new(PresenceType::None)
.with_from(participant2.clone())
.with_to(realjid3.clone())
.with_payloads(vec![MucUser {
status: vec![],
items: vec![MucItem::new(Affiliation::Owner, Role::Moderator)],
}
.into()]),
);
component.expect(
Presence::new(PresenceType::None)
.with_from(participant3.clone())
.with_to(realjid2.clone())
.with_payloads(vec![MucUser {
status: vec![],
items: vec![MucItem::new(Affiliation::Owner, Role::Moderator)],
}
.into()]),
);
component.expect(
Presence::new(PresenceType::None)
.with_from(participant3.clone())
.with_to(realjid3.clone())
.with_payloads(vec![MucUser {
status: vec![MucStatus::SelfPresence, MucStatus::AssignedNick],
items: vec![MucItem::new(Affiliation::Owner, Role::Moderator)],
}
.into()]),
);
room.broadcast_presence(
&mut component,
&participant3,
&realjid3,
&own_nick,
BroadcastPresence::Join,
)
.await
.unwrap();
component.assert();
// BroadcastPresence::Leave
let mut component = TestComponent::new(vec![]);
component.expect(
Presence::new(PresenceType::Unavailable)
.with_from(participant3.clone())
.with_to(realjid1.clone())
.with_payloads(vec![MucUser {
status: vec![],
items: vec![MucItem::new(Affiliation::Owner, Role::None)],
}
.into()]),
);
component.expect(
Presence::new(PresenceType::Unavailable)
.with_from(participant3.clone())
.with_to(realjid2.clone())
.with_payloads(vec![MucUser {
status: vec![],
items: vec![MucItem::new(Affiliation::Owner, Role::None)],
}
.into()]),
);
component.expect(
Presence::new(PresenceType::Unavailable)
.with_from(participant3.clone())
.with_to(realjid3.clone())
.with_payloads(vec![MucUser {
status: vec![MucStatus::SelfPresence],
items: vec![MucItem::new(Affiliation::Owner, Role::None)],
}
.into()]),
);
room.broadcast_presence(
&mut component,
&participant3,
&realjid3,
&own_nick,
BroadcastPresence::Leave,
)
.await
.unwrap();
component.assert();
}
}

View file

@ -410,9 +410,9 @@ async fn test_leave_room_not_last() {
component.expect(
Presence::new(PresenceType::Unavailable)
.with_from(Jid::Full(participant2.clone()))
.with_to(Jid::Full(realjid2.clone()))
.with_to(Jid::Full(realjid1.clone()))
.with_payloads(vec![MucUser {
status: vec![MucStatus::SelfPresence],
status: Vec::new(),
items: vec![MucItem::new(Affiliation::Owner, Role::None)],
}
.into()]),
@ -421,9 +421,9 @@ async fn test_leave_room_not_last() {
component.expect(
Presence::new(PresenceType::Unavailable)
.with_from(Jid::Full(participant2.clone()))
.with_to(Jid::Full(realjid1.clone()))
.with_to(Jid::Full(realjid2.clone()))
.with_payloads(vec![MucUser {
status: Vec::new(),
status: vec![MucStatus::SelfPresence],
items: vec![MucItem::new(Affiliation::Owner, Role::None)],
}
.into()]),