This commit is contained in:
Astro 2017-06-14 01:55:56 +02:00
parent 1d9790a1d8
commit df423e5047
7 changed files with 43 additions and 21 deletions

View file

@ -13,3 +13,4 @@ native-tls = "*"
tokio-tls = "*"
sasl = "*"
rustc-serialize = "*"
jid = "*"

View file

@ -1,13 +1,19 @@
extern crate futures;
extern crate tokio_core;
extern crate tokio_xmpp;
extern crate jid;
use std::str::FromStr;
use tokio_core::reactor::Core;
use futures::{Future, Stream};
use tokio_xmpp::TcpClient;
use tokio_xmpp::xmpp_codec::Packet;
use jid::Jid;
fn main() {
let jid = Jid::from_str("astrobot@example.net").expect("JID");
let password = "".to_owned();
use std::net::ToSocketAddrs;
let addr = "[2a01:4f8:a0:33d0::5]:5222"
.to_socket_addrs().unwrap()
@ -15,6 +21,7 @@ fn main() {
let mut core = Core::new().unwrap();
let client = TcpClient::connect(
jid.clone(),
&addr,
&core.handle()
).map_err(|e| format!("{}", e)
@ -25,7 +32,8 @@ fn main() {
panic!("No STARTTLS")
}
}).and_then(|stream| {
stream.auth("astrobot", "").expect("auth")
let username = jid.node.as_ref().unwrap().to_owned();
stream.auth(username, password).expect("auth")
}).and_then(|stream| {
stream.for_each(|event| {
match event {

View file

@ -8,6 +8,7 @@ extern crate native_tls;
extern crate tokio_tls;
extern crate sasl;
extern crate rustc_serialize as serialize;
extern crate jid;
pub mod xmpp_codec;

View file

@ -6,6 +6,7 @@ use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tls::*;
use native_tls::TlsConnector;
use xml;
use jid::Jid;
use xmpp_codec::*;
use xmpp_stream::*;
@ -16,7 +17,7 @@ pub const NS_XMPP_TLS: &str = "urn:ietf:params:xml:ns:xmpp-tls";
pub struct StartTlsClient<S: AsyncRead + AsyncWrite> {
state: StartTlsClientState<S>,
domain: String,
jid: Jid,
}
enum StartTlsClientState<S: AsyncRead + AsyncWrite> {
@ -30,9 +31,7 @@ enum StartTlsClientState<S: AsyncRead + AsyncWrite> {
impl<S: AsyncRead + AsyncWrite> StartTlsClient<S> {
/// Waits for <stream:features>
pub fn from_stream(xmpp_stream: XMPPStream<S>) -> Self {
let domain = xmpp_stream.stream_attrs.get("from")
.map(|s| s.to_owned())
.unwrap_or_else(|| String::new());
let jid = xmpp_stream.jid.clone();
let nonza = xml::Element::new(
"starttls".to_owned(), Some(NS_XMPP_TLS.to_owned()),
@ -44,7 +43,7 @@ impl<S: AsyncRead + AsyncWrite> StartTlsClient<S> {
StartTlsClient {
state: StartTlsClientState::SendStartTls(send),
domain,
jid,
}
}
}
@ -77,10 +76,10 @@ impl<S: AsyncRead + AsyncWrite> Future for StartTlsClient<S> {
if stanza.name == "proceed" =>
{
println!("* proceed *");
let stream = xmpp_stream.into_inner();
let stream = xmpp_stream.stream.into_inner();
let connect = TlsConnector::builder().unwrap()
.build().unwrap()
.connect_async(&self.domain, stream);
.connect_async(&self.jid.domain, stream);
let new_state = StartTlsClientState::StartingTls(connect);
retry = true;
(new_state, Ok(Async::NotReady))
@ -98,7 +97,7 @@ impl<S: AsyncRead + AsyncWrite> Future for StartTlsClient<S> {
match connect.poll() {
Ok(Async::Ready(tls_stream)) => {
println!("Got a TLS stream!");
let start = XMPPStream::from_stream(tls_stream, self.domain.clone());
let start = XMPPStream::from_stream(tls_stream, self.jid.clone());
let new_state = StartTlsClientState::Start(start);
retry = true;
(new_state, Ok(Async::NotReady))

View file

@ -4,6 +4,7 @@ use std::collections::HashMap;
use futures::*;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::Framed;
use jid::Jid;
use xmpp_codec::*;
use xmpp_stream::*;
@ -12,6 +13,7 @@ const NS_XMPP_STREAM: &str = "http://etherx.jabber.org/streams";
pub struct StreamStart<S: AsyncWrite> {
state: StreamStartState<S>,
jid: Jid,
}
enum StreamStartState<S: AsyncWrite> {
@ -22,8 +24,8 @@ enum StreamStartState<S: AsyncWrite> {
}
impl<S: AsyncWrite> StreamStart<S> {
pub fn from_stream(stream: Framed<S, XMPPCodec>, to: String) -> Self {
let attrs = [("to".to_owned(), to),
pub fn from_stream(stream: Framed<S, XMPPCodec>, jid: Jid) -> Self {
let attrs = [("to".to_owned(), jid.domain.clone()),
("version".to_owned(), "1.0".to_owned()),
("xmlns".to_owned(), "jabber:client".to_owned()),
("xmlns:stream".to_owned(), NS_XMPP_STREAM.to_owned()),
@ -32,6 +34,7 @@ impl<S: AsyncWrite> StreamStart<S> {
StreamStart {
state: StreamStartState::SendStart(send),
jid,
}
}
}
@ -75,7 +78,8 @@ impl<S: AsyncRead + AsyncWrite> Future for StreamStart<S> {
Ok(Async::Ready(Some(Packet::Stanza(stanza)))) =>
if stanza.name == "features"
&& stanza.ns == Some(NS_XMPP_STREAM.to_owned()) {
(StreamStartState::Invalid, Ok(Async::Ready(XMPPStream::new(stream, stream_attrs, stanza))))
let stream = XMPPStream::new(self.jid.clone(), stream, stream_attrs, stanza);
(StreamStartState::Invalid, Ok(Async::Ready(stream)))
} else {
(StreamStartState::RecvFeatures(stream, stream_attrs), Ok(Async::NotReady))
},

View file

@ -3,12 +3,14 @@ use std::io::Error;
use futures::{Future, Poll, Async};
use tokio_core::reactor::Handle;
use tokio_core::net::{TcpStream, TcpStreamNew};
use jid::Jid;
use xmpp_stream::*;
use stream_start::StreamStart;
pub struct TcpClient {
state: TcpClientState,
jid: Jid,
}
enum TcpClientState {
@ -18,10 +20,11 @@ enum TcpClientState {
}
impl TcpClient {
pub fn connect(addr: &SocketAddr, handle: &Handle) -> Self {
pub fn connect(jid: Jid, addr: &SocketAddr, handle: &Handle) -> Self {
let tcp_stream_new = TcpStream::connect(addr, handle);
TcpClient {
state: TcpClientState::Connecting(tcp_stream_new),
jid,
}
}
}
@ -34,7 +37,7 @@ impl Future for TcpClient {
let (new_state, result) = match self.state {
TcpClientState::Connecting(ref mut tcp_stream_new) => {
let tcp_stream = try_ready!(tcp_stream_new.poll());
let start = XMPPStream::from_stream(tcp_stream, "spaceboyz.net".to_owned());
let start = XMPPStream::from_stream(tcp_stream, self.jid.clone());
let new_state = TcpClientState::Start(start);
(new_state, Ok(Async::NotReady))
},

View file

@ -5,6 +5,7 @@ use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::Framed;
use xml;
use sasl::common::Credentials;
use jid::Jid;
use xmpp_codec::*;
use stream_start::*;
@ -14,15 +15,23 @@ use client_auth::ClientAuth;
pub const NS_XMPP_STREAM: &str = "http://etherx.jabber.org/streams";
pub struct XMPPStream<S> {
pub jid: Jid,
pub stream: Framed<S, XMPPCodec>,
pub stream_attrs: HashMap<String, String>,
pub stream_features: xml::Element,
}
impl<S: AsyncRead + AsyncWrite> XMPPStream<S> {
pub fn from_stream(stream: S, to: String) -> StreamStart<S> {
pub fn new(jid: Jid,
stream: Framed<S, XMPPCodec>,
stream_attrs: HashMap<String, String>,
stream_features: xml::Element) -> Self {
XMPPStream { jid, stream, stream_attrs, stream_features }
}
pub fn from_stream(stream: S, jid: Jid) -> StreamStart<S> {
let xmpp_stream = AsyncRead::framed(stream, XMPPCodec::new());
StreamStart::from_stream(xmpp_stream, to)
StreamStart::from_stream(xmpp_stream, jid)
}
pub fn into_inner(self) -> S {
@ -30,10 +39,7 @@ impl<S: AsyncRead + AsyncWrite> XMPPStream<S> {
}
pub fn restart(self) -> StreamStart<S> {
let to = self.stream_attrs.get("from")
.map(|s| s.to_owned())
.unwrap_or_else(|| "".to_owned());
Self::from_stream(self.into_inner(), to.clone())
Self::from_stream(self.stream.into_inner(), self.jid)
}
pub fn can_starttls(&self) -> bool {
@ -46,7 +52,7 @@ impl<S: AsyncRead + AsyncWrite> XMPPStream<S> {
StartTlsClient::from_stream(self)
}
pub fn auth(self, username: &str, password: &str) -> Result<ClientAuth<S>, String> {
pub fn auth(self, username: String, password: String) -> Result<ClientAuth<S>, String> {
let creds = Credentials::default()
.with_username(username)
.with_password(password);