Skip to content

Commit

Permalink
🤟 plugin/event: fix deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
rjeczalik committed Jun 30, 2024
1 parent 23e6677 commit fccee46
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 18 deletions.
35 changes: 26 additions & 9 deletions pkg/plugin/builtin/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package event // import "hookt.dev/cmd/pkg/plugin/builtin/event"
import (
"context"
"log/slog"
"strconv"
"time"

"hookt.dev/cmd/pkg/check"
Expand All @@ -16,14 +17,14 @@ type Plugin struct {
wire.Config

p *proto.P
c map[chan proto.Message]struct{}
c map[chan proto.Message]string
mux chan proto.Message
stop chan (chan proto.Message)
}

func New(opts ...func(*Plugin)) *Plugin {
p := &Plugin{
c: make(map[chan proto.Message]struct{}),
c: make(map[chan proto.Message]string),
mux: make(chan proto.Message),
stop: make(chan chan proto.Message),
}
Expand Down Expand Up @@ -81,22 +82,28 @@ wire:
}

func (p *Plugin) process() {
type indexer interface {
Index() int
}

for {
select {
case c := <-p.stop:
delete(p.c, c)
close(c)
case msg := <-p.mux:
for c := range p.c {
c <- msg
}
go func() {
for c := range p.c {
c <- msg
}
}()
}
}
}

func (p *Plugin) Step(ctx context.Context) any {
c := make(chan proto.Message)
p.c[c] = struct{}{}
p.c[c] = trace.Get(ctx, "step")
it, _ := time.ParseDuration(p.Config.InactiveTimeout)
return &Step{
p: p,
Expand All @@ -118,6 +125,10 @@ func group(ctx context.Context, name string) context.Context {
}

func (s *Step) Run(ctx context.Context, c *check.S) error {
type indexer interface {
Index() int
}

slog.Debug("event: run",
"match", s.Match,
"pass", s.Pass,
Expand Down Expand Up @@ -156,9 +167,15 @@ func (s *Step) Run(ctx context.Context, c *check.S) error {
}
inactive.Reset(s.it)

ctxt := ctx

if i, ok := msg.(indexer); ok {
ctxt = trace.With(ctxt, "event-seq", strconv.Itoa(i.Index()))
}

obj := msg.Object()

match, err := match.Match(group(ctx, "match"), obj)
match, err := match.Match(group(ctxt, "match"), obj)
if err != nil {
return errors.New("failed to match on pattern: %w", err)
}
Expand All @@ -167,7 +184,7 @@ func (s *Step) Run(ctx context.Context, c *check.S) error {
continue
}

fail, err := fail.Match(group(ctx, "fail"), obj)
fail, err := fail.Match(group(ctxt, "fail"), obj)
if err != nil {
return errors.New("failed to match fail pattern: %w", err)
}
Expand All @@ -176,7 +193,7 @@ func (s *Step) Run(ctx context.Context, c *check.S) error {
return errors.New("failure pattern matched")
}

ok, err := pass.Match(group(ctx, "match"), obj)
ok, err := pass.Match(group(ctxt, "pass"), obj)
if err != nil {
return errors.New("failed to match ok pattern: %w", err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/plugin/builtin/inline/inline.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (p *Plugin) publish(f *os.File) {

dec := json.NewDecoder(f)

for {
for index := 0; ; index++ {
var raw json.RawMessage

err := dec.Decode(&raw)
Expand All @@ -97,7 +97,7 @@ func (p *Plugin) publish(f *os.File) {
"bytes", len(raw),
)

p.c <- protowire.MakeMessage(raw)
p.c <- &protowire.Message{P: raw, I: index}
case '[':
var msgs []json.RawMessage

Expand All @@ -109,12 +109,12 @@ func (p *Plugin) publish(f *os.File) {
return
}

for i := range msgs {
for i := 0; i < len(msgs); i, index = i+1, index+1 {
slog.Debug("inline: publish",
"bytes", len(msgs[i]),
)

p.c <- protowire.MakeMessage(msgs[i])
p.c <- &protowire.Message{P: msgs[i], I: index}
}
default:
err = errors.New("unexpected JSON input")
Expand Down
5 changes: 2 additions & 3 deletions pkg/proto/wire/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ type Job struct {
}

type Message struct {
I int `json:"index,omitempty"`
P []byte `json:"bytes,omitempty"`
}

func MakeMessage(p json.RawMessage) *Message {
return &Message{P: p}
}
func (m *Message) Index() int { return m.I }

func (m *Message) Bytes() []byte { return m.P }

Expand Down
13 changes: 11 additions & 2 deletions pkg/trace/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,14 @@ func LogPattern() PatternTrace {
},
EqualMatch: func(ctx context.Context, want any, got any, ok bool) {
tags := append(attrs(ctx),
"want", fmt.Sprintf("%+[1]v (%[1]T)", want),
"got", fmt.Sprintf("%+[1]v (%[1]T)", got),
slog.Group("want",
"value", want,
"type", fmt.Sprintf("%T", want),
),
slog.Group("got",
"value", got,
"type", fmt.Sprintf("%T", got),
),
)
if !ok {
slog.Error("trace: EqualMatch", tags...)
Expand All @@ -128,6 +134,9 @@ func LogPattern() PatternTrace {
func attrs(ctx context.Context) []any {
var attrs []any

if seq := Get(ctx, "event-seq"); seq != "" {
attrs = append(attrs, "event-seq", seq)
}
if job := Get(ctx, "job"); job != "" {
attrs = append(attrs, "job", job)
}
Expand Down

0 comments on commit fccee46

Please sign in to comment.