diff --git a/Cargo.toml b/Cargo.toml index f348e860..87c6bc83 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,18 +12,18 @@ keywords = ["xmpp", "tokio"] [dependencies] futures = "0.1" -tokio-core = "0.1" +tokio = "0.1" tokio-io = "0.1" tokio-codec = "0.1" bytes = "0.4.9" xml5ever = "0.12" minidom = "0.9" -# TODO: update to 0.2.0 -native-tls = "0.1" -tokio-tls = "0.1" +native-tls = "0.2" +tokio-tls = "0.2" sasl = "0.4" jid = { version = "0.5", features = ["minidom"] } -domain = "0.2" +trust-dns-resolver = "0.9.1" +trust-dns-proto = "0.4.0" xmpp-parsers = "0.11" idna = "0.1" try_from = "0.2" diff --git a/examples/echo_bot.rs b/examples/echo_bot.rs index 462f7fe8..13822260 100644 --- a/examples/echo_bot.rs +++ b/examples/echo_bot.rs @@ -1,5 +1,5 @@ extern crate futures; -extern crate tokio_core; +extern crate tokio; extern crate tokio_xmpp; extern crate jid; extern crate minidom; @@ -9,8 +9,8 @@ extern crate try_from; use std::env::args; use std::process::exit; use try_from::TryFrom; -use tokio_core::reactor::Core; use futures::{Stream, Sink, future}; +use tokio::runtime::current_thread::Runtime; use tokio_xmpp::Client; use minidom::Element; use xmpp_parsers::presence::{Presence, Type as PresenceType, Show as PresenceShow}; @@ -27,9 +27,9 @@ fn main() { let password = &args[2]; // tokio_core context - let mut core = Core::new().unwrap(); + let mut rt = Runtime::new().unwrap(); // Client instance - let client = Client::new(jid, password, core.handle()).unwrap(); + let client = Client::new(jid, password).unwrap(); // Make the two interfaces for sending and receiving independent // of each other so we can move one into a closure. @@ -64,7 +64,7 @@ fn main() { }); // Start polling `done` - match core.run(done) { + match rt.block_on(done) { Ok(_) => (), Err(e) => { println!("Fatal: {}", e); diff --git a/examples/echo_component.rs b/examples/echo_component.rs index 745e0771..1239354a 100644 --- a/examples/echo_component.rs +++ b/examples/echo_component.rs @@ -1,5 +1,5 @@ extern crate futures; -extern crate tokio_core; +extern crate tokio; extern crate tokio_xmpp; extern crate jid; extern crate minidom; @@ -10,7 +10,7 @@ use std::env::args; use std::process::exit; use std::str::FromStr; use try_from::TryFrom; -use tokio_core::reactor::Core; +use tokio::runtime::current_thread::Runtime; use futures::{Stream, Sink, future}; use tokio_xmpp::Component; use minidom::Element; @@ -30,10 +30,10 @@ fn main() { let port: u16 = args.get(4).unwrap().parse().unwrap_or(5347u16); // tokio_core context - let mut core = Core::new().unwrap(); + let mut rt = Runtime::new().unwrap(); // Component instance - println!("{} {} {} {} {:?}", jid, password, server, port, core.handle()); - let component = Component::new(jid, password, server, port, core.handle()).unwrap(); + println!("{} {} {} {}", jid, password, server, port); + let component = Component::new(jid, password, server, port).unwrap(); // Make the two interfaces for sending and receiving independent // of each other so we can move one into a closure. @@ -70,7 +70,7 @@ fn main() { }); // Start polling `done` - match core.run(done) { + match rt.block_on(done) { Ok(_) => (), Err(e) => { println!("Fatal: {}", e); diff --git a/src/client/mod.rs b/src/client/mod.rs index b50e5392..e3a36528 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,8 +1,6 @@ use std::mem::replace; use std::str::FromStr; -use std::error::Error as StdError; -use tokio_core::reactor::Handle; -use tokio_core::net::TcpStream; +use tokio::net::TcpStream; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_tls::TlsStream; use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink, done}; @@ -45,17 +43,17 @@ impl Client { /// /// Start polling the returned instance so that it will connect /// and yield events. - pub fn new(jid: &str, password: &str, handle: Handle) -> Result { + pub fn new(jid: &str, password: &str) -> Result { let jid = Jid::from_str(jid)?; let password = password.to_owned(); - let connect = Self::make_connect(jid.clone(), password.clone(), handle); + let connect = Self::make_connect(jid.clone(), password.clone()); Ok(Client { jid, state: ClientState::Connecting(Box::new(connect)), }) } - fn make_connect(jid: Jid, password: String, handle: Handle) -> impl Future { + fn make_connect(jid: Jid, password: String) -> impl Future { let username = jid.node.as_ref().unwrap().to_owned(); let jid1 = jid.clone(); let jid2 = jid.clone(); @@ -63,8 +61,8 @@ impl Client { done(idna::domain_to_ascii(&jid.domain)) .map_err(|_| Error::Idna) .and_then(|domain| - done(Connecter::from_lookup(handle, &domain, "_xmpp-client._tcp", 5222)) - .map_err(Error::Domain) + done(Connecter::from_lookup(&domain, "_xmpp-client._tcp", 5222)) + .map_err(Error::Connection) ) .and_then(|connecter| connecter @@ -149,7 +147,7 @@ impl Stream for Client { Ok(Async::NotReady) => (), Ok(Async::Ready(())) => (), Err(e) => - return Err(Error::Io(e)), + return Err(e.into()), }; // Poll stream @@ -178,7 +176,7 @@ impl Stream for Client { impl Sink for Client { type SinkItem = Element; - type SinkError = String; + type SinkError = Error; fn start_send(&mut self, item: Self::SinkItem) -> StartSend { match self.state { @@ -192,7 +190,7 @@ impl Sink for Client { Ok(AsyncSink::Ready) }, Err(e) => - Err(e.description().to_owned()), + Err(e.into()), }, _ => Ok(AsyncSink::NotReady(item)), @@ -203,7 +201,7 @@ impl Sink for Client { match self.state { ClientState::Connected(ref mut stream) => stream.poll_complete() - .map_err(|e| e.description().to_owned()), + .map_err(|e| e.into()), _ => Ok(Async::Ready(())), } diff --git a/src/component/mod.rs b/src/component/mod.rs index 08ea6343..150ffd9a 100644 --- a/src/component/mod.rs +++ b/src/component/mod.rs @@ -3,9 +3,7 @@ //! allowed to use any user and resource identifiers in their stanzas. use std::mem::replace; use std::str::FromStr; -use std::error::Error as StdError; -use tokio_core::reactor::Handle; -use tokio_core::net::TcpStream; +use tokio::net::TcpStream; use tokio_io::{AsyncRead, AsyncWrite}; use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink, done}; use minidom::Element; @@ -42,24 +40,23 @@ impl Component { /// /// Start polling the returned instance so that it will connect /// and yield events. - pub fn new(jid: &str, password: &str, server: &str, port: u16, handle: Handle) -> Result { + pub fn new(jid: &str, password: &str, server: &str, port: u16) -> Result { let jid = Jid::from_str(jid)?; let password = password.to_owned(); - let connect = Self::make_connect(jid.clone(), password, server, port, handle); + let connect = Self::make_connect(jid.clone(), password, server, port); Ok(Component { jid, state: ComponentState::Connecting(Box::new(connect)), }) } - fn make_connect(jid: Jid, password: String, server: &str, port: u16, handle: Handle) -> impl Future { + fn make_connect(jid: Jid, password: String, server: &str, port: u16) -> impl Future { let jid1 = jid.clone(); let password = password; - done(Connecter::from_lookup(handle, server, "_xmpp-component._tcp", port)) - .map_err(Error::Domain) - .and_then(|connecter| connecter - .map_err(Error::Connection) - ).and_then(move |tcp_stream| { + done(Connecter::from_lookup(server, "_xmpp-component._tcp", port)) + .and_then(|connecter| connecter) + .map_err(Error::Connection) + .and_then(move |tcp_stream| { xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_COMPONENT_ACCEPT.to_owned()) }).and_then(move |xmpp_stream| { Self::auth(xmpp_stream, password).expect("auth") @@ -135,7 +132,7 @@ impl Stream for Component { impl Sink for Component { type SinkItem = Element; - type SinkError = String; + type SinkError = Error; fn start_send(&mut self, item: Self::SinkItem) -> StartSend { match self.state { @@ -149,7 +146,7 @@ impl Sink for Component { Ok(AsyncSink::Ready) }, Err(e) => - Err(e.description().to_owned()), + Err(e.into()), }, _ => Ok(AsyncSink::NotReady(item)), @@ -160,7 +157,7 @@ impl Sink for Component { match &mut self.state { &mut ComponentState::Connected(ref mut stream) => stream.poll_complete() - .map_err(|e| e.description().to_owned()), + .map_err(|e| e.into()), _ => Ok(Async::Ready(())), } diff --git a/src/error.rs b/src/error.rs index 7384f92e..541bbbfd 100644 --- a/src/error.rs +++ b/src/error.rs @@ -3,9 +3,10 @@ use std::error::Error as StdError; use std::str::Utf8Error; use std::borrow::Cow; use std::fmt; -use domain::resolv::error::Error as DNSError; -use domain::bits::name::FromStrError; use native_tls::Error as TlsError; +use trust_dns_resolver::error::ResolveError; +use trust_dns_proto::error::ProtoError; + use xmpp_parsers::error::Error as ParsersError; use xmpp_parsers::sasl::DefinedCondition as SaslDefinedCondition; @@ -16,7 +17,6 @@ pub enum Error { /// DNS label conversion error, no details available from module /// `idna` Idna, - Domain(FromStrError), Protocol(ProtocolError), Auth(AuthError), Tls(TlsError), @@ -90,5 +90,29 @@ pub enum AuthError { pub enum ConnecterError { NoSrv, AllFailed, - DNS(DNSError), + /// DNS name error + Domain(DomainError), + /// DNS resolution error + Dns(ProtoError), + /// DNS resolution error + Resolve(ResolveError), +} + +/// DNS name error wrapper type +#[derive(Debug)] +pub struct DomainError(pub String); + +impl StdError for DomainError { + fn description(&self) -> &str { + &self.0 + } + fn cause(&self) -> Option<&StdError> { + None + } +} + +impl fmt::Display for DomainError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.0) + } } diff --git a/src/happy_eyeballs.rs b/src/happy_eyeballs.rs index 49807932..0a5fd235 100644 --- a/src/happy_eyeballs.rs +++ b/src/happy_eyeballs.rs @@ -1,36 +1,44 @@ -use std::str::FromStr; -use std::collections::HashMap; -use std::net::SocketAddr; -use futures::{Future, Poll, Async, Stream}; -use tokio_core::reactor::Handle; -use tokio_core::net::{TcpStream, TcpStreamNew}; -use domain::resolv::Resolver; -use domain::resolv::lookup::srv::{lookup_srv, LookupSrv, LookupSrvStream}; -use domain::bits::name::{DNameBuf, FromStrError}; +use std::mem; +use std::net::{SocketAddr, IpAddr}; +use std::collections::{BTreeMap, btree_map}; +use std::collections::VecDeque; +use futures::{Future, Poll, Async}; +use tokio::net::{ConnectFuture, TcpStream}; +use trust_dns_resolver::{IntoName, Name, ResolverFuture, error::ResolveError}; +use trust_dns_resolver::lookup::SrvLookupFuture; +use trust_dns_resolver::lookup_ip::LookupIpFuture; +use trust_dns_proto::rr::rdata::srv::SRV; use ConnecterError; pub struct Connecter { - handle: Handle, - resolver: Resolver, - lookup: Option, - srvs: Option, - connects: HashMap, + fallback_port: u16, + name: Name, + domain: Name, + resolver_future: Box + Send>, + resolver_opt: Option, + srv_lookup_opt: Option, + srvs_opt: Option>, + ip_lookup_opt: Option<(u16, LookupIpFuture)>, + ips_opt: Option<(u16, VecDeque)>, + connect_opt: Option, } impl Connecter { - pub fn from_lookup(handle: Handle, domain: &str, srv: &str, fallback_port: u16) -> Result { - let domain = DNameBuf::from_str(domain)?; - let srv = DNameBuf::from_str(srv)?; - - let resolver = Resolver::new(&handle); - let lookup = lookup_srv(resolver.clone(), srv, domain, fallback_port); + pub fn from_lookup(domain: &str, srv: &str, fallback_port: u16) -> Result { + let resolver_future = ResolverFuture::from_system_conf()?; + let name = format!("{}.{}.", srv, domain).into_name()?; Ok(Connecter { - handle, - resolver, - lookup: Some(lookup), - srvs: None, - connects: HashMap::new(), + fallback_port, + name, + domain: domain.into_name()?, + resolver_future, + resolver_opt: None, + srv_lookup_opt: None, + srvs_opt: None, + ip_lookup_opt: None, + ips_opt: None, + connect_opt: None, }) } } @@ -40,69 +48,104 @@ impl Future for Connecter { type Error = ConnecterError; fn poll(&mut self) -> Poll { - match self.lookup.as_mut().map(|lookup| lookup.poll()) { - None | Some(Ok(Async::NotReady)) => (), - Some(Ok(Async::Ready(found_srvs))) => { - self.lookup = None; - match found_srvs { - Some(srvs) => - self.srvs = Some(srvs.to_stream(self.resolver.clone())), - None => - return Err(ConnecterError::NoSrv), - } - }, - Some(Err(e)) => - return Err(e.into()), + if self.resolver_opt.is_none() { + //println!("Poll resolver future"); + match self.resolver_future.poll() { + Ok(Async::Ready(resolver)) => + self.resolver_opt = Some(resolver), + Ok(Async::NotReady) => + return Ok(Async::NotReady), + Err(e) => + return Err(e.into()), + } } - match self.srvs.as_mut().map(|srv| srv.poll()) { - None | Some(Ok(Async::NotReady)) => (), - Some(Ok(Async::Ready(None))) => - self.srvs = None, - Some(Ok(Async::Ready(Some(srv_item)))) => { - let handle = &self.handle; - for addr in srv_item.to_socket_addrs() { - self.connects.entry(addr) - .or_insert_with(|| { - // println!("Connect to {}", addr); - TcpStream::connect(&addr, handle) - }); + if let Some(ref resolver) = self.resolver_opt { + if self.srvs_opt.is_none() { + if self.srv_lookup_opt.is_none() { + //println!("Lookup srv: {:?}", self.name); + self.srv_lookup_opt = Some(resolver.lookup_srv(&self.name)); } - }, - Some(Err(e)) => - return Err(e.into()), - } - let mut connected_stream = None; - self.connects.retain(|_, connect| { - if connected_stream.is_some() { - return false; + if let Some(ref mut srv_lookup) = self.srv_lookup_opt { + match srv_lookup.poll() { + Ok(Async::Ready(t)) => { + let mut srvs = BTreeMap::new(); + for srv in t.iter() { + srvs.insert(srv.priority(), srv.clone()); + } + srvs.insert(65535, SRV::new(65535, 0, self.fallback_port, self.domain.clone())); + self.srvs_opt = Some(srvs.into_iter()); + } + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(_) => { + //println!("Ignore SVR error: {:?}", e); + let mut srvs = BTreeMap::new(); + srvs.insert(65535, SRV::new(65535, 0, self.fallback_port, self.domain.clone())); + self.srvs_opt = Some(srvs.into_iter()); + }, + } + } } - match connect.poll() { - Ok(Async::NotReady) => true, - Ok(Async::Ready(tcp_stream)) => { - // Success! - connected_stream = Some(tcp_stream); - false - }, - Err(_e) => { - // println!("{}", _e); - false - }, - } - }); - if let Some(tcp_stream) = connected_stream { - return Ok(Async::Ready(tcp_stream)); - } + if self.connect_opt.is_none() { + if self.ips_opt.is_none() { + if self.ip_lookup_opt.is_none() { + if let Some(ref mut srvs) = self.srvs_opt { + if let Some((_, srv)) = srvs.next() { + //println!("Lookup ip: {:?}", srv); + self.ip_lookup_opt = Some((srv.port(), resolver.lookup_ip(srv.target()))); + } else { + return Err(ConnecterError::NoSrv); + } + } + } - if self.lookup.is_none() && - self.srvs.is_none() && - self.connects.is_empty() - { - return Err(ConnecterError::AllFailed); + if let Some((port, mut ip_lookup)) = mem::replace(&mut self.ip_lookup_opt, None) { + match ip_lookup.poll() { + Ok(Async::Ready(t)) => { + let mut ip_deque = VecDeque::new(); + ip_deque.extend(t.iter()); + //println!("IPs: {:?}", ip_deque); + self.ips_opt = Some((port, ip_deque)); + self.ip_lookup_opt = None; + }, + Ok(Async::NotReady) => { + self.ip_lookup_opt = Some((port, ip_lookup)); + return Ok(Async::NotReady) + }, + Err(_) => { + //println!("Ignore lookup error: {:?}", e); + self.ip_lookup_opt = None; + } + } + } + } + + if let Some((port, mut ip_deque)) = mem::replace(&mut self.ips_opt, None) { + if let Some(ip) = ip_deque.pop_front() { + //println!("Connect to {:?}:{}", ip, port); + self.connect_opt = Some(TcpStream::connect(&SocketAddr::new(ip, port))); + self.ips_opt = Some((port, ip_deque)); + } + } + } + + if let Some(mut connect_future) = mem::replace(&mut self.connect_opt, None) { + match connect_future.poll() { + Ok(Async::Ready(t)) => return Ok(Async::Ready(t)), + Ok(Async::NotReady) => { + self.connect_opt = Some(connect_future); + return Ok(Async::NotReady) + } + Err(_) => { + //println!("Ignore connect error: {:?}", e); + }, + } + } } Ok(Async::NotReady) } } + diff --git a/src/lib.rs b/src/lib.rs index 19a0da68..b23aefc6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,7 +3,7 @@ //! XMPP implemeentation with asynchronous I/O using Tokio. extern crate futures; -extern crate tokio_core; +extern crate tokio; extern crate tokio_io; extern crate tokio_codec; extern crate bytes; @@ -14,7 +14,8 @@ extern crate native_tls; extern crate tokio_tls; extern crate sasl; extern crate jid; -extern crate domain; +extern crate trust_dns_resolver; +extern crate trust_dns_proto; extern crate idna; extern crate xmpp_parsers; extern crate try_from; diff --git a/src/starttls.rs b/src/starttls.rs index 444a9cb8..5acfcb15 100644 --- a/src/starttls.rs +++ b/src/starttls.rs @@ -3,8 +3,8 @@ use futures::{Future, Sink, Poll, Async}; use futures::stream::Stream; use futures::sink; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_tls::{TlsStream, TlsConnectorExt, ConnectAsync}; -use native_tls::TlsConnector; +use tokio_tls::{TlsStream, TlsConnector, Connect}; +use native_tls::TlsConnector as NativeTlsConnector; use minidom::Element; use jid::Jid; @@ -26,7 +26,7 @@ enum StartTlsClientState { Invalid, SendStartTls(sink::Send>), AwaitProceed(XMPPStream), - StartingTls(ConnectAsync), + StartingTls(Connect), } impl StartTlsClient { @@ -54,7 +54,7 @@ impl Future for StartTlsClient { fn poll(&mut self) -> Poll { let old_state = replace(&mut self.state, StartTlsClientState::Invalid); let mut retry = false; - + let (new_state, result) = match old_state { StartTlsClientState::SendStartTls(mut send) => match send.poll() { @@ -74,9 +74,9 @@ impl Future for StartTlsClient { if stanza.name() == "proceed" => { let stream = xmpp_stream.stream.into_inner(); - let connect = TlsConnector::builder().unwrap() - .build().unwrap() - .connect_async(&self.jid.domain, stream); + let connect = TlsConnector::from(NativeTlsConnector::builder() + .build().unwrap()) + .connect(&self.jid.domain, stream); let new_state = StartTlsClientState::StartingTls(connect); retry = true; (new_state, Ok(Async::NotReady))