mirror of
https://gitlab.com/xmpp-rs/xmpp-rs.git
synced 2024-07-12 22:21:53 +00:00
Adapt new event system for component
This commit is contained in:
parent
917b14b5d2
commit
2ee23c1c05
3 changed files with 54 additions and 54 deletions
|
@ -2,7 +2,6 @@ extern crate xmpp;
|
|||
|
||||
use xmpp::jid::Jid;
|
||||
use xmpp::component::ComponentBuilder;
|
||||
use xmpp::plugins::messaging::{MessagingPlugin, MessageEvent};
|
||||
|
||||
use std::env;
|
||||
|
||||
|
@ -17,11 +16,5 @@ fn main() {
|
|||
.port(port)
|
||||
.connect()
|
||||
.unwrap();
|
||||
component.register_plugin(MessagingPlugin::new());
|
||||
loop {
|
||||
let event = component.next_event().unwrap();
|
||||
if let Some(evt) = event.downcast::<MessageEvent>() {
|
||||
println!("{:?}", evt);
|
||||
}
|
||||
}
|
||||
component.main().unwrap();
|
||||
}
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
use xml;
|
||||
use jid::Jid;
|
||||
use transport::{Transport, PlainTransport};
|
||||
use error::Error;
|
||||
use ns;
|
||||
use plugin::{Plugin, PluginProxyBinding};
|
||||
use event::AbstractEvent;
|
||||
use plugin::{Plugin, PluginInit, PluginProxyBinding};
|
||||
use event::{Dispatcher, ReceiveElement};
|
||||
use connection::{Connection, Component2S};
|
||||
use sha_1::{Sha1, Digest};
|
||||
|
||||
|
@ -12,7 +13,11 @@ use minidom::Element;
|
|||
use xml::reader::XmlEvent as ReaderEvent;
|
||||
|
||||
use std::fmt::Write;
|
||||
use std::sync::mpsc::{Receiver, channel};
|
||||
use std::sync::{Mutex, Arc};
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use std::any::TypeId;
|
||||
|
||||
/// A builder for `Component`s.
|
||||
pub struct ComponentBuilder {
|
||||
|
@ -56,15 +61,14 @@ impl ComponentBuilder {
|
|||
let host = &self.host.unwrap_or(self.jid.domain.clone());
|
||||
let mut transport = PlainTransport::connect(host, self.port)?;
|
||||
Component2S::init(&mut transport, &self.jid.domain, "stream_opening")?;
|
||||
let (sender_out, sender_in) = channel();
|
||||
let (dispatcher_out, dispatcher_in) = channel();
|
||||
let dispatcher = Arc::new(Mutex::new(Dispatcher::new()));
|
||||
let transport = Arc::new(Mutex::new(transport));
|
||||
let mut component = Component {
|
||||
jid: self.jid,
|
||||
transport: transport,
|
||||
plugins: Vec::new(),
|
||||
binding: PluginProxyBinding::new(sender_out, dispatcher_out),
|
||||
sender_in: sender_in,
|
||||
dispatcher_in: dispatcher_in,
|
||||
plugins: HashMap::new(),
|
||||
binding: PluginProxyBinding::new(dispatcher.clone()),
|
||||
dispatcher: dispatcher,
|
||||
};
|
||||
component.connect(self.secret)?;
|
||||
Ok(component)
|
||||
|
@ -74,11 +78,10 @@ impl ComponentBuilder {
|
|||
/// An XMPP component.
|
||||
pub struct Component {
|
||||
jid: Jid,
|
||||
transport: PlainTransport,
|
||||
plugins: Vec<Box<Plugin>>,
|
||||
transport: Arc<Mutex<PlainTransport>>,
|
||||
plugins: HashMap<TypeId, Arc<Box<Plugin>>>,
|
||||
binding: PluginProxyBinding,
|
||||
sender_in: Receiver<Element>,
|
||||
dispatcher_in: Receiver<AbstractEvent>,
|
||||
dispatcher: Arc<Mutex<Dispatcher>>,
|
||||
}
|
||||
|
||||
impl Component {
|
||||
|
@ -88,53 +91,57 @@ impl Component {
|
|||
}
|
||||
|
||||
/// Registers a plugin.
|
||||
pub fn register_plugin<P: Plugin + 'static>(&mut self, mut plugin: P) {
|
||||
plugin.bind(self.binding.clone());
|
||||
self.plugins.push(Box::new(plugin));
|
||||
pub fn register_plugin<P: Plugin + PluginInit + 'static>(&mut self, mut plugin: P) {
|
||||
let binding = self.binding.clone();
|
||||
plugin.bind(binding);
|
||||
let p = Arc::new(Box::new(plugin) as Box<Plugin>);
|
||||
{
|
||||
let mut disp = self.dispatcher.lock().unwrap();
|
||||
P::init(&mut disp, p.clone());
|
||||
}
|
||||
if self.plugins.insert(TypeId::of::<P>(), p).is_some() {
|
||||
panic!("registering a plugin that's already registered");
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the plugin given by the type parameter, if it exists, else panics.
|
||||
pub fn plugin<P: Plugin>(&self) -> &P {
|
||||
for plugin in &self.plugins {
|
||||
let any = plugin.as_any();
|
||||
if let Some(ret) = any.downcast_ref::<P>() {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
panic!("plugin does not exist!");
|
||||
self.plugins.get(&TypeId::of::<P>())
|
||||
.expect("the requested plugin was not registered")
|
||||
.as_any()
|
||||
.downcast_ref::<P>()
|
||||
.expect("plugin downcast failure (should not happen!!)")
|
||||
}
|
||||
|
||||
/// Returns the next event and flush the send queue.
|
||||
pub fn next_event(&mut self) -> Result<AbstractEvent, Error> {
|
||||
self.flush_send_queue()?;
|
||||
pub fn main(&mut self) -> Result<(), Error> {
|
||||
self.dispatcher.lock().unwrap().flush_all();
|
||||
loop {
|
||||
if let Ok(evt) = self.dispatcher_in.try_recv() {
|
||||
return Ok(evt);
|
||||
let elem = self.read_element()?;
|
||||
{
|
||||
let mut disp = self.dispatcher.lock().unwrap();
|
||||
disp.dispatch(ReceiveElement(elem));
|
||||
disp.flush_all();
|
||||
}
|
||||
let elem = self.transport.read_element()?;
|
||||
for plugin in self.plugins.iter_mut() {
|
||||
plugin.handle(&elem);
|
||||
// TODO: handle plugin return
|
||||
}
|
||||
self.flush_send_queue()?;
|
||||
}
|
||||
}
|
||||
|
||||
/// Flushes the send queue, sending all queued up stanzas.
|
||||
pub fn flush_send_queue(&mut self) -> Result<(), Error> { // TODO: not sure how great of an
|
||||
// idea it is to flush in this
|
||||
// manner…
|
||||
while let Ok(elem) = self.sender_in.try_recv() {
|
||||
self.transport.write_element(&elem)?;
|
||||
fn read_element(&self) -> Result<Element, Error> {
|
||||
self.transport.lock().unwrap().read_element()
|
||||
}
|
||||
Ok(())
|
||||
|
||||
fn write_element(&self, elem: &Element) -> Result<(), Error> {
|
||||
self.transport.lock().unwrap().write_element(elem)
|
||||
}
|
||||
|
||||
fn read_event(&self) -> Result<xml::reader::XmlEvent, Error> {
|
||||
self.transport.lock().unwrap().read_event()
|
||||
}
|
||||
|
||||
fn connect(&mut self, secret: String) -> Result<(), Error> {
|
||||
// TODO: this is very ugly
|
||||
let mut sid = String::new();
|
||||
loop {
|
||||
let e = self.transport.read_event()?;
|
||||
let e = self.read_event()?;
|
||||
match e {
|
||||
ReaderEvent::StartElement { attributes, .. } => {
|
||||
for attribute in attributes {
|
||||
|
@ -158,9 +165,9 @@ impl Component {
|
|||
.ns(ns::COMPONENT_ACCEPT)
|
||||
.build();
|
||||
elem.append_text_node(handshake);
|
||||
self.transport.write_element(&elem)?;
|
||||
self.write_element(&elem)?;
|
||||
loop {
|
||||
let n = self.transport.read_element()?;
|
||||
let n = self.read_element()?;
|
||||
if n.is("handshake", ns::COMPONENT_ACCEPT) {
|
||||
return Ok(());
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ pub mod ns;
|
|||
pub mod transport;
|
||||
pub mod error;
|
||||
pub mod client;
|
||||
//pub mod component;
|
||||
pub mod component;
|
||||
pub mod plugin;
|
||||
#[macro_use] pub mod plugin_macro;
|
||||
pub mod event;
|
||||
|
|
Loading…
Reference in a new issue