From 7b7f2866fc4fbc9724d6fc07fdfe75f5489b2456 Mon Sep 17 00:00:00 2001 From: Astro Date: Wed, 19 Jul 2017 01:02:45 +0200 Subject: [PATCH] move stream_start out of places --- src/client/mod.rs | 31 ++++++++++++--------- src/lib.rs | 3 -- src/starttls.rs | 22 ++------------- src/stream_start.rs | 30 +++++++++++--------- src/tcp.rs | 68 --------------------------------------------- src/xmpp_codec.rs | 4 +-- src/xmpp_stream.rs | 13 ++++----- 7 files changed, 46 insertions(+), 125 deletions(-) delete mode 100644 src/tcp.rs diff --git a/src/client/mod.rs b/src/client/mod.rs index a45cede8..aca0fdd2 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -12,7 +12,6 @@ use sasl::common::{Credentials, ChannelBinding}; use super::xmpp_codec::Packet; use super::xmpp_stream; -use super::tcp::TcpClient; use super::starttls::{NS_XMPP_TLS, StartTlsClient}; use super::happy_eyeballs::Connecter; @@ -29,6 +28,7 @@ pub struct Client { } type XMPPStream = xmpp_stream::XMPPStream>; +const NS_JABBER_CLIENT: &str = "jabber:client"; enum ClientState { Invalid, @@ -50,26 +50,31 @@ impl Client { fn make_connect(jid: Jid, password: String, handle: Handle) -> Box> { let username = jid.node.as_ref().unwrap().to_owned(); + let jid1 = jid.clone(); + let jid2 = jid.clone(); let password = password; Box::new( Connecter::from_lookup(handle, &jid.domain, "_xmpp-client._tcp", 5222) .expect("Connector::from_lookup") - .and_then(|tcp_stream| - TcpClient::from_stream(jid, tcp_stream) + .and_then(move |tcp_stream| + xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_CLIENT.to_owned()) .map_err(|e| format!("{}", e)) - ).and_then(|stream| { - if Self::can_starttls(&stream) { - Self::starttls(stream) + ).and_then(|xmpp_stream| { + if Self::can_starttls(&xmpp_stream) { + Self::starttls(xmpp_stream) } else { panic!("No STARTTLS") } - }).and_then(move |stream| { - Self::auth(stream, username, password).expect("auth") - }).and_then(|stream| { - Self::bind(stream) - }).and_then(|stream| { - println!("Bound to {}", stream.jid); - Ok(stream) + }).and_then(|tls_stream| + XMPPStream::start(tls_stream, jid2, NS_JABBER_CLIENT.to_owned()) + .map_err(|e| format!("{}", e)) + ).and_then(move |xmpp_stream| { + Self::auth(xmpp_stream, username, password).expect("auth") + }).and_then(|xmpp_stream| { + Self::bind(xmpp_stream) + }).and_then(|xmpp_stream| { + println!("Bound to {}", xmpp_stream.jid); + Ok(xmpp_stream) }) ) } diff --git a/src/lib.rs b/src/lib.rs index 0cea1db2..348c03b2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,3 @@ -#[macro_use] extern crate futures; extern crate tokio_core; extern crate tokio_io; @@ -16,8 +15,6 @@ extern crate domain; pub mod xmpp_codec; pub mod xmpp_stream; mod stream_start; -mod tcp; -pub use tcp::TcpClient; mod starttls; pub use starttls::StartTlsClient; mod happy_eyeballs; diff --git a/src/starttls.rs b/src/starttls.rs index 91a7ea26..7338b55c 100644 --- a/src/starttls.rs +++ b/src/starttls.rs @@ -10,7 +10,6 @@ use jid::Jid; use xmpp_codec::Packet; use xmpp_stream::XMPPStream; -use stream_start::StreamStart; pub const NS_XMPP_TLS: &str = "urn:ietf:params:xml:ns:xmpp-tls"; @@ -26,7 +25,6 @@ enum StartTlsClientState { SendStartTls(sink::Send>), AwaitProceed(XMPPStream), StartingTls(ConnectAsync), - Start(StreamStart>), } impl StartTlsClient { @@ -48,7 +46,7 @@ impl StartTlsClient { } impl Future for StartTlsClient { - type Item = XMPPStream>; + type Item = TlsStream; type Error = String; fn poll(&mut self) -> Poll { @@ -92,24 +90,10 @@ impl Future for StartTlsClient { }, StartTlsClientState::StartingTls(mut connect) => match connect.poll() { - Ok(Async::Ready(tls_stream)) => { - println!("TLS stream established"); - let start = XMPPStream::from_stream(tls_stream, self.jid.clone()); - let new_state = StartTlsClientState::Start(start); - retry = true; - (new_state, Ok(Async::NotReady)) - }, + Ok(Async::Ready(tls_stream)) => + (StartTlsClientState::Invalid, Ok(Async::Ready(tls_stream))), Ok(Async::NotReady) => (StartTlsClientState::StartingTls(connect), Ok(Async::NotReady)), - Err(e) => - (StartTlsClientState::StartingTls(connect), Err(format!("{}", e))), - }, - StartTlsClientState::Start(mut start) => - match start.poll() { - Ok(Async::Ready(xmpp_stream)) => - (StartTlsClientState::Invalid, Ok(Async::Ready(xmpp_stream))), - Ok(Async::NotReady) => - (StartTlsClientState::Start(start), Ok(Async::NotReady)), Err(e) => (StartTlsClientState::Invalid, Err(format!("{}", e))), }, diff --git a/src/stream_start.rs b/src/stream_start.rs index 0b4fd9e7..b194b5bb 100644 --- a/src/stream_start.rs +++ b/src/stream_start.rs @@ -1,6 +1,5 @@ use std::mem::replace; use std::io::{Error, ErrorKind}; -use std::collections::HashMap; use futures::{Future, Async, Poll, Stream, sink, Sink}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::Framed; @@ -14,20 +13,21 @@ const NS_XMPP_STREAM: &str = "http://etherx.jabber.org/streams"; pub struct StreamStart { state: StreamStartState, jid: Jid, + ns: String, } enum StreamStartState { SendStart(sink::Send>), RecvStart(Framed), - RecvFeatures(Framed, HashMap), + RecvFeatures(Framed, String), Invalid, } impl StreamStart { - pub fn from_stream(stream: Framed, jid: Jid) -> Self { + pub fn from_stream(stream: Framed, jid: Jid, ns: String) -> Self { let attrs = [("to".to_owned(), jid.domain.clone()), ("version".to_owned(), "1.0".to_owned()), - ("xmlns".to_owned(), "jabber:client".to_owned()), + ("xmlns".to_owned(), ns.clone()), ("xmlns:stream".to_owned(), NS_XMPP_STREAM.to_owned()), ].iter().cloned().collect(); let send = stream.send(Packet::StreamStart(attrs)); @@ -35,6 +35,7 @@ impl StreamStart { StreamStart { state: StreamStartState::SendStart(send), jid, + ns, } } } @@ -63,8 +64,13 @@ impl Future for StreamStart { match stream.poll() { Ok(Async::Ready(Some(Packet::StreamStart(stream_attrs)))) => { retry = true; + let stream_ns = match stream_attrs.get("xmlns") { + Some(ns) => ns.clone(), + None => + return Err(Error::from(ErrorKind::InvalidData)), + }; // TODO: skip RecvFeatures for version < 1.0 - (StreamStartState::RecvFeatures(stream, stream_attrs), Ok(Async::NotReady)) + (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady)) }, Ok(Async::Ready(_)) => return Err(Error::from(ErrorKind::InvalidData)), @@ -73,22 +79,20 @@ impl Future for StreamStart { Err(e) => return Err(e), }, - StreamStartState::RecvFeatures(mut stream, stream_attrs) => + StreamStartState::RecvFeatures(mut stream, stream_ns) => match stream.poll() { Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => if stanza.name() == "features" && stanza.ns() == Some(NS_XMPP_STREAM) { - let stream = XMPPStream::new(self.jid.clone(), stream, stream_attrs, stanza); + let stream = XMPPStream::new(self.jid.clone(), stream, self.ns.clone(), stanza); (StreamStartState::Invalid, Ok(Async::Ready(stream))) } else { - (StreamStartState::RecvFeatures(stream, stream_attrs), Ok(Async::NotReady)) + (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady)) }, - Ok(Async::Ready(item)) => { - println!("StreamStart skip {:?}", item); - (StreamStartState::RecvFeatures(stream, stream_attrs), Ok(Async::NotReady)) - }, + Ok(Async::Ready(item)) => + (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady)), Ok(Async::NotReady) => - (StreamStartState::RecvFeatures(stream, stream_attrs), Ok(Async::NotReady)), + (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady)), Err(e) => return Err(e), }, diff --git a/src/tcp.rs b/src/tcp.rs deleted file mode 100644 index 156fa4f5..00000000 --- a/src/tcp.rs +++ /dev/null @@ -1,68 +0,0 @@ -use std::net::SocketAddr; -use std::io::Error; -use futures::{Future, Poll, Async}; -use tokio_core::reactor::Handle; -use tokio_core::net::{TcpStream, TcpStreamNew}; -use jid::Jid; - -use xmpp_stream::XMPPStream; -use stream_start::StreamStart; - -pub struct TcpClient { - state: TcpClientState, - jid: Jid, -} - -enum TcpClientState { - Connecting(TcpStreamNew), - Start(StreamStart), - Established, -} - -impl TcpClient { - pub fn connect(jid: Jid, addr: &SocketAddr, handle: &Handle) -> Self { - let tcp_stream_new = TcpStream::connect(addr, handle); - TcpClient { - state: TcpClientState::Connecting(tcp_stream_new), - jid, - } - } - - pub fn from_stream(jid: Jid, tcp_stream: TcpStream) -> Self { - let start = XMPPStream::from_stream(tcp_stream, jid.clone()); - TcpClient { - state: TcpClientState::Start(start), - jid, - } - } -} - -impl Future for TcpClient { - type Item = XMPPStream; - type Error = Error; - - fn poll(&mut self) -> Poll { - let (new_state, result) = match self.state { - TcpClientState::Connecting(ref mut tcp_stream_new) => { - let tcp_stream = try_ready!(tcp_stream_new.poll()); - let start = XMPPStream::from_stream(tcp_stream, self.jid.clone()); - let new_state = TcpClientState::Start(start); - (new_state, Ok(Async::NotReady)) - }, - TcpClientState::Start(ref mut start) => { - let xmpp_stream = try_ready!(start.poll()); - let new_state = TcpClientState::Established; - (new_state, Ok(Async::Ready(xmpp_stream))) - }, - TcpClientState::Established => - unreachable!(), - }; - - self.state = new_state; - match result { - // by polling again, we register new future - Ok(Async::NotReady) => self.poll(), - result => result - } - } -} diff --git a/src/xmpp_codec.rs b/src/xmpp_codec.rs index 7504dfff..6686ed2b 100644 --- a/src/xmpp_codec.rs +++ b/src/xmpp_codec.rs @@ -98,8 +98,8 @@ impl ParserSink { if self.stack.is_empty() { let attrs = HashMap::from_iter( - el.attrs() - .map(|(name, value)| (name.to_owned(), value.to_owned())) + tag.attrs.iter() + .map(|attr| (attr.name.local.as_ref().to_owned(), attr.value.as_ref().to_owned())) ); self.push_queue(Packet::StreamStart(attrs)); } diff --git a/src/xmpp_stream.rs b/src/xmpp_stream.rs index 9ff6916f..2b9c7aea 100644 --- a/src/xmpp_stream.rs +++ b/src/xmpp_stream.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use futures::{Poll, Stream, Sink, StartSend}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::Framed; @@ -13,21 +12,21 @@ pub const NS_XMPP_STREAM: &str = "http://etherx.jabber.org/streams"; pub struct XMPPStream { pub jid: Jid, pub stream: Framed, - pub stream_attrs: HashMap, pub stream_features: Element, + pub ns: String, } impl XMPPStream { pub fn new(jid: Jid, stream: Framed, - stream_attrs: HashMap, + ns: String, stream_features: Element) -> Self { - XMPPStream { jid, stream, stream_attrs, stream_features } + XMPPStream { jid, stream, stream_features, ns } } - pub fn from_stream(stream: S, jid: Jid) -> StreamStart { + pub fn start(stream: S, jid: Jid, ns: String) -> StreamStart { let xmpp_stream = AsyncRead::framed(stream, XMPPCodec::new()); - StreamStart::from_stream(xmpp_stream, jid) + StreamStart::from_stream(xmpp_stream, jid, ns) } pub fn into_inner(self) -> S { @@ -35,7 +34,7 @@ impl XMPPStream { } pub fn restart(self) -> StreamStart { - Self::from_stream(self.stream.into_inner(), self.jid) + Self::start(self.stream.into_inner(), self.jid, self.ns) } }