Skip to content

Commit

Permalink
batched invoke
Browse files Browse the repository at this point in the history
  • Loading branch information
anthdm committed Nov 29, 2023
1 parent 58f1195 commit d567219
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 10 deletions.
22 changes: 22 additions & 0 deletions actor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,28 @@ import (
"github.com/stretchr/testify/require"
)

func TestPoisonShouldFlush(t *testing.T) {
e := NewEngine()
pid := e.SpawnFunc(func(c *Context) {
switch c.Message().(type) {
case Started:
fmt.Println("started")
case Stopped:
fmt.Println("stopped")
}
}, "foo")

go func() {
for {
e.Send(pid, "themessage")
}
}()

time.Sleep(time.Second)
e.Poison(pid)
time.Sleep(time.Second * 2)
}

type tick struct{}
type tickReceiver struct {
ticks int
Expand Down
5 changes: 3 additions & 2 deletions actor/inbox.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package actor

import (
"math"
"runtime"
"sync/atomic"

Expand Down Expand Up @@ -79,8 +80,8 @@ func (in *Inbox) run() {
}
i++

if msg, ok := in.rb.Pop(); ok {
in.proc.Invoke([]Envelope{msg})
if msgs, ok := in.rb.PopN(math.MaxInt); ok && len(msgs) > 0 {
in.proc.Invoke(msgs)
} else {
return
}
Expand Down
25 changes: 17 additions & 8 deletions actor/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ func (p *process) Invoke(msgs []Envelope) {
nmsg = len(msgs)
// numbers of msgs that are processed.
nproc = 0
// TODO/FIXME: figure out why using nproc at the bottom
// of the function freezes tests.
processed = 0
)
defer func() {
// If we recovered, we buffer up all the messages that we could not process
Expand All @@ -82,17 +85,23 @@ func (p *process) Invoke(msgs []Envelope) {
nproc++
msg := msgs[i]
if pill, ok := msg.Msg.(poisonPill); ok {
fmt.Printf("Need to stop but still need to process %d messages out of %d\n", len(msgs)-processed, len(msgs))
p.cleanup(pill.wg)
return
}
p.context.message = msg.Msg
p.context.sender = msg.Sender
recv := p.context.receiver
if len(p.Opts.Middleware) > 0 {
applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context)
} else {
recv.Receive(p.context)
}
p.invokeMsg(msg)
processed++
}
}

func (p *process) invokeMsg(msg Envelope) {
p.context.message = msg.Msg
p.context.sender = msg.Sender
recv := p.context.receiver
if len(p.Opts.Middleware) > 0 {
applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context)
} else {
recv.Receive(p.context)
}
}

Expand Down

0 comments on commit d567219

Please sign in to comment.