diff --git a/src/client/mod.rs b/src/client/mod.rs index 9033b83..025b439 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -35,6 +35,8 @@ enum ClientState { Disconnected, Connecting(Box>), Connected(XMPPStream), + ClosingSendEnd(futures::sink::Send), + ClosingClose(XMPPStream), } impl Client { @@ -188,6 +190,14 @@ impl Stream for Client { Err(e) => Err(e)?, } } + ClientState::ClosingSendEnd(_) => { + self.state = state; + Ok(Async::NotReady) + } + ClientState::ClosingClose(_) => { + self.state = state; + Ok(Async::NotReady) + } } } } @@ -219,20 +229,40 @@ impl Sink for Client { fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { match self.state { ClientState::Connected(ref mut stream) => stream.poll_complete().map_err(|e| e.into()), + ClientState::ClosingSendEnd(ref mut send) => { + match send.poll()? { + Async::NotReady => + Ok(Async::NotReady), + Async::Ready(stream) => { + self.state = ClientState::ClosingClose(stream); + self.poll_complete() + } + } + } + ClientState::ClosingClose(ref mut stream) => { + match stream.close()? { + Async::NotReady => + Ok(Async::NotReady), + Async::Ready(()) => { + self.state = ClientState::Disconnected; + Ok(Async::Ready(())) + } + } + } _ => Ok(Async::Ready(())), } } - /// This closes the inner TCP stream. - /// - /// To synchronize your shutdown with the server side, you should - /// first send `Packet::StreamEnd` and wait for the end of the - /// incoming stream before closing the connection. + /// Send ` and later close the inner TCP stream. fn close(&mut self) -> Poll<(), Self::SinkError> { - match self.state { - ClientState::Connected(ref mut stream) => - stream.close() - .map_err(|e| e.into()), + let state = replace(&mut self.state, ClientState::Disconnected); + + match state { + ClientState::Connected(stream) => { + let send = stream.send(Packet::StreamEnd); + self.state = ClientState::ClosingSendEnd(send); + self.poll_complete() + } _ => Ok(Async::Ready(())), }