Skip to content

Commit

Permalink
initialization code ok. tests in place.
Browse files Browse the repository at this point in the history
  • Loading branch information
perbu committed Nov 26, 2023
1 parent 7d3f853 commit 514a5e2
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 43 deletions.
13 changes: 6 additions & 7 deletions actor/deadletter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ func newDeadLetter() Receiver {
}
}

// Receive implements the Receiver interface, handling the deadletter messages.
// Todo: this will grow and grow. Maybe we want a limit on this?
func (d *deadLetter) Receive(ctx *Context) {
switch msg := ctx.Message().(type) {
case Started:
Expand All @@ -40,13 +42,10 @@ func (d *deadLetter) Receive(ctx *Context) {
if msg.Flush {
d.msgs = d.msgs[:0]
}
default:
case *DeadLetterEvent:
d.logger.Warnw("deadletter arrived", "msg", msg, "sender", ctx.Sender())
dl := DeadLetterEvent{
Target: nil, // todo: how to get the target?
Message: msg,
Sender: ctx.Sender(),
}
d.msgs = append(d.msgs, &dl)
d.msgs = append(d.msgs, msg)
default:
d.logger.Errorw("unknown message arrived", "msg", msg)
}
}
101 changes: 82 additions & 19 deletions actor/deadletter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,41 @@ import (
"time"
)

type testActor struct {
}
type testMessage struct {
data string
}
// TestDeadLetterDefault tests the default deadletter handling.
// It will spawn a new actor, kill it, send a message to it and then check if the deadletter
// received the message.
func TestDeadLetterDefault(t *testing.T) {
lh := log.NewHandler(os.Stdout, log.TextFormat, slog.LevelDebug)
e := NewEngine(EngineOptLogger(log.NewLogger("[engine]", lh)))
a1 := e.Spawn(newTestActor, "a1")
assert.NotNil(t, a1)
dl := e.Registry.getByID("deadletter")
assert.NotNil(t, dl) // should be registered by default
e.Poison(a1).Wait() // poison the a1 actor
e.Send(a1, testMessage{"bar"}) // should end up the deadletter queue
time.Sleep(time.Millisecond) // a flush would be nice here
// flushes the deadletter queue, and returns the messages:
resp, err := e.Request(dl.PID(), &DeadLetterFetch{Flush: true}, time.Millisecond*10).Result()
assert.Nil(t, err)
assert.NotNil(t, resp)
respDeadLetters, ok := resp.([]*DeadLetterEvent)
assert.True(t, ok) // should be a slice of deadletter events
assert.Equal(t, 1, len(respDeadLetters)) // should be one deadletter event
ev, ok := respDeadLetters[0].Message.(testMessage)
assert.True(t, ok) // should be a test message
assert.Equal(t, "bar", ev.data)

func newTestActor() Receiver {
return testActor{}
}
func (t testActor) Receive(ctx *Context) {
// do nothing
}

func TestDeadLetterDefault(t *testing.T) {
// TestDeadLetterCustom tests the custom deadletter handling.
// It will spawn a new actor, kill it, send a message to it and then check if the deadletter
// received the message.
// It is using the custom deadletter receiver below.
func TestDeadLetterCustom(t *testing.T) {
lh := log.NewHandler(os.Stdout, log.TextFormat, slog.LevelDebug)
e := NewEngine(EngineOptLogger(log.NewLogger("[engine]", lh)))
e := NewEngine(
EngineOptLogger(log.NewLogger("[engine]", lh)),
EngineOptDeadletter(newCustomDeadLetter))
a1 := e.Spawn(newTestActor, "a1")
assert.NotNil(t, a1)
dl := e.Registry.getByID("deadletter")
Expand All @@ -35,12 +54,56 @@ func TestDeadLetterDefault(t *testing.T) {
// should be in deadletter
fmt.Println("==== sending message via a1 to deadletter ====")
e.Send(a1, testMessage{"bar"})
time.Sleep(time.Millisecond * 50)
resp, err := e.Request(dl.PID(), &DeadLetterFetch{Flush: true}, time.Millisecond*1000).Result()
assert.Nil(t, err)
assert.NotNil(t, resp)
resp2, ok := resp.([]*DeadLetterEvent)
assert.True(t, ok)
assert.Equal(t, 1, len(resp2))
time.Sleep(time.Millisecond) // a flush would be nice here :-)
resp, err := e.Request(dl.PID(), &DeadLetterFetch{Flush: true}, time.Millisecond*10).Result()
assert.Nil(t, err) // no error from the request
assert.NotNil(t, resp) // we should get a response to our request
respDeadLetters, ok := resp.([]*DeadLetterEvent)
assert.True(t, ok) // got a slice of deadletter events
assert.Equal(t, 1, len(respDeadLetters)) // one deadletter event
ev, ok := respDeadLetters[0].Message.(testMessage)
assert.True(t, ok) // should be our test message
assert.Equal(t, "bar", ev.data)
}

type testActor struct{}
type testMessage struct {
data string
}

func newTestActor() Receiver {
return testActor{}
}
func (t testActor) Receive(ctx *Context) {
// do nothing
}

// customDeadLetter is a custom deadletter actor / receiver
type customDeadLetter struct {
deadLetters []*DeadLetterEvent
}

func newCustomDeadLetter() Receiver {
return &customDeadLetter{
deadLetters: make([]*DeadLetterEvent, 0),
}
}

// Receive implements the Receiver interface. This is a OK example of an actor that
// that deals with deadletters. It will store the deadletters in a slice.
func (c *customDeadLetter) Receive(ctx *Context) {
switch ctx.Message().(type) {
case *DeadLetterFlush:
c.deadLetters = c.deadLetters[:0]
case *DeadLetterFetch:
ctx.Respond(c.deadLetters)
case *DeadLetterEvent:
slog.Warn("received deadletter event")
msg, ok := ctx.Message().(*DeadLetterEvent)
if !ok {
slog.Error("failed to cast deadletter event")
return
}
c.deadLetters = append(c.deadLetters, msg)
}
}
36 changes: 26 additions & 10 deletions actor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ type Engine struct {
// You can pass an optional logger through
func NewEngine(opts ...func(*Engine)) *Engine {
e := &Engine{}
e.address = LocalLookupAddr
e.Registry = newRegistry(e) // need to init the registry in case we want a custom deadletter
e.EventStream = NewEventStream() //
for _, o := range opts {
o(e)
}
e.EventStream = NewEventStream(e.logger)
e.address = LocalLookupAddr
e.Registry = newRegistry(e)

// if no deadletter is registered, we will register the default deadletter from deadletter.go
if e.deadLetter == nil {
e.logger.Debugw("no deadletter receiver set, registering default")
Expand All @@ -53,6 +54,9 @@ func NewEngine(opts ...func(*Engine)) *Engine {
func EngineOptLogger(logger log.Logger) func(*Engine) {
return func(e *Engine) {
e.logger = logger
// This is a bit hacky, but we need to set the logger for the eventstream
// which cannot be set in the constructor since the logger is not set yet.
e.EventStream.logger = logger.SubLogger("[eventStream]")
}
}

Expand All @@ -63,9 +67,9 @@ func EngineOptPidSeparator(sep string) func(*Engine) {
}
}

func EngineOptDeadletter(deadletter *PID) func(*Engine) {
func EngineOptDeadletter(d Producer) func(*Engine) {
return func(e *Engine) {
e.deadLetter = deadletter
e.deadLetter = e.Spawn(d, "deadletter")
}
}

Expand Down Expand Up @@ -203,9 +207,16 @@ func (e *Engine) Poison(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup {
}
_wg.Add(1)
proc := e.Registry.get(pid)
if proc != nil {
e.SendLocal(pid, poisonPill{_wg}, nil)
if proc == nil {
// send a deadletter message
e.Send(e.deadLetter, &DeadLetterEvent{
Target: pid,
Message: poisonPill{_wg},
Sender: nil,
})
return _wg
}
e.SendLocal(pid, poisonPill{_wg}, nil)
return _wg
}

Expand All @@ -214,11 +225,16 @@ func (e *Engine) Poison(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup {
// process registered, the function will panic.
func (e *Engine) SendLocal(pid *PID, msg any, sender *PID) {
proc := e.Registry.get(pid)
if proc != nil {
proc.Send(pid, msg, sender)
if proc == nil {
// send a deadletter message
e.Send(e.deadLetter, &DeadLetterEvent{
Target: pid,
Message: msg,
Sender: sender,
})
return
}
panic("no way to handle message (didn't find deadletter)")
proc.Send(pid, msg, sender)
}

func (e *Engine) isLocalMessage(pid *PID) bool {
Expand Down
5 changes: 2 additions & 3 deletions actor/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ type EventStream struct {
logger log.Logger
}

func NewEventStream(l log.Logger) *EventStream {
func NewEventStream() *EventStream {
return &EventStream{
subs: make(map[*EventSub]EventStreamFunc),
logger: l.SubLogger("[eventStream]"),
subs: make(map[*EventSub]EventStreamFunc),
}
}

Expand Down
11 changes: 7 additions & 4 deletions actor/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ func (r *Registry) get(pid *PID) Processer {
if proc, ok := r.lookup[pid.ID]; ok {
return proc
}
if proc, ok := r.lookup["deadletter"]; ok {
return proc
}
panic("no deadletter registered")
/*
if proc, ok := r.lookup["deadletter"]; ok {
return proc
}
*/
return nil // didn't find the processer
}

func (r *Registry) getByID(id string) Processer {
Expand Down

0 comments on commit 514a5e2

Please sign in to comment.