Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BP: call the conditional execution methods in the processors #1411

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pkg/processor/processor_condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ package processor
import (
"bytes"
"strconv"
"strings"
"text/template"

"github.com/Masterminds/sprig/v3"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/record"
)

// processorCondition parse go templates, Evaluate them for provided records, and return the boolean value of the output.
Expand All @@ -32,6 +33,9 @@ type processorCondition struct {

// newProcessorCondition parses and returns the template, returns an error if template parsing failed.
func newProcessorCondition(condition string) (*processorCondition, error) {
if strings.Trim(condition, " ") == "" {
return nil, nil
}
// parse template
tmpl, err := template.New("").Funcs(sprig.FuncMap()).Parse(condition)
if err != nil {
Expand All @@ -45,7 +49,7 @@ func newProcessorCondition(condition string) (*processorCondition, error) {

// Evaluate executes the template for the provided record, and parses the output into a boolean, returns an error
// if output is not a boolean.
func (t *processorCondition) Evaluate(rec record.Record) (bool, error) {
func (t *processorCondition) Evaluate(rec opencdc.Record) (bool, error) {
var b bytes.Buffer
err := t.tmpl.Execute(&b, rec)
if err != nil {
Expand Down
20 changes: 10 additions & 10 deletions pkg/processor/processor_condition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package processor
import (
"testing"

"github.com/conduitio/conduit/pkg/record"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/matryer/is"
)

Expand All @@ -32,9 +32,9 @@ func Test_ProcessorCondition_InvalidTemplate(t *testing.T) {
func Test_ProcessorCondition_EvaluateTrue(t *testing.T) {
is := is.New(t)
condition := `{{ eq .Metadata.key "val" }}`
rec := record.Record{
Position: record.Position("position-out"),
Metadata: record.Metadata{"key": "val"},
rec := opencdc.Record{
Position: opencdc.Position("position-out"),
Metadata: opencdc.Metadata{"key": "val"},
}
tmpl, err := newProcessorCondition(condition)
is.NoErr(err)
Expand All @@ -46,9 +46,9 @@ func Test_ProcessorCondition_EvaluateTrue(t *testing.T) {
func Test_ProcessorCondition_EvaluateFalse(t *testing.T) {
is := is.New(t)
condition := `{{ eq .Metadata.key "wrongVal" }}`
rec := record.Record{
Position: record.Position("position-out"),
Metadata: record.Metadata{"key": "val"},
rec := opencdc.Record{
Position: opencdc.Position("position-out"),
Metadata: opencdc.Metadata{"key": "val"},
}
tmpl, err := newProcessorCondition(condition)
is.NoErr(err)
Expand All @@ -60,9 +60,9 @@ func Test_ProcessorCondition_EvaluateFalse(t *testing.T) {
func Test_ProcessorCondition_NonBooleanOutput(t *testing.T) {
is := is.New(t)
condition := `{{ printf "hi" }}`
rec := record.Record{
Position: record.Position("position-out"),
Metadata: record.Metadata{"key": "val"},
rec := opencdc.Record{
Position: opencdc.Position("position-out"),
Metadata: opencdc.Metadata{"key": "val"},
}
tmpl, err := newProcessorCondition(condition)
is.NoErr(err)
Expand Down
33 changes: 29 additions & 4 deletions pkg/processor/runnable_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@ import (
type RunnableProcessor struct {
*Instance
proc sdk.Processor
cond *processorCondition
}

func newRunnableProcessor(
proc sdk.Processor,
cond *processorCondition,
i *Instance,
) *RunnableProcessor {
return &RunnableProcessor{
Instance: i,
proc: proc,
cond: cond,
}
}

Expand All @@ -59,19 +62,41 @@ func (p *RunnableProcessor) Process(ctx context.Context, records []opencdc.Recor
p.inInsp.Send(ctx, record.FromOpenCDC(inRec))
}

outRecs := p.proc.Process(ctx, records)
for _, outRec := range outRecs {
singleRec, ok := outRec.(sdk.SingleRecord)
out := make([]sdk.ProcessedRecord, 0, len(records))
for _, rec := range records {
keep, err := p.evaluateCondition(rec)
if err != nil {
return append(out, sdk.ErrorRecord{Error: cerrors.Errorf("failed evaluating condition: %w", err)})
}
if !keep {
out = append(out, sdk.SingleRecord(rec))
continue
}

proc := p.proc.Process(ctx, records)
lovromazgon marked this conversation as resolved.
Show resolved Hide resolved
singleRec, ok := proc[0].(sdk.SingleRecord)
if ok {
p.outInsp.Send(ctx, record.FromOpenCDC(opencdc.Record(singleRec)))
}
// NB: the processor node that is calling this RunnableProcessor
// will check if the number of processed records is equal
// to the number of input records.
out = append(out, proc...)
}

return outRecs
return out
}

func (p *RunnableProcessor) Teardown(ctx context.Context) error {
err := p.proc.Teardown(ctx)
p.running = false
return err
}

func (p *RunnableProcessor) evaluateCondition(rec opencdc.Record) (bool, error) {
if p.cond == nil {
lovromazgon marked this conversation as resolved.
Show resolved Hide resolved
return true, nil
}

return p.cond.Evaluate(rec)
}
155 changes: 114 additions & 41 deletions pkg/processor/runnable_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
proc.EXPECT().Open(gomock.Any()).Return(tc.openErr)
}

underTest := newRunnableProcessor(proc, inst)
underTest := newRunnableProcessor(proc, nil, inst)
err := underTest.Open(ctx)
if tc.wantErr == nil {
is.NoErr(err)
Expand Down Expand Up @@ -128,7 +128,7 @@
proc := mock.NewProcessor(gomock.NewController(t))
proc.EXPECT().Teardown(gomock.Any()).Return(tc.teardownErr)

underTest := newRunnableProcessor(proc, inst)
underTest := newRunnableProcessor(proc, nil, inst)
err := underTest.Teardown(ctx)
if tc.wantErr == nil {
is.NoErr(err)
Expand All @@ -139,20 +139,10 @@
}
}

func TestRunnableProcessor_Process(t *testing.T) {
func TestRunnableProcessor_ProcessedRecordsInspected(t *testing.T) {
is := is.New(t)
ctx := context.Background()
inst := &Instance{
Config: Config{
Settings: map[string]string{
"foo": "bar",
},
Workers: 123,
},

inInsp: inspector.New(log.Nop(), inspector.DefaultBufferSize),
outInsp: inspector.New(log.Nop(), inspector.DefaultBufferSize),
}
inst := newTestInstance()
recsIn := []opencdc.Record{
{
Key: opencdc.RawData("test key in"),
Expand All @@ -166,7 +156,7 @@
proc := mock.NewProcessor(gomock.NewController(t))
proc.EXPECT().Process(gomock.Any(), recsIn).Return(recsOut)

underTest := newRunnableProcessor(proc, inst)
underTest := newRunnableProcessor(proc, nil, inst)
inSession := underTest.inInsp.NewSession(ctx, "id-in")
outSession := underTest.outInsp.NewSession(ctx, "id-out")

Expand All @@ -184,20 +174,10 @@
is.Equal(recsOut[0], sdk.SingleRecord(rec.ToOpenCDC()))
}

func TestRunnableProcessor_Process_Filter(t *testing.T) {
func TestRunnableProcessor_FilteredRecordsNotInspected(t *testing.T) {
is := is.New(t)
ctx := context.Background()
inst := &Instance{
Config: Config{
Settings: map[string]string{
"foo": "bar",
},
Workers: 123,
},

inInsp: inspector.New(log.Nop(), inspector.DefaultBufferSize),
outInsp: inspector.New(log.Nop(), inspector.DefaultBufferSize),
}
inst := newTestInstance()
recsIn := []opencdc.Record{
{
Key: opencdc.RawData("test key in"),
Expand All @@ -207,7 +187,7 @@
proc := mock.NewProcessor(gomock.NewController(t))
proc.EXPECT().Process(gomock.Any(), recsIn).Return([]sdk.ProcessedRecord{sdk.FilterRecord{}})

underTest := newRunnableProcessor(proc, inst)
underTest := newRunnableProcessor(proc, nil, inst)
inSession := underTest.inInsp.NewSession(ctx, "id-in")
outSession := underTest.outInsp.NewSession(ctx, "id-out")

Expand All @@ -224,20 +204,10 @@
is.True(cerrors.Is(err, context.DeadlineExceeded))
}

func TestRunnableProcessor_Process_Error(t *testing.T) {
func TestRunnableProcessor_ErrorRecordsNotInspected(t *testing.T) {
is := is.New(t)
ctx := context.Background()
inst := &Instance{
Config: Config{
Settings: map[string]string{
"foo": "bar",
},
Workers: 123,
},

inInsp: inspector.New(log.Nop(), inspector.DefaultBufferSize),
outInsp: inspector.New(log.Nop(), inspector.DefaultBufferSize),
}
inst := newTestInstance()
recsIn := []opencdc.Record{
{
Key: opencdc.RawData("test key in"),
Expand All @@ -247,7 +217,7 @@
proc := mock.NewProcessor(gomock.NewController(t))
proc.EXPECT().Process(gomock.Any(), recsIn).Return([]sdk.ProcessedRecord{sdk.ErrorRecord{}})

underTest := newRunnableProcessor(proc, inst)
underTest := newRunnableProcessor(proc, nil, inst)
inSession := underTest.inInsp.NewSession(ctx, "id-in")
outSession := underTest.outInsp.NewSession(ctx, "id-out")

Expand All @@ -263,3 +233,106 @@
is.True(!gotRec)
is.True(cerrors.Is(err, context.DeadlineExceeded))
}

func TestRunnableProcessor_Process_ConditionNotMatching(t *testing.T) {
is := is.New(t)
ctx := context.Background()
inst := newTestInstance()
recsIn := []opencdc.Record{
{
Metadata: opencdc.Metadata{"key": "something"},
},
}

proc := mock.NewProcessor(gomock.NewController(t))

condition, err := newProcessorCondition(`{{ eq .Metadata.key "val" }}`)
is.NoErr(err)
underTest := newRunnableProcessor(
proc,
condition,
inst,
)

recsOut := underTest.Process(ctx, recsIn)
defer underTest.Close()

is.Equal([]sdk.ProcessedRecord{sdk.SingleRecord(recsIn[0])}, recsOut)
}

func TestRunnableProcessor_Process_ConditionMatching(t *testing.T) {
is := is.New(t)
ctx := context.Background()
inst := newTestInstance()
recsIn := []opencdc.Record{
{
Metadata: opencdc.Metadata{"key": "val"},
},
}

wantRecs := []sdk.ProcessedRecord{sdk.SingleRecord{Key: opencdc.RawData(`a key`)}}
proc := mock.NewProcessor(gomock.NewController(t))
proc.EXPECT().Process(ctx, recsIn).Return(wantRecs)

condition, err := newProcessorCondition(`{{ eq .Metadata.key "val" }}`)
is.NoErr(err)
underTest := newRunnableProcessor(
proc,
condition,
inst,
)

gotRecs := underTest.Process(ctx, recsIn)
defer underTest.Close()

is.Equal(wantRecs, gotRecs)
}

func TestRunnableProcessor_Process_ConditionError(t *testing.T) {
is := is.New(t)
ctx := context.Background()
inst := newTestInstance()
recsIn := []opencdc.Record{
{
Metadata: opencdc.Metadata{"key": "val"},
},
}

proc := mock.NewProcessor(gomock.NewController(t))

condition, err := newProcessorCondition("junk")
is.NoErr(err)
underTest := newRunnableProcessor(
proc,
condition,
inst,
)

gotRecs := underTest.Process(ctx, recsIn)
defer underTest.Close()

is.Equal(1, len(gotRecs))
gotRec, gotErr := gotRecs[0].(sdk.ErrorRecord)
is.True(gotErr)
is.Equal(
"failed evaluating condition: error converting the condition go-template output to boolean, "+
"strconv.ParseBool: parsing \"junk\": invalid syntax: strconv.ParseBool: parsing \"junk\": "+
"invalid syntax",
gotRec.Error.Error(),
)

}

Check failure on line 324 in pkg/processor/runnable_processor_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

unnecessary trailing newline (whitespace)

func newTestInstance() *Instance {
return &Instance{
Config: Config{
Settings: map[string]string{
"foo": "bar",
},
Workers: 123,
},

inInsp: inspector.New(log.Nop(), inspector.DefaultBufferSize),
outInsp: inspector.New(log.Nop(), inspector.DefaultBufferSize),
}
}
6 changes: 5 additions & 1 deletion pkg/processor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,13 @@ func (s *Service) MakeRunnableProcessor(ctx context.Context, i *Instance) (*Runn
if err != nil {
return nil, err
}
cond, err := newProcessorCondition(i.Condition)
if err != nil {
return nil, cerrors.Errorf("invalid condition: %w", err)
}

i.running = true
return newRunnableProcessor(p, i), nil
return newRunnableProcessor(p, cond, i), nil
}

// Create will create a new processor instance.
Expand Down
Loading