tokio-xmpp: Poll packets in a loop

This needs to be a loop in order to ignore packets we don’t care about,
or those we want to handle elsewhere.  Returning something isn’t correct
in those two cases because it would signal to tokio that the XMPPStream
is also done, while there could be additional packets waiting for us.

The proper solution is thus a loop which we exit once we have something
to return.

Fixes a deadlock when we ignore some packets.
This commit is contained in:
Emmanuel Gil Peyrot 2023-06-21 13:43:24 +02:00 committed by pep
parent ac22765f21
commit 6ccc5ccace

View file

@ -243,42 +243,50 @@ impl Stream for Client {
};
// Poll stream
match Pin::new(&mut stream).poll_next(cx) {
Poll::Ready(None) => {
// EOF
self.state = ClientState::Disconnected;
Poll::Ready(Some(Event::Disconnected(Error::Disconnected)))
}
Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => {
// Receive stanza
self.state = ClientState::Connected(stream);
Poll::Ready(Some(Event::Stanza(stanza)))
}
Poll::Ready(Some(Ok(Packet::Text(_)))) => {
// Ignore text between stanzas
self.state = ClientState::Connected(stream);
Poll::Pending
}
Poll::Ready(Some(Ok(Packet::StreamStart(_)))) => {
// <stream:stream>
self.state = ClientState::Disconnected;
Poll::Ready(Some(Event::Disconnected(
ProtocolError::InvalidStreamStart.into(),
)))
}
Poll::Ready(Some(Ok(Packet::StreamEnd))) => {
// End of stream: </stream:stream>
self.state = ClientState::Disconnected;
Poll::Ready(Some(Event::Disconnected(Error::Disconnected)))
}
Poll::Pending => {
// Try again later
self.state = ClientState::Connected(stream);
Poll::Pending
}
Poll::Ready(Some(Err(e))) => {
self.state = ClientState::Disconnected;
Poll::Ready(Some(Event::Disconnected(e.into())))
//
// This needs to be a loop in order to ignore packets we dont care about, or those
// we want to handle elsewhere. Returning something isnt correct in those two
// cases because it would signal to tokio that the XMPPStream is also done, while
// there could be additional packets waiting for us.
//
// The proper solution is thus a loop which we exit once we have something to
// return.
loop {
match Pin::new(&mut stream).poll_next(cx) {
Poll::Ready(None) => {
// EOF
self.state = ClientState::Disconnected;
return Poll::Ready(Some(Event::Disconnected(Error::Disconnected)));
}
Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => {
// Receive stanza
self.state = ClientState::Connected(stream);
return Poll::Ready(Some(Event::Stanza(stanza)));
}
Poll::Ready(Some(Ok(Packet::Text(_)))) => {
// Ignore text between stanzas
}
Poll::Ready(Some(Ok(Packet::StreamStart(_)))) => {
// <stream:stream>
self.state = ClientState::Disconnected;
return Poll::Ready(Some(Event::Disconnected(
ProtocolError::InvalidStreamStart.into(),
)));
}
Poll::Ready(Some(Ok(Packet::StreamEnd))) => {
// End of stream: </stream:stream>
self.state = ClientState::Disconnected;
return Poll::Ready(Some(Event::Disconnected(Error::Disconnected)));
}
Poll::Pending => {
// Try again later
self.state = ClientState::Connected(stream);
return Poll::Pending;
}
Poll::Ready(Some(Err(e))) => {
self.state = ClientState::Disconnected;
return Poll::Ready(Some(Event::Disconnected(e.into())));
}
}
}
}