diff --git a/actor/inbox.go b/actor/inbox.go index 384ce24..5adb607 100644 --- a/actor/inbox.go +++ b/actor/inbox.go @@ -13,9 +13,10 @@ const ( ) const ( - idle int32 = iota + stopped int32 = iota + starting + idle running - stopped ) type Scheduler interface { @@ -52,8 +53,9 @@ type Inbox struct { func NewInbox(size int) *Inbox { return &Inbox{ - rb: ringbuffer.New[Envelope](int64(size)), - scheduler: NewScheduler(defaultThroughput), + rb: ringbuffer.New[Envelope](int64(size)), + scheduler: NewScheduler(defaultThroughput), + procStatus: stopped, } } @@ -91,7 +93,12 @@ func (in *Inbox) run() { } func (in *Inbox) Start(proc Processer) { - in.proc = proc + // transition to "starting" and then "idle" to ensure no race condition on in.proc + if atomic.CompareAndSwapInt32(&in.procStatus, stopped, starting) { + in.proc = proc + atomic.SwapInt32(&in.procStatus, idle) + in.schedule() + } } func (in *Inbox) Stop() error { diff --git a/actor/process.go b/actor/process.go index 65e6c9f..3de42b7 100644 --- a/actor/process.go +++ b/actor/process.go @@ -45,7 +45,6 @@ func newProcess(e *Engine, opts Opts) *process { context: ctx, mbuffer: nil, } - p.inbox.Start(p) return p } @@ -138,6 +137,8 @@ func (p *process) Start() { p.Invoke(p.mbuffer) p.mbuffer = nil } + + p.inbox.Start(p) } func (p *process) tryRestart(v any) { diff --git a/actor/process_test.go b/actor/process_test.go index 9256f3d..c4fcb9a 100644 --- a/actor/process_test.go +++ b/actor/process_test.go @@ -48,3 +48,72 @@ func Test_CleanTrace(t *testing.T) { func panicWrapper() { panic("foo") } + +func Test_StartupMessages(t *testing.T) { + e, err := NewEngine(NewEngineConfig()) + require.NoError(t, err) + + type testMsg struct{} + + timeout := time.After(1 * time.Second) + msgCh := make(chan any, 10) // used to check the msg order + syncCh1 := make(chan struct{}) + syncCh2 := make(chan struct{}) + + go func() { + e.SpawnFunc(func(c *Context) { + fmt.Printf("Got message type %T\n", c.Message()) + switch c.Message().(type) { + case Initialized: + syncCh1 <- struct{}{} + // wait for testMsg to send + select { + case <-syncCh2: + case <-timeout: + t.Error("test timed out") + } + } + msgCh <- c.Message() + }, "foo", WithID("bar")) + }() + + // wait for actor to initialize + select { + case <-syncCh1: + case <-timeout: + t.Error("test timed out") + return + } + + pid := e.Registry.GetPID("foo", "bar") + e.Send(pid, testMsg{}) + syncCh2 <- struct{}{} + + // check that message order is as expected + select { + case msg := <-msgCh: + _, ok := msg.(Initialized) + require.True(t, ok) + case <-timeout: + t.Error("test timed out") + return + } + + select { + case msg := <-msgCh: + _, ok := msg.(Started) + require.True(t, ok) + case <-timeout: + t.Error("test timed out") + return + } + + select { + case msg := <-msgCh: + _, ok := msg.(testMsg) + require.True(t, ok) + case <-timeout: + t.Error("test timed out") + return + } +}