Skip to content

Commit

Permalink
execute processor condition template (#1352)
Browse files Browse the repository at this point in the history
* add periods as allowed ids + fix http log

* execute processor conditions

* go mod tidy

* address reviews

* make the constructor private
  • Loading branch information
maha-hajja authored Feb 6, 2024
1 parent 700d6ef commit 442f8ee
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 1 deletion.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21.1
require (
buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.32.0-20231027202514-3f42134f4c56.1
github.com/Masterminds/semver/v3 v3.2.1
github.com/Masterminds/sprig/v3 v3.2.3
github.com/NYTimes/gziphandler v1.1.1
github.com/antchfx/jsonquery v1.3.3
github.com/bufbuild/buf v1.29.0
Expand Down Expand Up @@ -71,7 +72,6 @@ require (
github.com/GaijinEntertainment/go-exhaustruct/v3 v3.1.0 // indirect
github.com/Masterminds/goutils v1.1.1 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
github.com/Masterminds/sprig/v3 v3.2.3 // indirect
github.com/Masterminds/squirrel v1.5.4 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/OpenPeeDeeP/depguard/v2 v2.1.0 // indirect
Expand Down
59 changes: 59 additions & 0 deletions pkg/processor/processor_condition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processor

import (
"bytes"
"strconv"
"text/template"

"github.com/Masterminds/sprig/v3"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/record"
)

// processorCondition parse go templates, Evaluate them for provided records, and return the boolean value of the output.
type processorCondition struct {
condition string
tmpl *template.Template
}

// newProcessorCondition parses and returns the template, returns an error if template parsing failed.
func newProcessorCondition(condition string) (*processorCondition, error) {
// parse template
tmpl, err := template.New("").Funcs(sprig.FuncMap()).Parse(condition)
if err != nil {
return nil, err
}
return &processorCondition{
condition: condition,
tmpl: tmpl,
}, nil
}

// Evaluate executes the template for the provided record, and parses the output into a boolean, returns an error
// if output is not a boolean.
func (t *processorCondition) Evaluate(rec record.Record) (bool, error) {
var b bytes.Buffer
err := t.tmpl.Execute(&b, rec)
if err != nil {
return false, err
}
output, err := strconv.ParseBool(b.String())
if err != nil {
return false, cerrors.Errorf("error converting the condition go-template output to boolean, %w", err)
}
return output, nil
}
72 changes: 72 additions & 0 deletions pkg/processor/processor_condition_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processor

import (
"testing"

"github.com/conduitio/conduit/pkg/record"
"github.com/matryer/is"
)

func Test_ProcessorCondition_InvalidTemplate(t *testing.T) {
is := is.New(t)
condition := `{{ Im not a valid template }}`
tmpl, err := newProcessorCondition(condition)
is.True(err != nil)
is.Equal(tmpl, nil)
}

func Test_ProcessorCondition_EvaluateTrue(t *testing.T) {
is := is.New(t)
condition := `{{ eq .Metadata.key "val" }}`
rec := record.Record{
Position: record.Position("position-out"),
Metadata: record.Metadata{"key": "val"},
}
tmpl, err := newProcessorCondition(condition)
is.NoErr(err)
res, err := tmpl.Evaluate(rec)
is.NoErr(err)
is.True(res)
}

func Test_ProcessorCondition_EvaluateFalse(t *testing.T) {
is := is.New(t)
condition := `{{ eq .Metadata.key "wrongVal" }}`
rec := record.Record{
Position: record.Position("position-out"),
Metadata: record.Metadata{"key": "val"},
}
tmpl, err := newProcessorCondition(condition)
is.NoErr(err)
res, err := tmpl.Evaluate(rec)
is.NoErr(err)
is.True(res == false)
}

func Test_ProcessorCondition_NonBooleanOutput(t *testing.T) {
is := is.New(t)
condition := `{{ printf "hi" }}`
rec := record.Record{
Position: record.Position("position-out"),
Metadata: record.Metadata{"key": "val"},
}
tmpl, err := newProcessorCondition(condition)
is.NoErr(err)
res, err := tmpl.Evaluate(rec)
is.True(err != nil)
is.True(res == false)
}

0 comments on commit 442f8ee

Please sign in to comment.