diff --git a/examples/component.rs b/examples/component.rs index 3e04af96..97fc60a6 100644 --- a/examples/component.rs +++ b/examples/component.rs @@ -2,7 +2,6 @@ extern crate xmpp; use xmpp::jid::Jid; use xmpp::component::ComponentBuilder; -use xmpp::plugins::messaging::{MessagingPlugin, MessageEvent}; use std::env; @@ -17,11 +16,5 @@ fn main() { .port(port) .connect() .unwrap(); - component.register_plugin(MessagingPlugin::new()); - loop { - let event = component.next_event().unwrap(); - if let Some(evt) = event.downcast::() { - println!("{:?}", evt); - } - } + component.main().unwrap(); } diff --git a/src/component.rs b/src/component.rs index bd5b9ccd..b04916ec 100644 --- a/src/component.rs +++ b/src/component.rs @@ -1,9 +1,10 @@ +use xml; use jid::Jid; use transport::{Transport, PlainTransport}; use error::Error; use ns; -use plugin::{Plugin, PluginProxyBinding}; -use event::AbstractEvent; +use plugin::{Plugin, PluginInit, PluginProxyBinding}; +use event::{Dispatcher, ReceiveElement}; use connection::{Connection, Component2S}; use sha_1::{Sha1, Digest}; @@ -12,7 +13,11 @@ use minidom::Element; use xml::reader::XmlEvent as ReaderEvent; use std::fmt::Write; -use std::sync::mpsc::{Receiver, channel}; +use std::sync::{Mutex, Arc}; + +use std::collections::HashMap; + +use std::any::TypeId; /// A builder for `Component`s. pub struct ComponentBuilder { @@ -56,15 +61,14 @@ impl ComponentBuilder { let host = &self.host.unwrap_or(self.jid.domain.clone()); let mut transport = PlainTransport::connect(host, self.port)?; Component2S::init(&mut transport, &self.jid.domain, "stream_opening")?; - let (sender_out, sender_in) = channel(); - let (dispatcher_out, dispatcher_in) = channel(); + let dispatcher = Arc::new(Mutex::new(Dispatcher::new())); + let transport = Arc::new(Mutex::new(transport)); let mut component = Component { jid: self.jid, transport: transport, - plugins: Vec::new(), - binding: PluginProxyBinding::new(sender_out, dispatcher_out), - sender_in: sender_in, - dispatcher_in: dispatcher_in, + plugins: HashMap::new(), + binding: PluginProxyBinding::new(dispatcher.clone()), + dispatcher: dispatcher, }; component.connect(self.secret)?; Ok(component) @@ -74,11 +78,10 @@ impl ComponentBuilder { /// An XMPP component. pub struct Component { jid: Jid, - transport: PlainTransport, - plugins: Vec>, + transport: Arc>, + plugins: HashMap>>, binding: PluginProxyBinding, - sender_in: Receiver, - dispatcher_in: Receiver, + dispatcher: Arc>, } impl Component { @@ -88,53 +91,57 @@ impl Component { } /// Registers a plugin. - pub fn register_plugin(&mut self, mut plugin: P) { - plugin.bind(self.binding.clone()); - self.plugins.push(Box::new(plugin)); + pub fn register_plugin(&mut self, mut plugin: P) { + let binding = self.binding.clone(); + plugin.bind(binding); + let p = Arc::new(Box::new(plugin) as Box); + { + let mut disp = self.dispatcher.lock().unwrap(); + P::init(&mut disp, p.clone()); + } + if self.plugins.insert(TypeId::of::

(), p).is_some() { + panic!("registering a plugin that's already registered"); + } } /// Returns the plugin given by the type parameter, if it exists, else panics. pub fn plugin(&self) -> &P { - for plugin in &self.plugins { - let any = plugin.as_any(); - if let Some(ret) = any.downcast_ref::

() { - return ret; - } - } - panic!("plugin does not exist!"); + self.plugins.get(&TypeId::of::

()) + .expect("the requested plugin was not registered") + .as_any() + .downcast_ref::

() + .expect("plugin downcast failure (should not happen!!)") } /// Returns the next event and flush the send queue. - pub fn next_event(&mut self) -> Result { - self.flush_send_queue()?; + pub fn main(&mut self) -> Result<(), Error> { + self.dispatcher.lock().unwrap().flush_all(); loop { - if let Ok(evt) = self.dispatcher_in.try_recv() { - return Ok(evt); + let elem = self.read_element()?; + { + let mut disp = self.dispatcher.lock().unwrap(); + disp.dispatch(ReceiveElement(elem)); + disp.flush_all(); } - let elem = self.transport.read_element()?; - for plugin in self.plugins.iter_mut() { - plugin.handle(&elem); - // TODO: handle plugin return - } - self.flush_send_queue()?; } } - /// Flushes the send queue, sending all queued up stanzas. - pub fn flush_send_queue(&mut self) -> Result<(), Error> { // TODO: not sure how great of an - // idea it is to flush in this - // manner… - while let Ok(elem) = self.sender_in.try_recv() { - self.transport.write_element(&elem)?; - } - Ok(()) + fn read_element(&self) -> Result { + self.transport.lock().unwrap().read_element() + } + + fn write_element(&self, elem: &Element) -> Result<(), Error> { + self.transport.lock().unwrap().write_element(elem) + } + + fn read_event(&self) -> Result { + self.transport.lock().unwrap().read_event() } fn connect(&mut self, secret: String) -> Result<(), Error> { - // TODO: this is very ugly let mut sid = String::new(); loop { - let e = self.transport.read_event()?; + let e = self.read_event()?; match e { ReaderEvent::StartElement { attributes, .. } => { for attribute in attributes { @@ -158,9 +165,9 @@ impl Component { .ns(ns::COMPONENT_ACCEPT) .build(); elem.append_text_node(handshake); - self.transport.write_element(&elem)?; + self.write_element(&elem)?; loop { - let n = self.transport.read_element()?; + let n = self.read_element()?; if n.is("handshake", ns::COMPONENT_ACCEPT) { return Ok(()); } diff --git a/src/lib.rs b/src/lib.rs index c3e933fc..7187c9f8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,7 @@ pub mod ns; pub mod transport; pub mod error; pub mod client; -//pub mod component; +pub mod component; pub mod plugin; #[macro_use] pub mod plugin_macro; pub mod event;