diff --git a/tokio-xmpp/src/xmlstream/common.rs b/tokio-xmpp/src/xmlstream/common.rs index c0969564..2ea0526b 100644 --- a/tokio-xmpp/src/xmlstream/common.rs +++ b/tokio-xmpp/src/xmlstream/common.rs @@ -460,13 +460,14 @@ impl ReadXsoState { *self = ReadXsoState::Done; return Poll::Ready(Err(io::Error::new( io::ErrorKind::InvalidData, - "end of parent element before XSO started", + "eof before XSO started", ) .into())); } } } ReadXsoState::Parsing(builder) => { + log::trace!("ReadXsoState::Parsing ev = {:?}", ev); let Some(ev) = ev else { *self = ReadXsoState::Done; return Poll::Ready(Err(io::Error::new( diff --git a/tokio-xmpp/src/xmlstream/mod.rs b/tokio-xmpp/src/xmlstream/mod.rs index 04af83c4..a4d88a6a 100644 --- a/tokio-xmpp/src/xmlstream/mod.rs +++ b/tokio-xmpp/src/xmlstream/mod.rs @@ -321,7 +321,14 @@ impl Stream for XmlStream, cx: &mut Context<'_>) -> Poll> { let this = self.project(); let result = match this.read_state.as_mut() { - None => return Poll::Ready(Some(Err(ReadError::StreamFooterReceived))), + None => { + // awaiting eof. + return match ready!(this.inner.poll_next(cx)) { + None => Poll::Ready(None), + Some(Ok(_)) => unreachable!("xml parser allowed data after stream footer"), + Some(Err(e)) => Poll::Ready(Some(Err(ReadError::HardError(e)))), + }; + } Some(read_state) => ready!(read_state.poll_advance(this.inner, cx)), }; let result = match result { @@ -330,7 +337,9 @@ impl Stream for XmlStream Poll::Ready(Some(Err(ReadError::ParseError(e)))), Err(ReadXsoError::Footer) => { *this.read_state = None; - Poll::Ready(Some(Err(ReadError::StreamFooterReceived))) + // Return early here, because we cannot allow recreation of + // another read state. + return Poll::Ready(Some(Err(ReadError::StreamFooterReceived))); } }; *this.read_state = Some(ReadXsoState::default()); @@ -367,17 +376,26 @@ impl<'x, Io: AsyncWrite, T: FromXml + AsXml + fmt::Debug> Sink<&'x T> for XmlStr match ready!(this.inner.as_mut().poll_ready(cx)) .and_then(|_| this.inner.as_mut().start_send(Item::ElementFoot)) { - Ok(()) => (), + Ok(()) => { + log::trace!("stream footer sent successfully"); + } // If it fails, we fail the sink immediately. Err(e) => { + log::debug!( + "omitting stream footer: failed to make stream ready: {}", + e + ); *this.write_state = WriteState::Failed; return Poll::Ready(Err(e)); } } *this.write_state = WriteState::FooterSent; } - // Footer sent => just poll the inner sink for closure. - WriteState::FooterSent => break, + // Footer sent => just poll the inner sink for flush. + WriteState::FooterSent => { + ready!(this.inner.as_mut().poll_flush(cx)?); + break; + } WriteState::Failed => unreachable!(), // caught by check_ok() } }