Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BP: call the conditional execution methods in the processors #1411

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pkg/processor/processor_condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ package processor
import (
"bytes"
"strconv"
"strings"
"text/template"

"github.com/Masterminds/sprig/v3"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/record"
)

// processorCondition parse go templates, Evaluate them for provided records, and return the boolean value of the output.
Expand All @@ -32,6 +33,9 @@ type processorCondition struct {

// newProcessorCondition parses and returns the template, returns an error if template parsing failed.
func newProcessorCondition(condition string) (*processorCondition, error) {
if strings.Trim(condition, " ") == "" {
return nil, nil
}
// parse template
tmpl, err := template.New("").Funcs(sprig.FuncMap()).Parse(condition)
if err != nil {
Expand All @@ -45,7 +49,7 @@ func newProcessorCondition(condition string) (*processorCondition, error) {

// Evaluate executes the template for the provided record, and parses the output into a boolean, returns an error
// if output is not a boolean.
func (t *processorCondition) Evaluate(rec record.Record) (bool, error) {
func (t *processorCondition) Evaluate(rec opencdc.Record) (bool, error) {
var b bytes.Buffer
err := t.tmpl.Execute(&b, rec)
if err != nil {
Expand Down
20 changes: 10 additions & 10 deletions pkg/processor/processor_condition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package processor
import (
"testing"

"github.com/conduitio/conduit/pkg/record"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/matryer/is"
)

Expand All @@ -32,9 +32,9 @@ func Test_ProcessorCondition_InvalidTemplate(t *testing.T) {
func Test_ProcessorCondition_EvaluateTrue(t *testing.T) {
is := is.New(t)
condition := `{{ eq .Metadata.key "val" }}`
rec := record.Record{
Position: record.Position("position-out"),
Metadata: record.Metadata{"key": "val"},
rec := opencdc.Record{
Position: opencdc.Position("position-out"),
Metadata: opencdc.Metadata{"key": "val"},
}
tmpl, err := newProcessorCondition(condition)
is.NoErr(err)
Expand All @@ -46,9 +46,9 @@ func Test_ProcessorCondition_EvaluateTrue(t *testing.T) {
func Test_ProcessorCondition_EvaluateFalse(t *testing.T) {
is := is.New(t)
condition := `{{ eq .Metadata.key "wrongVal" }}`
rec := record.Record{
Position: record.Position("position-out"),
Metadata: record.Metadata{"key": "val"},
rec := opencdc.Record{
Position: opencdc.Position("position-out"),
Metadata: opencdc.Metadata{"key": "val"},
}
tmpl, err := newProcessorCondition(condition)
is.NoErr(err)
Expand All @@ -60,9 +60,9 @@ func Test_ProcessorCondition_EvaluateFalse(t *testing.T) {
func Test_ProcessorCondition_NonBooleanOutput(t *testing.T) {
is := is.New(t)
condition := `{{ printf "hi" }}`
rec := record.Record{
Position: record.Position("position-out"),
Metadata: record.Metadata{"key": "val"},
rec := opencdc.Record{
Position: opencdc.Position("position-out"),
Metadata: opencdc.Metadata{"key": "val"},
}
tmpl, err := newProcessorCondition(condition)
is.NoErr(err)
Expand Down
72 changes: 71 additions & 1 deletion pkg/processor/runnable_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@ import (
type RunnableProcessor struct {
*Instance
proc sdk.Processor
cond *processorCondition
}

func newRunnableProcessor(
proc sdk.Processor,
cond *processorCondition,
i *Instance,
) *RunnableProcessor {
return &RunnableProcessor{
Instance: i,
proc: proc,
cond: cond,
}
}

Expand All @@ -59,7 +62,74 @@ func (p *RunnableProcessor) Process(ctx context.Context, records []opencdc.Recor
p.inInsp.Send(ctx, record.FromOpenCDC(inRec))
}

outRecs := p.proc.Process(ctx, records)
var outRecs []sdk.ProcessedRecord
if p.cond == nil {
outRecs = p.proc.Process(ctx, records)
} else {
// We need to first evaluate condition for each record.

// TODO reuse these slices or at least use a pool
// keptRecords are records that will be sent to the processor
keptRecords := make([]opencdc.Record, 0, len(records))
// passthroughRecordIndexes are indexes of records that are just passed
// through to the other side.
passthroughRecordIndexes := make([]int, 0, len(records))

var err error

for i, rec := range records {
var keep bool
keep, err = p.cond.Evaluate(rec)
if err != nil {
err = cerrors.Errorf("failed evaluating condition: %w", err)
break
}

if keep {
keptRecords = append(keptRecords, rec)
} else {
passthroughRecordIndexes = append(passthroughRecordIndexes, i)
}
}

if len(keptRecords) > 0 {
outRecs = p.proc.Process(ctx, keptRecords)
if len(outRecs) > len(keptRecords) {
return []sdk.ProcessedRecord{
sdk.ErrorRecord{Error: cerrors.New("processor returned more records than input")},
}
}
}
if err != nil {
outRecs = append(outRecs, sdk.ErrorRecord{Error: err})
}

// Add passthrough records back into the resultset and keep the
// original order of the records.
if len(passthroughRecordIndexes) == len(records) {
// Optimization for the case where no records are kept
outRecs = make([]sdk.ProcessedRecord, len(records))
for i, rec := range records {
outRecs[i] = sdk.SingleRecord(rec)
}
} else if len(passthroughRecordIndexes) > 0 {
tmp := make([]sdk.ProcessedRecord, len(outRecs)+len(passthroughRecordIndexes))
prevIndex := -1
for i, index := range passthroughRecordIndexes {
// TODO index-i can be out of bounds if the processor returns
// fewer records than the input.
copy(tmp[prevIndex+1:index], outRecs[prevIndex-i+1:index-i])
tmp[index] = sdk.SingleRecord(records[index])
prevIndex = index
}
// if the last index is not the last record, copy the rest
if passthroughRecordIndexes[len(passthroughRecordIndexes)-1] != len(tmp)-1 {
copy(tmp[prevIndex+1:], outRecs[prevIndex-len(passthroughRecordIndexes)+1:])
}
outRecs = tmp
}
}

for _, outRec := range outRecs {
singleRec, ok := outRec.(sdk.SingleRecord)
if ok {
Expand Down
Loading
Loading