From 514e7256ae58986bfee5bbd682cbecbadbe958b7 Mon Sep 17 00:00:00 2001 From: Hendrik Sollich Date: Tue, 19 Oct 2021 20:47:06 +0200 Subject: [PATCH 1/2] Add demonstrator for dangling callers --- examples/caller.rs | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 examples/caller.rs diff --git a/examples/caller.rs b/examples/caller.rs new file mode 100644 index 0000000..d00a690 --- /dev/null +++ b/examples/caller.rs @@ -0,0 +1,40 @@ +use xactor::*; + +/// Define `Ping` message +#[message(result = "usize")] +struct Ping(usize); + +/// Actor +struct MyActor { + count: usize, +} + +/// Declare actor and its context +impl Actor for MyActor {} + +/// Handler for `Ping` message +#[async_trait::async_trait] +impl Handler for MyActor { + async fn handle(&mut self, _ctx: &mut Context, msg: Ping) -> usize { + self.count += msg.0; + self.count + } +} + +#[xactor::main] +async fn main() -> Result<()> { + // start new actor + let addr = MyActor { count: 10 }.start().await?; + + let sender: Caller = addr.caller(); + + let res = sender.call(Ping(10)).await?; + println!("RESULT: {}", res == 20); + + std::mem::drop(addr); + + let res = sender.call(Ping(10)).await?; + println!("RESULT: {}", res == 20); + + Ok(()) +} From 7c266de89681e626699f50add836f407308afe5d Mon Sep 17 00:00:00 2001 From: Hendrik Sollich Date: Tue, 19 Oct 2021 21:09:37 +0200 Subject: [PATCH 2/2] add can_upgrade() method to Caller and Sender --- examples/caller.rs | 9 +++-- src/addr.rs | 97 +++++++++++++++++++++++++--------------------- src/caller.rs | 10 +++++ 3 files changed, 69 insertions(+), 47 deletions(-) diff --git a/examples/caller.rs b/examples/caller.rs index d00a690..4e2141c 100644 --- a/examples/caller.rs +++ b/examples/caller.rs @@ -26,14 +26,17 @@ async fn main() -> Result<()> { // start new actor let addr = MyActor { count: 10 }.start().await?; - let sender: Caller = addr.caller(); + let caller: Caller = addr.caller(); - let res = sender.call(Ping(10)).await?; + let res = caller.call(Ping(10)).await?; println!("RESULT: {}", res == 20); + + println!("caller can upgrade: {}", caller.can_upgrade()); std::mem::drop(addr); + println!("caller can upgrade: {}", caller.can_upgrade()); - let res = sender.call(Ping(10)).await?; + let res = caller.call(Ping(10)).await?; println!("RESULT: {}", res == 20); Ok(()) diff --git a/src/addr.rs b/src/addr.rs index a81c249..22a37a3 100644 --- a/src/addr.rs +++ b/src/addr.rs @@ -1,3 +1,4 @@ +use crate::caller::CallerFn; use crate::{Actor, ActorId, Caller, Context, Error, Handler, Message, Result, Sender}; use futures::channel::{mpsc, oneshot}; use futures::future::Shared; @@ -110,30 +111,35 @@ impl Addr { A: Handler, { let weak_tx = Arc::downgrade(&self.tx); + let caller_fn: Mutex> = Mutex::new(Box::new(move |msg| { + let weak_tx_option = weak_tx.upgrade(); + Box::pin(async move { + match weak_tx_option { + Some(tx) => { + let (oneshot_tx, oneshot_rx) = oneshot::channel(); + + mpsc::UnboundedSender::clone(&tx).start_send(ActorEvent::Exec( + Box::new(move |actor, ctx| { + Box::pin(async move { + let res = Handler::handle(&mut *actor, ctx, msg).await; + let _ = oneshot_tx.send(res); + }) + }), + ))?; + Ok(oneshot_rx.await?) + } + None => Err(crate::error::anyhow!("Actor Dropped")), + } + }) + })); + + let weak_tx = Arc::downgrade(&self.tx); + let test_fn = Box::new(move || weak_tx.strong_count() > 0); Caller { - actor_id: self.actor_id.clone(), - caller_fn: Mutex::new(Box::new(move |msg| { - let weak_tx_option = weak_tx.upgrade(); - Box::pin(async move { - match weak_tx_option { - Some(tx) => { - let (oneshot_tx, oneshot_rx) = oneshot::channel(); - - mpsc::UnboundedSender::clone(&tx).start_send(ActorEvent::Exec( - Box::new(move |actor, ctx| { - Box::pin(async move { - let res = Handler::handle(&mut *actor, ctx, msg).await; - let _ = oneshot_tx.send(res); - }) - }), - ))?; - Ok(oneshot_rx.await?) - } - None => Err(crate::error::anyhow!("Actor Dropped")), - } - }) - })), + actor_id: self.actor_id, + caller_fn, + test_fn, } } @@ -143,21 +149,27 @@ impl Addr { A: Handler, { let weak_tx = Arc::downgrade(&self.tx); + let sender_fn = Box::new(move |msg| match weak_tx.upgrade() { + Some(tx) => { + mpsc::UnboundedSender::clone(&tx).start_send(ActorEvent::Exec(Box::new( + move |actor, ctx| { + Box::pin(async move { + Handler::handle(&mut *actor, ctx, msg).await; + }) + }, + )))?; + Ok(()) + } + None => Ok(()), + }); + + let weak_tx = Arc::downgrade(&self.tx); + let test_fn = Box::new(move || weak_tx.strong_count() > 0); + Sender { - actor_id: self.actor_id.clone(), - sender_fn: Box::new(move |msg| match weak_tx.upgrade() { - Some(tx) => { - mpsc::UnboundedSender::clone(&tx).start_send(ActorEvent::Exec(Box::new( - move |actor, ctx| { - Box::pin(async move { - Handler::handle(&mut *actor, ctx, msg).await; - }) - }, - )))?; - Ok(()) - } - None => Ok(()), - }), + actor_id: self.actor_id, + sender_fn, + test_fn, } } @@ -191,14 +203,11 @@ impl Hash for WeakAddr { impl WeakAddr { pub fn upgrade(&self) -> Option> { - match self.tx.upgrade() { - Some(tx) => Some(Addr { - actor_id: self.actor_id, - tx, - rx_exit: self.rx_exit.clone(), - }), - None => None, - } + self.tx.upgrade().map(|tx| Addr { + actor_id: self.actor_id, + tx, + rx_exit: self.rx_exit.clone(), + }) } } diff --git a/src/caller.rs b/src/caller.rs index b70a03a..550b984 100644 --- a/src/caller.rs +++ b/src/caller.rs @@ -11,6 +11,8 @@ pub(crate) type CallerFn = Box CallerFuture + Send + 'static> pub(crate) type SenderFn = Box Result<()> + 'static + Send>; +pub(crate) type TestFn = Box bool + 'static + Send>; + /// Caller of a specific message type /// /// Like `Sender, Caller has a weak reference to the recipient of the message type, and so will not prevent an actor from stopping if all Addr's have been dropped elsewhere. @@ -18,12 +20,16 @@ pub(crate) type SenderFn = Box Result<()> + 'static + Send>; pub struct Caller { pub actor_id: ActorId, pub(crate) caller_fn: Mutex>, + pub(crate) test_fn: TestFn, } impl Caller { pub fn call(&self, msg: T) -> CallerFuture { (self.caller_fn.lock().unwrap())(msg) } + pub fn can_upgrade(&self) -> bool { + (self.test_fn)() + } } impl> PartialEq for Caller { @@ -46,12 +52,16 @@ impl> Hash for Caller { pub struct Sender { pub actor_id: ActorId, pub(crate) sender_fn: SenderFn, + pub(crate) test_fn: TestFn, } impl> Sender { pub fn send(&self, msg: T) -> Result<()> { (self.sender_fn)(msg) } + pub fn can_upgrade(&self) -> bool { + (self.test_fn)() + } } impl> PartialEq for Sender {