xmlstream: split initiation reset in two phases

This commit is contained in:
Jonas Schäfer 2024-08-10 12:41:10 +02:00
parent c723897326
commit 2931df22db
3 changed files with 34 additions and 24 deletions

View file

@ -8,6 +8,8 @@ use core::pin::Pin;
use std::borrow::Cow; use std::borrow::Cow;
use std::io; use std::io;
use futures::SinkExt;
use tokio::io::{AsyncBufRead, AsyncWrite}; use tokio::io::{AsyncBufRead, AsyncWrite};
use xmpp_parsers::stream_features::StreamFeatures; use xmpp_parsers::stream_features::StreamFeatures;
@ -19,6 +21,27 @@ use super::{
XmlStream, XmlStream,
}; };
/// Type state for an initiator stream which has not yet sent its stream
/// header.
///
/// To continue stream setup, call [`send_header`][`Self::send_header`].
pub struct InitiatingStream<Io>(pub(super) RawXmlStream<Io>);
impl<Io: AsyncBufRead + AsyncWrite + Unpin> InitiatingStream<Io> {
/// Send the stream header.
pub async fn send_header(
self,
header: StreamHeader<'_>,
) -> io::Result<PendingFeaturesRecv<Io>> {
let Self(mut stream) = self;
header.send(Pin::new(&mut stream)).await?;
stream.flush().await?;
let header = StreamHeader::recv(Pin::new(&mut stream)).await?;
Ok(PendingFeaturesRecv { stream, header })
}
}
/// Type state for an initiator stream which has sent and received the stream /// Type state for an initiator stream which has sent and received the stream
/// header. /// header.
/// ///

View file

@ -41,7 +41,7 @@ use core::pin::Pin;
use core::task::{Context, Poll}; use core::task::{Context, Poll};
use std::io; use std::io;
use futures::{ready, Sink, SinkExt, Stream}; use futures::{ready, Sink, Stream};
use tokio::io::{AsyncBufRead, AsyncWrite}; use tokio::io::{AsyncBufRead, AsyncWrite};
@ -54,7 +54,7 @@ mod responder;
mod tests; mod tests;
use self::common::{RawXmlStream, ReadXsoError, ReadXsoState, StreamHeader}; use self::common::{RawXmlStream, ReadXsoError, ReadXsoState, StreamHeader};
pub use self::initiator::PendingFeaturesRecv; pub use self::initiator::{InitiatingStream, PendingFeaturesRecv};
pub use self::responder::{AcceptedStream, PendingFeaturesSend}; pub use self::responder::{AcceptedStream, PendingFeaturesSend};
/// Initiate a new stream /// Initiate a new stream
@ -70,16 +70,8 @@ pub async fn initiate_stream<Io: AsyncBufRead + AsyncWrite + Unpin>(
stream_ns: &'static str, stream_ns: &'static str,
stream_header: StreamHeader<'_>, stream_header: StreamHeader<'_>,
) -> Result<PendingFeaturesRecv<Io>, io::Error> { ) -> Result<PendingFeaturesRecv<Io>, io::Error> {
let mut raw_stream = RawXmlStream::new(io, stream_ns); let stream = InitiatingStream(RawXmlStream::new(io, stream_ns));
stream_header.send(Pin::new(&mut raw_stream)).await?; stream.send_header(stream_header).await
raw_stream.flush().await?;
let header = StreamHeader::recv(Pin::new(&mut raw_stream)).await?;
Ok(PendingFeaturesRecv {
stream: raw_stream,
header,
})
} }
/// Accept a new XML stream as responder /// Accept a new XML stream as responder
@ -194,8 +186,8 @@ impl<Io: AsyncBufRead, T: FromXml + AsXml> XmlStream<Io, T> {
impl<Io: AsyncBufRead + AsyncWrite + Unpin, T: FromXml + AsXml> XmlStream<Io, T> { impl<Io: AsyncBufRead + AsyncWrite + Unpin, T: FromXml + AsXml> XmlStream<Io, T> {
/// Initiate a stream reset /// Initiate a stream reset
/// ///
/// The `header` is the new stream header which is sent to the remote /// To actually send the stream header, call
/// party. /// [`send_header`][`InitiatingStream::send_header`] on the result.
/// ///
/// # Panics /// # Panics
/// ///
@ -205,18 +197,12 @@ impl<Io: AsyncBufRead + AsyncWrite + Unpin, T: FromXml + AsXml> XmlStream<Io, T>
/// ///
/// In addition, attempting to reset a stream which has been closed by /// In addition, attempting to reset a stream which has been closed by
/// either side or which has had an I/O error will also cause a panic. /// either side or which has had an I/O error will also cause a panic.
pub async fn initiate_reset( pub fn initiate_reset(self) -> InitiatingStream<Io> {
self,
header: StreamHeader<'_>,
) -> io::Result<PendingFeaturesRecv<Io>> {
self.assert_retypable(); self.assert_retypable();
let mut stream = self.inner; let mut stream = self.inner;
Pin::new(&mut stream).reset_state(); Pin::new(&mut stream).reset_state();
header.send(Pin::new(&mut stream)).await?; InitiatingStream(stream)
stream.flush().await?;
let header = StreamHeader::recv(Pin::new(&mut stream)).await?;
Ok(PendingFeaturesRecv { stream, header })
} }
/// Anticipate a new stream header sent by the remote party. /// Anticipate a new stream header sent by the remote party.

View file

@ -4,7 +4,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this // License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/. // file, You can obtain one at http://mozilla.org/MPL/2.0/.
use futures::StreamExt; use futures::{SinkExt, StreamExt};
use xmpp_parsers::stream_features::StreamFeatures; use xmpp_parsers::stream_features::StreamFeatures;
@ -185,7 +185,8 @@ async fn test_exchange_data_stream_reset_and_shutdown() {
other => panic!("unexpected stream message: {:?}", other), other => panic!("unexpected stream message: {:?}", other),
} }
let stream = stream let stream = stream
.initiate_reset(StreamHeader { .initiate_reset()
.send_header(StreamHeader {
from: Some("client".into()), from: Some("client".into()),
to: Some("server".into()), to: Some("server".into()),
id: Some("client-id".into()), id: Some("client-id".into()),