diff --git a/pkg/plugin/builtin/event/event.go b/pkg/plugin/builtin/event/event.go index c1f3ded..872b7c8 100644 --- a/pkg/plugin/builtin/event/event.go +++ b/pkg/plugin/builtin/event/event.go @@ -3,6 +3,7 @@ package event // import "hookt.dev/cmd/pkg/plugin/builtin/event" import ( "context" "log/slog" + "maps" "strconv" "time" @@ -17,14 +18,14 @@ type Plugin struct { wire.Config p *proto.P - c map[chan proto.Message]string + c map[chan proto.Message]chan struct{} mux chan proto.Message stop chan (chan proto.Message) } func New(opts ...func(*Plugin)) *Plugin { p := &Plugin{ - c: make(map[chan proto.Message]string), + c: make(map[chan proto.Message]chan struct{}), mux: make(chan proto.Message), stop: make(chan chan proto.Message), } @@ -89,12 +90,17 @@ func (p *Plugin) process() { for { select { case c := <-p.stop: + done := p.c[c] + close(done) delete(p.c, c) - close(c) case msg := <-p.mux: + ch := maps.Clone(p.c) go func() { - for c := range p.c { - c <- msg + for c, done := range ch { + select { + case <-done: + case c <- msg: + } } }() } @@ -103,21 +109,24 @@ func (p *Plugin) process() { func (p *Plugin) Step(ctx context.Context) any { c := make(chan proto.Message) - p.c[c] = trace.Get(ctx, "step") + done := make(chan struct{}) + p.c[c] = done it, _ := time.ParseDuration(p.Config.InactiveTimeout) return &Step{ - p: p, - c: c, - it: nonempty(it, 1*time.Minute), + p: p, + c: c, + done: done, + it: nonempty(it, 1*time.Minute), } } type Step struct { wire.Step - p *Plugin - c chan proto.Message - it time.Duration + p *Plugin + c chan proto.Message + done chan struct{} + it time.Duration } func group(ctx context.Context, name string) context.Context { @@ -213,7 +222,12 @@ func (s *Step) Stop() { } func (s *Step) drain() { - for range s.c { + for { + select { + case <-s.c: + case <-s.done: + return + } } }