mirror of
https://gitlab.com/xmpp-rs/xmpp-rs.git
synced 2024-07-12 22:21:53 +00:00
tokio-xmpp: implement more rxml stuff
This commit is contained in:
parent
2e21911c47
commit
67c242674e
4 changed files with 20 additions and 7 deletions
|
@ -46,7 +46,7 @@ impl TryFrom<Element> for TimeResult {
|
|||
check_no_children!(child, "tzo");
|
||||
check_no_attributes!(child, "tzo");
|
||||
// TODO: Add a FromStr implementation to FixedOffset to avoid this hack.
|
||||
let fake_date = String::from("2019-04-22T11:38:00") + &child.text();
|
||||
let fake_date = format!("{}{}", "2019-04-22T11:38:00", child.text());
|
||||
let date_time = DateTime::from_str(&fake_date)?;
|
||||
tzo = Some(date_time.timezone());
|
||||
} else if child.is("utc", ns::TIME) {
|
||||
|
|
|
@ -27,6 +27,7 @@ trust-dns-proto = "0.20"
|
|||
trust-dns-resolver = "0.20"
|
||||
xmpp-parsers = "0.19"
|
||||
minidom = "0.14"
|
||||
rxml = { git = "https://github.com/horazont/rxml.git" }
|
||||
webpki-roots = { version = "0.22", optional = true }
|
||||
|
||||
[build-dependencies]
|
||||
|
|
|
@ -9,7 +9,8 @@ use std::fmt::Write;
|
|||
use std::io;
|
||||
use tokio_util::codec::{Decoder, Encoder};
|
||||
use xmpp_parsers::Element;
|
||||
use minidom::{tokenize, tree_builder::TreeBuilder};
|
||||
use minidom::tree_builder::TreeBuilder;
|
||||
use rxml::{EventRead, Lexer, PushDriver, RawParser};
|
||||
use crate::Error;
|
||||
|
||||
/// Anything that can be sent or received on an XMPP/XML stream
|
||||
|
@ -30,6 +31,7 @@ pub struct XMPPCodec {
|
|||
/// Outgoing
|
||||
ns: Option<String>,
|
||||
/// Incoming
|
||||
driver: PushDriver<'static, RawParser>,
|
||||
stanza_builder: TreeBuilder,
|
||||
}
|
||||
|
||||
|
@ -37,8 +39,10 @@ impl XMPPCodec {
|
|||
/// Constructor
|
||||
pub fn new() -> Self {
|
||||
let stanza_builder = TreeBuilder::new();
|
||||
let driver = PushDriver::wrap(Lexer::new(), RawParser::new());
|
||||
XMPPCodec {
|
||||
ns: None,
|
||||
driver,
|
||||
stanza_builder,
|
||||
}
|
||||
}
|
||||
|
@ -55,9 +59,13 @@ impl Decoder for XMPPCodec {
|
|||
type Error = Error;
|
||||
|
||||
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||
while let Some(token) = tokenize(buf)? {
|
||||
// TODO: avoid the .to_owned
|
||||
self.driver.feed(std::borrow::Cow::from(buf.as_ref().to_owned()));
|
||||
buf.clear();
|
||||
|
||||
while let Some(token) = self.driver.read().map_err(|e| minidom::Error::from(e))? {
|
||||
let had_stream_root = self.stanza_builder.depth() > 0;
|
||||
self.stanza_builder.process_token(token)?;
|
||||
self.stanza_builder.process_event(token)?;
|
||||
let has_stream_root = self.stanza_builder.depth() > 0;
|
||||
|
||||
if ! had_stream_root && has_stream_root {
|
||||
|
@ -76,10 +84,14 @@ impl Decoder for XMPPCodec {
|
|||
.collect();
|
||||
return Ok(Some(Packet::StreamStart(attrs)));
|
||||
} else if self.stanza_builder.depth() == 1 {
|
||||
self.driver.release_temporaries();
|
||||
|
||||
if let Some(stanza) = self.stanza_builder.unshift_child() {
|
||||
return Ok(Some(Packet::Stanza(stanza)));
|
||||
}
|
||||
} else if let Some(_) = self.stanza_builder.root.take() {
|
||||
self.driver.release_temporaries();
|
||||
|
||||
return Ok(Some(Packet::StreamEnd));
|
||||
}
|
||||
}
|
||||
|
@ -317,7 +329,7 @@ mod tests {
|
|||
block_on(framed.send(Packet::Stanza(stanza))).expect("send");
|
||||
assert_eq!(
|
||||
framed.get_ref().get_ref(),
|
||||
&("<message xmlns=\"jabber:client\"><body>".to_owned() + &text + "</body></message>")
|
||||
&format!("<message xmlns=\"jabber:client\"><body>{}</body></message>", text)
|
||||
.as_bytes()
|
||||
);
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> XMPPStream<S> {
|
|||
}
|
||||
|
||||
/// Send a `<stream:stream>` start tag
|
||||
pub async fn start<'a>(stream: S, jid: Jid, ns: String) -> Result<Self, Error> {
|
||||
pub async fn start(stream: S, jid: Jid, ns: String) -> Result<Self, Error> {
|
||||
let xmpp_stream = Framed::new(stream, XMPPCodec::new());
|
||||
stream_start::start(xmpp_stream, jid, ns).await
|
||||
}
|
||||
|
@ -65,7 +65,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> XMPPStream<S> {
|
|||
}
|
||||
|
||||
/// Re-run `start()`
|
||||
pub async fn restart<'a>(self) -> Result<Self, Error> {
|
||||
pub async fn restart(self) -> Result<Self, Error> {
|
||||
let stream = self.stream.into_inner().unwrap().into_inner();
|
||||
Self::start(stream, self.jid, self.ns).await
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue