From 288930bcd44351d9f9617f8a213a5ebd3e2e8a81 Mon Sep 17 00:00:00 2001 From: Astro Date: Sun, 4 Jun 2017 01:37:46 +0200 Subject: [PATCH] reorg --- examples/echo_bot.rs | 29 ++++++++++ src/lib.rs | 128 ++----------------------------------------- src/tcp.rs | 97 ++++++++++++++++++++++++++++++++ 3 files changed, 131 insertions(+), 123 deletions(-) create mode 100644 examples/echo_bot.rs create mode 100644 src/tcp.rs diff --git a/examples/echo_bot.rs b/examples/echo_bot.rs new file mode 100644 index 00000000..f1e4aff2 --- /dev/null +++ b/examples/echo_bot.rs @@ -0,0 +1,29 @@ +extern crate futures; +extern crate tokio_core; +extern crate tokio_xmpp; + +use tokio_core::reactor::Core; +use futures::{Future, Stream}; +use tokio_xmpp::{Packet, TcpClient}; + +fn main() { + use std::net::ToSocketAddrs; + let addr = "[2a01:4f8:a0:33d0::5]:5222" + .to_socket_addrs().unwrap() + .next().unwrap(); + + let mut core = Core::new().unwrap(); + let client = TcpClient::connect( + &addr, + &core.handle() + ).and_then(|stream| { + stream.for_each(|event| { + match event { + Packet::Stanza(el) => println!("<< {}", el), + _ => println!("!! {:?}", event), + } + Ok(()) + }) + }); + core.run(client).unwrap(); +} diff --git a/src/lib.rs b/src/lib.rs index 043c81e7..02a3295a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,133 +2,15 @@ extern crate futures; extern crate tokio_core; extern crate xml; +extern crate rustls; +extern crate tokio_rustls; -use std::net::SocketAddr; -use std::net::ToSocketAddrs; -use std::sync::Arc; -use std::io::ErrorKind; -use futures::{Future, BoxFuture, Sink, Poll, Async}; -use futures::stream::{Stream, iter}; -use futures::future::result; -use tokio_core::reactor::Handle; -use tokio_core::io::Io; -use tokio_core::net::{TcpStream, TcpStreamNew}; mod xmpp_codec; -use xmpp_codec::*; +pub use xmpp_codec::*; +mod tcp; +pub use tcp::*; // type FullClient = sasl::Client> -#[derive(Debug)] -pub struct TcpClient { - state: TcpClientState, -} - -enum TcpClientState { - Connecting(TcpStreamNew), - SendStart(futures::sink::Send>), - RecvStart(Option>), - Established, - Invalid, -} - -impl std::fmt::Debug for TcpClientState { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { - let s = match *self { - TcpClientState::Connecting(_) => "Connecting", - TcpClientState::SendStart(_) => "SendStart", - TcpClientState::RecvStart(_) => "RecvStart", - TcpClientState::Established => "Established", - TcpClientState::Invalid => "Invalid", - }; - write!(fmt, "{}", s) - } -} - -impl TcpClient { - pub fn connect(addr: &SocketAddr, handle: &Handle) -> Self { - let tcp_stream_new = TcpStream::connect(addr, handle); - TcpClient { - state: TcpClientState::Connecting(tcp_stream_new), - } - } -} - -impl Future for TcpClient { - type Item = XMPPStream; - type Error = std::io::Error; - - fn poll(&mut self) -> Poll { - let (new_state, result) = match self.state { - TcpClientState::Connecting(ref mut tcp_stream_new) => { - let tcp_stream = try_ready!(tcp_stream_new.poll()); - let xmpp_stream = tcp_stream.framed(XMPPCodec::new()); - let send = xmpp_stream.send(Packet::StreamStart); - let new_state = TcpClientState::SendStart(send); - (new_state, Ok(Async::NotReady)) - }, - TcpClientState::SendStart(ref mut send) => { - let xmpp_stream = try_ready!(send.poll()); - let new_state = TcpClientState::RecvStart(Some(xmpp_stream)); - (new_state, Ok(Async::NotReady)) - }, - TcpClientState::RecvStart(ref mut opt_xmpp_stream) => { - let mut xmpp_stream = opt_xmpp_stream.take().unwrap(); - match xmpp_stream.poll() { - Ok(Async::Ready(Some(Packet::StreamStart))) => println!("Recv start!"), - Ok(Async::Ready(_)) => return Err(std::io::Error::from(ErrorKind::InvalidData)), - Ok(Async::NotReady) => { - *opt_xmpp_stream = Some(xmpp_stream); - return Ok(Async::NotReady); - }, - Err(e) => return Err(e) - }; - let new_state = TcpClientState::Established; - (new_state, Ok(Async::Ready(xmpp_stream))) - }, - TcpClientState::Established | TcpClientState::Invalid => - unreachable!(), - }; - - println!("Next state: {:?}", new_state); - self.state = new_state; - match result { - // by polling again, we register new future - Ok(Async::NotReady) => self.poll(), - result => result - } - } -} - -#[cfg(test)] -mod tests { - use tokio_core::reactor::Core; - use futures::{Future, Stream}; - use xmpp_codec::Packet; - - #[test] - fn it_works() { - use std::net::ToSocketAddrs; - let addr = "[2a01:4f8:a0:33d0::5]:5222" - .to_socket_addrs().unwrap() - .next().unwrap(); - - let mut core = Core::new().unwrap(); - let client = super::TcpClient::connect( - &addr, - &core.handle() - ).and_then(|stream| { - stream.for_each(|event| { - match event { - Packet::Stanza(el) => println!("<< {}", el), - _ => println!("!! {:?}", event), - } - Ok(()) - }) - }); - core.run(client).unwrap(); - } - - // TODO: test truncated utf8 -} diff --git a/src/tcp.rs b/src/tcp.rs new file mode 100644 index 00000000..fc0d6654 --- /dev/null +++ b/src/tcp.rs @@ -0,0 +1,97 @@ +use std::fmt; +use std::net::SocketAddr; +use std::net::ToSocketAddrs; +use std::sync::Arc; +use std::io::{Error, ErrorKind}; +use futures::{Future, BoxFuture, Sink, Poll, Async}; +use futures::stream::{Stream, iter}; +use futures::sink; +use tokio_core::reactor::Handle; +use tokio_core::io::Io; +use tokio_core::net::{TcpStream, TcpStreamNew}; +use rustls::ClientConfig; +use tokio_rustls::ClientConfigExt; + +use super::{XMPPStream, XMPPCodec, Packet}; + + +#[derive(Debug)] +pub struct TcpClient { + state: TcpClientState, +} + +enum TcpClientState { + Connecting(TcpStreamNew), + SendStart(sink::Send>), + RecvStart(Option>), + Established, + Invalid, +} + +impl fmt::Debug for TcpClientState { + fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> { + let s = match *self { + TcpClientState::Connecting(_) => "Connecting", + TcpClientState::SendStart(_) => "SendStart", + TcpClientState::RecvStart(_) => "RecvStart", + TcpClientState::Established => "Established", + TcpClientState::Invalid => "Invalid", + }; + write!(fmt, "{}", s) + } +} + +impl TcpClient { + pub fn connect(addr: &SocketAddr, handle: &Handle) -> Self { + let tcp_stream_new = TcpStream::connect(addr, handle); + TcpClient { + state: TcpClientState::Connecting(tcp_stream_new), + } + } +} + +impl Future for TcpClient { + type Item = XMPPStream; + type Error = Error; + + fn poll(&mut self) -> Poll { + let (new_state, result) = match self.state { + TcpClientState::Connecting(ref mut tcp_stream_new) => { + let tcp_stream = try_ready!(tcp_stream_new.poll()); + let xmpp_stream = tcp_stream.framed(XMPPCodec::new()); + let send = xmpp_stream.send(Packet::StreamStart); + let new_state = TcpClientState::SendStart(send); + (new_state, Ok(Async::NotReady)) + }, + TcpClientState::SendStart(ref mut send) => { + let xmpp_stream = try_ready!(send.poll()); + let new_state = TcpClientState::RecvStart(Some(xmpp_stream)); + (new_state, Ok(Async::NotReady)) + }, + TcpClientState::RecvStart(ref mut opt_xmpp_stream) => { + let mut xmpp_stream = opt_xmpp_stream.take().unwrap(); + match xmpp_stream.poll() { + Ok(Async::Ready(Some(Packet::StreamStart))) => println!("Recv start!"), + Ok(Async::Ready(_)) => return Err(Error::from(ErrorKind::InvalidData)), + Ok(Async::NotReady) => { + *opt_xmpp_stream = Some(xmpp_stream); + return Ok(Async::NotReady); + }, + Err(e) => return Err(e) + }; + let new_state = TcpClientState::Established; + (new_state, Ok(Async::Ready(xmpp_stream))) + }, + TcpClientState::Established | TcpClientState::Invalid => + unreachable!(), + }; + + println!("Next state: {:?}", new_state); + self.state = new_state; + match result { + // by polling again, we register new future + Ok(Async::NotReady) => self.poll(), + result => result + } + } +}