Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

execute processor condition template #1352

Merged
merged 11 commits into from
Feb 6, 2024
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21.1
require (
buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.32.0-20231027202514-3f42134f4c56.1
github.com/Masterminds/semver/v3 v3.2.1
github.com/Masterminds/sprig/v3 v3.2.3
github.com/NYTimes/gziphandler v1.1.1
github.com/antchfx/jsonquery v1.3.3
github.com/bufbuild/buf v1.29.0
Expand Down Expand Up @@ -71,7 +72,6 @@ require (
github.com/GaijinEntertainment/go-exhaustruct/v3 v3.1.0 // indirect
github.com/Masterminds/goutils v1.1.1 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
github.com/Masterminds/sprig/v3 v3.2.3 // indirect
github.com/Masterminds/squirrel v1.5.4 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/OpenPeeDeeP/depguard/v2 v2.1.0 // indirect
Expand Down
59 changes: 59 additions & 0 deletions pkg/processor/processor_condition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processor

import (
"bytes"
"strconv"
"text/template"

"github.com/Masterminds/sprig/v3"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/record"
)

// processorCondition parse go templates, Evaluate them for provided records, and return the boolean value of the output.
type processorCondition struct {
condition string
tmpl *template.Template
}

// newProcessorCondition parses and returns the template, returns an error if template parsing failed.
func newProcessorCondition(condition string) (*processorCondition, error) {
// parse template
tmpl, err := template.New("").Funcs(sprig.FuncMap()).Parse(condition)
if err != nil {
return nil, err
}
return &processorCondition{
condition: condition,
tmpl: tmpl,
}, nil
}

// Evaluate executes the template for the provided record, and parses the output into a boolean, returns an error
// if output is not a boolean.
func (t *processorCondition) Evaluate(rec record.Record) (bool, error) {
var b bytes.Buffer
err := t.tmpl.Execute(&b, rec)
if err != nil {
return false, err
}
output, err := strconv.ParseBool(b.String())
if err != nil {
return false, cerrors.Errorf("error converting the condition go-template output to boolean, %w", err)
}
return output, nil
}
72 changes: 72 additions & 0 deletions pkg/processor/processor_condition_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processor

import (
"testing"

"github.com/conduitio/conduit/pkg/record"
"github.com/matryer/is"
)

func Test_ProcessorCondition_InvalidTemplate(t *testing.T) {
is := is.New(t)
condition := `{{ Im not a valid template }}`
tmpl, err := newProcessorCondition(condition)
is.True(err != nil)
is.Equal(tmpl, nil)
}

func Test_ProcessorCondition_EvaluateTrue(t *testing.T) {
is := is.New(t)
condition := `{{ eq .Metadata.key "val" }}`
rec := record.Record{
Position: record.Position("position-out"),
Metadata: record.Metadata{"key": "val"},
}
tmpl, err := newProcessorCondition(condition)
is.NoErr(err)
res, err := tmpl.Evaluate(rec)
is.NoErr(err)
is.True(res)
}

func Test_ProcessorCondition_EvaluateFalse(t *testing.T) {
is := is.New(t)
condition := `{{ eq .Metadata.key "wrongVal" }}`
rec := record.Record{
Position: record.Position("position-out"),
Metadata: record.Metadata{"key": "val"},
}
tmpl, err := newProcessorCondition(condition)
is.NoErr(err)
res, err := tmpl.Evaluate(rec)
is.NoErr(err)
is.True(res == false)
}

func Test_ProcessorCondition_NonBooleanOutput(t *testing.T) {
is := is.New(t)
condition := `{{ printf "hi" }}`
rec := record.Record{
Position: record.Position("position-out"),
Metadata: record.Metadata{"key": "val"},
}
tmpl, err := newProcessorCondition(condition)
is.NoErr(err)
res, err := tmpl.Evaluate(rec)
is.True(err != nil)
is.True(res == false)
}
Loading