From 67c242674e7a737a18ff885f17378db48633d7cc Mon Sep 17 00:00:00 2001 From: Astro Date: Sun, 10 Apr 2022 02:56:57 +0200 Subject: [PATCH] tokio-xmpp: implement more rxml stuff --- parsers/src/time.rs | 2 +- tokio-xmpp/Cargo.toml | 1 + tokio-xmpp/src/xmpp_codec.rs | 20 ++++++++++++++++---- tokio-xmpp/src/xmpp_stream.rs | 4 ++-- 4 files changed, 20 insertions(+), 7 deletions(-) diff --git a/parsers/src/time.rs b/parsers/src/time.rs index d97c941..3b468b4 100644 --- a/parsers/src/time.rs +++ b/parsers/src/time.rs @@ -46,7 +46,7 @@ impl TryFrom for TimeResult { check_no_children!(child, "tzo"); check_no_attributes!(child, "tzo"); // TODO: Add a FromStr implementation to FixedOffset to avoid this hack. - let fake_date = String::from("2019-04-22T11:38:00") + &child.text(); + let fake_date = format!("{}{}", "2019-04-22T11:38:00", child.text()); let date_time = DateTime::from_str(&fake_date)?; tzo = Some(date_time.timezone()); } else if child.is("utc", ns::TIME) { diff --git a/tokio-xmpp/Cargo.toml b/tokio-xmpp/Cargo.toml index 5e72f78..5ffa826 100644 --- a/tokio-xmpp/Cargo.toml +++ b/tokio-xmpp/Cargo.toml @@ -27,6 +27,7 @@ trust-dns-proto = "0.20" trust-dns-resolver = "0.20" xmpp-parsers = "0.19" minidom = "0.14" +rxml = { git = "https://github.com/horazont/rxml.git" } webpki-roots = { version = "0.22", optional = true } [build-dependencies] diff --git a/tokio-xmpp/src/xmpp_codec.rs b/tokio-xmpp/src/xmpp_codec.rs index b990897..5b65f42 100644 --- a/tokio-xmpp/src/xmpp_codec.rs +++ b/tokio-xmpp/src/xmpp_codec.rs @@ -9,7 +9,8 @@ use std::fmt::Write; use std::io; use tokio_util::codec::{Decoder, Encoder}; use xmpp_parsers::Element; -use minidom::{tokenize, tree_builder::TreeBuilder}; +use minidom::tree_builder::TreeBuilder; +use rxml::{EventRead, Lexer, PushDriver, RawParser}; use crate::Error; /// Anything that can be sent or received on an XMPP/XML stream @@ -30,6 +31,7 @@ pub struct XMPPCodec { /// Outgoing ns: Option, /// Incoming + driver: PushDriver<'static, RawParser>, stanza_builder: TreeBuilder, } @@ -37,8 +39,10 @@ impl XMPPCodec { /// Constructor pub fn new() -> Self { let stanza_builder = TreeBuilder::new(); + let driver = PushDriver::wrap(Lexer::new(), RawParser::new()); XMPPCodec { ns: None, + driver, stanza_builder, } } @@ -55,9 +59,13 @@ impl Decoder for XMPPCodec { type Error = Error; fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { - while let Some(token) = tokenize(buf)? { + // TODO: avoid the .to_owned + self.driver.feed(std::borrow::Cow::from(buf.as_ref().to_owned())); + buf.clear(); + + while let Some(token) = self.driver.read().map_err(|e| minidom::Error::from(e))? { let had_stream_root = self.stanza_builder.depth() > 0; - self.stanza_builder.process_token(token)?; + self.stanza_builder.process_event(token)?; let has_stream_root = self.stanza_builder.depth() > 0; if ! had_stream_root && has_stream_root { @@ -76,10 +84,14 @@ impl Decoder for XMPPCodec { .collect(); return Ok(Some(Packet::StreamStart(attrs))); } else if self.stanza_builder.depth() == 1 { + self.driver.release_temporaries(); + if let Some(stanza) = self.stanza_builder.unshift_child() { return Ok(Some(Packet::Stanza(stanza))); } } else if let Some(_) = self.stanza_builder.root.take() { + self.driver.release_temporaries(); + return Ok(Some(Packet::StreamEnd)); } } @@ -317,7 +329,7 @@ mod tests { block_on(framed.send(Packet::Stanza(stanza))).expect("send"); assert_eq!( framed.get_ref().get_ref(), - &("".to_owned() + &text + "") + &format!("{}", text) .as_bytes() ); } diff --git a/tokio-xmpp/src/xmpp_stream.rs b/tokio-xmpp/src/xmpp_stream.rs index 54b3821..1d474d7 100644 --- a/tokio-xmpp/src/xmpp_stream.rs +++ b/tokio-xmpp/src/xmpp_stream.rs @@ -54,7 +54,7 @@ impl XMPPStream { } /// Send a `` start tag - pub async fn start<'a>(stream: S, jid: Jid, ns: String) -> Result { + pub async fn start(stream: S, jid: Jid, ns: String) -> Result { let xmpp_stream = Framed::new(stream, XMPPCodec::new()); stream_start::start(xmpp_stream, jid, ns).await } @@ -65,7 +65,7 @@ impl XMPPStream { } /// Re-run `start()` - pub async fn restart<'a>(self) -> Result { + pub async fn restart(self) -> Result { let stream = self.stream.into_inner().unwrap().into_inner(); Self::start(stream, self.jid, self.ns).await }