tokio-xmpp: remove LocalSet from AsyncClient, making it Sync

This commit is contained in:
Astro 2022-03-24 03:26:30 +01:00
parent 9340dfe4ab
commit 81ffb2a9f1

View file

@ -6,7 +6,6 @@ use std::str::FromStr;
use std::task::Context; use std::task::Context;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::task::LocalSet;
#[cfg(feature = "tls-native")] #[cfg(feature = "tls-native")]
use tokio_native_tls::TlsStream; use tokio_native_tls::TlsStream;
#[cfg(feature = "tls-rust")] #[cfg(feature = "tls-rust")]
@ -58,7 +57,7 @@ type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>;
enum ClientState { enum ClientState {
Invalid, Invalid,
Disconnected, Disconnected,
Connecting(JoinHandle<Result<XMPPStream, Error>>, LocalSet), Connecting(JoinHandle<Result<XMPPStream, Error>>),
Connected(XMPPStream), Connected(XMPPStream),
} }
@ -80,15 +79,14 @@ impl Client {
/// Start a new client given that the JID is already parsed. /// Start a new client given that the JID is already parsed.
pub fn new_with_config(config: Config) -> Self { pub fn new_with_config(config: Config) -> Self {
let local = LocalSet::new(); let connect = tokio::spawn(Self::connect(
let connect = local.spawn_local(Self::connect(
config.server.clone(), config.server.clone(),
config.jid.clone(), config.jid.clone(),
config.password.clone(), config.password.clone(),
)); ));
let client = Client { let client = Client {
config, config,
state: ClientState::Connecting(connect, local), state: ClientState::Connecting(connect),
reconnect: false, reconnect: false,
}; };
client client
@ -196,18 +194,16 @@ impl Stream for Client {
ClientState::Invalid => panic!("Invalid client state"), ClientState::Invalid => panic!("Invalid client state"),
ClientState::Disconnected if self.reconnect => { ClientState::Disconnected if self.reconnect => {
// TODO: add timeout // TODO: add timeout
let mut local = LocalSet::new(); let connect = tokio::spawn(Self::connect(
let connect = local.spawn_local(Self::connect(
self.config.server.clone(), self.config.server.clone(),
self.config.jid.clone(), self.config.jid.clone(),
self.config.password.clone(), self.config.password.clone(),
)); ));
let _ = Pin::new(&mut local).poll(cx); self.state = ClientState::Connecting(connect);
self.state = ClientState::Connecting(connect, local);
self.poll_next(cx) self.poll_next(cx)
} }
ClientState::Disconnected => Poll::Ready(None), ClientState::Disconnected => Poll::Ready(None),
ClientState::Connecting(mut connect, mut local) => { ClientState::Connecting(mut connect) => {
match Pin::new(&mut connect).poll(cx) { match Pin::new(&mut connect).poll(cx) {
Poll::Ready(Ok(Ok(stream))) => { Poll::Ready(Ok(Ok(stream))) => {
let bound_jid = stream.jid.clone(); let bound_jid = stream.jid.clone();
@ -226,9 +222,7 @@ impl Stream for Client {
panic!("connect task: {}", e); panic!("connect task: {}", e);
} }
Poll::Pending => { Poll::Pending => {
let _ = Pin::new(&mut local).poll(cx); self.state = ClientState::Connecting(connect);
self.state = ClientState::Connecting(connect, local);
Poll::Pending Poll::Pending
} }
} }