From 005013f37ca8af711431e76852842b2bb32c83a3 Mon Sep 17 00:00:00 2001 From: Astro Date: Sat, 3 Jun 2017 01:58:31 +0200 Subject: [PATCH] this kinda works --- Cargo.toml | 6 +- src/lib.rs | 149 ++++++++++++++++++++++++++-------------------- src/xmpp_codec.rs | 5 +- 3 files changed, 90 insertions(+), 70 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4e643380..2a22e0bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,6 @@ version = "0.1.0" authors = ["Astro "] [dependencies] -futures = "0.1.6" -tokio-core = "0.1.1" -RustyXML = "0.1.1" +futures = "*" +tokio-core = "*" +RustyXML = "*" diff --git a/src/lib.rs b/src/lib.rs index e9162b02..f8e6453e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +#[macro_use] extern crate futures; extern crate tokio_core; extern crate xml; @@ -6,12 +7,12 @@ use std::net::SocketAddr; use std::net::ToSocketAddrs; use std::sync::Arc; use std::io::ErrorKind; -use futures::{Future, BoxFuture, Sink, Poll}; +use futures::{Future, BoxFuture, Sink, Poll, Async}; use futures::stream::{Stream, iter}; use futures::future::result; use tokio_core::reactor::Handle; use tokio_core::io::Io; -use tokio_core::net::TcpStream; +use tokio_core::net::{TcpStream, TcpStreamNew}; mod xmpp_codec; use xmpp_codec::*; @@ -19,92 +20,108 @@ use xmpp_codec::*; // type FullClient = sasl::Client> -type Event = (); -type Error = std::io::Error; - -struct TCPStream { - source: Box>, - sink: Arc>>>, +#[derive(Debug)] +pub struct TcpClient { + state: TcpClientState, } -impl TCPStream { - pub fn connect(addr: &SocketAddr, handle: &Handle) -> BoxFuture, std::io::Error> { - TcpStream::connect(addr, handle) - .and_then(|stream| { - let (sink, source) = stream.framed(XMPPCodec::new()) - // .framed(UTF8Codec::new()) - .split(); - - sink.send(Packet::StreamStart) - .and_then(|sink| result(Ok((Arc::new(Box::new(sink)), source)))) - }) - .and_then(|(sink, source)| { - let sink1 = sink.clone(); - let source = source - .map(|items| iter(items.into_iter().map(Ok))) - .flatten() - .filter_map(move |pkt| Self::process_packet(pkt, &sink1)) - // .for_each(|ev| { - // match ev { - // Packet::Stanza - // _ => (), - // } - // Ok(println!("xmpp: {:?}", ev)) - // }) - // .boxed(); - ; - result(Ok(Arc::new(TCPStream { - source: Box::new(source), - sink: sink, - }))) - }).boxed() - //.map_err(|e| std::io::Error::new(ErrorKind::Other, e)); - } +enum TcpClientState { + Connecting(TcpStreamNew), + SendStart(futures::sink::Send>), + RecvStart(Option>), + Established, + Invalid, +} - fn process_packet(pkt: Packet, sink: &Arc) -> Option - where S: Sink { - - println!("pkt: {:?}", pkt); - None +impl std::fmt::Debug for TcpClientState { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { + let s = match *self { + TcpClientState::Connecting(_) => "Connecting", + TcpClientState::SendStart(_) => "SendStart", + TcpClientState::RecvStart(_) => "RecvStart", + TcpClientState::Established => "Established", + TcpClientState::Invalid => "Invalid", + }; + write!(fmt, "{}", s) } } -struct ClientStream { - inner: TCPStream, +impl TcpClient { + pub fn connect(addr: &SocketAddr, handle: &Handle) -> Self { + let tcp_stream_new = TcpStream::connect(addr, handle); + TcpClient { + state: TcpClientState::Connecting(tcp_stream_new), + } + } } -impl ClientStream { - pub fn connect(jid: &str, password: &str, handle: &Handle) -> Box> { - let addr = "[2a01:4f8:a0:33d0::5]:5222" - .to_socket_addrs().unwrap() - .next().unwrap(); - let stream = - TCPStream::connect(&addr, handle) - .and_then(|stream| { - Ok(ClientStream { - inner: stream - }) - }); - Box::new(stream) +impl Future for TcpClient { + type Item = XMPPStream; + type Error = std::io::Error; + + fn poll(&mut self) -> Poll { + let (new_state, result) = match self.state { + TcpClientState::Connecting(ref mut tcp_stream_new) => { + let tcp_stream = try_ready!(tcp_stream_new.poll()); + let xmpp_stream = tcp_stream.framed(XMPPCodec::new()); + let send = xmpp_stream.send(Packet::StreamStart); + let new_state = TcpClientState::SendStart(send); + (new_state, Ok(Async::NotReady)) + }, + TcpClientState::SendStart(ref mut send) => { + let xmpp_stream = try_ready!(send.poll()); + let new_state = TcpClientState::RecvStart(Some(xmpp_stream)); + (new_state, Ok(Async::NotReady)) + }, + TcpClientState::RecvStart(ref mut opt_xmpp_stream) => { + let mut xmpp_stream = opt_xmpp_stream.take().unwrap(); + match xmpp_stream.poll() { + Ok(Async::Ready(Some(events))) => println!("Recv start: {:?}", events), + Ok(Async::Ready(_)) => return Err(std::io::Error::from(ErrorKind::InvalidData)), + Ok(Async::NotReady) => { + *opt_xmpp_stream = Some(xmpp_stream); + return Ok(Async::NotReady); + }, + Err(e) => return Err(e) + }; + let new_state = TcpClientState::Established; + (new_state, Ok(Async::Ready(xmpp_stream))) + }, + TcpClientState::Established | TcpClientState::Invalid => + unreachable!(), + }; + + println!("Next state: {:?}", new_state); + self.state = new_state; + match result { + // by polling again, we register new future + Ok(Async::NotReady) => self.poll(), + result => result + } } } #[cfg(test)] mod tests { use tokio_core::reactor::Core; + use futures::{Future, Stream}; #[test] fn it_works() { + use std::net::ToSocketAddrs; + let addr = "[2a01:4f8:a0:33d0::5]:5222" + .to_socket_addrs().unwrap() + .next().unwrap(); + let mut core = Core::new().unwrap(); - let client = super::ClientStream::connect( - "astro@spaceboyz.net", - "...", + let client = super::TcpClient::connect( + &addr, &core.handle() ).and_then(|stream| { - stream.inner.source.boxed().for_each(|item| { + stream.for_each(|item| { Ok(println!("stream item: {:?}", item)) }) - }).boxed(); + }); core.run(client).unwrap(); } diff --git a/src/xmpp_codec.rs b/src/xmpp_codec.rs index 84733931..e76dce59 100644 --- a/src/xmpp_codec.rs +++ b/src/xmpp_codec.rs @@ -2,7 +2,7 @@ use std; use std::str::from_utf8; use std::io::{Error, ErrorKind}; use std::collections::HashMap; -use tokio_core::io::{Codec, EasyBuf}; +use tokio_core::io::{Codec, EasyBuf, Framed}; use xml; const NS_XMLNS: &'static str = "http://www.w3.org/2000/xmlns/"; @@ -51,6 +51,8 @@ pub enum Packet { StreamEnd, } +pub type XMPPStream = Framed; + pub struct XMPPCodec { parser: xml::Parser, root: Option, @@ -70,6 +72,7 @@ impl Codec for XMPPCodec { type Out = Packet; fn decode(&mut self, buf: &mut EasyBuf) -> Result, Error> { + println!("XMPPCodec.decode {:?}", buf.len()); match from_utf8(buf.as_slice()) { Ok(s) => self.parser.feed_str(s),