diff --git a/src/client/mod.rs b/src/client/mod.rs index a33dade9..9033b83b 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -164,10 +164,24 @@ impl Stream for Client { Ok(Async::Ready(Some(Event::Disconnected))) } Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => { + // Receive stanza self.state = ClientState::Connected(stream); Ok(Async::Ready(Some(Event::Stanza(stanza)))) } - Ok(Async::NotReady) | Ok(Async::Ready(_)) => { + Ok(Async::Ready(Some(Packet::Text(_)))) => { + // Ignore text between stanzas + Ok(Async::NotReady) + } + Ok(Async::Ready(Some(Packet::StreamStart(_)))) => { + // + Err(ProtocolError::InvalidStreamStart.into()) + } + Ok(Async::Ready(Some(Packet::StreamEnd))) => { + // End of stream: + Ok(Async::Ready(None)) + } + Ok(Async::NotReady) => { + // Try again later self.state = ClientState::Connected(stream); Ok(Async::NotReady) } @@ -212,8 +226,8 @@ impl Sink for Client { /// This closes the inner TCP stream. /// /// To synchronize your shutdown with the server side, you should - /// first send `Packet::StreamEnd` and wait it to be sent back - /// before closing the connection. + /// first send `Packet::StreamEnd` and wait for the end of the + /// incoming stream before closing the connection. fn close(&mut self) -> Poll<(), Self::SinkError> { match self.state { ClientState::Connected(ref mut stream) => diff --git a/src/error.rs b/src/error.rs index 8ffa1b7a..375c5178 100644 --- a/src/error.rs +++ b/src/error.rs @@ -88,6 +88,8 @@ pub enum ProtocolError { NoStreamId, /// Encountered an unexpected XML token InvalidToken, + /// Unexpected (shouldn't occur) + InvalidStreamStart, } /// Authentication error diff --git a/src/xmpp_codec.rs b/src/xmpp_codec.rs index d8e5310a..82eda4a6 100644 --- a/src/xmpp_codec.rs +++ b/src/xmpp_codec.rs @@ -378,6 +378,25 @@ mod tests { }); } + #[test] + fn test_stream_end() { + let mut c = XMPPCodec::new(); + let mut b = BytesMut::with_capacity(1024); + b.put(r""); + let r = c.decode(&mut b); + assert!(match r { + Ok(Some(Packet::StreamStart(_))) => true, + _ => false, + }); + b.clear(); + b.put(r""); + let r = c.decode(&mut b); + assert!(match r { + Ok(Some(Packet::StreamEnd)) => true, + _ => false, + }); + } + #[test] fn test_truncated_stanza() { let mut c = XMPPCodec::new();