Revert "Merge branch 'update-deps' into 'master'"

This reverts merge request !32
This commit is contained in:
Maxime Buquet 2017-12-31 13:56:17 +00:00
parent 359f6bb4aa
commit 8c4bb1b810
11 changed files with 194 additions and 213 deletions

View file

@ -1,4 +1,4 @@
image: "pitkley/rust:nightly" image: "scorpil/rust:nightly"
before_script: before_script:
- apt-get update -yqq - apt-get update -yqq

View file

@ -15,12 +15,12 @@ license = "LGPL-3.0+"
gitlab = { repository = "lumi/xmpp-rs" } gitlab = { repository = "lumi/xmpp-rs" }
[dependencies] [dependencies]
quick-xml = "0.10.0" xml-rs = "0.4.1"
xmpp-parsers = "0.9.0" xmpp-parsers = "0.7.0"
openssl = "0.9.12" openssl = "0.9.12"
base64 = "0.6.0" base64 = "0.6.0"
minidom = "0.7.0" minidom = "0.4.1"
jid = { version = "0.4", features = ["minidom"] } jid = "0.2.1"
sasl = "0.4.0" sasl = "0.4.0"
sha-1 = "0.4" sha-1 = "0.4"
chrono = "0.4.0" chrono = "0.4.0"

View file

@ -1,3 +1,4 @@
use xml;
use jid::Jid; use jid::Jid;
use transport::{Transport, SslTransport}; use transport::{Transport, SslTransport};
use error::Error; use error::Error;
@ -16,7 +17,7 @@ use base64;
use minidom::Element; use minidom::Element;
use quick_xml::events::Event as XmlEvent; use xml::reader::XmlEvent as ReaderEvent;
use std::sync::{Mutex, Arc}; use std::sync::{Mutex, Arc};
@ -155,6 +156,10 @@ impl Client {
self.transport.lock().unwrap().write_element(elem) self.transport.lock().unwrap().write_element(elem)
} }
fn read_event(&self) -> Result<xml::reader::XmlEvent, Error> {
self.transport.lock().unwrap().read_event()
}
fn connect(&mut self, mut credentials: SaslCredentials) -> Result<(), Error> { fn connect(&mut self, mut credentials: SaslCredentials) -> Result<(), Error> {
let features = self.wait_for_features()?; let features = self.wait_for_features()?;
let ms = &features.sasl_mechanisms.ok_or(Error::SaslError(Some("no SASL mechanisms".to_owned())))?; let ms = &features.sasl_mechanisms.ok_or(Error::SaslError(Some("no SASL mechanisms".to_owned())))?;
@ -264,10 +269,9 @@ impl Client {
fn wait_for_features(&mut self) -> Result<StreamFeatures, Error> { fn wait_for_features(&mut self) -> Result<StreamFeatures, Error> {
// TODO: this is very ugly // TODO: this is very ugly
loop { loop {
let mut transport = self.transport.lock().unwrap(); let e = self.read_event()?;
let e = transport.read_event();
match e { match e {
Ok(XmlEvent::Start { .. }) => { ReaderEvent::StartElement { .. } => {
break; break;
}, },
_ => (), _ => (),

View file

@ -1,3 +1,4 @@
use xml;
use jid::Jid; use jid::Jid;
use transport::{Transport, PlainTransport}; use transport::{Transport, PlainTransport};
use error::Error; use error::Error;
@ -9,9 +10,8 @@ use sha_1::{Sha1, Digest};
use minidom::Element; use minidom::Element;
use quick_xml::events::Event as XmlEvent; use xml::reader::XmlEvent as ReaderEvent;
use std::str;
use std::fmt::Write; use std::fmt::Write;
use std::sync::{Mutex, Arc}; use std::sync::{Mutex, Arc};
@ -131,23 +131,19 @@ impl Component {
self.transport.lock().unwrap().write_element(elem) self.transport.lock().unwrap().write_element(elem)
} }
fn read_event(&self) -> Result<xml::reader::XmlEvent, Error> {
self.transport.lock().unwrap().read_event()
}
fn connect(&mut self, secret: String) -> Result<(), Error> { fn connect(&mut self, secret: String) -> Result<(), Error> {
let mut sid = String::new(); let mut sid = String::new();
loop { loop {
let mut transport = self.transport.lock().unwrap(); let e = self.read_event()?;
let e = transport.read_event()?;
match e { match e {
XmlEvent::Start(ref e) => { ReaderEvent::StartElement { attributes, .. } => {
for attr_result in e.attributes() { for attribute in attributes {
match attr_result { if attribute.name.namespace == None && attribute.name.local_name == "id" {
Ok(attr) => { sid = attribute.value;
let name = str::from_utf8(attr.key)?;
let value = str::from_utf8(&attr.value)?;
if name == "id" {
sid = value.to_owned();
}
},
_ => panic!()
} }
} }
break; break;

View file

@ -2,7 +2,7 @@ use transport::Transport;
use error::Error; use error::Error;
use ns; use ns;
use quick_xml::events::{Event as WriterEvent, BytesStart, BytesEnd}; use xml::writer::XmlEvent as WriterEvent;
pub trait Connection { pub trait Connection {
type InitError; type InitError;
@ -23,20 +23,16 @@ impl Connection for C2S {
fn namespace() -> &'static str { ns::CLIENT } fn namespace() -> &'static str { ns::CLIENT }
fn init<T: Transport>(transport: &mut T, domain: &str, id: &str) -> Result<(), Error> { fn init<T: Transport>(transport: &mut T, domain: &str, id: &str) -> Result<(), Error> {
let name = "stream:stream"; transport.write_event(WriterEvent::start_element("stream:stream")
let mut elem = BytesStart::borrowed(name.as_bytes(), name.len()); .attr("to", domain)
elem.push_attribute(("to", domain)); .attr("id", id)
elem.push_attribute(("id", id)); .default_ns(ns::CLIENT)
elem.push_attribute(("xmlns", ns::CLIENT)); .ns("stream", ns::STREAM))?;
elem.push_attribute(("xmlns:stream", ns::STREAM));
transport.write_event(WriterEvent::Start(elem))?;
Ok(()) Ok(())
} }
fn close<T: Transport>(transport: &mut T) -> Result<(), Error> { fn close<T: Transport>(transport: &mut T) -> Result<(), Error> {
let name = "stream:stream"; transport.write_event(WriterEvent::end_element())?;
let elem = BytesEnd::borrowed(name.as_bytes());
transport.write_event(WriterEvent::End(elem))?;
Ok(()) Ok(())
} }
} }
@ -50,20 +46,16 @@ impl Connection for Component2S {
fn namespace() -> &'static str { ns::COMPONENT_ACCEPT } fn namespace() -> &'static str { ns::COMPONENT_ACCEPT }
fn init<T: Transport>(transport: &mut T, domain: &str, id: &str) -> Result<(), Error> { fn init<T: Transport>(transport: &mut T, domain: &str, id: &str) -> Result<(), Error> {
let name = "stream:stream"; transport.write_event(WriterEvent::start_element("stream:stream")
let mut elem = BytesStart::borrowed(name.as_bytes(), name.len()); .attr("to", domain)
elem.push_attribute(("to", domain)); .attr("id", id)
elem.push_attribute(("id", id)); .default_ns(ns::COMPONENT_ACCEPT)
elem.push_attribute(("xmlns", ns::COMPONENT_ACCEPT)); .ns("stream", ns::STREAM))?;
elem.push_attribute(("xmlns:stream", ns::STREAM));
transport.write_event(WriterEvent::Start(elem))?;
Ok(()) Ok(())
} }
fn close<T: Transport>(transport: &mut T) -> Result<(), Error> { fn close<T: Transport>(transport: &mut T) -> Result<(), Error> {
let name = "stream:stream"; transport.write_event(WriterEvent::end_element())?;
let elem = BytesEnd::borrowed(name.as_bytes());
transport.write_event(WriterEvent::End(elem))?;
Ok(()) Ok(())
} }
} }

View file

@ -5,12 +5,12 @@ use std::fmt::Error as FormatError;
use std::io; use std::io;
use std::net::TcpStream; use std::net::TcpStream;
use std::str::Utf8Error;
use openssl::ssl::HandshakeError; use openssl::ssl::HandshakeError;
use openssl::error::ErrorStack; use openssl::error::ErrorStack;
use quick_xml::errors::Error as XmlError; use xml::reader::Error as XmlError;
use xml::writer::Error as EmitterError;
use minidom::Error as MinidomError; use minidom::Error as MinidomError;
@ -22,6 +22,7 @@ use components::sasl_error::SaslError;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
XmlError(XmlError), XmlError(XmlError),
EmitterError(EmitterError),
IoError(io::Error), IoError(io::Error),
HandshakeError(HandshakeError<TcpStream>), HandshakeError(HandshakeError<TcpStream>),
OpenSslErrorStack(ErrorStack), OpenSslErrorStack(ErrorStack),
@ -30,7 +31,6 @@ pub enum Error {
SaslError(Option<String>), SaslError(Option<String>),
XmppSaslError(SaslError), XmppSaslError(SaslError),
FormatError(FormatError), FormatError(FormatError),
Utf8Error(Utf8Error),
StreamError, StreamError,
EndOfDocument, EndOfDocument,
} }
@ -41,6 +41,12 @@ impl From<XmlError> for Error {
} }
} }
impl From<EmitterError> for Error {
fn from(err: EmitterError) -> Error {
Error::EmitterError(err)
}
}
impl From<io::Error> for Error { impl From<io::Error> for Error {
fn from(err: io::Error) -> Error { fn from(err: io::Error) -> Error {
Error::IoError(err) Error::IoError(err)
@ -76,9 +82,3 @@ impl From<FormatError> for Error {
Error::FormatError(err) Error::FormatError(err)
} }
} }
impl From<Utf8Error> for Error {
fn from(err: Utf8Error) -> Error {
Error::Utf8Error(err)
}
}

View file

@ -1,4 +1,4 @@
extern crate quick_xml; extern crate xml;
extern crate xmpp_parsers; extern crate xmpp_parsers;
extern crate openssl; extern crate openssl;
extern crate minidom; extern crate minidom;

View file

@ -10,7 +10,7 @@ use jid::Jid;
use plugins::stanza::Iq; use plugins::stanza::Iq;
use plugins::disco::DiscoPlugin; use plugins::disco::DiscoPlugin;
use xmpp_parsers::iq::{IqType, IqSetPayload}; use xmpp_parsers::iq::{IqType, IqSetPayload};
use xmpp_parsers::ibb::{Open, Data, Close, Stanza}; use xmpp_parsers::ibb::{IBB, Stanza};
use xmpp_parsers::stanza_error::{StanzaError, ErrorType, DefinedCondition}; use xmpp_parsers::stanza_error::{StanzaError, ErrorType, DefinedCondition};
use xmpp_parsers::ns; use xmpp_parsers::ns;
@ -86,79 +86,74 @@ impl IbbPlugin {
} }
} }
fn handle_ibb_open(&self, from: Jid, open: Open) -> Result<(), StanzaError> { fn handle_ibb(&self, from: Jid, ibb: IBB) -> Result<(), StanzaError> {
let mut sessions = self.sessions.lock().unwrap(); let mut sessions = self.sessions.lock().unwrap();
let Open { block_size, sid, stanza } = open; match ibb {
match sessions.entry((from.clone(), sid.clone())) { IBB::Open { block_size, sid, stanza } => {
Entry::Vacant(_) => Ok(()), match sessions.entry((from.clone(), sid.clone())) {
Entry::Occupied(_) => Err(generate_error( Entry::Vacant(_) => Ok(()),
ErrorType::Cancel, Entry::Occupied(_) => Err(generate_error(
DefinedCondition::NotAcceptable, ErrorType::Cancel,
"This session is already open." DefinedCondition::NotAcceptable,
)), "This session is already open."
}?; )),
let session = Session { }?;
stanza, let session = Session {
block_size, stanza,
cur_seq: 65535u16, block_size,
}; cur_seq: 65535u16,
sessions.insert((from, sid), session.clone()); };
self.proxy.dispatch(IbbOpen { sessions.insert((from, sid), session.clone());
session: session, self.proxy.dispatch(IbbOpen {
}); session: session,
Ok(()) });
} },
IBB::Data { seq, sid, data } => {
fn handle_ibb_data(&self, from: Jid, data: Data) -> Result<(), StanzaError> { let entry = match sessions.entry((from, sid)) {
let mut sessions = self.sessions.lock().unwrap(); Entry::Occupied(entry) => Ok(entry),
let Data { seq, sid, data } = data; Entry::Vacant(_) => Err(generate_error(
let entry = match sessions.entry((from, sid)) { ErrorType::Cancel,
Entry::Occupied(entry) => Ok(entry), DefinedCondition::ItemNotFound,
Entry::Vacant(_) => Err(generate_error( "This session doesnt exist."
ErrorType::Cancel, )),
DefinedCondition::ItemNotFound, }?;
"This session doesnt exist." let mut session = entry.into_mut();
)), if session.stanza != Stanza::Iq {
}?; return Err(generate_error(
let session = entry.into_mut(); ErrorType::Cancel,
if session.stanza != Stanza::Iq { DefinedCondition::NotAcceptable,
return Err(generate_error( "Wrong stanza type."
ErrorType::Cancel, ))
DefinedCondition::NotAcceptable, }
"Wrong stanza type." let cur_seq = session.cur_seq.wrapping_add(1);
)) if seq != cur_seq {
return Err(generate_error(
ErrorType::Cancel,
DefinedCondition::NotAcceptable,
"Wrong seq number."
))
}
session.cur_seq = cur_seq;
self.proxy.dispatch(IbbData {
session: session.clone(),
data,
});
},
IBB::Close { sid } => {
let entry = match sessions.entry((from, sid)) {
Entry::Occupied(entry) => Ok(entry),
Entry::Vacant(_) => Err(generate_error(
ErrorType::Cancel,
DefinedCondition::ItemNotFound,
"This session doesnt exist."
)),
}?;
let session = entry.remove();
self.proxy.dispatch(IbbClose {
session,
});
},
} }
let cur_seq = session.cur_seq.wrapping_add(1);
if seq != cur_seq {
return Err(generate_error(
ErrorType::Cancel,
DefinedCondition::NotAcceptable,
"Wrong seq number."
))
}
session.cur_seq = cur_seq;
self.proxy.dispatch(IbbData {
session: session.clone(),
data,
});
Ok(())
}
fn handle_ibb_close(&self, from: Jid, close: Close) -> Result<(), StanzaError> {
let mut sessions = self.sessions.lock().unwrap();
let Close { sid } = close;
let entry = match sessions.entry((from, sid)) {
Entry::Occupied(entry) => Ok(entry),
Entry::Vacant(_) => Err(generate_error(
ErrorType::Cancel,
DefinedCondition::ItemNotFound,
"This session doesnt exist."
)),
}?;
let session = entry.remove();
self.proxy.dispatch(IbbClose {
session,
});
Ok(()) Ok(())
} }
@ -169,20 +164,8 @@ impl IbbPlugin {
let id = iq.id.unwrap(); let id = iq.id.unwrap();
// TODO: use an intermediate plugin to parse this payload. // TODO: use an intermediate plugin to parse this payload.
let payload = match IqSetPayload::try_from(payload) { let payload = match IqSetPayload::try_from(payload) {
Ok(IqSetPayload::IbbOpen(open)) => { Ok(IqSetPayload::IBB(ibb)) => {
match self.handle_ibb_open(from.clone(), open) { match self.handle_ibb(from.clone(), ibb) {
Ok(_) => IqType::Result(None),
Err(error) => IqType::Error(error),
}
},
Ok(IqSetPayload::IbbData(data)) => {
match self.handle_ibb_data(from.clone(), data) {
Ok(_) => IqType::Result(None),
Err(error) => IqType::Error(error),
}
},
Ok(IqSetPayload::IbbClose(close)) => {
match self.handle_ibb_close(from.clone(), close) {
Ok(_) => IqType::Result(None), Ok(_) => IqType::Result(None),
Err(error) => IqType::Error(error), Err(error) => IqType::Error(error),
} }

View file

@ -7,9 +7,9 @@ use error::Error;
use jid::Jid; use jid::Jid;
use plugins::stanza::Message; use plugins::stanza::Message;
use xmpp_parsers::message::{MessagePayload, MessageType, Body}; use xmpp_parsers::message::{MessagePayload, MessageType};
use xmpp_parsers::chatstates::ChatState; use xmpp_parsers::chatstates::ChatState;
use xmpp_parsers::receipts::{Request, Received}; use xmpp_parsers::receipts::Receipt;
use xmpp_parsers::stanza_id::StanzaId; use xmpp_parsers::stanza_id::StanzaId;
// TODO: use the id (maybe even stanza-id) to identify every message. // TODO: use the id (maybe even stanza-id) to identify every message.
@ -70,7 +70,7 @@ impl MessagingPlugin {
id: Some(self.proxy.gen_id()), id: Some(self.proxy.gen_id()),
bodies: { bodies: {
let mut bodies = BTreeMap::new(); let mut bodies = BTreeMap::new();
bodies.insert(String::new(), Body(body.to_owned())); bodies.insert(String::new(), String::from(body));
bodies bodies
}, },
subjects: BTreeMap::new(), subjects: BTreeMap::new(),
@ -98,11 +98,11 @@ impl MessagingPlugin {
chat_state: chat_state, chat_state: chat_state,
}), }),
// XEP-0184 // XEP-0184
MessagePayload::ReceiptRequest(Request) => self.proxy.dispatch(ReceiptRequestEvent { MessagePayload::Receipt(Receipt::Request) => self.proxy.dispatch(ReceiptRequestEvent {
from: from.clone(), from: from.clone(),
}), }),
// XEP-0184 // XEP-0184
MessagePayload::ReceiptReceived(Received {id}) => self.proxy.dispatch(ReceiptReceivedEvent { MessagePayload::Receipt(Receipt::Received(id)) => self.proxy.dispatch(ReceiptReceivedEvent {
from: from.clone(), from: from.clone(),
id: id.unwrap(), id: id.unwrap(),
}), }),
@ -118,9 +118,9 @@ impl MessagingPlugin {
if message.bodies.contains_key("") { if message.bodies.contains_key("") {
self.proxy.dispatch(MessageEvent { self.proxy.dispatch(MessageEvent {
from: from, from: from,
body: message.bodies[""].clone().0, body: message.bodies[""].clone(),
subject: if message.subjects.contains_key("") { Some(message.subjects[""].clone().0) } else { None }, subject: if message.subjects.contains_key("") { Some(message.subjects[""].clone()) } else { None },
thread: match message.thread.clone() { Some(thread) => Some(thread.0), None => None }, thread: message.thread.clone(),
}); });
} }
Propagation::Stop Propagation::Stop

View file

@ -77,7 +77,7 @@ impl RosterPlugin {
// TODO: use a better error type. // TODO: use a better error type.
pub fn send_roster_set(&self, to: Option<Jid>, item: Item) -> Result<(), String> { pub fn send_roster_set(&self, to: Option<Jid>, item: Item) -> Result<(), String> {
if item.subscription != Subscription::Remove { if item.subscription.is_some() && item.subscription != Some(Subscription::Remove) {
return Err(String::from("Subscription must be either nothing or Remove.")); return Err(String::from("Subscription must be either nothing or Remove."));
} }
let iq = Iq { let iq = Iq {
@ -117,10 +117,10 @@ impl RosterPlugin {
let mut jids = self.jids.lock().unwrap(); let mut jids = self.jids.lock().unwrap();
let previous = jids.insert(item.jid.clone(), item.clone()); let previous = jids.insert(item.jid.clone(), item.clone());
if previous.is_none() { if previous.is_none() {
assert!(item.subscription != Subscription::Remove); assert!(item.subscription != Some(Subscription::Remove));
self.proxy.dispatch(RosterPush::Added(item)); self.proxy.dispatch(RosterPush::Added(item));
} else { } else {
if item.subscription == Subscription::Remove { if item.subscription == Some(Subscription::Remove) {
self.proxy.dispatch(RosterPush::Removed(item)); self.proxy.dispatch(RosterPush::Removed(item));
} else { } else {
self.proxy.dispatch(RosterPush::Modified(item)); self.proxy.dispatch(RosterPush::Modified(item));

View file

@ -1,13 +1,11 @@
//! Provides transports for the xml streams. //! Provides transports for the xml streams.
use std::io::BufReader;
use std::io::prelude::*; use std::io::prelude::*;
use std::str;
use std::net::{TcpStream, Shutdown}; use std::net::{TcpStream, Shutdown};
use quick_xml::reader::{Reader as EventReader}; use xml::reader::{EventReader, XmlEvent as XmlReaderEvent};
use quick_xml::events::Event; use xml::writer::{EventWriter, XmlEvent as XmlWriterEvent, EmitterConfig};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@ -26,11 +24,11 @@ use sasl::common::ChannelBinding;
/// A trait which transports are required to implement. /// A trait which transports are required to implement.
pub trait Transport { pub trait Transport {
/// Writes a `quick_xml::events::Event` to the stream. /// Writes an `xml::writer::XmlEvent` to the stream.
fn write_event<'a, E: Into<Event<'a>>>(&mut self, event: E) -> Result<(), Error>; fn write_event<'a, E: Into<XmlWriterEvent<'a>>>(&mut self, event: E) -> Result<(), Error>;
/// Reads a `quick_xml::events::Event` from the stream. /// Reads an `xml::reader::XmlEvent` from the stream.
fn read_event(&mut self) -> Result<Event, Error>; fn read_event(&mut self) -> Result<XmlReaderEvent, Error>;
/// Writes a `minidom::Element` to the stream. /// Writes a `minidom::Element` to the stream.
fn write_element(&mut self, element: &minidom::Element) -> Result<(), Error>; fn write_element(&mut self, element: &minidom::Element) -> Result<(), Error>;
@ -50,21 +48,19 @@ pub trait Transport {
/// A plain text transport, completely unencrypted. /// A plain text transport, completely unencrypted.
pub struct PlainTransport { pub struct PlainTransport {
inner: Arc<Mutex<TcpStream>>, // TODO: this feels rather ugly inner: Arc<Mutex<TcpStream>>, // TODO: this feels rather ugly
// TODO: especially feels ugly because this read would keep the lock held very long reader: EventReader<LockedIO<TcpStream>>, // TODO: especially feels ugly because
// (potentially) // this read would keep the lock
reader: EventReader<BufReader<LockedIO<TcpStream>>>, // held very long (potentially)
writer: LockedIO<TcpStream>, writer: EventWriter<LockedIO<TcpStream>>,
buf: Vec<u8>,
} }
impl Transport for PlainTransport { impl Transport for PlainTransport {
fn write_event<'a, E: Into<Event<'a>>>(&mut self, event: E) -> Result<(), Error> { fn write_event<'a, E: Into<XmlWriterEvent<'a>>>(&mut self, event: E) -> Result<(), Error> {
self.writer.write(&event.into())?; Ok(self.writer.write(event)?)
Ok(())
} }
fn read_event(&mut self) -> Result<Event, Error> { fn read_event(&mut self) -> Result<XmlReaderEvent, Error> {
Ok(self.reader.read_event(&mut self.buf)?) Ok(self.reader.next()?)
} }
fn write_element(&mut self, element: &minidom::Element) -> Result<(), Error> { fn write_element(&mut self, element: &minidom::Element) -> Result<(), Error> {
@ -78,8 +74,13 @@ impl Transport for PlainTransport {
fn reset_stream(&mut self) { fn reset_stream(&mut self) {
let locked_io = LockedIO::from(self.inner.clone()); let locked_io = LockedIO::from(self.inner.clone());
self.reader = EventReader::from_reader(BufReader::new(locked_io.clone())); self.reader = EventReader::new(locked_io.clone());
self.writer = locked_io; self.writer = EventWriter::new_with_config(locked_io, EmitterConfig {
line_separator: "".into(),
perform_indent: false,
normalize_empty_elements: false,
.. Default::default()
});
} }
fn channel_bind(&self) -> ChannelBinding { fn channel_bind(&self) -> ChannelBinding {
@ -92,16 +93,21 @@ impl PlainTransport {
/// Connects to a server without any encryption. /// Connects to a server without any encryption.
pub fn connect(host: &str, port: u16) -> Result<PlainTransport, Error> { pub fn connect(host: &str, port: u16) -> Result<PlainTransport, Error> {
let tcp_stream = TcpStream::connect((host, port))?; let tcp_stream = TcpStream::connect((host, port))?;
let stream = Arc::new(Mutex::new(tcp_stream)); let parser = EventReader::new(tcp_stream);
let parser_stream = parser.into_inner();
let stream = Arc::new(Mutex::new(parser_stream));
let locked_io = LockedIO::from(stream.clone()); let locked_io = LockedIO::from(stream.clone());
let reader = EventReader::from_reader(BufReader::new(locked_io.clone())); let reader = EventReader::new(locked_io.clone());
let writer = locked_io; let writer = EventWriter::new_with_config(locked_io, EmitterConfig {
line_separator: "".into(),
perform_indent: false,
normalize_empty_elements: false,
.. Default::default()
});
Ok(PlainTransport { Ok(PlainTransport {
inner: stream, inner: stream,
reader: reader, reader: reader,
writer: writer, writer: writer,
buf: Vec::new(),
}) })
} }
@ -117,21 +123,19 @@ impl PlainTransport {
/// A transport which uses STARTTLS. /// A transport which uses STARTTLS.
pub struct SslTransport { pub struct SslTransport {
inner: Arc<Mutex<SslStream<TcpStream>>>, // TODO: this feels rather ugly inner: Arc<Mutex<SslStream<TcpStream>>>, // TODO: this feels rather ugly
// TODO: especially feels ugly because this read would keep the lock held very long reader: EventReader<LockedIO<SslStream<TcpStream>>>, // TODO: especially feels ugly because
// (potentially) // this read would keep the lock
reader: EventReader<BufReader<LockedIO<SslStream<TcpStream>>>>, // held very long (potentially)
writer: LockedIO<SslStream<TcpStream>>, writer: EventWriter<LockedIO<SslStream<TcpStream>>>,
buf: Vec<u8>,
} }
impl Transport for SslTransport { impl Transport for SslTransport {
fn write_event<'a, E: Into<Event<'a>>>(&mut self, event: E) -> Result<(), Error> { fn write_event<'a, E: Into<XmlWriterEvent<'a>>>(&mut self, event: E) -> Result<(), Error> {
self.writer.write(&event.into())?; Ok(self.writer.write(event)?)
Ok(())
} }
fn read_event(&mut self) -> Result<Event, Error> { fn read_event(&mut self) -> Result<XmlReaderEvent, Error> {
Ok(self.reader.read_event(&mut self.buf)?) Ok(self.reader.next()?)
} }
fn write_element(&mut self, element: &minidom::Element) -> Result<(), Error> { fn write_element(&mut self, element: &minidom::Element) -> Result<(), Error> {
@ -144,8 +148,13 @@ impl Transport for SslTransport {
fn reset_stream(&mut self) { fn reset_stream(&mut self) {
let locked_io = LockedIO::from(self.inner.clone()); let locked_io = LockedIO::from(self.inner.clone());
self.reader = EventReader::from_reader(BufReader::new(locked_io.clone())); self.reader = EventReader::new(locked_io.clone());
self.writer = locked_io; self.writer = EventWriter::new_with_config(locked_io, EmitterConfig {
line_separator: "".into(),
perform_indent: false,
normalize_empty_elements: false,
.. Default::default()
});
} }
fn channel_bind(&self) -> ChannelBinding { fn channel_bind(&self) -> ChannelBinding {
@ -163,29 +172,23 @@ impl SslTransport {
, ns::CLIENT, ns::STREAM, host)?; , ns::CLIENT, ns::STREAM, host)?;
write!(stream, "<starttls xmlns='{}'/>" write!(stream, "<starttls xmlns='{}'/>"
, ns::TLS)?; , ns::TLS)?;
{ let mut parser = EventReader::new(stream);
let mut parser = EventReader::from_reader(BufReader::new(&stream)); loop { // TODO: possibly a timeout?
let mut buf = Vec::new(); match parser.next()? {
let ns_buf = Vec::new(); XmlReaderEvent::StartElement { name, .. } => {
loop { // TODO: possibly a timeout? if let Some(ns) = name.namespace {
match parser.read_event(&mut buf)? { if ns == ns::TLS && name.local_name == "proceed" {
Event::Start(ref e) => { break;
let (namespace, local_name) = parser.resolve_namespace(e.name(), &ns_buf);
let namespace = namespace.map(str::from_utf8);
let local_name = str::from_utf8(local_name)?;
if let Some(ns) = namespace {
if ns == Ok(ns::TLS) && local_name == "proceed" {
break;
} else if ns == Ok(ns::STREAM) && local_name == "error" {
return Err(Error::StreamError);
}
} }
}, else if ns == ns::STREAM && name.local_name == "error" {
_ => (), return Err(Error::StreamError);
} }
}
},
_ => {},
} }
} }
let stream = parser.into_inner();
#[cfg(feature = "insecure")] #[cfg(feature = "insecure")]
let ssl_stream = { let ssl_stream = {
let mut ctx = SslContextBuilder::new(SslMethod::tls())?; let mut ctx = SslContextBuilder::new(SslMethod::tls())?;
@ -200,14 +203,17 @@ impl SslTransport {
}; };
let ssl_stream = Arc::new(Mutex::new(ssl_stream)); let ssl_stream = Arc::new(Mutex::new(ssl_stream));
let locked_io = LockedIO::from(ssl_stream.clone()); let locked_io = LockedIO::from(ssl_stream.clone());
let reader = EventReader::from_reader(BufReader::new(locked_io.clone())); let reader = EventReader::new(locked_io.clone());
let writer = locked_io; let writer = EventWriter::new_with_config(locked_io, EmitterConfig {
line_separator: "".into(),
perform_indent: false,
normalize_empty_elements: false,
.. Default::default()
});
Ok(SslTransport { Ok(SslTransport {
inner: ssl_stream, inner: ssl_stream,
reader: reader, reader: reader,
writer: writer, writer: writer,
buf: Vec::new(),
}) })
} }