2018-12-18 18:04:31 +00:00
|
|
|
|
use futures::{sink, Async, Future, Poll, Sink, Stream};
|
2017-06-13 23:55:56 +00:00
|
|
|
|
use jid::Jid;
|
2017-07-22 00:59:51 +00:00
|
|
|
|
use minidom::Element;
|
2018-12-18 18:04:31 +00:00
|
|
|
|
use std::mem::replace;
|
|
|
|
|
use tokio_codec::Framed;
|
|
|
|
|
use tokio_io::{AsyncRead, AsyncWrite};
|
2017-06-05 00:50:22 +00:00
|
|
|
|
|
2018-12-18 18:04:31 +00:00
|
|
|
|
use crate::xmpp_codec::{Packet, XMPPCodec};
|
2018-12-18 17:29:31 +00:00
|
|
|
|
use crate::xmpp_stream::XMPPStream;
|
|
|
|
|
use crate::{Error, ProtocolError};
|
2017-06-05 00:50:22 +00:00
|
|
|
|
|
|
|
|
|
const NS_XMPP_STREAM: &str = "http://etherx.jabber.org/streams";
|
|
|
|
|
|
|
|
|
|
pub struct StreamStart<S: AsyncWrite> {
|
|
|
|
|
state: StreamStartState<S>,
|
2017-06-13 23:55:56 +00:00
|
|
|
|
jid: Jid,
|
2017-07-18 23:02:45 +00:00
|
|
|
|
ns: String,
|
2017-06-05 00:50:22 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
enum StreamStartState<S: AsyncWrite> {
|
|
|
|
|
SendStart(sink::Send<Framed<S, XMPPCodec>>),
|
|
|
|
|
RecvStart(Framed<S, XMPPCodec>),
|
2017-07-18 23:02:45 +00:00
|
|
|
|
RecvFeatures(Framed<S, XMPPCodec>, String),
|
2017-06-05 00:50:22 +00:00
|
|
|
|
Invalid,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<S: AsyncWrite> StreamStart<S> {
|
2017-07-18 23:02:45 +00:00
|
|
|
|
pub fn from_stream(stream: Framed<S, XMPPCodec>, jid: Jid, ns: String) -> Self {
|
2018-12-18 18:04:31 +00:00
|
|
|
|
let attrs = [
|
|
|
|
|
("to".to_owned(), jid.domain.clone()),
|
|
|
|
|
("version".to_owned(), "1.0".to_owned()),
|
|
|
|
|
("xmlns".to_owned(), ns.clone()),
|
|
|
|
|
("xmlns:stream".to_owned(), NS_XMPP_STREAM.to_owned()),
|
|
|
|
|
]
|
|
|
|
|
.iter()
|
|
|
|
|
.cloned()
|
|
|
|
|
.collect();
|
2017-06-05 00:50:22 +00:00
|
|
|
|
let send = stream.send(Packet::StreamStart(attrs));
|
|
|
|
|
|
|
|
|
|
StreamStart {
|
|
|
|
|
state: StreamStartState::SendStart(send),
|
2017-06-13 23:55:56 +00:00
|
|
|
|
jid,
|
2017-07-18 23:02:45 +00:00
|
|
|
|
ns,
|
2017-06-05 00:50:22 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<S: AsyncRead + AsyncWrite> Future for StreamStart<S> {
|
|
|
|
|
type Item = XMPPStream<S>;
|
2018-09-06 15:46:06 +00:00
|
|
|
|
type Error = Error;
|
2017-06-05 00:50:22 +00:00
|
|
|
|
|
|
|
|
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
|
|
|
|
let old_state = replace(&mut self.state, StreamStartState::Invalid);
|
|
|
|
|
let mut retry = false;
|
|
|
|
|
|
|
|
|
|
let (new_state, result) = match old_state {
|
2018-12-18 18:04:31 +00:00
|
|
|
|
StreamStartState::SendStart(mut send) => match send.poll() {
|
|
|
|
|
Ok(Async::Ready(stream)) => {
|
|
|
|
|
retry = true;
|
|
|
|
|
(StreamStartState::RecvStart(stream), Ok(Async::NotReady))
|
|
|
|
|
}
|
|
|
|
|
Ok(Async::NotReady) => (StreamStartState::SendStart(send), Ok(Async::NotReady)),
|
|
|
|
|
Err(e) => (StreamStartState::Invalid, Err(e.into())),
|
|
|
|
|
},
|
|
|
|
|
StreamStartState::RecvStart(mut stream) => match stream.poll() {
|
|
|
|
|
Ok(Async::Ready(Some(Packet::StreamStart(stream_attrs)))) => {
|
|
|
|
|
let stream_ns = stream_attrs
|
|
|
|
|
.get("xmlns")
|
|
|
|
|
.ok_or(ProtocolError::NoStreamNamespace)?
|
|
|
|
|
.clone();
|
|
|
|
|
if self.ns == "jabber:client" {
|
2017-06-05 00:50:22 +00:00
|
|
|
|
retry = true;
|
2018-12-18 18:04:31 +00:00
|
|
|
|
// TODO: skip RecvFeatures for version < 1.0
|
|
|
|
|
(
|
|
|
|
|
StreamStartState::RecvFeatures(stream, stream_ns),
|
|
|
|
|
Ok(Async::NotReady),
|
|
|
|
|
)
|
|
|
|
|
} else {
|
|
|
|
|
let id = stream_attrs
|
|
|
|
|
.get("id")
|
|
|
|
|
.ok_or(ProtocolError::NoStreamId)?
|
2018-09-06 21:57:42 +00:00
|
|
|
|
.clone();
|
2018-12-18 18:04:31 +00:00
|
|
|
|
// FIXME: huge hack, shouldn’t be an element!
|
|
|
|
|
let stream = XMPPStream::new(
|
|
|
|
|
self.jid.clone(),
|
|
|
|
|
stream,
|
|
|
|
|
self.ns.clone(),
|
|
|
|
|
Element::builder(id).build(),
|
|
|
|
|
);
|
|
|
|
|
(StreamStartState::Invalid, Ok(Async::Ready(stream)))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Ok(Async::Ready(_)) => return Err(ProtocolError::InvalidToken.into()),
|
|
|
|
|
Ok(Async::NotReady) => (StreamStartState::RecvStart(stream), Ok(Async::NotReady)),
|
|
|
|
|
Err(e) => return Err(ProtocolError::from(e).into()),
|
|
|
|
|
},
|
|
|
|
|
StreamStartState::RecvFeatures(mut stream, stream_ns) => match stream.poll() {
|
|
|
|
|
Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
|
|
|
|
|
if stanza.is("features", NS_XMPP_STREAM) {
|
|
|
|
|
let stream =
|
|
|
|
|
XMPPStream::new(self.jid.clone(), stream, self.ns.clone(), stanza);
|
|
|
|
|
(StreamStartState::Invalid, Ok(Async::Ready(stream)))
|
|
|
|
|
} else {
|
|
|
|
|
(
|
|
|
|
|
StreamStartState::RecvFeatures(stream, stream_ns),
|
|
|
|
|
Ok(Async::NotReady),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Ok(Async::Ready(_)) | Ok(Async::NotReady) => (
|
|
|
|
|
StreamStartState::RecvFeatures(stream, stream_ns),
|
|
|
|
|
Ok(Async::NotReady),
|
|
|
|
|
),
|
|
|
|
|
Err(e) => return Err(ProtocolError::from(e).into()),
|
|
|
|
|
},
|
|
|
|
|
StreamStartState::Invalid => unreachable!(),
|
2017-06-05 00:50:22 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
self.state = new_state;
|
|
|
|
|
if retry {
|
|
|
|
|
self.poll()
|
|
|
|
|
} else {
|
|
|
|
|
result
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|