@@ -316,6 +316,7 @@ pub(crate) mod test_util {
316
316
Cast <( String , PortRef <String >) >,
317
317
Cast <GetRank >,
318
318
Cast <Error >,
319
+ GetRank ,
319
320
Relay ,
320
321
IndexedErasedUnbound <Cast <( String , PortRef <String >) >>,
321
322
IndexedErasedUnbound <Cast <GetRank >>,
@@ -362,6 +363,20 @@ pub(crate) mod test_util {
362
363
}
363
364
}
364
365
366
+ #[ async_trait]
367
+ impl Handler < GetRank > for TestActor {
368
+ async fn handle (
369
+ & mut self ,
370
+ this : & Instance < Self > ,
371
+ GetRank ( ok, reply) : GetRank ,
372
+ ) -> Result < ( ) , anyhow:: Error > {
373
+ let rank = this. self_id ( ) . rank ( ) ;
374
+ reply. send ( this, rank) ?;
375
+ anyhow:: ensure!( ok, "intentional error!" ) ; // If `!ok` exit with `Err()`.
376
+ Ok ( ( ) )
377
+ }
378
+ }
379
+
365
380
#[ async_trait]
366
381
impl Handler < Cast < ( String , PortRef < String > ) > > for TestActor {
367
382
async fn handle (
@@ -662,6 +677,60 @@ mod tests {
662
677
) ;
663
678
assert ! ( events. next( ) . await . is_none( ) ) ;
664
679
}
680
+
681
+ #[ tracing_test:: traced_test]
682
+ #[ tokio:: test]
683
+ async fn test_behaviors_on_failed_send ( ) {
684
+ use hyperactor:: ActorId ;
685
+ use hyperactor:: ProcId ;
686
+ use hyperactor:: WorldId ;
687
+
688
+ let alloc = LocalAllocator
689
+ . allocate ( AllocSpec {
690
+ shape : shape ! { replica = 1 } ,
691
+ constraints : Default :: default ( ) ,
692
+ } )
693
+ . await
694
+ . unwrap ( ) ;
695
+
696
+ let stop = alloc. stopper ( ) ;
697
+ let name = alloc. name ( ) . to_string ( ) ;
698
+ let mesh = ProcMesh :: allocate ( alloc) . await . unwrap ( ) ;
699
+
700
+ let actor_mesh = mesh. spawn :: < TestActor > ( "foo" , & ( ) ) . await . unwrap ( ) ;
701
+
702
+ let ( reply_handle, mut reply_receiver) = actor_mesh. open_port ( ) ;
703
+
704
+ // Send a message to an actor that exists.
705
+ let foo: ActorRef < TestActor > =
706
+ ActorRef :: attest ( ActorId ( ProcId ( WorldId ( name. clone ( ) ) , 0 ) , "foo" . into ( ) , 0 ) ) ;
707
+ foo. send ( mesh. client ( ) , GetRank ( true , reply_handle. bind ( ) ) )
708
+ . unwrap ( ) ;
709
+ // This is ok.
710
+ let rank = reply_receiver. recv ( ) . await . unwrap ( ) ;
711
+ assert_eq ! ( rank, 0 ) ;
712
+
713
+ // Send a message to an actor that doesn't exist but the proc does.
714
+ let _bar: ActorRef < TestActor > =
715
+ ActorRef :: attest ( ActorId ( ProcId ( WorldId ( name. clone ( ) ) , 0 ) , "foo" . into ( ) , 1 ) ) ;
716
+ // Uncomment for infinite hang.
717
+ /*
718
+ // Message gets logged.
719
+ // delivery error: address not routable: no mailbox for actor _1ku59VQkwPSM[0].foo[1] registered in muxer
720
+ bar.send(mesh.client(), GetRank(true, reply_handle.bind())).unwrap();
721
+ */
722
+
723
+ // Send a message to an actor on a proc that doesn't exist.
724
+ let _baz: ActorRef < TestActor > =
725
+ ActorRef :: attest ( ActorId ( ProcId ( WorldId ( name. clone ( ) ) , 1 ) , "foo" . into ( ) , 0 ) ) ;
726
+ // Uncomment for stack overflow.
727
+ /*
728
+ baz.send(mesh.client(), GetRank(true, reply_handle.bind())).unwrap();
729
+ */
730
+
731
+ // Stop the mesh.
732
+ stop ( ) ;
733
+ }
665
734
} // mod local
666
735
667
736
mod process {
0 commit comments