Skip to content

Commit

Permalink
🤟 plugin/event: fix data race
Browse files Browse the repository at this point in the history
  • Loading branch information
rjeczalik committed Jun 30, 2024
1 parent fccee46 commit 5c90647
Showing 1 changed file with 27 additions and 13 deletions.
40 changes: 27 additions & 13 deletions pkg/plugin/builtin/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package event // import "hookt.dev/cmd/pkg/plugin/builtin/event"
import (
"context"
"log/slog"
"maps"
"strconv"
"time"

Expand All @@ -17,14 +18,14 @@ type Plugin struct {
wire.Config

p *proto.P
c map[chan proto.Message]string
c map[chan proto.Message]chan struct{}
mux chan proto.Message
stop chan (chan proto.Message)
}

func New(opts ...func(*Plugin)) *Plugin {
p := &Plugin{
c: make(map[chan proto.Message]string),
c: make(map[chan proto.Message]chan struct{}),
mux: make(chan proto.Message),
stop: make(chan chan proto.Message),
}
Expand Down Expand Up @@ -89,12 +90,17 @@ func (p *Plugin) process() {
for {
select {
case c := <-p.stop:
done := p.c[c]
close(done)
delete(p.c, c)
close(c)
case msg := <-p.mux:
ch := maps.Clone(p.c)
go func() {
for c := range p.c {
c <- msg
for c, done := range ch {
select {
case <-done:
case c <- msg:
}
}
}()
}
Expand All @@ -103,21 +109,24 @@ func (p *Plugin) process() {

func (p *Plugin) Step(ctx context.Context) any {
c := make(chan proto.Message)
p.c[c] = trace.Get(ctx, "step")
done := make(chan struct{})
p.c[c] = done
it, _ := time.ParseDuration(p.Config.InactiveTimeout)
return &Step{
p: p,
c: c,
it: nonempty(it, 1*time.Minute),
p: p,
c: c,
done: done,
it: nonempty(it, 1*time.Minute),
}
}

type Step struct {
wire.Step

p *Plugin
c chan proto.Message
it time.Duration
p *Plugin
c chan proto.Message
done chan struct{}
it time.Duration
}

func group(ctx context.Context, name string) context.Context {
Expand Down Expand Up @@ -213,7 +222,12 @@ func (s *Step) Stop() {
}

func (s *Step) drain() {
for range s.c {
for {
select {
case <-s.c:
case <-s.done:
return
}
}
}

Expand Down

0 comments on commit 5c90647

Please sign in to comment.