From 514a5e2ecdbebf553e7432dcce4282d100db7e3b Mon Sep 17 00:00:00 2001 From: Per Buer Date: Sun, 26 Nov 2023 21:51:06 +0100 Subject: [PATCH] initialization code ok. tests in place. --- actor/deadletter.go | 13 +++-- actor/deadletter_test.go | 101 +++++++++++++++++++++++++++++++-------- actor/engine.go | 36 ++++++++++---- actor/event_stream.go | 5 +- actor/registry.go | 11 +++-- 5 files changed, 123 insertions(+), 43 deletions(-) diff --git a/actor/deadletter.go b/actor/deadletter.go index 068430c..b54d3dd 100644 --- a/actor/deadletter.go +++ b/actor/deadletter.go @@ -21,6 +21,8 @@ func newDeadLetter() Receiver { } } +// Receive implements the Receiver interface, handling the deadletter messages. +// Todo: this will grow and grow. Maybe we want a limit on this? func (d *deadLetter) Receive(ctx *Context) { switch msg := ctx.Message().(type) { case Started: @@ -40,13 +42,10 @@ func (d *deadLetter) Receive(ctx *Context) { if msg.Flush { d.msgs = d.msgs[:0] } - default: + case *DeadLetterEvent: d.logger.Warnw("deadletter arrived", "msg", msg, "sender", ctx.Sender()) - dl := DeadLetterEvent{ - Target: nil, // todo: how to get the target? - Message: msg, - Sender: ctx.Sender(), - } - d.msgs = append(d.msgs, &dl) + d.msgs = append(d.msgs, msg) + default: + d.logger.Errorw("unknown message arrived", "msg", msg) } } diff --git a/actor/deadletter_test.go b/actor/deadletter_test.go index 9cd20bf..b1d843a 100644 --- a/actor/deadletter_test.go +++ b/actor/deadletter_test.go @@ -10,22 +10,41 @@ import ( "time" ) -type testActor struct { -} -type testMessage struct { - data string -} +// TestDeadLetterDefault tests the default deadletter handling. +// It will spawn a new actor, kill it, send a message to it and then check if the deadletter +// received the message. +func TestDeadLetterDefault(t *testing.T) { + lh := log.NewHandler(os.Stdout, log.TextFormat, slog.LevelDebug) + e := NewEngine(EngineOptLogger(log.NewLogger("[engine]", lh))) + a1 := e.Spawn(newTestActor, "a1") + assert.NotNil(t, a1) + dl := e.Registry.getByID("deadletter") + assert.NotNil(t, dl) // should be registered by default + e.Poison(a1).Wait() // poison the a1 actor + e.Send(a1, testMessage{"bar"}) // should end up the deadletter queue + time.Sleep(time.Millisecond) // a flush would be nice here + // flushes the deadletter queue, and returns the messages: + resp, err := e.Request(dl.PID(), &DeadLetterFetch{Flush: true}, time.Millisecond*10).Result() + assert.Nil(t, err) + assert.NotNil(t, resp) + respDeadLetters, ok := resp.([]*DeadLetterEvent) + assert.True(t, ok) // should be a slice of deadletter events + assert.Equal(t, 1, len(respDeadLetters)) // should be one deadletter event + ev, ok := respDeadLetters[0].Message.(testMessage) + assert.True(t, ok) // should be a test message + assert.Equal(t, "bar", ev.data) -func newTestActor() Receiver { - return testActor{} -} -func (t testActor) Receive(ctx *Context) { - // do nothing } -func TestDeadLetterDefault(t *testing.T) { +// TestDeadLetterCustom tests the custom deadletter handling. +// It will spawn a new actor, kill it, send a message to it and then check if the deadletter +// received the message. +// It is using the custom deadletter receiver below. +func TestDeadLetterCustom(t *testing.T) { lh := log.NewHandler(os.Stdout, log.TextFormat, slog.LevelDebug) - e := NewEngine(EngineOptLogger(log.NewLogger("[engine]", lh))) + e := NewEngine( + EngineOptLogger(log.NewLogger("[engine]", lh)), + EngineOptDeadletter(newCustomDeadLetter)) a1 := e.Spawn(newTestActor, "a1") assert.NotNil(t, a1) dl := e.Registry.getByID("deadletter") @@ -35,12 +54,56 @@ func TestDeadLetterDefault(t *testing.T) { // should be in deadletter fmt.Println("==== sending message via a1 to deadletter ====") e.Send(a1, testMessage{"bar"}) - time.Sleep(time.Millisecond * 50) - resp, err := e.Request(dl.PID(), &DeadLetterFetch{Flush: true}, time.Millisecond*1000).Result() - assert.Nil(t, err) - assert.NotNil(t, resp) - resp2, ok := resp.([]*DeadLetterEvent) - assert.True(t, ok) - assert.Equal(t, 1, len(resp2)) + time.Sleep(time.Millisecond) // a flush would be nice here :-) + resp, err := e.Request(dl.PID(), &DeadLetterFetch{Flush: true}, time.Millisecond*10).Result() + assert.Nil(t, err) // no error from the request + assert.NotNil(t, resp) // we should get a response to our request + respDeadLetters, ok := resp.([]*DeadLetterEvent) + assert.True(t, ok) // got a slice of deadletter events + assert.Equal(t, 1, len(respDeadLetters)) // one deadletter event + ev, ok := respDeadLetters[0].Message.(testMessage) + assert.True(t, ok) // should be our test message + assert.Equal(t, "bar", ev.data) +} + +type testActor struct{} +type testMessage struct { + data string +} + +func newTestActor() Receiver { + return testActor{} +} +func (t testActor) Receive(ctx *Context) { + // do nothing +} + +// customDeadLetter is a custom deadletter actor / receiver +type customDeadLetter struct { + deadLetters []*DeadLetterEvent +} + +func newCustomDeadLetter() Receiver { + return &customDeadLetter{ + deadLetters: make([]*DeadLetterEvent, 0), + } +} +// Receive implements the Receiver interface. This is a OK example of an actor that +// that deals with deadletters. It will store the deadletters in a slice. +func (c *customDeadLetter) Receive(ctx *Context) { + switch ctx.Message().(type) { + case *DeadLetterFlush: + c.deadLetters = c.deadLetters[:0] + case *DeadLetterFetch: + ctx.Respond(c.deadLetters) + case *DeadLetterEvent: + slog.Warn("received deadletter event") + msg, ok := ctx.Message().(*DeadLetterEvent) + if !ok { + slog.Error("failed to cast deadletter event") + return + } + c.deadLetters = append(c.deadLetters, msg) + } } diff --git a/actor/engine.go b/actor/engine.go index bb3481f..d5a124c 100644 --- a/actor/engine.go +++ b/actor/engine.go @@ -36,12 +36,13 @@ type Engine struct { // You can pass an optional logger through func NewEngine(opts ...func(*Engine)) *Engine { e := &Engine{} + e.address = LocalLookupAddr + e.Registry = newRegistry(e) // need to init the registry in case we want a custom deadletter + e.EventStream = NewEventStream() // for _, o := range opts { o(e) } - e.EventStream = NewEventStream(e.logger) - e.address = LocalLookupAddr - e.Registry = newRegistry(e) + // if no deadletter is registered, we will register the default deadletter from deadletter.go if e.deadLetter == nil { e.logger.Debugw("no deadletter receiver set, registering default") @@ -53,6 +54,9 @@ func NewEngine(opts ...func(*Engine)) *Engine { func EngineOptLogger(logger log.Logger) func(*Engine) { return func(e *Engine) { e.logger = logger + // This is a bit hacky, but we need to set the logger for the eventstream + // which cannot be set in the constructor since the logger is not set yet. + e.EventStream.logger = logger.SubLogger("[eventStream]") } } @@ -63,9 +67,9 @@ func EngineOptPidSeparator(sep string) func(*Engine) { } } -func EngineOptDeadletter(deadletter *PID) func(*Engine) { +func EngineOptDeadletter(d Producer) func(*Engine) { return func(e *Engine) { - e.deadLetter = deadletter + e.deadLetter = e.Spawn(d, "deadletter") } } @@ -203,9 +207,16 @@ func (e *Engine) Poison(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup { } _wg.Add(1) proc := e.Registry.get(pid) - if proc != nil { - e.SendLocal(pid, poisonPill{_wg}, nil) + if proc == nil { + // send a deadletter message + e.Send(e.deadLetter, &DeadLetterEvent{ + Target: pid, + Message: poisonPill{_wg}, + Sender: nil, + }) + return _wg } + e.SendLocal(pid, poisonPill{_wg}, nil) return _wg } @@ -214,11 +225,16 @@ func (e *Engine) Poison(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup { // process registered, the function will panic. func (e *Engine) SendLocal(pid *PID, msg any, sender *PID) { proc := e.Registry.get(pid) - if proc != nil { - proc.Send(pid, msg, sender) + if proc == nil { + // send a deadletter message + e.Send(e.deadLetter, &DeadLetterEvent{ + Target: pid, + Message: msg, + Sender: sender, + }) return } - panic("no way to handle message (didn't find deadletter)") + proc.Send(pid, msg, sender) } func (e *Engine) isLocalMessage(pid *PID) bool { diff --git a/actor/event_stream.go b/actor/event_stream.go index 9b16898..7d0b8b4 100644 --- a/actor/event_stream.go +++ b/actor/event_stream.go @@ -20,10 +20,9 @@ type EventStream struct { logger log.Logger } -func NewEventStream(l log.Logger) *EventStream { +func NewEventStream() *EventStream { return &EventStream{ - subs: make(map[*EventSub]EventStreamFunc), - logger: l.SubLogger("[eventStream]"), + subs: make(map[*EventSub]EventStreamFunc), } } diff --git a/actor/registry.go b/actor/registry.go index 56a1c4a..4fc9388 100644 --- a/actor/registry.go +++ b/actor/registry.go @@ -35,10 +35,13 @@ func (r *Registry) get(pid *PID) Processer { if proc, ok := r.lookup[pid.ID]; ok { return proc } - if proc, ok := r.lookup["deadletter"]; ok { - return proc - } - panic("no deadletter registered") + /* + if proc, ok := r.lookup["deadletter"]; ok { + return proc + } + + */ + return nil // didn't find the processer } func (r *Registry) getByID(id string) Processer {