Skip to content

Commit

Permalink
BP: call the conditional execution methods in the processors (#1411)
Browse files Browse the repository at this point in the history
* init

* simplify

* fix evaluation

* add support for batches in runnable processor

* batch test in runnable processor

* optimize case when all records are passed through in a processor

* make sure remaining records are copied over

* fix len check

* make sure processor doesn't return more records

---------

Co-authored-by: Lovro Mažgon <[email protected]>
  • Loading branch information
hariso and lovromazgon authored Mar 7, 2024
1 parent 09be701 commit 28128ee
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 55 deletions.
8 changes: 6 additions & 2 deletions pkg/processor/processor_condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ package processor
import (
"bytes"
"strconv"
"strings"
"text/template"

"github.com/Masterminds/sprig/v3"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/record"
)

// processorCondition parse go templates, Evaluate them for provided records, and return the boolean value of the output.
Expand All @@ -32,6 +33,9 @@ type processorCondition struct {

// newProcessorCondition parses and returns the template, returns an error if template parsing failed.
func newProcessorCondition(condition string) (*processorCondition, error) {
if strings.Trim(condition, " ") == "" {
return nil, nil
}
// parse template
tmpl, err := template.New("").Funcs(sprig.FuncMap()).Parse(condition)
if err != nil {
Expand All @@ -45,7 +49,7 @@ func newProcessorCondition(condition string) (*processorCondition, error) {

// Evaluate executes the template for the provided record, and parses the output into a boolean, returns an error
// if output is not a boolean.
func (t *processorCondition) Evaluate(rec record.Record) (bool, error) {
func (t *processorCondition) Evaluate(rec opencdc.Record) (bool, error) {
var b bytes.Buffer
err := t.tmpl.Execute(&b, rec)
if err != nil {
Expand Down
20 changes: 10 additions & 10 deletions pkg/processor/processor_condition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package processor
import (
"testing"

"github.com/conduitio/conduit/pkg/record"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/matryer/is"
)

Expand All @@ -32,9 +32,9 @@ func Test_ProcessorCondition_InvalidTemplate(t *testing.T) {
func Test_ProcessorCondition_EvaluateTrue(t *testing.T) {
is := is.New(t)
condition := `{{ eq .Metadata.key "val" }}`
rec := record.Record{
Position: record.Position("position-out"),
Metadata: record.Metadata{"key": "val"},
rec := opencdc.Record{
Position: opencdc.Position("position-out"),
Metadata: opencdc.Metadata{"key": "val"},
}
tmpl, err := newProcessorCondition(condition)
is.NoErr(err)
Expand All @@ -46,9 +46,9 @@ func Test_ProcessorCondition_EvaluateTrue(t *testing.T) {
func Test_ProcessorCondition_EvaluateFalse(t *testing.T) {
is := is.New(t)
condition := `{{ eq .Metadata.key "wrongVal" }}`
rec := record.Record{
Position: record.Position("position-out"),
Metadata: record.Metadata{"key": "val"},
rec := opencdc.Record{
Position: opencdc.Position("position-out"),
Metadata: opencdc.Metadata{"key": "val"},
}
tmpl, err := newProcessorCondition(condition)
is.NoErr(err)
Expand All @@ -60,9 +60,9 @@ func Test_ProcessorCondition_EvaluateFalse(t *testing.T) {
func Test_ProcessorCondition_NonBooleanOutput(t *testing.T) {
is := is.New(t)
condition := `{{ printf "hi" }}`
rec := record.Record{
Position: record.Position("position-out"),
Metadata: record.Metadata{"key": "val"},
rec := opencdc.Record{
Position: opencdc.Position("position-out"),
Metadata: opencdc.Metadata{"key": "val"},
}
tmpl, err := newProcessorCondition(condition)
is.NoErr(err)
Expand Down
72 changes: 71 additions & 1 deletion pkg/processor/runnable_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@ import (
type RunnableProcessor struct {
*Instance
proc sdk.Processor
cond *processorCondition
}

func newRunnableProcessor(
proc sdk.Processor,
cond *processorCondition,
i *Instance,
) *RunnableProcessor {
return &RunnableProcessor{
Instance: i,
proc: proc,
cond: cond,
}
}

Expand All @@ -59,7 +62,74 @@ func (p *RunnableProcessor) Process(ctx context.Context, records []opencdc.Recor
p.inInsp.Send(ctx, record.FromOpenCDC(inRec))
}

outRecs := p.proc.Process(ctx, records)
var outRecs []sdk.ProcessedRecord
if p.cond == nil {
outRecs = p.proc.Process(ctx, records)
} else {
// We need to first evaluate condition for each record.

// TODO reuse these slices or at least use a pool
// keptRecords are records that will be sent to the processor
keptRecords := make([]opencdc.Record, 0, len(records))
// passthroughRecordIndexes are indexes of records that are just passed
// through to the other side.
passthroughRecordIndexes := make([]int, 0, len(records))

var err error

for i, rec := range records {
var keep bool
keep, err = p.cond.Evaluate(rec)
if err != nil {
err = cerrors.Errorf("failed evaluating condition: %w", err)
break
}

if keep {
keptRecords = append(keptRecords, rec)
} else {
passthroughRecordIndexes = append(passthroughRecordIndexes, i)
}
}

if len(keptRecords) > 0 {
outRecs = p.proc.Process(ctx, keptRecords)
if len(outRecs) > len(keptRecords) {
return []sdk.ProcessedRecord{
sdk.ErrorRecord{Error: cerrors.New("processor returned more records than input")},
}
}
}
if err != nil {
outRecs = append(outRecs, sdk.ErrorRecord{Error: err})
}

// Add passthrough records back into the resultset and keep the
// original order of the records.
if len(passthroughRecordIndexes) == len(records) {
// Optimization for the case where no records are kept
outRecs = make([]sdk.ProcessedRecord, len(records))
for i, rec := range records {
outRecs[i] = sdk.SingleRecord(rec)
}
} else if len(passthroughRecordIndexes) > 0 {
tmp := make([]sdk.ProcessedRecord, len(outRecs)+len(passthroughRecordIndexes))
prevIndex := -1
for i, index := range passthroughRecordIndexes {
// TODO index-i can be out of bounds if the processor returns
// fewer records than the input.
copy(tmp[prevIndex+1:index], outRecs[prevIndex-i+1:index-i])
tmp[index] = sdk.SingleRecord(records[index])
prevIndex = index
}
// if the last index is not the last record, copy the rest
if passthroughRecordIndexes[len(passthroughRecordIndexes)-1] != len(tmp)-1 {
copy(tmp[prevIndex+1:], outRecs[prevIndex-len(passthroughRecordIndexes)+1:])
}
outRecs = tmp
}
}

for _, outRec := range outRecs {
singleRec, ok := outRec.(sdk.SingleRecord)
if ok {
Expand Down
Loading

0 comments on commit 28128ee

Please sign in to comment.