Skip to content

Commit

Permalink
Fix race condition during actor startup sequence (#160)
Browse files Browse the repository at this point in the history
* fixed race condition on actor context when starting the actor

* added test case and fixed bug

* added extra protections against race conditions

* removed sleeps from Test_StartupMessages
  • Loading branch information
troygilman0 authored Aug 18, 2024
1 parent 393374f commit 0921477
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 6 deletions.
17 changes: 12 additions & 5 deletions actor/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ const (
)

const (
idle int32 = iota
stopped int32 = iota
starting
idle
running
stopped
)

type Scheduler interface {
Expand Down Expand Up @@ -52,8 +53,9 @@ type Inbox struct {

func NewInbox(size int) *Inbox {
return &Inbox{
rb: ringbuffer.New[Envelope](int64(size)),
scheduler: NewScheduler(defaultThroughput),
rb: ringbuffer.New[Envelope](int64(size)),
scheduler: NewScheduler(defaultThroughput),
procStatus: stopped,
}
}

Expand Down Expand Up @@ -91,7 +93,12 @@ func (in *Inbox) run() {
}

func (in *Inbox) Start(proc Processer) {
in.proc = proc
// transition to "starting" and then "idle" to ensure no race condition on in.proc
if atomic.CompareAndSwapInt32(&in.procStatus, stopped, starting) {
in.proc = proc
atomic.SwapInt32(&in.procStatus, idle)
in.schedule()
}
}

func (in *Inbox) Stop() error {
Expand Down
3 changes: 2 additions & 1 deletion actor/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func newProcess(e *Engine, opts Opts) *process {
context: ctx,
mbuffer: nil,
}
p.inbox.Start(p)
return p
}

Expand Down Expand Up @@ -138,6 +137,8 @@ func (p *process) Start() {
p.Invoke(p.mbuffer)
p.mbuffer = nil
}

p.inbox.Start(p)
}

func (p *process) tryRestart(v any) {
Expand Down
69 changes: 69 additions & 0 deletions actor/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,72 @@ func Test_CleanTrace(t *testing.T) {
func panicWrapper() {
panic("foo")
}

func Test_StartupMessages(t *testing.T) {
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)

type testMsg struct{}

timeout := time.After(1 * time.Second)
msgCh := make(chan any, 10) // used to check the msg order
syncCh1 := make(chan struct{})
syncCh2 := make(chan struct{})

go func() {
e.SpawnFunc(func(c *Context) {
fmt.Printf("Got message type %T\n", c.Message())
switch c.Message().(type) {
case Initialized:
syncCh1 <- struct{}{}
// wait for testMsg to send
select {
case <-syncCh2:
case <-timeout:
t.Error("test timed out")
}
}
msgCh <- c.Message()
}, "foo", WithID("bar"))
}()

// wait for actor to initialize
select {
case <-syncCh1:
case <-timeout:
t.Error("test timed out")
return
}

pid := e.Registry.GetPID("foo", "bar")
e.Send(pid, testMsg{})
syncCh2 <- struct{}{}

// check that message order is as expected
select {
case msg := <-msgCh:
_, ok := msg.(Initialized)
require.True(t, ok)
case <-timeout:
t.Error("test timed out")
return
}

select {
case msg := <-msgCh:
_, ok := msg.(Started)
require.True(t, ok)
case <-timeout:
t.Error("test timed out")
return
}

select {
case msg := <-msgCh:
_, ok := msg.(testMsg)
require.True(t, ok)
case <-timeout:
t.Error("test timed out")
return
}
}

0 comments on commit 0921477

Please sign in to comment.