client: add more state to make close() send </stream:stream>

This commit is contained in:
Astro 2019-01-26 23:58:12 +01:00
parent 599e3be32e
commit 6379f91e02

View file

@ -35,6 +35,8 @@ enum ClientState {
Disconnected, Disconnected,
Connecting(Box<Future<Item = XMPPStream, Error = Error>>), Connecting(Box<Future<Item = XMPPStream, Error = Error>>),
Connected(XMPPStream), Connected(XMPPStream),
ClosingSendEnd(futures::sink::Send<XMPPStream>),
ClosingClose(XMPPStream),
} }
impl Client { impl Client {
@ -188,6 +190,14 @@ impl Stream for Client {
Err(e) => Err(e)?, Err(e) => Err(e)?,
} }
} }
ClientState::ClosingSendEnd(_) => {
self.state = state;
Ok(Async::NotReady)
}
ClientState::ClosingClose(_) => {
self.state = state;
Ok(Async::NotReady)
}
} }
} }
} }
@ -219,20 +229,40 @@ impl Sink for Client {
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
match self.state { match self.state {
ClientState::Connected(ref mut stream) => stream.poll_complete().map_err(|e| e.into()), ClientState::Connected(ref mut stream) => stream.poll_complete().map_err(|e| e.into()),
ClientState::ClosingSendEnd(ref mut send) => {
match send.poll()? {
Async::NotReady =>
Ok(Async::NotReady),
Async::Ready(stream) => {
self.state = ClientState::ClosingClose(stream);
self.poll_complete()
}
}
}
ClientState::ClosingClose(ref mut stream) => {
match stream.close()? {
Async::NotReady =>
Ok(Async::NotReady),
Async::Ready(()) => {
self.state = ClientState::Disconnected;
Ok(Async::Ready(()))
}
}
}
_ => Ok(Async::Ready(())), _ => Ok(Async::Ready(())),
} }
} }
/// This closes the inner TCP stream. /// Send `</stream:stream> and later close the inner TCP stream.
///
/// To synchronize your shutdown with the server side, you should
/// first send `Packet::StreamEnd` and wait for the end of the
/// incoming stream before closing the connection.
fn close(&mut self) -> Poll<(), Self::SinkError> { fn close(&mut self) -> Poll<(), Self::SinkError> {
match self.state { let state = replace(&mut self.state, ClientState::Disconnected);
ClientState::Connected(ref mut stream) =>
stream.close() match state {
.map_err(|e| e.into()), ClientState::Connected(stream) => {
let send = stream.send(Packet::StreamEnd);
self.state = ClientState::ClosingSendEnd(send);
self.poll_complete()
}
_ => _ =>
Ok(Async::Ready(())), Ok(Async::Ready(())),
} }