diff --git a/examples/component.rs b/examples/component.rs new file mode 100644 index 00000000..3e04af96 --- /dev/null +++ b/examples/component.rs @@ -0,0 +1,27 @@ +extern crate xmpp; + +use xmpp::jid::Jid; +use xmpp::component::ComponentBuilder; +use xmpp::plugins::messaging::{MessagingPlugin, MessageEvent}; + +use std::env; + +fn main() { + let jid: Jid = env::var("JID").unwrap().parse().unwrap(); + let pass = env::var("PASS").unwrap(); + let host = env::var("HOST").unwrap(); + let port: u16 = env::var("PORT").unwrap().parse().unwrap(); + let mut component = ComponentBuilder::new(jid.clone()) + .password(pass) + .host(host) + .port(port) + .connect() + .unwrap(); + component.register_plugin(MessagingPlugin::new()); + loop { + let event = component.next_event().unwrap(); + if let Some(evt) = event.downcast::() { + println!("{:?}", evt); + } + } +} diff --git a/src/component.rs b/src/component.rs new file mode 100644 index 00000000..530a2e30 --- /dev/null +++ b/src/component.rs @@ -0,0 +1,168 @@ +use jid::Jid; +use transport::{Transport, PlainTransport}; +use error::Error; +use ns; +use plugin::{Plugin, PluginProxyBinding}; +use event::AbstractEvent; +use connection::{Connection, Component2S}; +use openssl::hash::{hash, MessageDigest}; + +use minidom::Element; + +use xml::reader::XmlEvent as ReaderEvent; + +use std::sync::mpsc::{Receiver, channel}; + +/// A builder for `Component`s. +pub struct ComponentBuilder { + jid: Jid, + secret: String, + host: Option, + port: u16, +} + +impl ComponentBuilder { + /// Creates a new builder for an XMPP component that will connect to `jid` with default parameters. + pub fn new(jid: Jid) -> ComponentBuilder { + ComponentBuilder { + jid: jid, + secret: "".to_owned(), + host: None, + port: 5347, + } + } + + /// Sets the host to connect to. + pub fn host(mut self, host: String) -> ComponentBuilder { + self.host = Some(host); + self + } + + /// Sets the port to connect to. + pub fn port(mut self, port: u16) -> ComponentBuilder { + self.port = port; + self + } + + /// Sets the password to use. + pub fn password>(mut self, password: P) -> ComponentBuilder { + self.secret = password.into(); + self + } + + /// Connects to the server and returns a `Component` when succesful. + pub fn connect(self) -> Result { + let host = &self.host.unwrap_or(self.jid.domain.clone()); + let mut transport = PlainTransport::connect(host, self.port)?; + Component2S::init(&mut transport, &self.jid.domain, "stream_opening")?; + let (sender_out, sender_in) = channel(); + let (dispatcher_out, dispatcher_in) = channel(); + let mut component = Component { + jid: self.jid, + transport: transport, + plugins: Vec::new(), + binding: PluginProxyBinding::new(sender_out, dispatcher_out), + sender_in: sender_in, + dispatcher_in: dispatcher_in, + }; + component.connect(self.secret)?; + Ok(component) + } +} + +/// An XMPP component. +pub struct Component { + jid: Jid, + transport: PlainTransport, + plugins: Vec>, + binding: PluginProxyBinding, + sender_in: Receiver, + dispatcher_in: Receiver, +} + +impl Component { + /// Returns a reference to the `Jid` associated with this `Component`. + pub fn jid(&self) -> &Jid { + &self.jid + } + + /// Registers a plugin. + pub fn register_plugin(&mut self, mut plugin: P) { + plugin.bind(self.binding.clone()); + self.plugins.push(Box::new(plugin)); + } + + /// Returns the plugin given by the type parameter, if it exists, else panics. + pub fn plugin(&self) -> &P { + for plugin in &self.plugins { + let any = plugin.as_any(); + if let Some(ret) = any.downcast_ref::

() { + return ret; + } + } + panic!("plugin does not exist!"); + } + + /// Returns the next event and flush the send queue. + pub fn next_event(&mut self) -> Result { + self.flush_send_queue()?; + loop { + if let Ok(evt) = self.dispatcher_in.try_recv() { + return Ok(evt); + } + let elem = self.transport.read_element()?; + for plugin in self.plugins.iter_mut() { + plugin.handle(&elem); + // TODO: handle plugin return + } + self.flush_send_queue()?; + } + } + + /// Flushes the send queue, sending all queued up stanzas. + pub fn flush_send_queue(&mut self) -> Result<(), Error> { // TODO: not sure how great of an + // idea it is to flush in this + // manner… + while let Ok(elem) = self.sender_in.try_recv() { + self.transport.write_element(&elem)?; + } + Ok(()) + } + + fn connect(&mut self, secret: String) -> Result<(), Error> { + // TODO: this is very ugly + let mut sid = String::new(); + loop { + let e = self.transport.read_event()?; + match e { + ReaderEvent::StartElement { attributes, .. } => { + for attribute in attributes { + if attribute.name.namespace == None && attribute.name.local_name == "id" { + sid = attribute.value; + } + } + break; + }, + _ => (), + } + } + let concatenated = format!("{}{}", sid, secret); + let hash = hash(MessageDigest::sha1(), concatenated.as_bytes())?; + let mut handshake = String::new(); + for byte in hash { + // TODO: probably terrible perfs! + handshake = format!("{}{:x}", handshake, byte); + } + let mut elem = Element::builder("handshake") + .ns(ns::COMPONENT_ACCEPT) + .build(); + elem.append_text_node(handshake); + self.transport.write_element(&elem)?; + loop { + let n = self.transport.read_element()?; + if n.is("handshake", ns::COMPONENT_ACCEPT) { + return Ok(()); + } + } + } +} diff --git a/src/connection.rs b/src/connection.rs index 76bef64e..4d639b4e 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -36,3 +36,26 @@ impl Connection for C2S { Ok(()) } } + +pub struct Component2S; + +impl Connection for Component2S { + type InitError = Error; + type CloseError = Error; + + fn namespace() -> &'static str { ns::COMPONENT_ACCEPT } + + fn init(transport: &mut T, domain: &str, id: &str) -> Result<(), Error> { + transport.write_event(WriterEvent::start_element("stream:stream") + .attr("to", domain) + .attr("id", id) + .default_ns(ns::COMPONENT_ACCEPT) + .ns("stream", ns::STREAM))?; + Ok(()) + } + + fn close(transport: &mut T) -> Result<(), Error> { + transport.write_event(WriterEvent::end_element())?; + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index d9f54698..9f96d034 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,7 @@ pub mod ns; pub mod transport; pub mod error; pub mod client; +pub mod component; pub mod plugin; pub mod event; pub mod plugins; diff --git a/src/ns.rs b/src/ns.rs index 24d27025..4b1a3edf 100644 --- a/src/ns.rs +++ b/src/ns.rs @@ -1,6 +1,7 @@ //! Provides constants for namespaces. pub const CLIENT: &'static str = "jabber:client"; +pub const COMPONENT_ACCEPT: &'static str = "jabber:component:accept"; pub const STREAM: &'static str = "http://etherx.jabber.org/streams"; pub const TLS: &'static str = "urn:ietf:params:xml:ns:xmpp-tls"; pub const SASL: &'static str = "urn:ietf:params:xml:ns:xmpp-sasl"; diff --git a/src/transport.rs b/src/transport.rs index 114ea1b2..d67c3ccf 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -2,7 +2,7 @@ use std::io::prelude::*; -use std::net::TcpStream; +use std::net::{TcpStream, Shutdown}; use xml::reader::{EventReader, XmlEvent as XmlReaderEvent}; use xml::writer::{EventWriter, XmlEvent as XmlWriterEvent, EmitterConfig}; @@ -45,6 +45,83 @@ pub trait Transport { } } +/// A plain text transport, completely unencrypted. +pub struct PlainTransport { + inner: Arc>, // TODO: this feels rather ugly + reader: EventReader>, // TODO: especially feels ugly because + // this read would keep the lock + // held very long (potentially) + writer: EventWriter>, +} + +impl Transport for PlainTransport { + fn write_event<'a, E: Into>>(&mut self, event: E) -> Result<(), Error> { + Ok(self.writer.write(event)?) + } + + fn read_event(&mut self) -> Result { + Ok(self.reader.next()?) + } + + fn write_element(&mut self, element: &minidom::Element) -> Result<(), Error> { + println!("SENT: {:?}", element); + Ok(element.write_to(&mut self.writer)?) + } + + fn read_element(&mut self) -> Result { + let element = minidom::Element::from_reader(&mut self.reader)?; + println!("RECV: {:?}", element); + Ok(element) + } + + fn reset_stream(&mut self) { + let locked_io = LockedIO::from(self.inner.clone()); + self.reader = EventReader::new(locked_io.clone()); + self.writer = EventWriter::new_with_config(locked_io, EmitterConfig { + line_separator: "".into(), + perform_indent: false, + normalize_empty_elements: false, + .. Default::default() + }); + } + + fn channel_bind(&self) -> ChannelBinding { + // TODO: channel binding + ChannelBinding::None + } +} + +impl PlainTransport { + /// Connects to a server without any encryption. + pub fn connect(host: &str, port: u16) -> Result { + let tcp_stream = TcpStream::connect((host, port))?; + let parser = EventReader::new(tcp_stream); + let parser_stream = parser.into_inner(); + let stream = Arc::new(Mutex::new(parser_stream)); + let locked_io = LockedIO::from(stream.clone()); + let reader = EventReader::new(locked_io.clone()); + let writer = EventWriter::new_with_config(locked_io, EmitterConfig { + line_separator: "".into(), + perform_indent: false, + normalize_empty_elements: false, + .. Default::default() + }); + Ok(PlainTransport { + inner: stream, + reader: reader, + writer: writer, + }) + } + + /// Closes the stream. + pub fn close(&mut self) { + self.inner.lock() + .unwrap() + .shutdown(Shutdown::Both) + .unwrap(); // TODO: safety, return value and such + } +} + /// A transport which uses STARTTLS. pub struct SslTransport { inner: Arc>>, // TODO: this feels rather ugly