From 8e5a7530ed666a957c03de8d0e9d6bddbab9815f Mon Sep 17 00:00:00 2001 From: Astro Date: Thu, 24 Mar 2022 01:49:00 +0100 Subject: [PATCH] tokio-xmpp: use new minidom tokenizer --- minidom/src/element.rs | 15 ++- minidom/src/error.rs | 11 -- minidom/src/tokenizer.rs | 8 +- minidom/src/tree_builder.rs | 17 ++- tokio-xmpp/Cargo.toml | 2 +- tokio-xmpp/src/error.rs | 51 ++----- tokio-xmpp/src/lib.rs | 2 +- tokio-xmpp/src/xmpp_codec.rs | 252 ++++++----------------------------- 8 files changed, 84 insertions(+), 274 deletions(-) diff --git a/minidom/src/element.rs b/minidom/src/element.rs index bb54233..36075b4 100644 --- a/minidom/src/element.rs +++ b/minidom/src/element.rs @@ -84,7 +84,8 @@ pub struct Element { /// This is only used when deserializing. If you have to use a custom prefix use /// `ElementBuilder::prefix`. pub(crate) prefix: Option, - prefixes: Prefixes, + /// Namespace declarations + pub prefixes: Prefixes, attributes: BTreeMap, children: Vec, } @@ -717,6 +718,18 @@ impl Element { })?; self.children.remove(idx).into_element() } + + /// Remove the leading nodes up to the first child element and + /// return it + pub fn unshift_child(&mut self) -> Option { + while self.children.len() > 0 { + if let Some(el) = self.children.remove(0).into_element() { + return Some(el); + } + } + + None + } } /// An iterator over references to child elements of an `Element`. diff --git a/minidom/src/error.rs b/minidom/src/error.rs index fb1496a..7db6502 100644 --- a/minidom/src/error.rs +++ b/minidom/src/error.rs @@ -23,9 +23,6 @@ pub enum Error { /// Error from the Tokenizer TokenizerError(crate::tokenizer::TokenizerError), - /// An UTF-8 conversion error. - Utf8Error(::std::str::Utf8Error), - /// An I/O error, from std::io. IoError(::std::io::Error), @@ -51,7 +48,6 @@ impl StdError for Error { match self { Error::XmlError(e) => Some(e), Error::TokenizerError(e) => Some(e), - Error::Utf8Error(e) => Some(e), Error::IoError(e) => Some(e), Error::EndOfDocument => None, Error::InvalidElementClosed => None, @@ -67,7 +63,6 @@ impl std::fmt::Display for Error { match self { Error::XmlError(e) => write!(fmt, "XML error: {}", e), Error::TokenizerError(e) => write!(fmt, "XML tokenizer error: {}", e), - Error::Utf8Error(e) => write!(fmt, "UTF-8 error: {}", e), Error::IoError(e) => write!(fmt, "IO error: {}", e), Error::EndOfDocument => { write!(fmt, "the end of the document has been reached prematurely") @@ -94,12 +89,6 @@ impl From for Error { } } -impl From<::std::str::Utf8Error> for Error { - fn from(err: ::std::str::Utf8Error) -> Error { - Error::Utf8Error(err) - } -} - impl From<::std::io::Error> for Error { fn from(err: ::std::io::Error) -> Error { Error::IoError(err) diff --git a/minidom/src/tokenizer.rs b/minidom/src/tokenizer.rs index d787484..e0f13d1 100644 --- a/minidom/src/tokenizer.rs +++ b/minidom/src/tokenizer.rs @@ -3,7 +3,7 @@ //! Streaming tokenizer (SAX parser) use bytes::BytesMut; -use super::Token; +use super::{Error, Token}; /// `Result::Err` type returned from `Tokenizer` pub type TokenizerError = nom::error::Error; @@ -32,7 +32,7 @@ impl Tokenizer { } /// Parse the next document fragment - pub fn pull(&mut self) -> Result, TokenizerError> { + pub fn pull(&mut self) -> Result, Error> { /// cannot return an error with location info that points to /// our buffer that we still want to mutate fn with_input_to_owned(e: nom::error::Error<&[u8]>) -> TokenizerError { @@ -50,9 +50,9 @@ impl Tokenizer { Result::Err(nom::Err::Incomplete(_)) => None, Result::Err(nom::Err::Error(e)) => - return Err(with_input_to_owned(e)), + return Err(with_input_to_owned(e).into()), Result::Err(nom::Err::Failure(e)) => - return Err(with_input_to_owned(e)), + return Err(with_input_to_owned(e).into()), } }; match result { Some((s_len, token)) => { diff --git a/minidom/src/tree_builder.rs b/minidom/src/tree_builder.rs index 406a600..6166bdf 100644 --- a/minidom/src/tree_builder.rs +++ b/minidom/src/tree_builder.rs @@ -32,12 +32,27 @@ impl TreeBuilder { self.stack.len() } + /// Get the top-most element from the stack but don't remove it + pub fn top(&mut self) -> Option<&Element> { + self.stack.last() + } + /// Pop the top-most element from the stack - pub fn pop(&mut self) -> Option { + fn pop(&mut self) -> Option { self.prefixes_stack.pop(); self.stack.pop() } + /// Unshift the first child of the top element + pub fn unshift_child(&mut self) -> Option { + let depth = self.stack.len(); + if depth > 0 { + self.stack[depth - 1].unshift_child() + } else { + None + } + } + /// Lookup XML namespace declaration for given prefix (or no prefix) fn lookup_prefix(&self, prefix: &Option) -> Option<&str> { for nss in self.prefixes_stack.iter().rev() { diff --git a/tokio-xmpp/Cargo.toml b/tokio-xmpp/Cargo.toml index a80f7a0..5e72f78 100644 --- a/tokio-xmpp/Cargo.toml +++ b/tokio-xmpp/Cargo.toml @@ -25,8 +25,8 @@ tokio-stream = { version = "0.1", features = [] } tokio-util = { version = "0.6", features = ["codec"] } trust-dns-proto = "0.20" trust-dns-resolver = "0.20" -xml5ever = "0.16" xmpp-parsers = "0.19" +minidom = "0.14" webpki-roots = { version = "0.22", optional = true } [build-dependencies] diff --git a/tokio-xmpp/src/error.rs b/tokio-xmpp/src/error.rs index 0efcd66..0818dab 100644 --- a/tokio-xmpp/src/error.rs +++ b/tokio-xmpp/src/error.rs @@ -5,7 +5,6 @@ use std::borrow::Cow; use std::error::Error as StdError; use std::fmt; use std::io::Error as IoError; -use std::str::Utf8Error; #[cfg(feature = "tls-rust")] use tokio_rustls::rustls::client::InvalidDnsNameError; #[cfg(feature = "tls-rust")] @@ -106,44 +105,6 @@ impl From for Error { } } -/// Causes for stream parsing errors -#[derive(Debug)] -pub enum ParserError { - /// Encoding error - Utf8(Utf8Error), - /// XML parse error - Parse(ParseError), - /// Illegal `` - ShortTag, - /// Required by `impl Decoder` - Io(IoError), -} - -impl fmt::Display for ParserError { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match self { - ParserError::Utf8(e) => write!(fmt, "UTF-8 error: {}", e), - ParserError::Parse(e) => write!(fmt, "parse error: {}", e), - ParserError::ShortTag => write!(fmt, "short tag"), - ParserError::Io(e) => write!(fmt, "IO error: {}", e), - } - } -} - -impl StdError for ParserError {} - -impl From for ParserError { - fn from(e: IoError) -> Self { - ParserError::Io(e) - } -} - -impl From for Error { - fn from(e: ParserError) -> Self { - ProtocolError::Parser(e).into() - } -} - /// XML parse error wrapper type #[derive(Debug)] pub struct ParseError(pub Cow<'static, str>); @@ -167,7 +128,7 @@ impl fmt::Display for ParseError { #[derive(Debug)] pub enum ProtocolError { /// XML parser error - Parser(ParserError), + Parser(minidom::Error), /// Error with expected stanza schema Parsers(ParsersError), /// No TLS available @@ -205,12 +166,18 @@ impl fmt::Display for ProtocolError { impl StdError for ProtocolError {} -impl From for ProtocolError { - fn from(e: ParserError) -> Self { +impl From for ProtocolError { + fn from(e: minidom::Error) -> Self { ProtocolError::Parser(e) } } +impl From for Error { + fn from(e: minidom::Error) -> Self { + ProtocolError::Parser(e).into() + } +} + impl From for ProtocolError { fn from(e: ParsersError) -> Self { ProtocolError::Parsers(e) diff --git a/tokio-xmpp/src/lib.rs b/tokio-xmpp/src/lib.rs index 05cfc55..aaf65b2 100644 --- a/tokio-xmpp/src/lib.rs +++ b/tokio-xmpp/src/lib.rs @@ -16,5 +16,5 @@ pub use client::{async_client::Client as AsyncClient, simple_client::Client as S mod component; pub use crate::component::Component; mod error; -pub use crate::error::{AuthError, ConnecterError, Error, ParseError, ParserError, ProtocolError}; +pub use crate::error::{AuthError, ConnecterError, Error, ParseError, ProtocolError}; pub use starttls::starttls; diff --git a/tokio-xmpp/src/xmpp_codec.rs b/tokio-xmpp/src/xmpp_codec.rs index 6c6b92c..fcd6b08 100644 --- a/tokio-xmpp/src/xmpp_codec.rs +++ b/tokio-xmpp/src/xmpp_codec.rs @@ -1,24 +1,16 @@ //! XML stream parser for XMPP -use crate::{ParseError, ParserError}; use bytes::{BufMut, BytesMut}; -use log::{debug, error}; +use log::debug; use std; -use std::borrow::Cow; -use std::collections::vec_deque::VecDeque; use std::collections::HashMap; use std::default::Default; use std::fmt::Write; use std::io; -use std::iter::FromIterator; -use std::str::from_utf8; -use std::sync::Arc; -use std::sync::Mutex; use tokio_util::codec::{Decoder, Encoder}; -use xml5ever::buffer_queue::BufferQueue; -use xml5ever::interface::Attribute; -use xml5ever::tokenizer::{Tag, TagKind, Token, TokenSink, XmlTokenizer}; use xmpp_parsers::Element; +use minidom::{Tokenizer, tree_builder::TreeBuilder}; +use crate::Error; /// Anything that can be sent or received on an XMPP/XML stream #[derive(Debug, Clone, PartialEq, Eq)] @@ -33,175 +25,24 @@ pub enum Packet { StreamEnd, } -type QueueItem = Result; - -/// Parser state -struct ParserSink { - // Ready stanzas, shared with XMPPCodec - queue: Arc>>, - // Parsing stack - stack: Vec, - ns_stack: Vec, String>>, -} - -impl ParserSink { - pub fn new(queue: Arc>>) -> Self { - ParserSink { - queue, - stack: vec![], - ns_stack: vec![], - } - } - - fn push_queue(&self, pkt: Packet) { - self.queue.lock().unwrap().push_back(Ok(pkt)); - } - - fn push_queue_error(&self, e: ParserError) { - self.queue.lock().unwrap().push_back(Err(e)); - } - - /// Lookup XML namespace declaration for given prefix (or no prefix) - fn lookup_ns(&self, prefix: &Option) -> Option<&str> { - for nss in self.ns_stack.iter().rev() { - if let Some(ns) = nss.get(prefix) { - return Some(ns); - } - } - - None - } - - fn handle_start_tag(&mut self, tag: Tag) { - let mut nss = HashMap::new(); - let is_prefix_xmlns = |attr: &Attribute| { - attr.name - .prefix - .as_ref() - .map(|prefix| prefix.eq_str_ignore_ascii_case("xmlns")) - .unwrap_or(false) - }; - for attr in &tag.attrs { - match attr.name.local.as_ref() { - "xmlns" => { - nss.insert(None, attr.value.as_ref().to_owned()); - } - prefix if is_prefix_xmlns(attr) => { - nss.insert(Some(prefix.to_owned()), attr.value.as_ref().to_owned()); - } - _ => (), - } - } - self.ns_stack.push(nss); - - let el = { - let el_ns = self - .lookup_ns(&tag.name.prefix.map(|prefix| prefix.as_ref().to_owned())) - .unwrap(); - let mut el_builder = Element::builder(tag.name.local.as_ref(), el_ns); - for attr in &tag.attrs { - match attr.name.local.as_ref() { - "xmlns" => (), - _ if is_prefix_xmlns(attr) => (), - _ => { - let attr_name = if let Some(ref prefix) = attr.name.prefix { - Cow::Owned(format!("{}:{}", prefix, attr.name.local)) - } else { - Cow::Borrowed(attr.name.local.as_ref()) - }; - el_builder = el_builder.attr(attr_name, attr.value.as_ref()); - } - } - } - el_builder.build() - }; - - if self.stack.is_empty() { - let attrs = HashMap::from_iter(tag.attrs.iter().map(|attr| { - ( - attr.name.local.as_ref().to_owned(), - attr.value.as_ref().to_owned(), - ) - })); - self.push_queue(Packet::StreamStart(attrs)); - } - - self.stack.push(el); - } - - fn handle_end_tag(&mut self) { - let el = self.stack.pop().unwrap(); - self.ns_stack.pop(); - - match self.stack.len() { - // - 0 => self.push_queue(Packet::StreamEnd), - // - 1 => self.push_queue(Packet::Stanza(el)), - len => { - let parent = &mut self.stack[len - 1]; - parent.append_child(el); - } - } - } -} - -impl TokenSink for ParserSink { - fn process_token(&mut self, token: Token) { - match token { - Token::TagToken(tag) => match tag.kind { - TagKind::StartTag => self.handle_start_tag(tag), - TagKind::EndTag => self.handle_end_tag(), - TagKind::EmptyTag => { - self.handle_start_tag(tag); - self.handle_end_tag(); - } - TagKind::ShortTag => self.push_queue_error(ParserError::ShortTag), - }, - Token::CharacterTokens(tendril) => match self.stack.len() { - 0 | 1 => self.push_queue(Packet::Text(tendril.into())), - len => { - let el = &mut self.stack[len - 1]; - el.append_text_node(tendril); - } - }, - Token::EOFToken => self.push_queue(Packet::StreamEnd), - Token::ParseError(s) => { - self.push_queue_error(ParserError::Parse(ParseError(s))); - } - _ => (), - } - } - - // fn end(&mut self) { - // } -} - /// Stateful encoder/decoder for a bytestream from/to XMPP `Packet` pub struct XMPPCodec { /// Outgoing ns: Option, /// Incoming - parser: XmlTokenizer, - /// For handling incoming truncated utf8 - // TODO: optimize using tendrils? - buf: Vec, - /// Shared with ParserSink - queue: Arc>>, + tokenizer: Tokenizer, + stanza_builder: TreeBuilder, } impl XMPPCodec { /// Constructor pub fn new() -> Self { - let queue = Arc::new(Mutex::new(VecDeque::new())); - let sink = ParserSink::new(queue.clone()); - // TODO: configure parser? - let parser = XmlTokenizer::new(sink, Default::default()); + let tokenizer = Tokenizer::new(); + let stanza_builder = TreeBuilder::new(); XMPPCodec { ns: None, - parser, - queue, - buf: vec![], + tokenizer, + stanza_builder, } } } @@ -214,57 +55,42 @@ impl Default for XMPPCodec { impl Decoder for XMPPCodec { type Item = Packet; - type Error = ParserError; + type Error = Error; fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { - let buf1: Box> = if !self.buf.is_empty() && !buf.is_empty() { - let mut prefix = std::mem::replace(&mut self.buf, vec![]); - prefix.extend_from_slice(&buf.split_to(buf.len())); - Box::new(prefix) - } else { - Box::new(buf.split_to(buf.len())) - }; - let buf1 = buf1.as_ref().as_ref(); - match from_utf8(buf1) { - Ok(s) => { - debug!("<< {:?}", s); - if !s.is_empty() { - let mut buffer_queue = BufferQueue::new(); - let tendril = FromIterator::from_iter(s.chars()); - buffer_queue.push_back(tendril); - self.parser.feed(&mut buffer_queue); + self.tokenizer.push(buf); + buf.clear(); + + while let Some(token) = self.tokenizer.pull()? { + let had_stream_root = self.stanza_builder.depth() > 0; + self.stanza_builder.process_token(token)?; + let has_stream_root = self.stanza_builder.depth() > 0; + + if ! had_stream_root && has_stream_root { + let root = self.stanza_builder.top().unwrap(); + let attrs = root.attrs() + .map(|(name, value)| (name.to_owned(), value.to_owned())) + .chain( + root.prefixes.declared_prefixes() + .iter() + .map(|(prefix, namespace)| ( + prefix.as_ref().map(|prefix| format!("xmlns:{}", prefix)) + .unwrap_or_else(|| "xmlns".to_owned()), + namespace.clone() + )) + ) + .collect(); + return Ok(Some(Packet::StreamStart(attrs))); + } else if self.stanza_builder.depth() == 1 { + if let Some(stanza) = self.stanza_builder.unshift_child() { + return Ok(Some(Packet::Stanza(stanza))); } - } - // Remedies for truncated utf8 - Err(e) if e.valid_up_to() >= buf1.len() - 3 => { - // Prepare all the valid data - let mut b = BytesMut::with_capacity(e.valid_up_to()); - b.put(&buf1[0..e.valid_up_to()]); - - // Retry - let result = self.decode(&mut b); - - // Keep the tail back in - self.buf.extend_from_slice(&buf1[e.valid_up_to()..]); - - return result; - } - Err(e) => { - error!( - "error {} at {}/{} in {:?}", - e, - e.valid_up_to(), - buf1.len(), - buf1 - ); - return Err(ParserError::Utf8(e)); + } else if let Some(_) = self.stanza_builder.root.take() { + return Ok(Some(Packet::StreamEnd)); } } - match self.queue.lock().unwrap().pop_front() { - None => Ok(None), - Some(result) => result.map(|pkt| Some(pkt)), - } + Ok(None) } fn decode_eof(&mut self, buf: &mut BytesMut) -> Result, Self::Error> {