diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 6d38814d..3038e39f 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,4 +1,4 @@ -image: "pitkley/rust:nightly" +image: "scorpil/rust:nightly" before_script: - apt-get update -yqq diff --git a/Cargo.toml b/Cargo.toml index b721ff66..04aa98f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,12 +15,12 @@ license = "LGPL-3.0+" gitlab = { repository = "lumi/xmpp-rs" } [dependencies] -quick-xml = "0.10.0" -xmpp-parsers = "0.9.0" +xml-rs = "0.4.1" +xmpp-parsers = "0.7.0" openssl = "0.9.12" base64 = "0.6.0" -minidom = "0.7.0" -jid = { version = "0.4", features = ["minidom"] } +minidom = "0.4.1" +jid = "0.2.1" sasl = "0.4.0" sha-1 = "0.4" chrono = "0.4.0" diff --git a/src/client.rs b/src/client.rs index b2a00537..bfab1746 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,3 +1,4 @@ +use xml; use jid::Jid; use transport::{Transport, SslTransport}; use error::Error; @@ -16,7 +17,7 @@ use base64; use minidom::Element; -use quick_xml::events::Event as XmlEvent; +use xml::reader::XmlEvent as ReaderEvent; use std::sync::{Mutex, Arc}; @@ -155,6 +156,10 @@ impl Client { self.transport.lock().unwrap().write_element(elem) } + fn read_event(&self) -> Result { + self.transport.lock().unwrap().read_event() + } + fn connect(&mut self, mut credentials: SaslCredentials) -> Result<(), Error> { let features = self.wait_for_features()?; let ms = &features.sasl_mechanisms.ok_or(Error::SaslError(Some("no SASL mechanisms".to_owned())))?; @@ -264,10 +269,9 @@ impl Client { fn wait_for_features(&mut self) -> Result { // TODO: this is very ugly loop { - let mut transport = self.transport.lock().unwrap(); - let e = transport.read_event(); + let e = self.read_event()?; match e { - Ok(XmlEvent::Start { .. }) => { + ReaderEvent::StartElement { .. } => { break; }, _ => (), diff --git a/src/component.rs b/src/component.rs index be598230..00592ed2 100644 --- a/src/component.rs +++ b/src/component.rs @@ -1,3 +1,4 @@ +use xml; use jid::Jid; use transport::{Transport, PlainTransport}; use error::Error; @@ -9,9 +10,8 @@ use sha_1::{Sha1, Digest}; use minidom::Element; -use quick_xml::events::Event as XmlEvent; +use xml::reader::XmlEvent as ReaderEvent; -use std::str; use std::fmt::Write; use std::sync::{Mutex, Arc}; @@ -131,23 +131,19 @@ impl Component { self.transport.lock().unwrap().write_element(elem) } + fn read_event(&self) -> Result { + self.transport.lock().unwrap().read_event() + } + fn connect(&mut self, secret: String) -> Result<(), Error> { let mut sid = String::new(); loop { - let mut transport = self.transport.lock().unwrap(); - let e = transport.read_event()?; + let e = self.read_event()?; match e { - XmlEvent::Start(ref e) => { - for attr_result in e.attributes() { - match attr_result { - Ok(attr) => { - let name = str::from_utf8(attr.key)?; - let value = str::from_utf8(&attr.value)?; - if name == "id" { - sid = value.to_owned(); - } - }, - _ => panic!() + ReaderEvent::StartElement { attributes, .. } => { + for attribute in attributes { + if attribute.name.namespace == None && attribute.name.local_name == "id" { + sid = attribute.value; } } break; diff --git a/src/connection.rs b/src/connection.rs index 4b80cdc2..4d639b4e 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -2,7 +2,7 @@ use transport::Transport; use error::Error; use ns; -use quick_xml::events::{Event as WriterEvent, BytesStart, BytesEnd}; +use xml::writer::XmlEvent as WriterEvent; pub trait Connection { type InitError; @@ -23,20 +23,16 @@ impl Connection for C2S { fn namespace() -> &'static str { ns::CLIENT } fn init(transport: &mut T, domain: &str, id: &str) -> Result<(), Error> { - let name = "stream:stream"; - let mut elem = BytesStart::borrowed(name.as_bytes(), name.len()); - elem.push_attribute(("to", domain)); - elem.push_attribute(("id", id)); - elem.push_attribute(("xmlns", ns::CLIENT)); - elem.push_attribute(("xmlns:stream", ns::STREAM)); - transport.write_event(WriterEvent::Start(elem))?; + transport.write_event(WriterEvent::start_element("stream:stream") + .attr("to", domain) + .attr("id", id) + .default_ns(ns::CLIENT) + .ns("stream", ns::STREAM))?; Ok(()) } fn close(transport: &mut T) -> Result<(), Error> { - let name = "stream:stream"; - let elem = BytesEnd::borrowed(name.as_bytes()); - transport.write_event(WriterEvent::End(elem))?; + transport.write_event(WriterEvent::end_element())?; Ok(()) } } @@ -50,20 +46,16 @@ impl Connection for Component2S { fn namespace() -> &'static str { ns::COMPONENT_ACCEPT } fn init(transport: &mut T, domain: &str, id: &str) -> Result<(), Error> { - let name = "stream:stream"; - let mut elem = BytesStart::borrowed(name.as_bytes(), name.len()); - elem.push_attribute(("to", domain)); - elem.push_attribute(("id", id)); - elem.push_attribute(("xmlns", ns::COMPONENT_ACCEPT)); - elem.push_attribute(("xmlns:stream", ns::STREAM)); - transport.write_event(WriterEvent::Start(elem))?; + transport.write_event(WriterEvent::start_element("stream:stream") + .attr("to", domain) + .attr("id", id) + .default_ns(ns::COMPONENT_ACCEPT) + .ns("stream", ns::STREAM))?; Ok(()) } fn close(transport: &mut T) -> Result<(), Error> { - let name = "stream:stream"; - let elem = BytesEnd::borrowed(name.as_bytes()); - transport.write_event(WriterEvent::End(elem))?; + transport.write_event(WriterEvent::end_element())?; Ok(()) } } diff --git a/src/error.rs b/src/error.rs index 61ce9767..f27e94ed 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,12 +5,12 @@ use std::fmt::Error as FormatError; use std::io; use std::net::TcpStream; -use std::str::Utf8Error; use openssl::ssl::HandshakeError; use openssl::error::ErrorStack; -use quick_xml::errors::Error as XmlError; +use xml::reader::Error as XmlError; +use xml::writer::Error as EmitterError; use minidom::Error as MinidomError; @@ -22,6 +22,7 @@ use components::sasl_error::SaslError; #[derive(Debug)] pub enum Error { XmlError(XmlError), + EmitterError(EmitterError), IoError(io::Error), HandshakeError(HandshakeError), OpenSslErrorStack(ErrorStack), @@ -30,7 +31,6 @@ pub enum Error { SaslError(Option), XmppSaslError(SaslError), FormatError(FormatError), - Utf8Error(Utf8Error), StreamError, EndOfDocument, } @@ -41,6 +41,12 @@ impl From for Error { } } +impl From for Error { + fn from(err: EmitterError) -> Error { + Error::EmitterError(err) + } +} + impl From for Error { fn from(err: io::Error) -> Error { Error::IoError(err) @@ -76,9 +82,3 @@ impl From for Error { Error::FormatError(err) } } - -impl From for Error { - fn from(err: Utf8Error) -> Error { - Error::Utf8Error(err) - } -} diff --git a/src/lib.rs b/src/lib.rs index c160b7fd..c022941d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -extern crate quick_xml; +extern crate xml; extern crate xmpp_parsers; extern crate openssl; extern crate minidom; diff --git a/src/plugins/ibb.rs b/src/plugins/ibb.rs index 428bdd37..a8738a27 100644 --- a/src/plugins/ibb.rs +++ b/src/plugins/ibb.rs @@ -10,7 +10,7 @@ use jid::Jid; use plugins::stanza::Iq; use plugins::disco::DiscoPlugin; use xmpp_parsers::iq::{IqType, IqSetPayload}; -use xmpp_parsers::ibb::{Open, Data, Close, Stanza}; +use xmpp_parsers::ibb::{IBB, Stanza}; use xmpp_parsers::stanza_error::{StanzaError, ErrorType, DefinedCondition}; use xmpp_parsers::ns; @@ -86,79 +86,74 @@ impl IbbPlugin { } } - fn handle_ibb_open(&self, from: Jid, open: Open) -> Result<(), StanzaError> { + fn handle_ibb(&self, from: Jid, ibb: IBB) -> Result<(), StanzaError> { let mut sessions = self.sessions.lock().unwrap(); - let Open { block_size, sid, stanza } = open; - match sessions.entry((from.clone(), sid.clone())) { - Entry::Vacant(_) => Ok(()), - Entry::Occupied(_) => Err(generate_error( - ErrorType::Cancel, - DefinedCondition::NotAcceptable, - "This session is already open." - )), - }?; - let session = Session { - stanza, - block_size, - cur_seq: 65535u16, - }; - sessions.insert((from, sid), session.clone()); - self.proxy.dispatch(IbbOpen { - session: session, - }); - Ok(()) - } - - fn handle_ibb_data(&self, from: Jid, data: Data) -> Result<(), StanzaError> { - let mut sessions = self.sessions.lock().unwrap(); - let Data { seq, sid, data } = data; - let entry = match sessions.entry((from, sid)) { - Entry::Occupied(entry) => Ok(entry), - Entry::Vacant(_) => Err(generate_error( - ErrorType::Cancel, - DefinedCondition::ItemNotFound, - "This session doesn’t exist." - )), - }?; - let session = entry.into_mut(); - if session.stanza != Stanza::Iq { - return Err(generate_error( - ErrorType::Cancel, - DefinedCondition::NotAcceptable, - "Wrong stanza type." - )) + match ibb { + IBB::Open { block_size, sid, stanza } => { + match sessions.entry((from.clone(), sid.clone())) { + Entry::Vacant(_) => Ok(()), + Entry::Occupied(_) => Err(generate_error( + ErrorType::Cancel, + DefinedCondition::NotAcceptable, + "This session is already open." + )), + }?; + let session = Session { + stanza, + block_size, + cur_seq: 65535u16, + }; + sessions.insert((from, sid), session.clone()); + self.proxy.dispatch(IbbOpen { + session: session, + }); + }, + IBB::Data { seq, sid, data } => { + let entry = match sessions.entry((from, sid)) { + Entry::Occupied(entry) => Ok(entry), + Entry::Vacant(_) => Err(generate_error( + ErrorType::Cancel, + DefinedCondition::ItemNotFound, + "This session doesn’t exist." + )), + }?; + let mut session = entry.into_mut(); + if session.stanza != Stanza::Iq { + return Err(generate_error( + ErrorType::Cancel, + DefinedCondition::NotAcceptable, + "Wrong stanza type." + )) + } + let cur_seq = session.cur_seq.wrapping_add(1); + if seq != cur_seq { + return Err(generate_error( + ErrorType::Cancel, + DefinedCondition::NotAcceptable, + "Wrong seq number." + )) + } + session.cur_seq = cur_seq; + self.proxy.dispatch(IbbData { + session: session.clone(), + data, + }); + }, + IBB::Close { sid } => { + let entry = match sessions.entry((from, sid)) { + Entry::Occupied(entry) => Ok(entry), + Entry::Vacant(_) => Err(generate_error( + ErrorType::Cancel, + DefinedCondition::ItemNotFound, + "This session doesn’t exist." + )), + }?; + let session = entry.remove(); + self.proxy.dispatch(IbbClose { + session, + }); + }, } - let cur_seq = session.cur_seq.wrapping_add(1); - if seq != cur_seq { - return Err(generate_error( - ErrorType::Cancel, - DefinedCondition::NotAcceptable, - "Wrong seq number." - )) - } - session.cur_seq = cur_seq; - self.proxy.dispatch(IbbData { - session: session.clone(), - data, - }); - Ok(()) - } - - fn handle_ibb_close(&self, from: Jid, close: Close) -> Result<(), StanzaError> { - let mut sessions = self.sessions.lock().unwrap(); - let Close { sid } = close; - let entry = match sessions.entry((from, sid)) { - Entry::Occupied(entry) => Ok(entry), - Entry::Vacant(_) => Err(generate_error( - ErrorType::Cancel, - DefinedCondition::ItemNotFound, - "This session doesn’t exist." - )), - }?; - let session = entry.remove(); - self.proxy.dispatch(IbbClose { - session, - }); Ok(()) } @@ -169,20 +164,8 @@ impl IbbPlugin { let id = iq.id.unwrap(); // TODO: use an intermediate plugin to parse this payload. let payload = match IqSetPayload::try_from(payload) { - Ok(IqSetPayload::IbbOpen(open)) => { - match self.handle_ibb_open(from.clone(), open) { - Ok(_) => IqType::Result(None), - Err(error) => IqType::Error(error), - } - }, - Ok(IqSetPayload::IbbData(data)) => { - match self.handle_ibb_data(from.clone(), data) { - Ok(_) => IqType::Result(None), - Err(error) => IqType::Error(error), - } - }, - Ok(IqSetPayload::IbbClose(close)) => { - match self.handle_ibb_close(from.clone(), close) { + Ok(IqSetPayload::IBB(ibb)) => { + match self.handle_ibb(from.clone(), ibb) { Ok(_) => IqType::Result(None), Err(error) => IqType::Error(error), } diff --git a/src/plugins/messaging.rs b/src/plugins/messaging.rs index 9da0d2a8..9b0d1d43 100644 --- a/src/plugins/messaging.rs +++ b/src/plugins/messaging.rs @@ -7,9 +7,9 @@ use error::Error; use jid::Jid; use plugins::stanza::Message; -use xmpp_parsers::message::{MessagePayload, MessageType, Body}; +use xmpp_parsers::message::{MessagePayload, MessageType}; use xmpp_parsers::chatstates::ChatState; -use xmpp_parsers::receipts::{Request, Received}; +use xmpp_parsers::receipts::Receipt; use xmpp_parsers::stanza_id::StanzaId; // TODO: use the id (maybe even stanza-id) to identify every message. @@ -70,7 +70,7 @@ impl MessagingPlugin { id: Some(self.proxy.gen_id()), bodies: { let mut bodies = BTreeMap::new(); - bodies.insert(String::new(), Body(body.to_owned())); + bodies.insert(String::new(), String::from(body)); bodies }, subjects: BTreeMap::new(), @@ -98,11 +98,11 @@ impl MessagingPlugin { chat_state: chat_state, }), // XEP-0184 - MessagePayload::ReceiptRequest(Request) => self.proxy.dispatch(ReceiptRequestEvent { + MessagePayload::Receipt(Receipt::Request) => self.proxy.dispatch(ReceiptRequestEvent { from: from.clone(), }), // XEP-0184 - MessagePayload::ReceiptReceived(Received {id}) => self.proxy.dispatch(ReceiptReceivedEvent { + MessagePayload::Receipt(Receipt::Received(id)) => self.proxy.dispatch(ReceiptReceivedEvent { from: from.clone(), id: id.unwrap(), }), @@ -118,9 +118,9 @@ impl MessagingPlugin { if message.bodies.contains_key("") { self.proxy.dispatch(MessageEvent { from: from, - body: message.bodies[""].clone().0, - subject: if message.subjects.contains_key("") { Some(message.subjects[""].clone().0) } else { None }, - thread: match message.thread.clone() { Some(thread) => Some(thread.0), None => None }, + body: message.bodies[""].clone(), + subject: if message.subjects.contains_key("") { Some(message.subjects[""].clone()) } else { None }, + thread: message.thread.clone(), }); } Propagation::Stop diff --git a/src/plugins/roster.rs b/src/plugins/roster.rs index f287c46d..487a90ff 100644 --- a/src/plugins/roster.rs +++ b/src/plugins/roster.rs @@ -77,7 +77,7 @@ impl RosterPlugin { // TODO: use a better error type. pub fn send_roster_set(&self, to: Option, item: Item) -> Result<(), String> { - if item.subscription != Subscription::Remove { + if item.subscription.is_some() && item.subscription != Some(Subscription::Remove) { return Err(String::from("Subscription must be either nothing or Remove.")); } let iq = Iq { @@ -117,10 +117,10 @@ impl RosterPlugin { let mut jids = self.jids.lock().unwrap(); let previous = jids.insert(item.jid.clone(), item.clone()); if previous.is_none() { - assert!(item.subscription != Subscription::Remove); + assert!(item.subscription != Some(Subscription::Remove)); self.proxy.dispatch(RosterPush::Added(item)); } else { - if item.subscription == Subscription::Remove { + if item.subscription == Some(Subscription::Remove) { self.proxy.dispatch(RosterPush::Removed(item)); } else { self.proxy.dispatch(RosterPush::Modified(item)); diff --git a/src/transport.rs b/src/transport.rs index 2f147422..527134a0 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -1,13 +1,11 @@ //! Provides transports for the xml streams. -use std::io::BufReader; use std::io::prelude::*; -use std::str; use std::net::{TcpStream, Shutdown}; -use quick_xml::reader::{Reader as EventReader}; -use quick_xml::events::Event; +use xml::reader::{EventReader, XmlEvent as XmlReaderEvent}; +use xml::writer::{EventWriter, XmlEvent as XmlWriterEvent, EmitterConfig}; use std::sync::{Arc, Mutex}; @@ -26,11 +24,11 @@ use sasl::common::ChannelBinding; /// A trait which transports are required to implement. pub trait Transport { - /// Writes a `quick_xml::events::Event` to the stream. - fn write_event<'a, E: Into>>(&mut self, event: E) -> Result<(), Error>; + /// Writes an `xml::writer::XmlEvent` to the stream. + fn write_event<'a, E: Into>>(&mut self, event: E) -> Result<(), Error>; - /// Reads a `quick_xml::events::Event` from the stream. - fn read_event(&mut self) -> Result; + /// Reads an `xml::reader::XmlEvent` from the stream. + fn read_event(&mut self) -> Result; /// Writes a `minidom::Element` to the stream. fn write_element(&mut self, element: &minidom::Element) -> Result<(), Error>; @@ -50,21 +48,19 @@ pub trait Transport { /// A plain text transport, completely unencrypted. pub struct PlainTransport { inner: Arc>, // TODO: this feels rather ugly - // TODO: especially feels ugly because this read would keep the lock held very long - // (potentially) - reader: EventReader>>, - writer: LockedIO, - buf: Vec, + reader: EventReader>, // TODO: especially feels ugly because + // this read would keep the lock + // held very long (potentially) + writer: EventWriter>, } impl Transport for PlainTransport { - fn write_event<'a, E: Into>>(&mut self, event: E) -> Result<(), Error> { - self.writer.write(&event.into())?; - Ok(()) + fn write_event<'a, E: Into>>(&mut self, event: E) -> Result<(), Error> { + Ok(self.writer.write(event)?) } - fn read_event(&mut self) -> Result { - Ok(self.reader.read_event(&mut self.buf)?) + fn read_event(&mut self) -> Result { + Ok(self.reader.next()?) } fn write_element(&mut self, element: &minidom::Element) -> Result<(), Error> { @@ -78,8 +74,13 @@ impl Transport for PlainTransport { fn reset_stream(&mut self) { let locked_io = LockedIO::from(self.inner.clone()); - self.reader = EventReader::from_reader(BufReader::new(locked_io.clone())); - self.writer = locked_io; + self.reader = EventReader::new(locked_io.clone()); + self.writer = EventWriter::new_with_config(locked_io, EmitterConfig { + line_separator: "".into(), + perform_indent: false, + normalize_empty_elements: false, + .. Default::default() + }); } fn channel_bind(&self) -> ChannelBinding { @@ -92,16 +93,21 @@ impl PlainTransport { /// Connects to a server without any encryption. pub fn connect(host: &str, port: u16) -> Result { let tcp_stream = TcpStream::connect((host, port))?; - let stream = Arc::new(Mutex::new(tcp_stream)); + let parser = EventReader::new(tcp_stream); + let parser_stream = parser.into_inner(); + let stream = Arc::new(Mutex::new(parser_stream)); let locked_io = LockedIO::from(stream.clone()); - let reader = EventReader::from_reader(BufReader::new(locked_io.clone())); - let writer = locked_io; - + let reader = EventReader::new(locked_io.clone()); + let writer = EventWriter::new_with_config(locked_io, EmitterConfig { + line_separator: "".into(), + perform_indent: false, + normalize_empty_elements: false, + .. Default::default() + }); Ok(PlainTransport { inner: stream, reader: reader, writer: writer, - buf: Vec::new(), }) } @@ -117,21 +123,19 @@ impl PlainTransport { /// A transport which uses STARTTLS. pub struct SslTransport { inner: Arc>>, // TODO: this feels rather ugly - // TODO: especially feels ugly because this read would keep the lock held very long - // (potentially) - reader: EventReader>>>, - writer: LockedIO>, - buf: Vec, + reader: EventReader>>, // TODO: especially feels ugly because + // this read would keep the lock + // held very long (potentially) + writer: EventWriter>>, } impl Transport for SslTransport { - fn write_event<'a, E: Into>>(&mut self, event: E) -> Result<(), Error> { - self.writer.write(&event.into())?; - Ok(()) + fn write_event<'a, E: Into>>(&mut self, event: E) -> Result<(), Error> { + Ok(self.writer.write(event)?) } - fn read_event(&mut self) -> Result { - Ok(self.reader.read_event(&mut self.buf)?) + fn read_event(&mut self) -> Result { + Ok(self.reader.next()?) } fn write_element(&mut self, element: &minidom::Element) -> Result<(), Error> { @@ -144,8 +148,13 @@ impl Transport for SslTransport { fn reset_stream(&mut self) { let locked_io = LockedIO::from(self.inner.clone()); - self.reader = EventReader::from_reader(BufReader::new(locked_io.clone())); - self.writer = locked_io; + self.reader = EventReader::new(locked_io.clone()); + self.writer = EventWriter::new_with_config(locked_io, EmitterConfig { + line_separator: "".into(), + perform_indent: false, + normalize_empty_elements: false, + .. Default::default() + }); } fn channel_bind(&self) -> ChannelBinding { @@ -163,29 +172,23 @@ impl SslTransport { , ns::CLIENT, ns::STREAM, host)?; write!(stream, "" , ns::TLS)?; - { - let mut parser = EventReader::from_reader(BufReader::new(&stream)); - let mut buf = Vec::new(); - let ns_buf = Vec::new(); - loop { // TODO: possibly a timeout? - match parser.read_event(&mut buf)? { - Event::Start(ref e) => { - let (namespace, local_name) = parser.resolve_namespace(e.name(), &ns_buf); - let namespace = namespace.map(str::from_utf8); - let local_name = str::from_utf8(local_name)?; - - if let Some(ns) = namespace { - if ns == Ok(ns::TLS) && local_name == "proceed" { - break; - } else if ns == Ok(ns::STREAM) && local_name == "error" { - return Err(Error::StreamError); - } + let mut parser = EventReader::new(stream); + loop { // TODO: possibly a timeout? + match parser.next()? { + XmlReaderEvent::StartElement { name, .. } => { + if let Some(ns) = name.namespace { + if ns == ns::TLS && name.local_name == "proceed" { + break; } - }, - _ => (), - } + else if ns == ns::STREAM && name.local_name == "error" { + return Err(Error::StreamError); + } + } + }, + _ => {}, } } + let stream = parser.into_inner(); #[cfg(feature = "insecure")] let ssl_stream = { let mut ctx = SslContextBuilder::new(SslMethod::tls())?; @@ -200,14 +203,17 @@ impl SslTransport { }; let ssl_stream = Arc::new(Mutex::new(ssl_stream)); let locked_io = LockedIO::from(ssl_stream.clone()); - let reader = EventReader::from_reader(BufReader::new(locked_io.clone())); - let writer = locked_io; - + let reader = EventReader::new(locked_io.clone()); + let writer = EventWriter::new_with_config(locked_io, EmitterConfig { + line_separator: "".into(), + perform_indent: false, + normalize_empty_elements: false, + .. Default::default() + }); Ok(SslTransport { inner: ssl_stream, reader: reader, writer: writer, - buf: Vec::new(), }) }