diff --git a/pkg/processor/processor_condition.go b/pkg/processor/processor_condition.go index 88baa0258..ec9bd3444 100644 --- a/pkg/processor/processor_condition.go +++ b/pkg/processor/processor_condition.go @@ -17,11 +17,12 @@ package processor import ( "bytes" "strconv" + "strings" "text/template" "github.com/Masterminds/sprig/v3" + "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit/pkg/foundation/cerrors" - "github.com/conduitio/conduit/pkg/record" ) // processorCondition parse go templates, Evaluate them for provided records, and return the boolean value of the output. @@ -32,6 +33,9 @@ type processorCondition struct { // newProcessorCondition parses and returns the template, returns an error if template parsing failed. func newProcessorCondition(condition string) (*processorCondition, error) { + if strings.Trim(condition, " ") == "" { + return nil, nil + } // parse template tmpl, err := template.New("").Funcs(sprig.FuncMap()).Parse(condition) if err != nil { @@ -45,7 +49,7 @@ func newProcessorCondition(condition string) (*processorCondition, error) { // Evaluate executes the template for the provided record, and parses the output into a boolean, returns an error // if output is not a boolean. -func (t *processorCondition) Evaluate(rec record.Record) (bool, error) { +func (t *processorCondition) Evaluate(rec opencdc.Record) (bool, error) { var b bytes.Buffer err := t.tmpl.Execute(&b, rec) if err != nil { diff --git a/pkg/processor/processor_condition_test.go b/pkg/processor/processor_condition_test.go index fb13f25c6..3bbbc2131 100644 --- a/pkg/processor/processor_condition_test.go +++ b/pkg/processor/processor_condition_test.go @@ -17,7 +17,7 @@ package processor import ( "testing" - "github.com/conduitio/conduit/pkg/record" + "github.com/conduitio/conduit-commons/opencdc" "github.com/matryer/is" ) @@ -32,9 +32,9 @@ func Test_ProcessorCondition_InvalidTemplate(t *testing.T) { func Test_ProcessorCondition_EvaluateTrue(t *testing.T) { is := is.New(t) condition := `{{ eq .Metadata.key "val" }}` - rec := record.Record{ - Position: record.Position("position-out"), - Metadata: record.Metadata{"key": "val"}, + rec := opencdc.Record{ + Position: opencdc.Position("position-out"), + Metadata: opencdc.Metadata{"key": "val"}, } tmpl, err := newProcessorCondition(condition) is.NoErr(err) @@ -46,9 +46,9 @@ func Test_ProcessorCondition_EvaluateTrue(t *testing.T) { func Test_ProcessorCondition_EvaluateFalse(t *testing.T) { is := is.New(t) condition := `{{ eq .Metadata.key "wrongVal" }}` - rec := record.Record{ - Position: record.Position("position-out"), - Metadata: record.Metadata{"key": "val"}, + rec := opencdc.Record{ + Position: opencdc.Position("position-out"), + Metadata: opencdc.Metadata{"key": "val"}, } tmpl, err := newProcessorCondition(condition) is.NoErr(err) @@ -60,9 +60,9 @@ func Test_ProcessorCondition_EvaluateFalse(t *testing.T) { func Test_ProcessorCondition_NonBooleanOutput(t *testing.T) { is := is.New(t) condition := `{{ printf "hi" }}` - rec := record.Record{ - Position: record.Position("position-out"), - Metadata: record.Metadata{"key": "val"}, + rec := opencdc.Record{ + Position: opencdc.Position("position-out"), + Metadata: opencdc.Metadata{"key": "val"}, } tmpl, err := newProcessorCondition(condition) is.NoErr(err) diff --git a/pkg/processor/runnable_processor.go b/pkg/processor/runnable_processor.go index 397028032..5034fb5a0 100644 --- a/pkg/processor/runnable_processor.go +++ b/pkg/processor/runnable_processor.go @@ -28,15 +28,18 @@ import ( type RunnableProcessor struct { *Instance proc sdk.Processor + cond *processorCondition } func newRunnableProcessor( proc sdk.Processor, + cond *processorCondition, i *Instance, ) *RunnableProcessor { return &RunnableProcessor{ Instance: i, proc: proc, + cond: cond, } } @@ -59,7 +62,74 @@ func (p *RunnableProcessor) Process(ctx context.Context, records []opencdc.Recor p.inInsp.Send(ctx, record.FromOpenCDC(inRec)) } - outRecs := p.proc.Process(ctx, records) + var outRecs []sdk.ProcessedRecord + if p.cond == nil { + outRecs = p.proc.Process(ctx, records) + } else { + // We need to first evaluate condition for each record. + + // TODO reuse these slices or at least use a pool + // keptRecords are records that will be sent to the processor + keptRecords := make([]opencdc.Record, 0, len(records)) + // passthroughRecordIndexes are indexes of records that are just passed + // through to the other side. + passthroughRecordIndexes := make([]int, 0, len(records)) + + var err error + + for i, rec := range records { + var keep bool + keep, err = p.cond.Evaluate(rec) + if err != nil { + err = cerrors.Errorf("failed evaluating condition: %w", err) + break + } + + if keep { + keptRecords = append(keptRecords, rec) + } else { + passthroughRecordIndexes = append(passthroughRecordIndexes, i) + } + } + + if len(keptRecords) > 0 { + outRecs = p.proc.Process(ctx, keptRecords) + if len(outRecs) > len(keptRecords) { + return []sdk.ProcessedRecord{ + sdk.ErrorRecord{Error: cerrors.New("processor returned more records than input")}, + } + } + } + if err != nil { + outRecs = append(outRecs, sdk.ErrorRecord{Error: err}) + } + + // Add passthrough records back into the resultset and keep the + // original order of the records. + if len(passthroughRecordIndexes) == len(records) { + // Optimization for the case where no records are kept + outRecs = make([]sdk.ProcessedRecord, len(records)) + for i, rec := range records { + outRecs[i] = sdk.SingleRecord(rec) + } + } else if len(passthroughRecordIndexes) > 0 { + tmp := make([]sdk.ProcessedRecord, len(outRecs)+len(passthroughRecordIndexes)) + prevIndex := -1 + for i, index := range passthroughRecordIndexes { + // TODO index-i can be out of bounds if the processor returns + // fewer records than the input. + copy(tmp[prevIndex+1:index], outRecs[prevIndex-i+1:index-i]) + tmp[index] = sdk.SingleRecord(records[index]) + prevIndex = index + } + // if the last index is not the last record, copy the rest + if passthroughRecordIndexes[len(passthroughRecordIndexes)-1] != len(tmp)-1 { + copy(tmp[prevIndex+1:], outRecs[prevIndex-len(passthroughRecordIndexes)+1:]) + } + outRecs = tmp + } + } + for _, outRec := range outRecs { singleRec, ok := outRec.(sdk.SingleRecord) if ok { diff --git a/pkg/processor/runnable_processor_test.go b/pkg/processor/runnable_processor_test.go index 20bbd5182..5b057513f 100644 --- a/pkg/processor/runnable_processor_test.go +++ b/pkg/processor/runnable_processor_test.go @@ -82,7 +82,7 @@ func TestRunnableProcessor_Open(t *testing.T) { proc.EXPECT().Open(gomock.Any()).Return(tc.openErr) } - underTest := newRunnableProcessor(proc, inst) + underTest := newRunnableProcessor(proc, nil, inst) err := underTest.Open(ctx) if tc.wantErr == nil { is.NoErr(err) @@ -128,7 +128,7 @@ func TestRunnableProcessor_Teardown(t *testing.T) { proc := mock.NewProcessor(gomock.NewController(t)) proc.EXPECT().Teardown(gomock.Any()).Return(tc.teardownErr) - underTest := newRunnableProcessor(proc, inst) + underTest := newRunnableProcessor(proc, nil, inst) err := underTest.Teardown(ctx) if tc.wantErr == nil { is.NoErr(err) @@ -139,20 +139,10 @@ func TestRunnableProcessor_Teardown(t *testing.T) { } } -func TestRunnableProcessor_Process(t *testing.T) { +func TestRunnableProcessor_ProcessedRecordsInspected(t *testing.T) { is := is.New(t) ctx := context.Background() - inst := &Instance{ - Config: Config{ - Settings: map[string]string{ - "foo": "bar", - }, - Workers: 123, - }, - - inInsp: inspector.New(log.Nop(), inspector.DefaultBufferSize), - outInsp: inspector.New(log.Nop(), inspector.DefaultBufferSize), - } + inst := newTestInstance() recsIn := []opencdc.Record{ { Key: opencdc.RawData("test key in"), @@ -166,7 +156,7 @@ func TestRunnableProcessor_Process(t *testing.T) { proc := mock.NewProcessor(gomock.NewController(t)) proc.EXPECT().Process(gomock.Any(), recsIn).Return(recsOut) - underTest := newRunnableProcessor(proc, inst) + underTest := newRunnableProcessor(proc, nil, inst) inSession := underTest.inInsp.NewSession(ctx, "id-in") outSession := underTest.outInsp.NewSession(ctx, "id-out") @@ -184,20 +174,10 @@ func TestRunnableProcessor_Process(t *testing.T) { is.Equal(recsOut[0], sdk.SingleRecord(rec.ToOpenCDC())) } -func TestRunnableProcessor_Process_Filter(t *testing.T) { +func TestRunnableProcessor_FilteredRecordsNotInspected(t *testing.T) { is := is.New(t) ctx := context.Background() - inst := &Instance{ - Config: Config{ - Settings: map[string]string{ - "foo": "bar", - }, - Workers: 123, - }, - - inInsp: inspector.New(log.Nop(), inspector.DefaultBufferSize), - outInsp: inspector.New(log.Nop(), inspector.DefaultBufferSize), - } + inst := newTestInstance() recsIn := []opencdc.Record{ { Key: opencdc.RawData("test key in"), @@ -207,7 +187,7 @@ func TestRunnableProcessor_Process_Filter(t *testing.T) { proc := mock.NewProcessor(gomock.NewController(t)) proc.EXPECT().Process(gomock.Any(), recsIn).Return([]sdk.ProcessedRecord{sdk.FilterRecord{}}) - underTest := newRunnableProcessor(proc, inst) + underTest := newRunnableProcessor(proc, nil, inst) inSession := underTest.inInsp.NewSession(ctx, "id-in") outSession := underTest.outInsp.NewSession(ctx, "id-out") @@ -224,20 +204,10 @@ func TestRunnableProcessor_Process_Filter(t *testing.T) { is.True(cerrors.Is(err, context.DeadlineExceeded)) } -func TestRunnableProcessor_Process_Error(t *testing.T) { +func TestRunnableProcessor_ErrorRecordsNotInspected(t *testing.T) { is := is.New(t) ctx := context.Background() - inst := &Instance{ - Config: Config{ - Settings: map[string]string{ - "foo": "bar", - }, - Workers: 123, - }, - - inInsp: inspector.New(log.Nop(), inspector.DefaultBufferSize), - outInsp: inspector.New(log.Nop(), inspector.DefaultBufferSize), - } + inst := newTestInstance() recsIn := []opencdc.Record{ { Key: opencdc.RawData("test key in"), @@ -247,7 +217,7 @@ func TestRunnableProcessor_Process_Error(t *testing.T) { proc := mock.NewProcessor(gomock.NewController(t)) proc.EXPECT().Process(gomock.Any(), recsIn).Return([]sdk.ProcessedRecord{sdk.ErrorRecord{}}) - underTest := newRunnableProcessor(proc, inst) + underTest := newRunnableProcessor(proc, nil, inst) inSession := underTest.inInsp.NewSession(ctx, "id-in") outSession := underTest.outInsp.NewSession(ctx, "id-out") @@ -263,3 +233,152 @@ func TestRunnableProcessor_Process_Error(t *testing.T) { is.True(!gotRec) is.True(cerrors.Is(err, context.DeadlineExceeded)) } + +func TestRunnableProcessor_Process_ConditionNotMatching(t *testing.T) { + is := is.New(t) + ctx := context.Background() + inst := newTestInstance() + recsIn := []opencdc.Record{ + { + Metadata: opencdc.Metadata{"key": "something"}, + }, + } + + proc := mock.NewProcessor(gomock.NewController(t)) + + condition, err := newProcessorCondition(`{{ eq .Metadata.key "val" }}`) + is.NoErr(err) + underTest := newRunnableProcessor( + proc, + condition, + inst, + ) + + recsOut := underTest.Process(ctx, recsIn) + defer underTest.Close() + + is.Equal([]sdk.ProcessedRecord{sdk.SingleRecord(recsIn[0])}, recsOut) +} + +func TestRunnableProcessor_Process_ConditionMatching(t *testing.T) { + is := is.New(t) + ctx := context.Background() + inst := newTestInstance() + recsIn := []opencdc.Record{ + { + Metadata: opencdc.Metadata{"key": "val"}, + }, + } + + wantRecs := []sdk.ProcessedRecord{sdk.SingleRecord{Key: opencdc.RawData(`a key`)}} + proc := mock.NewProcessor(gomock.NewController(t)) + proc.EXPECT().Process(ctx, recsIn).Return(wantRecs) + + condition, err := newProcessorCondition(`{{ eq .Metadata.key "val" }}`) + is.NoErr(err) + underTest := newRunnableProcessor( + proc, + condition, + inst, + ) + + gotRecs := underTest.Process(ctx, recsIn) + defer underTest.Close() + + is.Equal(wantRecs, gotRecs) +} + +func TestRunnableProcessor_Process_ConditionError(t *testing.T) { + is := is.New(t) + ctx := context.Background() + inst := newTestInstance() + recsIn := []opencdc.Record{ + { + Metadata: opencdc.Metadata{"key": "val"}, + }, + } + + proc := mock.NewProcessor(gomock.NewController(t)) + + condition, err := newProcessorCondition("junk") + is.NoErr(err) + underTest := newRunnableProcessor( + proc, + condition, + inst, + ) + + gotRecs := underTest.Process(ctx, recsIn) + defer underTest.Close() + + is.Equal(1, len(gotRecs)) + gotRec, gotErr := gotRecs[0].(sdk.ErrorRecord) + is.True(gotErr) + is.Equal( + "failed evaluating condition: error converting the condition go-template output to boolean, "+ + "strconv.ParseBool: parsing \"junk\": invalid syntax: strconv.ParseBool: parsing \"junk\": "+ + "invalid syntax", + gotRec.Error.Error(), + ) +} + +func TestRunnableProcessor_Process_Batch(t *testing.T) { + is := is.New(t) + ctx := context.Background() + inst := newTestInstance() + recsIn := []opencdc.Record{ + {Metadata: opencdc.Metadata{"key": "no", "rec": "1"}}, + {Metadata: opencdc.Metadata{"key": "yes", "rec": "2"}}, + {Metadata: opencdc.Metadata{"key": "no", "rec": "3"}}, + {Metadata: opencdc.Metadata{"key": "no", "rec": "4"}}, + {Metadata: opencdc.Metadata{"key": "yes", "rec": "5"}}, + {Metadata: opencdc.Metadata{"key": "no", "rec": "6"}}, + {Metadata: opencdc.Metadata{"key": "yes", "rec": "7"}}, + } + + wantRecs := []sdk.ProcessedRecord{ + sdk.SingleRecord(recsIn[0]), + sdk.SingleRecord{Metadata: opencdc.Metadata{"key": "yes", "rec": "2", "processed": "true"}}, + sdk.SingleRecord(recsIn[2]), + sdk.SingleRecord(recsIn[3]), + sdk.SingleRecord{Metadata: opencdc.Metadata{"key": "yes", "rec": "5", "processed": "true"}}, + sdk.SingleRecord(recsIn[5]), + sdk.SingleRecord{Metadata: opencdc.Metadata{"key": "yes", "rec": "7", "processed": "true"}}, + } + proc := mock.NewProcessor(gomock.NewController(t)) + proc.EXPECT().Process(ctx, []opencdc.Record{recsIn[1], recsIn[4], recsIn[6]}).DoAndReturn(func(_ context.Context, recs []opencdc.Record) []sdk.ProcessedRecord { + out := make([]sdk.ProcessedRecord, 0, len(recs)) + for _, rec := range recs { + rec.Metadata["processed"] = "true" + out = append(out, sdk.SingleRecord{Metadata: rec.Metadata}) + } + return out + }) + + condition, err := newProcessorCondition(`{{ eq .Metadata.key "yes" }}`) + is.NoErr(err) + underTest := newRunnableProcessor( + proc, + condition, + inst, + ) + + gotRecs := underTest.Process(ctx, recsIn) + defer underTest.Close() + + is.Equal(wantRecs, gotRecs) +} + +func newTestInstance() *Instance { + return &Instance{ + Config: Config{ + Settings: map[string]string{ + "foo": "bar", + }, + Workers: 123, + }, + + inInsp: inspector.New(log.Nop(), inspector.DefaultBufferSize), + outInsp: inspector.New(log.Nop(), inspector.DefaultBufferSize), + } +} diff --git a/pkg/processor/service.go b/pkg/processor/service.go index d88dea81f..8c50a7638 100644 --- a/pkg/processor/service.go +++ b/pkg/processor/service.go @@ -100,9 +100,13 @@ func (s *Service) MakeRunnableProcessor(ctx context.Context, i *Instance) (*Runn if err != nil { return nil, err } + cond, err := newProcessorCondition(i.Condition) + if err != nil { + return nil, cerrors.Errorf("invalid condition: %w", err) + } i.running = true - return newRunnableProcessor(p, i), nil + return newRunnableProcessor(p, cond, i), nil } // Create will create a new processor instance.