use std::str::FromStr; use std::collections::HashMap; use std::net::SocketAddr; use futures::{Future, Poll, Async, Stream}; use tokio_core::reactor::Handle; use tokio_core::net::{TcpStream, TcpStreamNew}; use domain::resolv::Resolver; use domain::resolv::lookup::srv::{lookup_srv, LookupSrv, LookupSrvStream}; use domain::bits::name::{DNameBuf, FromStrError}; use ConnecterError; pub struct Connecter { handle: Handle, resolver: Resolver, lookup: Option, srvs: Option, connects: HashMap, } impl Connecter { pub fn from_lookup(handle: Handle, domain: &str, srv: &str, fallback_port: u16) -> Result { let domain = DNameBuf::from_str(domain)?; let srv = DNameBuf::from_str(srv)?; let resolver = Resolver::new(&handle); let lookup = lookup_srv(resolver.clone(), srv, domain, fallback_port); Ok(Connecter { handle, resolver, lookup: Some(lookup), srvs: None, connects: HashMap::new(), }) } } impl Future for Connecter { type Item = TcpStream; type Error = ConnecterError; fn poll(&mut self) -> Poll { match self.lookup.as_mut().map(|lookup| lookup.poll()) { None | Some(Ok(Async::NotReady)) => (), Some(Ok(Async::Ready(found_srvs))) => { self.lookup = None; match found_srvs { Some(srvs) => self.srvs = Some(srvs.to_stream(self.resolver.clone())), None => return Err(ConnecterError::NoSrv), } }, Some(Err(e)) => return Err(e.into()), } match self.srvs.as_mut().map(|srv| srv.poll()) { None | Some(Ok(Async::NotReady)) => (), Some(Ok(Async::Ready(None))) => self.srvs = None, Some(Ok(Async::Ready(Some(srv_item)))) => { let handle = &self.handle; for addr in srv_item.to_socket_addrs() { self.connects.entry(addr) .or_insert_with(|| { // println!("Connect to {}", addr); TcpStream::connect(&addr, handle) }); } }, Some(Err(e)) => return Err(e.into()), } let mut connected_stream = None; self.connects.retain(|_, connect| { if connected_stream.is_some() { return false; } match connect.poll() { Ok(Async::NotReady) => true, Ok(Async::Ready(tcp_stream)) => { // Success! connected_stream = Some(tcp_stream); false }, Err(_e) => { // println!("{}", _e); false }, } }); if let Some(tcp_stream) = connected_stream { return Ok(Async::Ready(tcp_stream)); } if self.lookup.is_none() && self.srvs.is_none() && self.connects.is_empty() { return Err(ConnecterError::AllFailed); } Ok(Async::NotReady) } }