mirror of
https://gitlab.com/xmpp-rs/xmpp-rs.git
synced 2024-07-12 22:21:53 +00:00
tokio-xmpp: use new minidom tokenizer
This commit is contained in:
parent
c1e661dd61
commit
8e5a7530ed
8 changed files with 84 additions and 274 deletions
|
@ -84,7 +84,8 @@ pub struct Element {
|
|||
/// This is only used when deserializing. If you have to use a custom prefix use
|
||||
/// `ElementBuilder::prefix`.
|
||||
pub(crate) prefix: Option<Prefix>,
|
||||
prefixes: Prefixes,
|
||||
/// Namespace declarations
|
||||
pub prefixes: Prefixes,
|
||||
attributes: BTreeMap<String, String>,
|
||||
children: Vec<Node>,
|
||||
}
|
||||
|
@ -717,6 +718,18 @@ impl Element {
|
|||
})?;
|
||||
self.children.remove(idx).into_element()
|
||||
}
|
||||
|
||||
/// Remove the leading nodes up to the first child element and
|
||||
/// return it
|
||||
pub fn unshift_child(&mut self) -> Option<Element> {
|
||||
while self.children.len() > 0 {
|
||||
if let Some(el) = self.children.remove(0).into_element() {
|
||||
return Some(el);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// An iterator over references to child elements of an `Element`.
|
||||
|
|
|
@ -23,9 +23,6 @@ pub enum Error {
|
|||
/// Error from the Tokenizer
|
||||
TokenizerError(crate::tokenizer::TokenizerError),
|
||||
|
||||
/// An UTF-8 conversion error.
|
||||
Utf8Error(::std::str::Utf8Error),
|
||||
|
||||
/// An I/O error, from std::io.
|
||||
IoError(::std::io::Error),
|
||||
|
||||
|
@ -51,7 +48,6 @@ impl StdError for Error {
|
|||
match self {
|
||||
Error::XmlError(e) => Some(e),
|
||||
Error::TokenizerError(e) => Some(e),
|
||||
Error::Utf8Error(e) => Some(e),
|
||||
Error::IoError(e) => Some(e),
|
||||
Error::EndOfDocument => None,
|
||||
Error::InvalidElementClosed => None,
|
||||
|
@ -67,7 +63,6 @@ impl std::fmt::Display for Error {
|
|||
match self {
|
||||
Error::XmlError(e) => write!(fmt, "XML error: {}", e),
|
||||
Error::TokenizerError(e) => write!(fmt, "XML tokenizer error: {}", e),
|
||||
Error::Utf8Error(e) => write!(fmt, "UTF-8 error: {}", e),
|
||||
Error::IoError(e) => write!(fmt, "IO error: {}", e),
|
||||
Error::EndOfDocument => {
|
||||
write!(fmt, "the end of the document has been reached prematurely")
|
||||
|
@ -94,12 +89,6 @@ impl From<crate::tokenizer::TokenizerError> for Error {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<::std::str::Utf8Error> for Error {
|
||||
fn from(err: ::std::str::Utf8Error) -> Error {
|
||||
Error::Utf8Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<::std::io::Error> for Error {
|
||||
fn from(err: ::std::io::Error) -> Error {
|
||||
Error::IoError(err)
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
//! Streaming tokenizer (SAX parser)
|
||||
|
||||
use bytes::BytesMut;
|
||||
use super::Token;
|
||||
use super::{Error, Token};
|
||||
|
||||
/// `Result::Err` type returned from `Tokenizer`
|
||||
pub type TokenizerError = nom::error::Error<String>;
|
||||
|
@ -32,7 +32,7 @@ impl Tokenizer {
|
|||
}
|
||||
|
||||
/// Parse the next document fragment
|
||||
pub fn pull(&mut self) -> Result<Option<Token>, TokenizerError> {
|
||||
pub fn pull(&mut self) -> Result<Option<Token>, Error> {
|
||||
/// cannot return an error with location info that points to
|
||||
/// our buffer that we still want to mutate
|
||||
fn with_input_to_owned(e: nom::error::Error<&[u8]>) -> TokenizerError {
|
||||
|
@ -50,9 +50,9 @@ impl Tokenizer {
|
|||
Result::Err(nom::Err::Incomplete(_)) =>
|
||||
None,
|
||||
Result::Err(nom::Err::Error(e)) =>
|
||||
return Err(with_input_to_owned(e)),
|
||||
return Err(with_input_to_owned(e).into()),
|
||||
Result::Err(nom::Err::Failure(e)) =>
|
||||
return Err(with_input_to_owned(e)),
|
||||
return Err(with_input_to_owned(e).into()),
|
||||
} };
|
||||
match result {
|
||||
Some((s_len, token)) => {
|
||||
|
|
|
@ -32,12 +32,27 @@ impl TreeBuilder {
|
|||
self.stack.len()
|
||||
}
|
||||
|
||||
/// Get the top-most element from the stack but don't remove it
|
||||
pub fn top(&mut self) -> Option<&Element> {
|
||||
self.stack.last()
|
||||
}
|
||||
|
||||
/// Pop the top-most element from the stack
|
||||
pub fn pop(&mut self) -> Option<Element> {
|
||||
fn pop(&mut self) -> Option<Element> {
|
||||
self.prefixes_stack.pop();
|
||||
self.stack.pop()
|
||||
}
|
||||
|
||||
/// Unshift the first child of the top element
|
||||
pub fn unshift_child(&mut self) -> Option<Element> {
|
||||
let depth = self.stack.len();
|
||||
if depth > 0 {
|
||||
self.stack[depth - 1].unshift_child()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Lookup XML namespace declaration for given prefix (or no prefix)
|
||||
fn lookup_prefix(&self, prefix: &Option<String>) -> Option<&str> {
|
||||
for nss in self.prefixes_stack.iter().rev() {
|
||||
|
|
|
@ -25,8 +25,8 @@ tokio-stream = { version = "0.1", features = [] }
|
|||
tokio-util = { version = "0.6", features = ["codec"] }
|
||||
trust-dns-proto = "0.20"
|
||||
trust-dns-resolver = "0.20"
|
||||
xml5ever = "0.16"
|
||||
xmpp-parsers = "0.19"
|
||||
minidom = "0.14"
|
||||
webpki-roots = { version = "0.22", optional = true }
|
||||
|
||||
[build-dependencies]
|
||||
|
|
|
@ -5,7 +5,6 @@ use std::borrow::Cow;
|
|||
use std::error::Error as StdError;
|
||||
use std::fmt;
|
||||
use std::io::Error as IoError;
|
||||
use std::str::Utf8Error;
|
||||
#[cfg(feature = "tls-rust")]
|
||||
use tokio_rustls::rustls::client::InvalidDnsNameError;
|
||||
#[cfg(feature = "tls-rust")]
|
||||
|
@ -106,44 +105,6 @@ impl From<InvalidDnsNameError> for Error {
|
|||
}
|
||||
}
|
||||
|
||||
/// Causes for stream parsing errors
|
||||
#[derive(Debug)]
|
||||
pub enum ParserError {
|
||||
/// Encoding error
|
||||
Utf8(Utf8Error),
|
||||
/// XML parse error
|
||||
Parse(ParseError),
|
||||
/// Illegal `</>`
|
||||
ShortTag,
|
||||
/// Required by `impl Decoder`
|
||||
Io(IoError),
|
||||
}
|
||||
|
||||
impl fmt::Display for ParserError {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
ParserError::Utf8(e) => write!(fmt, "UTF-8 error: {}", e),
|
||||
ParserError::Parse(e) => write!(fmt, "parse error: {}", e),
|
||||
ParserError::ShortTag => write!(fmt, "short tag"),
|
||||
ParserError::Io(e) => write!(fmt, "IO error: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl StdError for ParserError {}
|
||||
|
||||
impl From<IoError> for ParserError {
|
||||
fn from(e: IoError) -> Self {
|
||||
ParserError::Io(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ParserError> for Error {
|
||||
fn from(e: ParserError) -> Self {
|
||||
ProtocolError::Parser(e).into()
|
||||
}
|
||||
}
|
||||
|
||||
/// XML parse error wrapper type
|
||||
#[derive(Debug)]
|
||||
pub struct ParseError(pub Cow<'static, str>);
|
||||
|
@ -167,7 +128,7 @@ impl fmt::Display for ParseError {
|
|||
#[derive(Debug)]
|
||||
pub enum ProtocolError {
|
||||
/// XML parser error
|
||||
Parser(ParserError),
|
||||
Parser(minidom::Error),
|
||||
/// Error with expected stanza schema
|
||||
Parsers(ParsersError),
|
||||
/// No TLS available
|
||||
|
@ -205,12 +166,18 @@ impl fmt::Display for ProtocolError {
|
|||
|
||||
impl StdError for ProtocolError {}
|
||||
|
||||
impl From<ParserError> for ProtocolError {
|
||||
fn from(e: ParserError) -> Self {
|
||||
impl From<minidom::Error> for ProtocolError {
|
||||
fn from(e: minidom::Error) -> Self {
|
||||
ProtocolError::Parser(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<minidom::Error> for Error {
|
||||
fn from(e: minidom::Error) -> Self {
|
||||
ProtocolError::Parser(e).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ParsersError> for ProtocolError {
|
||||
fn from(e: ParsersError) -> Self {
|
||||
ProtocolError::Parsers(e)
|
||||
|
|
|
@ -16,5 +16,5 @@ pub use client::{async_client::Client as AsyncClient, simple_client::Client as S
|
|||
mod component;
|
||||
pub use crate::component::Component;
|
||||
mod error;
|
||||
pub use crate::error::{AuthError, ConnecterError, Error, ParseError, ParserError, ProtocolError};
|
||||
pub use crate::error::{AuthError, ConnecterError, Error, ParseError, ProtocolError};
|
||||
pub use starttls::starttls;
|
||||
|
|
|
@ -1,24 +1,16 @@
|
|||
//! XML stream parser for XMPP
|
||||
|
||||
use crate::{ParseError, ParserError};
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use log::{debug, error};
|
||||
use log::debug;
|
||||
use std;
|
||||
use std::borrow::Cow;
|
||||
use std::collections::vec_deque::VecDeque;
|
||||
use std::collections::HashMap;
|
||||
use std::default::Default;
|
||||
use std::fmt::Write;
|
||||
use std::io;
|
||||
use std::iter::FromIterator;
|
||||
use std::str::from_utf8;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use tokio_util::codec::{Decoder, Encoder};
|
||||
use xml5ever::buffer_queue::BufferQueue;
|
||||
use xml5ever::interface::Attribute;
|
||||
use xml5ever::tokenizer::{Tag, TagKind, Token, TokenSink, XmlTokenizer};
|
||||
use xmpp_parsers::Element;
|
||||
use minidom::{Tokenizer, tree_builder::TreeBuilder};
|
||||
use crate::Error;
|
||||
|
||||
/// Anything that can be sent or received on an XMPP/XML stream
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
|
@ -33,175 +25,24 @@ pub enum Packet {
|
|||
StreamEnd,
|
||||
}
|
||||
|
||||
type QueueItem = Result<Packet, ParserError>;
|
||||
|
||||
/// Parser state
|
||||
struct ParserSink {
|
||||
// Ready stanzas, shared with XMPPCodec
|
||||
queue: Arc<Mutex<VecDeque<QueueItem>>>,
|
||||
// Parsing stack
|
||||
stack: Vec<Element>,
|
||||
ns_stack: Vec<HashMap<Option<String>, String>>,
|
||||
}
|
||||
|
||||
impl ParserSink {
|
||||
pub fn new(queue: Arc<Mutex<VecDeque<QueueItem>>>) -> Self {
|
||||
ParserSink {
|
||||
queue,
|
||||
stack: vec![],
|
||||
ns_stack: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
fn push_queue(&self, pkt: Packet) {
|
||||
self.queue.lock().unwrap().push_back(Ok(pkt));
|
||||
}
|
||||
|
||||
fn push_queue_error(&self, e: ParserError) {
|
||||
self.queue.lock().unwrap().push_back(Err(e));
|
||||
}
|
||||
|
||||
/// Lookup XML namespace declaration for given prefix (or no prefix)
|
||||
fn lookup_ns(&self, prefix: &Option<String>) -> Option<&str> {
|
||||
for nss in self.ns_stack.iter().rev() {
|
||||
if let Some(ns) = nss.get(prefix) {
|
||||
return Some(ns);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
fn handle_start_tag(&mut self, tag: Tag) {
|
||||
let mut nss = HashMap::new();
|
||||
let is_prefix_xmlns = |attr: &Attribute| {
|
||||
attr.name
|
||||
.prefix
|
||||
.as_ref()
|
||||
.map(|prefix| prefix.eq_str_ignore_ascii_case("xmlns"))
|
||||
.unwrap_or(false)
|
||||
};
|
||||
for attr in &tag.attrs {
|
||||
match attr.name.local.as_ref() {
|
||||
"xmlns" => {
|
||||
nss.insert(None, attr.value.as_ref().to_owned());
|
||||
}
|
||||
prefix if is_prefix_xmlns(attr) => {
|
||||
nss.insert(Some(prefix.to_owned()), attr.value.as_ref().to_owned());
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
self.ns_stack.push(nss);
|
||||
|
||||
let el = {
|
||||
let el_ns = self
|
||||
.lookup_ns(&tag.name.prefix.map(|prefix| prefix.as_ref().to_owned()))
|
||||
.unwrap();
|
||||
let mut el_builder = Element::builder(tag.name.local.as_ref(), el_ns);
|
||||
for attr in &tag.attrs {
|
||||
match attr.name.local.as_ref() {
|
||||
"xmlns" => (),
|
||||
_ if is_prefix_xmlns(attr) => (),
|
||||
_ => {
|
||||
let attr_name = if let Some(ref prefix) = attr.name.prefix {
|
||||
Cow::Owned(format!("{}:{}", prefix, attr.name.local))
|
||||
} else {
|
||||
Cow::Borrowed(attr.name.local.as_ref())
|
||||
};
|
||||
el_builder = el_builder.attr(attr_name, attr.value.as_ref());
|
||||
}
|
||||
}
|
||||
}
|
||||
el_builder.build()
|
||||
};
|
||||
|
||||
if self.stack.is_empty() {
|
||||
let attrs = HashMap::from_iter(tag.attrs.iter().map(|attr| {
|
||||
(
|
||||
attr.name.local.as_ref().to_owned(),
|
||||
attr.value.as_ref().to_owned(),
|
||||
)
|
||||
}));
|
||||
self.push_queue(Packet::StreamStart(attrs));
|
||||
}
|
||||
|
||||
self.stack.push(el);
|
||||
}
|
||||
|
||||
fn handle_end_tag(&mut self) {
|
||||
let el = self.stack.pop().unwrap();
|
||||
self.ns_stack.pop();
|
||||
|
||||
match self.stack.len() {
|
||||
// </stream:stream>
|
||||
0 => self.push_queue(Packet::StreamEnd),
|
||||
// </stanza>
|
||||
1 => self.push_queue(Packet::Stanza(el)),
|
||||
len => {
|
||||
let parent = &mut self.stack[len - 1];
|
||||
parent.append_child(el);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TokenSink for ParserSink {
|
||||
fn process_token(&mut self, token: Token) {
|
||||
match token {
|
||||
Token::TagToken(tag) => match tag.kind {
|
||||
TagKind::StartTag => self.handle_start_tag(tag),
|
||||
TagKind::EndTag => self.handle_end_tag(),
|
||||
TagKind::EmptyTag => {
|
||||
self.handle_start_tag(tag);
|
||||
self.handle_end_tag();
|
||||
}
|
||||
TagKind::ShortTag => self.push_queue_error(ParserError::ShortTag),
|
||||
},
|
||||
Token::CharacterTokens(tendril) => match self.stack.len() {
|
||||
0 | 1 => self.push_queue(Packet::Text(tendril.into())),
|
||||
len => {
|
||||
let el = &mut self.stack[len - 1];
|
||||
el.append_text_node(tendril);
|
||||
}
|
||||
},
|
||||
Token::EOFToken => self.push_queue(Packet::StreamEnd),
|
||||
Token::ParseError(s) => {
|
||||
self.push_queue_error(ParserError::Parse(ParseError(s)));
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
// fn end(&mut self) {
|
||||
// }
|
||||
}
|
||||
|
||||
/// Stateful encoder/decoder for a bytestream from/to XMPP `Packet`
|
||||
pub struct XMPPCodec {
|
||||
/// Outgoing
|
||||
ns: Option<String>,
|
||||
/// Incoming
|
||||
parser: XmlTokenizer<ParserSink>,
|
||||
/// For handling incoming truncated utf8
|
||||
// TODO: optimize using tendrils?
|
||||
buf: Vec<u8>,
|
||||
/// Shared with ParserSink
|
||||
queue: Arc<Mutex<VecDeque<QueueItem>>>,
|
||||
tokenizer: Tokenizer,
|
||||
stanza_builder: TreeBuilder,
|
||||
}
|
||||
|
||||
impl XMPPCodec {
|
||||
/// Constructor
|
||||
pub fn new() -> Self {
|
||||
let queue = Arc::new(Mutex::new(VecDeque::new()));
|
||||
let sink = ParserSink::new(queue.clone());
|
||||
// TODO: configure parser?
|
||||
let parser = XmlTokenizer::new(sink, Default::default());
|
||||
let tokenizer = Tokenizer::new();
|
||||
let stanza_builder = TreeBuilder::new();
|
||||
XMPPCodec {
|
||||
ns: None,
|
||||
parser,
|
||||
queue,
|
||||
buf: vec![],
|
||||
tokenizer,
|
||||
stanza_builder,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -214,57 +55,42 @@ impl Default for XMPPCodec {
|
|||
|
||||
impl Decoder for XMPPCodec {
|
||||
type Item = Packet;
|
||||
type Error = ParserError;
|
||||
type Error = Error;
|
||||
|
||||
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||
let buf1: Box<dyn AsRef<[u8]>> = if !self.buf.is_empty() && !buf.is_empty() {
|
||||
let mut prefix = std::mem::replace(&mut self.buf, vec![]);
|
||||
prefix.extend_from_slice(&buf.split_to(buf.len()));
|
||||
Box::new(prefix)
|
||||
} else {
|
||||
Box::new(buf.split_to(buf.len()))
|
||||
};
|
||||
let buf1 = buf1.as_ref().as_ref();
|
||||
match from_utf8(buf1) {
|
||||
Ok(s) => {
|
||||
debug!("<< {:?}", s);
|
||||
if !s.is_empty() {
|
||||
let mut buffer_queue = BufferQueue::new();
|
||||
let tendril = FromIterator::from_iter(s.chars());
|
||||
buffer_queue.push_back(tendril);
|
||||
self.parser.feed(&mut buffer_queue);
|
||||
}
|
||||
}
|
||||
// Remedies for truncated utf8
|
||||
Err(e) if e.valid_up_to() >= buf1.len() - 3 => {
|
||||
// Prepare all the valid data
|
||||
let mut b = BytesMut::with_capacity(e.valid_up_to());
|
||||
b.put(&buf1[0..e.valid_up_to()]);
|
||||
self.tokenizer.push(buf);
|
||||
buf.clear();
|
||||
|
||||
// Retry
|
||||
let result = self.decode(&mut b);
|
||||
while let Some(token) = self.tokenizer.pull()? {
|
||||
let had_stream_root = self.stanza_builder.depth() > 0;
|
||||
self.stanza_builder.process_token(token)?;
|
||||
let has_stream_root = self.stanza_builder.depth() > 0;
|
||||
|
||||
// Keep the tail back in
|
||||
self.buf.extend_from_slice(&buf1[e.valid_up_to()..]);
|
||||
|
||||
return result;
|
||||
if ! had_stream_root && has_stream_root {
|
||||
let root = self.stanza_builder.top().unwrap();
|
||||
let attrs = root.attrs()
|
||||
.map(|(name, value)| (name.to_owned(), value.to_owned()))
|
||||
.chain(
|
||||
root.prefixes.declared_prefixes()
|
||||
.iter()
|
||||
.map(|(prefix, namespace)| (
|
||||
prefix.as_ref().map(|prefix| format!("xmlns:{}", prefix))
|
||||
.unwrap_or_else(|| "xmlns".to_owned()),
|
||||
namespace.clone()
|
||||
))
|
||||
)
|
||||
.collect();
|
||||
return Ok(Some(Packet::StreamStart(attrs)));
|
||||
} else if self.stanza_builder.depth() == 1 {
|
||||
if let Some(stanza) = self.stanza_builder.unshift_child() {
|
||||
return Ok(Some(Packet::Stanza(stanza)));
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"error {} at {}/{} in {:?}",
|
||||
e,
|
||||
e.valid_up_to(),
|
||||
buf1.len(),
|
||||
buf1
|
||||
);
|
||||
return Err(ParserError::Utf8(e));
|
||||
} else if let Some(_) = self.stanza_builder.root.take() {
|
||||
return Ok(Some(Packet::StreamEnd));
|
||||
}
|
||||
}
|
||||
|
||||
match self.queue.lock().unwrap().pop_front() {
|
||||
None => Ok(None),
|
||||
Some(result) => result.map(|pkt| Some(pkt)),
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||
|
|
Loading…
Reference in a new issue