From d3039127ddb3628c510757bb4af9962be7ef7243 Mon Sep 17 00:00:00 2001 From: O01eg Date: Sat, 1 Sep 2018 22:59:02 +0300 Subject: [PATCH] Move from tokio-core to tokio. --- Cargo.toml | 10 +- examples/echo_bot.rs | 10 +- examples/echo_component.rs | 12 +-- src/client/mod.rs | 11 +- src/component/mod.rs | 11 +- src/happy_eyeballs.rs | 202 ++++++++++++++++++++++--------------- src/lib.rs | 5 +- src/starttls.rs | 14 +-- 8 files changed, 159 insertions(+), 116 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 02a706d..9c6c8d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,18 +12,18 @@ keywords = ["xmpp", "tokio"] [dependencies] futures = "0.1" -tokio-core = "0.1" +tokio = "0.1" tokio-io = "0.1" tokio-codec = "0.1" bytes = "0.4.9" xml5ever = "0.12" minidom = "0.9" -# TODO: update to 0.2.0 -native-tls = "0.1" -tokio-tls = "0.1" +native-tls = "0.2" +tokio-tls = "0.2" sasl = "0.4" jid = { version = "0.5", features = ["minidom"] } -domain = "0.2" +trust-dns-resolver = "0.9.1" +trust-dns-proto = "0.4.0" xmpp-parsers = "0.11" idna = "0.1" try_from = "0.2" diff --git a/examples/echo_bot.rs b/examples/echo_bot.rs index 462f7fe..1382226 100644 --- a/examples/echo_bot.rs +++ b/examples/echo_bot.rs @@ -1,5 +1,5 @@ extern crate futures; -extern crate tokio_core; +extern crate tokio; extern crate tokio_xmpp; extern crate jid; extern crate minidom; @@ -9,8 +9,8 @@ extern crate try_from; use std::env::args; use std::process::exit; use try_from::TryFrom; -use tokio_core::reactor::Core; use futures::{Stream, Sink, future}; +use tokio::runtime::current_thread::Runtime; use tokio_xmpp::Client; use minidom::Element; use xmpp_parsers::presence::{Presence, Type as PresenceType, Show as PresenceShow}; @@ -27,9 +27,9 @@ fn main() { let password = &args[2]; // tokio_core context - let mut core = Core::new().unwrap(); + let mut rt = Runtime::new().unwrap(); // Client instance - let client = Client::new(jid, password, core.handle()).unwrap(); + let client = Client::new(jid, password).unwrap(); // Make the two interfaces for sending and receiving independent // of each other so we can move one into a closure. @@ -64,7 +64,7 @@ fn main() { }); // Start polling `done` - match core.run(done) { + match rt.block_on(done) { Ok(_) => (), Err(e) => { println!("Fatal: {}", e); diff --git a/examples/echo_component.rs b/examples/echo_component.rs index 745e077..1239354 100644 --- a/examples/echo_component.rs +++ b/examples/echo_component.rs @@ -1,5 +1,5 @@ extern crate futures; -extern crate tokio_core; +extern crate tokio; extern crate tokio_xmpp; extern crate jid; extern crate minidom; @@ -10,7 +10,7 @@ use std::env::args; use std::process::exit; use std::str::FromStr; use try_from::TryFrom; -use tokio_core::reactor::Core; +use tokio::runtime::current_thread::Runtime; use futures::{Stream, Sink, future}; use tokio_xmpp::Component; use minidom::Element; @@ -30,10 +30,10 @@ fn main() { let port: u16 = args.get(4).unwrap().parse().unwrap_or(5347u16); // tokio_core context - let mut core = Core::new().unwrap(); + let mut rt = Runtime::new().unwrap(); // Component instance - println!("{} {} {} {} {:?}", jid, password, server, port, core.handle()); - let component = Component::new(jid, password, server, port, core.handle()).unwrap(); + println!("{} {} {} {}", jid, password, server, port); + let component = Component::new(jid, password, server, port).unwrap(); // Make the two interfaces for sending and receiving independent // of each other so we can move one into a closure. @@ -70,7 +70,7 @@ fn main() { }); // Start polling `done` - match core.run(done) { + match rt.block_on(done) { Ok(_) => (), Err(e) => { println!("Fatal: {}", e); diff --git a/src/client/mod.rs b/src/client/mod.rs index 44dc8d0..d415116 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,8 +1,7 @@ use std::mem::replace; use std::str::FromStr; use std::error::Error; -use tokio_core::reactor::Handle; -use tokio_core::net::TcpStream; +use tokio::net::TcpStream; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_tls::TlsStream; use futures::{future, Future, Stream, Poll, Async, Sink, StartSend, AsyncSink}; @@ -44,17 +43,17 @@ impl Client { /// /// Start polling the returned instance so that it will connect /// and yield events. - pub fn new(jid: &str, password: &str, handle: Handle) -> Result { + pub fn new(jid: &str, password: &str) -> Result { let jid = Jid::from_str(jid)?; let password = password.to_owned(); - let connect = Self::make_connect(jid.clone(), password.clone(), handle); + let connect = Self::make_connect(jid.clone(), password.clone()); Ok(Client { jid, state: ClientState::Connecting(connect), }) } - fn make_connect(jid: Jid, password: String, handle: Handle) -> Box> { + fn make_connect(jid: Jid, password: String) -> Box> { let username = jid.node.as_ref().unwrap().to_owned(); let jid1 = jid.clone(); let jid2 = jid.clone(); @@ -66,7 +65,7 @@ impl Client { return Box::new(future::err(format!("{:?}", e))), }; Box::new( - Connecter::from_lookup(handle, &domain, "_xmpp-client._tcp", 5222) + Connecter::from_lookup(&domain, "_xmpp-client._tcp", 5222) .expect("Connector::from_lookup") .and_then(move |tcp_stream| xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_CLIENT.to_owned()) diff --git a/src/component/mod.rs b/src/component/mod.rs index bf9a005..1cf8a39 100644 --- a/src/component/mod.rs +++ b/src/component/mod.rs @@ -4,8 +4,7 @@ use std::mem::replace; use std::str::FromStr; use std::error::Error; -use tokio_core::reactor::Handle; -use tokio_core::net::TcpStream; +use tokio::net::TcpStream; use tokio_io::{AsyncRead, AsyncWrite}; use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink}; use minidom::Element; @@ -41,21 +40,21 @@ impl Component { /// /// Start polling the returned instance so that it will connect /// and yield events. - pub fn new(jid: &str, password: &str, server: &str, port: u16, handle: Handle) -> Result { + pub fn new(jid: &str, password: &str, server: &str, port: u16) -> Result { let jid = Jid::from_str(jid)?; let password = password.to_owned(); - let connect = Self::make_connect(jid.clone(), password, server, port, handle); + let connect = Self::make_connect(jid.clone(), password, server, port); Ok(Component { jid, state: ComponentState::Connecting(connect), }) } - fn make_connect(jid: Jid, password: String, server: &str, port: u16, handle: Handle) -> Box> { + fn make_connect(jid: Jid, password: String, server: &str, port: u16) -> Box> { let jid1 = jid.clone(); let password = password; Box::new( - Connecter::from_lookup(handle, server, "_xmpp-component._tcp", port) + Connecter::from_lookup(server, "_xmpp-component._tcp", port) .expect("Connector::from_lookup") .and_then(move |tcp_stream| { xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_COMPONENT_ACCEPT.to_owned()) diff --git a/src/happy_eyeballs.rs b/src/happy_eyeballs.rs index 7b95f82..9edd2ca 100644 --- a/src/happy_eyeballs.rs +++ b/src/happy_eyeballs.rs @@ -1,37 +1,46 @@ -use std::str::FromStr; -use std::collections::HashMap; -use std::net::SocketAddr; -use futures::{Future, Poll, Async, Stream}; -use tokio_core::reactor::Handle; -use tokio_core::net::{TcpStream, TcpStreamNew}; -use domain::resolv::Resolver; -use domain::resolv::lookup::srv::{lookup_srv, LookupSrv, LookupSrvStream}; -use domain::bits::DNameBuf; +use std::mem; +use std::net::{SocketAddr, IpAddr}; +use std::collections::{BTreeMap, btree_map}; +use std::collections::VecDeque; +use futures::{Future, Poll, Async}; +use tokio::net::{ConnectFuture, TcpStream}; +use trust_dns_resolver::{IntoName, Name, ResolverFuture, error::ResolveError}; +use trust_dns_resolver::lookup::SrvLookupFuture; +use trust_dns_resolver::lookup_ip::LookupIpFuture; +use trust_dns_proto::rr::rdata::srv::SRV; pub struct Connecter { - handle: Handle, - resolver: Resolver, - lookup: Option, - srvs: Option, - connects: HashMap, + fallback_port: u16, + name: Name, + domain: Name, + resolver_future: Box + Send>, + resolver_opt: Option, + srv_lookup_opt: Option, + srvs_opt: Option>, + ip_lookup_opt: Option<(u16, LookupIpFuture)>, + ips_opt: Option<(u16, VecDeque)>, + connect_opt: Option, } impl Connecter { - pub fn from_lookup(handle: Handle, domain: &str, srv: &str, fallback_port: u16) -> Result { - let domain = DNameBuf::from_str(domain) - .map_err(|e| format!("{}", e))?; - let srv = DNameBuf::from_str(srv) - .map_err(|e| format!("{}", e))?; + pub fn from_lookup(domain: &str, srv: &str, fallback_port: u16) -> Result { + let resolver_future = ResolverFuture::from_system_conf() + .map_err(|e| format!("Configure resolver: {:?}", e))?; - let resolver = Resolver::new(&handle); - let lookup = lookup_srv(resolver.clone(), srv, domain, fallback_port); + let name = format!("{}.{}.", srv, domain).into_name() + .map_err(|e| format!("Parse service name: {:?}", e))?; Ok(Connecter { - handle, - resolver, - lookup: Some(lookup), - srvs: None, - connects: HashMap::new(), + fallback_port, + name, + domain: domain.into_name().map_err(|e| format!("Parse domain name: {:?}", e))?, + resolver_future, + resolver_opt: None, + srv_lookup_opt: None, + srvs_opt: None, + ip_lookup_opt: None, + ips_opt: None, + connect_opt: None, }) } } @@ -41,69 +50,104 @@ impl Future for Connecter { type Error = String; fn poll(&mut self) -> Poll { - match self.lookup.as_mut().map(|lookup| lookup.poll()) { - None | Some(Ok(Async::NotReady)) => (), - Some(Ok(Async::Ready(found_srvs))) => { - self.lookup = None; - match found_srvs { - Some(srvs) => - self.srvs = Some(srvs.to_stream(self.resolver.clone())), - None => - return Err("No SRV records".to_owned()), + if self.resolver_opt.is_none() { + //println!("Poll resolver future"); + match self.resolver_future.poll() { + Ok(Async::Ready(resolver)) => { + self.resolver_opt = Some(resolver); } - }, - Some(Err(e)) => - return Err(format!("{}", e)), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => return Err(format!("Cann't get resolver: {:?}", e)), + } } - match self.srvs.as_mut().map(|srv| srv.poll()) { - None | Some(Ok(Async::NotReady)) => (), - Some(Ok(Async::Ready(None))) => - self.srvs = None, - Some(Ok(Async::Ready(Some(srv_item)))) => { - let handle = &self.handle; - for addr in srv_item.to_socket_addrs() { - self.connects.entry(addr) - .or_insert_with(|| { - // println!("Connect to {}", addr); - TcpStream::connect(&addr, handle) - }); + if let Some(ref resolver) = self.resolver_opt { + if self.srvs_opt.is_none() { + if self.srv_lookup_opt.is_none() { + //println!("Lookup srv: {:?}", self.name); + self.srv_lookup_opt = Some(resolver.lookup_srv(&self.name)); } - }, - Some(Err(e)) => - return Err(format!("{}", e)), - } - let mut connected_stream = None; - self.connects.retain(|_, connect| { - if connected_stream.is_some() { - return false; + if let Some(ref mut srv_lookup) = self.srv_lookup_opt { + match srv_lookup.poll() { + Ok(Async::Ready(t)) => { + let mut srvs = BTreeMap::new(); + for srv in t.iter() { + srvs.insert(srv.priority(), srv.clone()); + } + srvs.insert(65535, SRV::new(65535, 0, self.fallback_port, self.domain.clone())); + self.srvs_opt = Some(srvs.into_iter()); + } + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(_) => { + //println!("Ignore SVR error: {:?}", e); + let mut srvs = BTreeMap::new(); + srvs.insert(65535, SRV::new(65535, 0, self.fallback_port, self.domain.clone())); + self.srvs_opt = Some(srvs.into_iter()); + }, + } + } } - match connect.poll() { - Ok(Async::NotReady) => true, - Ok(Async::Ready(tcp_stream)) => { - // Success! - connected_stream = Some(tcp_stream); - false - }, - Err(_e) => { - // println!("{}", _e); - false - }, - } - }); - if let Some(tcp_stream) = connected_stream { - return Ok(Async::Ready(tcp_stream)); - } + if self.connect_opt.is_none() { + if self.ips_opt.is_none() { + if self.ip_lookup_opt.is_none() { + if let Some(ref mut srvs) = self.srvs_opt { + if let Some((_, srv)) = srvs.next() { + //println!("Lookup ip: {:?}", srv); + self.ip_lookup_opt = Some((srv.port(), resolver.lookup_ip(srv.target()))); + } else { + return Err("Cann't connect".to_string()); + } + } + } + + if let Some((port, mut ip_lookup)) = mem::replace(&mut self.ip_lookup_opt, None) { + match ip_lookup.poll() { + Ok(Async::Ready(t)) => { + let mut ip_deque = VecDeque::new(); + ip_deque.extend(t.iter()); + //println!("IPs: {:?}", ip_deque); + self.ips_opt = Some((port, ip_deque)); + self.ip_lookup_opt = None; + }, + Ok(Async::NotReady) => { + self.ip_lookup_opt = Some((port, ip_lookup)); + return Ok(Async::NotReady) + }, + Err(_) => { + //println!("Ignore lookup error: {:?}", e); + self.ip_lookup_opt = None; + } + } + } + } + + if let Some((port, mut ip_deque)) = mem::replace(&mut self.ips_opt, None) { + if let Some(ip) = ip_deque.pop_front() { + //println!("Connect to {:?}:{}", ip, port); + self.connect_opt = Some(TcpStream::connect(&SocketAddr::new(ip, port))); + self.ips_opt = Some((port, ip_deque)); + } + } + } + + if let Some(mut connect_future) = mem::replace(&mut self.connect_opt, None) { + match connect_future.poll() { + Ok(Async::Ready(t)) => return Ok(Async::Ready(t)), + Ok(Async::NotReady) => { + self.connect_opt = Some(connect_future); + return Ok(Async::NotReady) + } + Err(_) => { + //println!("Ignore connect error: {:?}", e); + }, + } + } - if self.lookup.is_none() && - self.srvs.is_none() && - self.connects.is_empty() - { - return Err("All connection attempts failed".to_owned()); } Ok(Async::NotReady) } } + diff --git a/src/lib.rs b/src/lib.rs index ebba1f1..13f374d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,7 +3,7 @@ //! XMPP implemeentation with asynchronous I/O using Tokio. extern crate futures; -extern crate tokio_core; +extern crate tokio; extern crate tokio_io; extern crate tokio_codec; extern crate bytes; @@ -14,7 +14,8 @@ extern crate native_tls; extern crate tokio_tls; extern crate sasl; extern crate jid; -extern crate domain; +extern crate trust_dns_resolver; +extern crate trust_dns_proto; extern crate idna; extern crate xmpp_parsers; extern crate try_from; diff --git a/src/starttls.rs b/src/starttls.rs index 6ac23ac..1d56a9c 100644 --- a/src/starttls.rs +++ b/src/starttls.rs @@ -3,8 +3,8 @@ use futures::{Future, Sink, Poll, Async}; use futures::stream::Stream; use futures::sink; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_tls::{TlsStream, TlsConnectorExt, ConnectAsync}; -use native_tls::TlsConnector; +use tokio_tls::{TlsStream, TlsConnector, Connect}; +use native_tls::TlsConnector as NativeTlsConnector; use minidom::Element; use jid::Jid; @@ -25,7 +25,7 @@ enum StartTlsClientState { Invalid, SendStartTls(sink::Send>), AwaitProceed(XMPPStream), - StartingTls(ConnectAsync), + StartingTls(Connect), } impl StartTlsClient { @@ -53,7 +53,7 @@ impl Future for StartTlsClient { fn poll(&mut self) -> Poll { let old_state = replace(&mut self.state, StartTlsClientState::Invalid); let mut retry = false; - + let (new_state, result) = match old_state { StartTlsClientState::SendStartTls(mut send) => match send.poll() { @@ -73,9 +73,9 @@ impl Future for StartTlsClient { if stanza.name() == "proceed" => { let stream = xmpp_stream.stream.into_inner(); - let connect = TlsConnector::builder().unwrap() - .build().unwrap() - .connect_async(&self.jid.domain, stream); + let connect = TlsConnector::from(NativeTlsConnector::builder() + .build().unwrap()) + .connect(&self.jid.domain, stream); let new_state = StartTlsClientState::StartingTls(connect); retry = true; (new_state, Ok(Async::NotReady))