diff --git a/go.mod b/go.mod index 9f342dbed..1c40b24b3 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21.1 require ( buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.32.0-20231027202514-3f42134f4c56.1 github.com/Masterminds/semver/v3 v3.2.1 + github.com/Masterminds/sprig/v3 v3.2.3 github.com/NYTimes/gziphandler v1.1.1 github.com/antchfx/jsonquery v1.3.3 github.com/bufbuild/buf v1.29.0 @@ -71,7 +72,6 @@ require ( github.com/GaijinEntertainment/go-exhaustruct/v3 v3.1.0 // indirect github.com/Masterminds/goutils v1.1.1 // indirect github.com/Masterminds/semver v1.5.0 // indirect - github.com/Masterminds/sprig/v3 v3.2.3 // indirect github.com/Masterminds/squirrel v1.5.4 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect github.com/OpenPeeDeeP/depguard/v2 v2.1.0 // indirect diff --git a/pkg/processor/processor_condition.go b/pkg/processor/processor_condition.go new file mode 100644 index 000000000..88baa0258 --- /dev/null +++ b/pkg/processor/processor_condition.go @@ -0,0 +1,59 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "bytes" + "strconv" + "text/template" + + "github.com/Masterminds/sprig/v3" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/record" +) + +// processorCondition parse go templates, Evaluate them for provided records, and return the boolean value of the output. +type processorCondition struct { + condition string + tmpl *template.Template +} + +// newProcessorCondition parses and returns the template, returns an error if template parsing failed. +func newProcessorCondition(condition string) (*processorCondition, error) { + // parse template + tmpl, err := template.New("").Funcs(sprig.FuncMap()).Parse(condition) + if err != nil { + return nil, err + } + return &processorCondition{ + condition: condition, + tmpl: tmpl, + }, nil +} + +// Evaluate executes the template for the provided record, and parses the output into a boolean, returns an error +// if output is not a boolean. +func (t *processorCondition) Evaluate(rec record.Record) (bool, error) { + var b bytes.Buffer + err := t.tmpl.Execute(&b, rec) + if err != nil { + return false, err + } + output, err := strconv.ParseBool(b.String()) + if err != nil { + return false, cerrors.Errorf("error converting the condition go-template output to boolean, %w", err) + } + return output, nil +} diff --git a/pkg/processor/processor_condition_test.go b/pkg/processor/processor_condition_test.go new file mode 100644 index 000000000..fb13f25c6 --- /dev/null +++ b/pkg/processor/processor_condition_test.go @@ -0,0 +1,72 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "testing" + + "github.com/conduitio/conduit/pkg/record" + "github.com/matryer/is" +) + +func Test_ProcessorCondition_InvalidTemplate(t *testing.T) { + is := is.New(t) + condition := `{{ Im not a valid template }}` + tmpl, err := newProcessorCondition(condition) + is.True(err != nil) + is.Equal(tmpl, nil) +} + +func Test_ProcessorCondition_EvaluateTrue(t *testing.T) { + is := is.New(t) + condition := `{{ eq .Metadata.key "val" }}` + rec := record.Record{ + Position: record.Position("position-out"), + Metadata: record.Metadata{"key": "val"}, + } + tmpl, err := newProcessorCondition(condition) + is.NoErr(err) + res, err := tmpl.Evaluate(rec) + is.NoErr(err) + is.True(res) +} + +func Test_ProcessorCondition_EvaluateFalse(t *testing.T) { + is := is.New(t) + condition := `{{ eq .Metadata.key "wrongVal" }}` + rec := record.Record{ + Position: record.Position("position-out"), + Metadata: record.Metadata{"key": "val"}, + } + tmpl, err := newProcessorCondition(condition) + is.NoErr(err) + res, err := tmpl.Evaluate(rec) + is.NoErr(err) + is.True(res == false) +} + +func Test_ProcessorCondition_NonBooleanOutput(t *testing.T) { + is := is.New(t) + condition := `{{ printf "hi" }}` + rec := record.Record{ + Position: record.Position("position-out"), + Metadata: record.Metadata{"key": "val"}, + } + tmpl, err := newProcessorCondition(condition) + is.NoErr(err) + res, err := tmpl.Evaluate(rec) + is.True(err != nil) + is.True(res == false) +}