From 482bf779551b48d82a3c54f5f8a40ba93401eab2 Mon Sep 17 00:00:00 2001 From: Astro Date: Sun, 4 Jun 2017 02:05:08 +0200 Subject: [PATCH] tidy up --- Cargo.toml | 2 ++ src/lib.rs | 4 ++-- src/tcp.rs | 19 +++++++---------- src/xmpp_codec.rs | 53 ++++++++++++++++++++++------------------------- 4 files changed, 36 insertions(+), 42 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2a22e0bd..23d59db4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,4 +6,6 @@ authors = ["Astro "] [dependencies] futures = "*" tokio-core = "*" +tokio-io = "*" +bytes = "*" RustyXML = "*" diff --git a/src/lib.rs b/src/lib.rs index 02a3295a..20ded2ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,9 @@ #[macro_use] extern crate futures; extern crate tokio_core; +extern crate tokio_io; +extern crate bytes; extern crate xml; -extern crate rustls; -extern crate tokio_rustls; mod xmpp_codec; diff --git a/src/tcp.rs b/src/tcp.rs index fc0d6654..01b9db0f 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -1,16 +1,12 @@ use std::fmt; use std::net::SocketAddr; -use std::net::ToSocketAddrs; -use std::sync::Arc; use std::io::{Error, ErrorKind}; -use futures::{Future, BoxFuture, Sink, Poll, Async}; -use futures::stream::{Stream, iter}; +use futures::{Future, Sink, Poll, Async}; +use futures::stream::Stream; use futures::sink; use tokio_core::reactor::Handle; -use tokio_core::io::Io; +use tokio_io::AsyncRead; use tokio_core::net::{TcpStream, TcpStreamNew}; -use rustls::ClientConfig; -use tokio_rustls::ClientConfigExt; use super::{XMPPStream, XMPPCodec, Packet}; @@ -25,7 +21,6 @@ enum TcpClientState { SendStart(sink::Send>), RecvStart(Option>), Established, - Invalid, } impl fmt::Debug for TcpClientState { @@ -35,9 +30,9 @@ impl fmt::Debug for TcpClientState { TcpClientState::SendStart(_) => "SendStart", TcpClientState::RecvStart(_) => "RecvStart", TcpClientState::Established => "Established", - TcpClientState::Invalid => "Invalid", }; - write!(fmt, "{}", s) + try!(write!(fmt, "{}", s)); + Ok(()) } } @@ -58,7 +53,7 @@ impl Future for TcpClient { let (new_state, result) = match self.state { TcpClientState::Connecting(ref mut tcp_stream_new) => { let tcp_stream = try_ready!(tcp_stream_new.poll()); - let xmpp_stream = tcp_stream.framed(XMPPCodec::new()); + let xmpp_stream = AsyncRead::framed(tcp_stream, XMPPCodec::new()); let send = xmpp_stream.send(Packet::StreamStart); let new_state = TcpClientState::SendStart(send); (new_state, Ok(Async::NotReady)) @@ -82,7 +77,7 @@ impl Future for TcpClient { let new_state = TcpClientState::Established; (new_state, Ok(Async::Ready(xmpp_stream))) }, - TcpClientState::Established | TcpClientState::Invalid => + TcpClientState::Established => unreachable!(), }; diff --git a/src/xmpp_codec.rs b/src/xmpp_codec.rs index dc7aeb7e..28c0dfbd 100644 --- a/src/xmpp_codec.rs +++ b/src/xmpp_codec.rs @@ -1,9 +1,11 @@ use std; +use std::fmt::Write; use std::str::from_utf8; use std::io::{Error, ErrorKind}; use std::collections::HashMap; -use tokio_core::io::{Codec, EasyBuf, Framed}; +use tokio_io::codec::{Framed, Encoder, Decoder}; use xml; +use bytes::*; const NS_XMLNS: &'static str = "http://www.w3.org/2000/xmlns/"; const NS_STREAMS: &'static str = "http://etherx.jabber.org/streams"; @@ -67,22 +69,20 @@ impl XMPPCodec { } } -impl Codec for XMPPCodec { - type In = Packet; - type Out = Packet; +impl Decoder for XMPPCodec { + type Item = Packet; + type Error = Error; - fn decode(&mut self, buf: &mut EasyBuf) -> Result, Error> { + fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { println!("XMPPCodec.decode {:?}", buf.len()); - let buf_len = buf.len(); - let chunk = buf.drain_to(buf_len); - match from_utf8(chunk.as_slice()) { + match from_utf8(buf.take().as_ref()) { Ok(s) => self.parser.feed_str(s), Err(e) => return Err(Error::new(ErrorKind::InvalidInput, e)), } - let mut new_root = None; + let mut new_root: Option = None; let mut result = None; for event in &mut self.parser { match self.root { @@ -128,29 +128,26 @@ impl Codec for XMPPCodec { Ok(result) } - fn encode(&mut self, msg: Self::Out, buf: &mut Vec) -> Result<(), Error> { - match msg { + fn decode_eof(&mut self, buf: &mut BytesMut) -> Result, Error> { + self.decode(buf) + } +} + +impl Encoder for XMPPCodec { + type Item = Packet; + type Error = Error; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + match item { Packet::StreamStart => { - let mut write = |s: &str| { - buf.extend_from_slice(s.as_bytes()); - }; - - write("\n"); - write("\n"); - - Ok(()) + write!(dst, + "\n +\n", + NS_CLIENT, NS_STREAMS) + .map_err(|_| Error::from(ErrorKind::WriteZero)) }, // TODO: Implement all _ => Ok(()) } } - - fn decode_eof(&mut self, _buf: &mut EasyBuf) -> Result { - Err(Error::from(ErrorKind::UnexpectedEof)) - } }