xmlstream: fix clean shutdown sequence

Without the early return in XmlStream::poll_next in case of the stream
footer, the read state gets recreated and the logic at the top of that
function to actually handle stream shutdown gracefully is never
triggered.

Also that logic was incorrect; the correct behaviour is to wait for the
true EOF.
This commit is contained in:
Jonas Schäfer 2024-08-18 10:49:29 +02:00
parent efc859abc0
commit 960fd782bd
2 changed files with 25 additions and 6 deletions

View file

@ -460,13 +460,14 @@ impl<T: FromXml> ReadXsoState<T> {
*self = ReadXsoState::Done; *self = ReadXsoState::Done;
return Poll::Ready(Err(io::Error::new( return Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidData, io::ErrorKind::InvalidData,
"end of parent element before XSO started", "eof before XSO started",
) )
.into())); .into()));
} }
} }
} }
ReadXsoState::Parsing(builder) => { ReadXsoState::Parsing(builder) => {
log::trace!("ReadXsoState::Parsing ev = {:?}", ev);
let Some(ev) = ev else { let Some(ev) = ev else {
*self = ReadXsoState::Done; *self = ReadXsoState::Done;
return Poll::Ready(Err(io::Error::new( return Poll::Ready(Err(io::Error::new(

View file

@ -321,7 +321,14 @@ impl<Io: AsyncBufRead, T: FromXml + AsXml + fmt::Debug> Stream for XmlStream<Io,
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project(); let this = self.project();
let result = match this.read_state.as_mut() { let result = match this.read_state.as_mut() {
None => return Poll::Ready(Some(Err(ReadError::StreamFooterReceived))), None => {
// awaiting eof.
return match ready!(this.inner.poll_next(cx)) {
None => Poll::Ready(None),
Some(Ok(_)) => unreachable!("xml parser allowed data after stream footer"),
Some(Err(e)) => Poll::Ready(Some(Err(ReadError::HardError(e)))),
};
}
Some(read_state) => ready!(read_state.poll_advance(this.inner, cx)), Some(read_state) => ready!(read_state.poll_advance(this.inner, cx)),
}; };
let result = match result { let result = match result {
@ -330,7 +337,9 @@ impl<Io: AsyncBufRead, T: FromXml + AsXml + fmt::Debug> Stream for XmlStream<Io,
Err(ReadXsoError::Parse(e)) => Poll::Ready(Some(Err(ReadError::ParseError(e)))), Err(ReadXsoError::Parse(e)) => Poll::Ready(Some(Err(ReadError::ParseError(e)))),
Err(ReadXsoError::Footer) => { Err(ReadXsoError::Footer) => {
*this.read_state = None; *this.read_state = None;
Poll::Ready(Some(Err(ReadError::StreamFooterReceived))) // Return early here, because we cannot allow recreation of
// another read state.
return Poll::Ready(Some(Err(ReadError::StreamFooterReceived)));
} }
}; };
*this.read_state = Some(ReadXsoState::default()); *this.read_state = Some(ReadXsoState::default());
@ -367,17 +376,26 @@ impl<'x, Io: AsyncWrite, T: FromXml + AsXml + fmt::Debug> Sink<&'x T> for XmlStr
match ready!(this.inner.as_mut().poll_ready(cx)) match ready!(this.inner.as_mut().poll_ready(cx))
.and_then(|_| this.inner.as_mut().start_send(Item::ElementFoot)) .and_then(|_| this.inner.as_mut().start_send(Item::ElementFoot))
{ {
Ok(()) => (), Ok(()) => {
log::trace!("stream footer sent successfully");
}
// If it fails, we fail the sink immediately. // If it fails, we fail the sink immediately.
Err(e) => { Err(e) => {
log::debug!(
"omitting stream footer: failed to make stream ready: {}",
e
);
*this.write_state = WriteState::Failed; *this.write_state = WriteState::Failed;
return Poll::Ready(Err(e)); return Poll::Ready(Err(e));
} }
} }
*this.write_state = WriteState::FooterSent; *this.write_state = WriteState::FooterSent;
} }
// Footer sent => just poll the inner sink for closure. // Footer sent => just poll the inner sink for flush.
WriteState::FooterSent => break, WriteState::FooterSent => {
ready!(this.inner.as_mut().poll_flush(cx)?);
break;
}
WriteState::Failed => unreachable!(), // caught by check_ok() WriteState::Failed => unreachable!(), // caught by check_ok()
} }
} }