diff --git a/Cargo.toml b/Cargo.toml index 6410ca2..e80f7bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,8 +7,10 @@ authors = ["Astro "] futures = "*" tokio-core = "0.1.7" tokio-io = "*" -bytes = "*" -RustyXML = "*" +bytes = "0.4.4" +xml5ever = "*" +tendril = "*" +minidom = { path = "../../programs/minidom-rs" } native-tls = "*" tokio-tls = "*" sasl = "*" diff --git a/examples/echo_bot.rs b/examples/echo_bot.rs index ee99250..8ba944e 100644 --- a/examples/echo_bot.rs +++ b/examples/echo_bot.rs @@ -2,13 +2,14 @@ extern crate futures; extern crate tokio_core; extern crate tokio_xmpp; extern crate jid; -extern crate xml; +extern crate minidom; use std::env::args; use std::process::exit; use tokio_core::reactor::Core; use futures::{Future, Stream, Sink, future}; use tokio_xmpp::Client; +use minidom::Element; fn main() { let args: Vec = args().collect(); @@ -47,14 +48,14 @@ fn main() { let presence = make_presence(); send(presence); } else if let Some(stanza) = event.as_stanza() { - if stanza.name == "message" && - stanza.get_attribute("type", None) != Some("error") { + if stanza.name() == "message" && + stanza.attr("type") != Some("error") { // This is a message we'll echo - let from = stanza.get_attribute("from", None); - let body = stanza.get_child("body", Some("jabber:client")) - .map(|el| el.content_str()); + let from = stanza.attr("from"); + let body = stanza.get_child("body", "jabber:client") + .map(|el| el.text()); - match (from.as_ref(), body) { + match (from, body) { (Some(from), Some(body)) => { let reply = make_reply(from, body); send(reply); @@ -78,24 +79,24 @@ fn main() { } // Construct a -fn make_presence() -> xml::Element { - let mut presence = xml::Element::new("presence".to_owned(), None, vec![]); - presence.tag(xml::Element::new("status".to_owned(), None, vec![])) - .text("chat".to_owned()); - presence.tag(xml::Element::new("show".to_owned(), None, vec![])) - .text("Echoing messages".to_owned()); - presence +fn make_presence() -> Element { + Element::builder("presence") + .append(Element::builder("status") + .append("chat") + .build()) + .append(Element::builder("show") + .append("Echoing messages") + .build()) + .build() } // Construct a chat -fn make_reply(to: &str, body: String) -> xml::Element { - let mut message = xml::Element::new( - "message".to_owned(), - None, - vec![("type".to_owned(), None, "chat".to_owned()), - ("to".to_owned(), None, to.to_owned())] - ); - message.tag(xml::Element::new("body".to_owned(), None, vec![])) - .text(body); - message +fn make_reply(to: &str, body: String) -> Element { + Element::builder("message") + .attr("type", "chat") + .attr("to", to) + .append(Element::builder("body") + .append(body) + .build()) + .build() } diff --git a/src/client/auth.rs b/src/client/auth.rs index c6aa79d..8716fea 100644 --- a/src/client/auth.rs +++ b/src/client/auth.rs @@ -2,7 +2,7 @@ use std::mem::replace; use futures::*; use futures::sink; use tokio_io::{AsyncRead, AsyncWrite}; -use xml; +use minidom::Element; use sasl::common::Credentials; use sasl::common::scram::*; use sasl::client::Mechanism; @@ -37,12 +37,13 @@ impl ClientAuth { ]; let mech_names: Vec = - match stream.stream_features.get_child("mechanisms", Some(NS_XMPP_SASL)) { + match stream.stream_features.get_child("mechanisms", NS_XMPP_SASL) { None => return Err("No auth mechanisms".to_owned()), Some(mechs) => - mechs.get_children("mechanism", Some(NS_XMPP_SASL)) - .map(|mech_el| mech_el.content_str()) + mechs.children() + .filter(|child| child.is("mechanism", NS_XMPP_SASL)) + .map(|mech_el| mech_el.text()) .collect(), }; println!("SASL mechanisms offered: {:?}", mech_names); @@ -58,7 +59,7 @@ impl ClientAuth { }; this.send( stream, - "auth", &[("mechanism".to_owned(), name)], + "auth", &[("mechanism", &name)], &initial ); return Ok(this); @@ -68,15 +69,13 @@ impl ClientAuth { Err("No supported SASL mechanism available".to_owned()) } - fn send(&mut self, stream: XMPPStream, nonza_name: &str, attrs: &[(String, String)], content: &[u8]) { - let mut nonza = xml::Element::new( - nonza_name.to_owned(), - Some(NS_XMPP_SASL.to_owned()), - attrs.iter() - .map(|&(ref name, ref value)| (name.clone(), None, value.clone())) - .collect() - ); - nonza.text(content.to_base64(base64::URL_SAFE)); + fn send(&mut self, stream: XMPPStream, nonza_name: &str, attrs: &[(&str, &str)], content: &[u8]) { + let nonza = Element::builder(nonza_name) + .ns(NS_XMPP_SASL); + let nonza = attrs.iter() + .fold(nonza, |nonza, &(name, value)| nonza.attr(name, value)) + .append(content.to_base64(base64::URL_SAFE)) + .build(); let send = stream.send(Packet::Stanza(nonza)); @@ -108,11 +107,11 @@ impl Future for ClientAuth { ClientAuthState::WaitRecv(mut stream) => match stream.poll() { Ok(Async::Ready(Some(Packet::Stanza(ref stanza)))) - if stanza.name == "challenge" - && stanza.ns == Some(NS_XMPP_SASL.to_owned()) => + if stanza.name() == "challenge" + && stanza.ns() == Some(NS_XMPP_SASL) => { let content = try!( - stanza.content_str() + stanza.text() .from_base64() .map_err(|e| format!("{}", e)) ); @@ -121,29 +120,24 @@ impl Future for ClientAuth { self.poll() }, Ok(Async::Ready(Some(Packet::Stanza(ref stanza)))) - if stanza.name == "success" - && stanza.ns == Some(NS_XMPP_SASL.to_owned()) => + if stanza.name() == "success" + && stanza.ns() == Some(NS_XMPP_SASL) => { let start = stream.restart(); self.state = ClientAuthState::Start(start); self.poll() }, Ok(Async::Ready(Some(Packet::Stanza(ref stanza)))) - if stanza.name == "failure" - && stanza.ns == Some(NS_XMPP_SASL.to_owned()) => + if stanza.name() == "failure" + && stanza.ns() == Some(NS_XMPP_SASL) => { let mut e = None; - for child in &stanza.children { - match child { - &xml::Xml::ElementNode(ref child) => { - e = Some(child.name.clone()); - break - }, - _ => (), - } + for child in stanza.children() { + e = Some(child.name().clone()); + break } - let e = e.unwrap_or_else(|| "Authentication failure".to_owned()); - Err(e) + let e = e.unwrap_or_else(|| "Authentication failure"); + Err(e.to_owned()) }, Ok(Async::Ready(event)) => { println!("ClientAuth ignore {:?}", event); diff --git a/src/client/bind.rs b/src/client/bind.rs index 45f68e6..6a59503 100644 --- a/src/client/bind.rs +++ b/src/client/bind.rs @@ -4,8 +4,8 @@ use std::str::FromStr; use futures::*; use futures::sink; use tokio_io::{AsyncRead, AsyncWrite}; -use xml; use jid::Jid; +use minidom::Element; use xmpp_codec::*; use xmpp_stream::*; @@ -25,7 +25,7 @@ impl ClientBind { /// the stream for anything else until the resource binding /// req/resp are done. pub fn new(stream: XMPPStream) -> Self { - match stream.stream_features.get_child("bind", Some(NS_XMPP_BIND)) { + match stream.stream_features.get_child("bind", NS_XMPP_BIND) { None => // No resource binding available, // return the (probably // usable) stream immediately @@ -39,31 +39,22 @@ impl ClientBind { } } -fn make_bind_request(resource: Option<&String>) -> xml::Element { - let mut iq = xml::Element::new( - "iq".to_owned(), - None, - vec![("type".to_owned(), None, "set".to_owned()), - ("id".to_owned(), None, BIND_REQ_ID.to_owned())] - ); - { - let bind_el = iq.tag( - xml::Element::new( - "bind".to_owned(), - Some(NS_XMPP_BIND.to_owned()), - vec![] - )); - resource.map(|resource| { - let resource_el = bind_el.tag( - xml::Element::new( - "resource".to_owned(), - Some(NS_XMPP_BIND.to_owned()), - vec![] - )); - resource_el.text(resource.clone()); - }); +fn make_bind_request(resource: Option<&String>) -> Element { + let iq = Element::builder("iq") + .attr("type", "set") + .attr("id", BIND_REQ_ID); + let mut bind_el = Element::builder("bind") + .ns(NS_XMPP_BIND); + match resource { + Some(resource) => { + let resource_el = Element::builder("resource") + .append(resource); + bind_el = bind_el.append(resource_el.build()); + }, + None => (), } - iq + iq.append(bind_el.build()) + .build() } impl Future for ClientBind { @@ -93,9 +84,9 @@ impl Future for ClientBind { ClientBind::WaitRecv(mut stream) => { match stream.poll() { Ok(Async::Ready(Some(Packet::Stanza(ref iq)))) - if iq.name == "iq" - && iq.get_attribute("id", None) == Some(BIND_REQ_ID) => { - match iq.get_attribute("type", None) { + if iq.name() == "iq" + && iq.attr("id") == Some(BIND_REQ_ID) => { + match iq.attr("type") { Some("result") => { get_bind_response_jid(&iq) .map(|jid| stream.jid = jid); @@ -123,13 +114,13 @@ impl Future for ClientBind { } } -fn get_bind_response_jid(iq: &xml::Element) -> Option { - iq.get_child("bind", Some(NS_XMPP_BIND)) +fn get_bind_response_jid(iq: &Element) -> Option { + iq.get_child("bind", NS_XMPP_BIND) .and_then(|bind_el| - bind_el.get_child("jid", Some(NS_XMPP_BIND)) + bind_el.get_child("jid", NS_XMPP_BIND) ) .and_then(|jid_el| - Jid::from_str(&jid_el.content_str()) + Jid::from_str(&jid_el.text()) .ok() ) } diff --git a/src/client/event.rs b/src/client/event.rs index e54cff4..7aba984 100644 --- a/src/client/event.rs +++ b/src/client/event.rs @@ -1,10 +1,10 @@ -use xml; +use minidom::Element; #[derive(Debug)] pub enum Event { Online, Disconnected, - Stanza(xml::Element), + Stanza(Element), } impl Event { @@ -17,12 +17,12 @@ impl Event { pub fn is_stanza(&self, name: &str) -> bool { match self { - &Event::Stanza(ref stanza) => stanza.name == name, + &Event::Stanza(ref stanza) => stanza.name() == name, _ => false, } } - pub fn as_stanza(&self) -> Option<&xml::Element> { + pub fn as_stanza(&self) -> Option<&Element> { match self { &Event::Stanza(ref stanza) => Some(stanza), _ => None, diff --git a/src/client/mod.rs b/src/client/mod.rs index d0be336..de0bdb6 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -6,8 +6,8 @@ use tokio_core::net::TcpStream; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_tls::TlsStream; use futures::*; +use minidom::Element; use jid::{Jid, JidParseError}; -use xml; use sasl::common::{Credentials, ChannelBinding}; use super::xmpp_codec::Packet; @@ -76,7 +76,7 @@ impl Client { fn can_starttls(stream: &xmpp_stream::XMPPStream) -> bool { stream.stream_features - .get_child("starttls", Some(NS_XMPP_TLS)) + .get_child("starttls", NS_XMPP_TLS) .is_some() } @@ -151,7 +151,7 @@ impl Stream for Client { } impl Sink for Client { - type SinkItem = xml::Element; + type SinkItem = Element; type SinkError = String; fn start_send(&mut self, item: Self::SinkItem) -> StartSend { diff --git a/src/lib.rs b/src/lib.rs index bbc5d2e..c150760 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,7 +3,9 @@ extern crate futures; extern crate tokio_core; extern crate tokio_io; extern crate bytes; -extern crate xml; +extern crate xml5ever; +extern crate tendril; +extern crate minidom; extern crate native_tls; extern crate tokio_tls; extern crate sasl; diff --git a/src/starttls.rs b/src/starttls.rs index 79ee731..fe6c641 100644 --- a/src/starttls.rs +++ b/src/starttls.rs @@ -5,7 +5,7 @@ use futures::sink; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_tls::*; use native_tls::TlsConnector; -use xml; +use minidom::Element; use jid::Jid; use xmpp_codec::*; @@ -34,10 +34,9 @@ impl StartTlsClient { pub fn from_stream(xmpp_stream: XMPPStream) -> Self { let jid = xmpp_stream.jid.clone(); - let nonza = xml::Element::new( - "starttls".to_owned(), Some(NS_XMPP_TLS.to_owned()), - vec![] - ); + let nonza = Element::builder("starttls") + .ns(NS_XMPP_TLS) + .build(); let packet = Packet::Stanza(nonza); let send = xmpp_stream.send(packet); @@ -72,7 +71,7 @@ impl Future for StartTlsClient { StartTlsClientState::AwaitProceed(mut xmpp_stream) => match xmpp_stream.poll() { Ok(Async::Ready(Some(Packet::Stanza(ref stanza)))) - if stanza.name == "proceed" => + if stanza.name() == "proceed" => { let stream = xmpp_stream.stream.into_inner(); let connect = TlsConnector::builder().unwrap() diff --git a/src/stream_start.rs b/src/stream_start.rs index 11b074e..bbccadd 100644 --- a/src/stream_start.rs +++ b/src/stream_start.rs @@ -76,8 +76,8 @@ impl Future for StreamStart { StreamStartState::RecvFeatures(mut stream, stream_attrs) => match stream.poll() { Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => - if stanza.name == "features" - && stanza.ns == Some(NS_XMPP_STREAM.to_owned()) { + if stanza.name() == "features" + && stanza.ns() == Some(NS_XMPP_STREAM) { let stream = XMPPStream::new(self.jid.clone(), stream, stream_attrs, stanza); (StreamStartState::Invalid, Ok(Async::Ready(stream))) } else { diff --git a/src/xmpp_codec.rs b/src/xmpp_codec.rs index 1069372..e6a1bd2 100644 --- a/src/xmpp_codec.rs +++ b/src/xmpp_codec.rs @@ -1,69 +1,139 @@ use std; +use std::default::Default; +use std::iter::FromIterator; +use std::cell::RefCell; +use std::rc::Rc; use std::fmt::Write; use std::str::from_utf8; use std::io::{Error, ErrorKind}; use std::collections::HashMap; +use std::collections::vec_deque::VecDeque; use tokio_io::codec::{Encoder, Decoder}; -use xml; +use minidom::Element; +use xml5ever::tokenizer::{XmlTokenizer, TokenSink, Token, Tag, TagKind}; use bytes::*; -const NS_XMLNS: &'static str = "http://www.w3.org/2000/xmlns/"; - -pub type Attributes = HashMap<(String, Option), String>; - -struct XMPPRoot { - builder: xml::ElementBuilder, - pub attributes: Attributes, -} - -impl XMPPRoot { - fn new(root: xml::StartTag) -> Self { - let mut builder = xml::ElementBuilder::new(); - let mut attributes = HashMap::new(); - for (name_ns, value) in root.attributes { - match name_ns { - (ref name, None) if name == "xmlns" => - builder.set_default_ns(value), - (ref prefix, Some(ref ns)) if ns == NS_XMLNS => - builder.define_prefix(prefix.to_owned(), value), - _ => { - attributes.insert(name_ns, value); - }, - } - } - - XMPPRoot { - builder: builder, - attributes: attributes, - } - } - - fn handle_event(&mut self, event: Result) - -> Option> { - self.builder.handle_event(event) - } -} +// const NS_XMLNS: &'static str = "http://www.w3.org/2000/xmlns/"; #[derive(Debug)] pub enum Packet { Error(Box), StreamStart(HashMap), - Stanza(xml::Element), + Stanza(Element), Text(String), StreamEnd, } +struct ParserSink { + // Ready stanzas + queue: Rc>>, + // Parsing stack + stack: Vec, +} + +impl ParserSink { + pub fn new(queue: Rc>>) -> Self { + ParserSink { + queue, + stack: vec![], + } + } + + fn push_queue(&self, pkt: Packet) { + println!("push: {:?}", pkt); + self.queue.borrow_mut().push_back(pkt); + } + + fn handle_start_tag(&mut self, tag: Tag) { + let el = tag_to_element(&tag); + self.stack.push(el); + } + + fn handle_end_tag(&mut self) { + let el = self.stack.pop().unwrap(); + match self.stack.len() { + // + 0 => + self.push_queue(Packet::StreamEnd), + // + 1 => + self.push_queue(Packet::Stanza(el)), + len => { + let parent = &mut self.stack[len - 1]; + parent.append_child(el); + }, + } + } +} + +fn tag_to_element(tag: &Tag) -> Element { + let el_builder = Element::builder(tag.name.local.as_ref()) + .ns(tag.name.ns.as_ref()); + let el_builder = tag.attrs.iter().fold( + el_builder, + |el_builder, attr| el_builder.attr( + attr.name.local.as_ref(), + attr.value.as_ref() + ) + ); + el_builder.build() +} + +impl TokenSink for ParserSink { + fn process_token(&mut self, token: Token) { + println!("Token: {:?}", token); + match token { + Token::TagToken(tag) => match tag.kind { + TagKind::StartTag => + self.handle_start_tag(tag), + TagKind::EndTag => + self.handle_end_tag(), + TagKind::EmptyTag => { + self.handle_start_tag(tag); + self.handle_end_tag(); + }, + TagKind::ShortTag => + self.push_queue(Packet::Error(Box::new(Error::new(ErrorKind::InvalidInput, "ShortTag")))), + }, + Token::CharacterTokens(tendril) => + match self.stack.len() { + 0 | 1 => + self.push_queue(Packet::Text(tendril.into())), + len => { + let el = &mut self.stack[len - 1]; + el.append_text_node(tendril); + }, + }, + Token::EOFToken => + self.push_queue(Packet::StreamEnd), + Token::ParseError(s) => { + println!("ParseError: {:?}", s); + self.push_queue(Packet::Error(Box::new(Error::new(ErrorKind::InvalidInput, (*s).to_owned())))) + }, + _ => (), + } + } + + // fn end(&mut self) { + // } +} + pub struct XMPPCodec { - parser: xml::Parser, - root: Option, + parser: XmlTokenizer, + // For handling truncated utf8 buf: Vec, + queue: Rc>>, } impl XMPPCodec { pub fn new() -> Self { + let queue = Rc::new(RefCell::new((VecDeque::new()))); + let sink = ParserSink::new(queue.clone()); + // TODO: configure parser? + let parser = XmlTokenizer::new(sink, Default::default()); XMPPCodec { - parser: xml::Parser::new(), - root: None, + parser, + queue, buf: vec![], } } @@ -74,6 +144,7 @@ impl Decoder for XMPPCodec { type Error = Error; fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { + println!("decode {} bytes", buf.len()); let buf1: Box> = if self.buf.len() > 0 && buf.len() > 0 { let mut prefix = std::mem::replace(&mut self.buf, vec![]); @@ -87,7 +158,8 @@ impl Decoder for XMPPCodec { Ok(s) => { if s.len() > 0 { println!("<< {}", s); - self.parser.feed_str(s); + let tendril = FromIterator::from_iter(s.chars()); + self.parser.feed(tendril); } }, // Remedies for truncated utf8 @@ -110,57 +182,11 @@ impl Decoder for XMPPCodec { }, } - let mut new_root: Option = None; - let mut result = None; - for event in &mut self.parser { - match self.root { - None => { - // Expecting - match event { - Ok(xml::Event::ElementStart(start_tag)) => { - let mut attrs: HashMap = HashMap::new(); - for (&(ref name, _), value) in &start_tag.attributes { - attrs.insert(name.to_owned(), value.to_owned()); - } - result = Some(Packet::StreamStart(attrs)); - self.root = Some(XMPPRoot::new(start_tag)); - break - }, - Err(e) => { - result = Some(Packet::Error(Box::new(e))); - break - }, - _ => - (), - } - } - - Some(ref mut root) => { - match root.handle_event(event) { - None => (), - Some(Ok(stanza)) => { - // Emit the stanza - result = Some(Packet::Stanza(stanza)); - break - }, - Some(Err(e)) => { - result = Some(Packet::Error(Box::new(e))); - break - } - }; - }, - } - - match new_root.take() { - None => (), - Some(root) => self.root = Some(root), - } - } - + let result = self.queue.borrow_mut().pop_front(); Ok(result) } - fn decode_eof(&mut self, buf: &mut BytesMut) -> Result, Error> { + fn decode_eof(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { self.decode(buf) } } @@ -170,35 +196,56 @@ impl Encoder for XMPPCodec { type Error = Error; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + println!("encode {:?}", item); match item { Packet::StreamStart(start_attrs) => { let mut buf = String::new(); write!(buf, "\n").unwrap(); print!(">> {}", buf); write!(dst, "{}", buf) + .map_err(|_| Error::from(ErrorKind::InvalidInput)) }, Packet::Stanza(stanza) => { - println!(">> {}", stanza); - write!(dst, "{}", stanza) + println!(">> {:?}", stanza); + let mut root_ns = None; // TODO + stanza.write_to_inner(&mut dst.clone().writer(), &mut root_ns) + .map_err(|_| Error::from(ErrorKind::InvalidInput)) }, Packet::Text(text) => { - let escaped = xml::escape(&text); + let escaped = escape(&text); println!(">> {}", escaped); write!(dst, "{}", escaped) + .map_err(|_| Error::from(ErrorKind::InvalidInput)) }, // TODO: Implement all _ => Ok(()) } - .map_err(|_| Error::from(ErrorKind::InvalidInput)) } } +/// Copied from RustyXML for now +pub fn escape(input: &str) -> String { + let mut result = String::with_capacity(input.len()); + + for c in input.chars() { + match c { + '&' => result.push_str("&"), + '<' => result.push_str("<"), + '>' => result.push_str(">"), + '\'' => result.push_str("'"), + '"' => result.push_str("""), + o => result.push(o) + } + } + result +} + #[cfg(test)] mod tests { use super::*; @@ -240,8 +287,8 @@ mod tests { let r = c.decode(&mut b); assert!(match r { Ok(Some(Packet::Stanza(ref el))) - if el.name == "test" - && el.content_str() == "ß" + if el.name() == "test" + && el.text() == "ß" => true, _ => false, }); @@ -271,8 +318,8 @@ mod tests { let r = c.decode(&mut b); assert!(match r { Ok(Some(Packet::Stanza(ref el))) - if el.name == "test" - && el.content_str() == "ß" + if el.name() == "test" + && el.text() == "ß" => true, _ => false, }); diff --git a/src/xmpp_stream.rs b/src/xmpp_stream.rs index a09b5a9..932cbce 100644 --- a/src/xmpp_stream.rs +++ b/src/xmpp_stream.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use futures::*; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::Framed; -use xml; +use minidom::Element; use jid::Jid; use xmpp_codec::*; @@ -14,14 +14,14 @@ pub struct XMPPStream { pub jid: Jid, pub stream: Framed, pub stream_attrs: HashMap, - pub stream_features: xml::Element, + pub stream_features: Element, } impl XMPPStream { pub fn new(jid: Jid, stream: Framed, stream_attrs: HashMap, - stream_features: xml::Element) -> Self { + stream_features: Element) -> Self { XMPPStream { jid, stream, stream_attrs, stream_features } }