From d21426cec62bd21d8019e6ac384435a4f4159e92 Mon Sep 17 00:00:00 2001 From: Maha Hajja Date: Thu, 7 Dec 2023 10:28:45 -0800 Subject: [PATCH 1/5] add periods as allowed ids + fix http log --- pkg/conduit/runtime.go | 2 +- pkg/connector/service.go | 2 +- pkg/connector/service_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index 189093803..07b55155e 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -636,7 +636,7 @@ func (r *Runtime) serveHTTP( ) (net.Addr, error) { ln, err := net.Listen("tcp", srv.Addr) if err != nil { - return nil, cerrors.Errorf("failed to listen on address %q: %w", r.Config.API.GRPC.Address, err) + return nil, cerrors.Errorf("failed to listen on address %q: %w", r.Config.API.HTTP.Address, err) } t.Go(func() error { diff --git a/pkg/connector/service.go b/pkg/connector/service.go index 0e1433fec..d582cf652 100644 --- a/pkg/connector/service.go +++ b/pkg/connector/service.go @@ -27,7 +27,7 @@ import ( "github.com/conduitio/conduit/pkg/foundation/multierror" ) -var idRegex = regexp.MustCompile(`^[A-Za-z0-9-_:]*$`) +var idRegex = regexp.MustCompile(`^[A-Za-z0-9-_:.]*$`) // Service manages connectors. type Service struct { diff --git a/pkg/connector/service_test.go b/pkg/connector/service_test.go index fa13ef166..a0ee555f0 100644 --- a/pkg/connector/service_test.go +++ b/pkg/connector/service_test.go @@ -315,7 +315,7 @@ func TestService_Create_ValidateSuccess(t *testing.T) { }, }, { name: "valid connector ID", - connID: "Aa0-_", + connID: "Aa0-_.", data: Config{ Name: "test-connector", Settings: map[string]string{"foo": "bar"}, From 851d38cd4c9a1da61b56c288991a13fcc5e3a19d Mon Sep 17 00:00:00 2001 From: Maha Hajja Date: Fri, 26 Jan 2024 16:02:30 -0800 Subject: [PATCH 2/5] execute processor conditions --- pkg/processor/template_utils.go | 67 ++++++++++++++++++++++++++++ pkg/processor/template_utils_test.go | 64 ++++++++++++++++++++++++++ 2 files changed, 131 insertions(+) create mode 100644 pkg/processor/template_utils.go create mode 100644 pkg/processor/template_utils_test.go diff --git a/pkg/processor/template_utils.go b/pkg/processor/template_utils.go new file mode 100644 index 000000000..d328b588b --- /dev/null +++ b/pkg/processor/template_utils.go @@ -0,0 +1,67 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "bytes" + "fmt" + "strconv" + "text/template" + + "github.com/Masterminds/sprig/v3" + "github.com/conduitio/conduit/pkg/record" +) + +// TemplateUtils provides methods to parse, execute go templates for provided records, and return the boolean value of the output. +type TemplateUtils struct { + condition string + rec record.Record + tmpl *template.Template +} + +func NewTemplateUtils(condition string, rec record.Record) *TemplateUtils { + return &TemplateUtils{ + condition: condition, + rec: rec, + } +} + +// parse parses the provided template with injecting `sprig` functions, returns an error if parsing failed. +func (t *TemplateUtils) parse() error { + tmpl, err := template.New("").Funcs(sprig.FuncMap()).Parse(t.condition) + if err != nil { + return err + } + t.tmpl = tmpl + return nil +} + +// execute executes the template for the provided record, and parses the template output into a boolean, returns an error +// if output is not a boolean, or if template is nil (should be called after TemplateUtils.parse). +func (t *TemplateUtils) execute() (bool, error) { + if t.tmpl == nil { + return false, fmt.Errorf("template is nil, make sure to parse it first by calling TemplateUtils.parse()") + } + var b bytes.Buffer + err := t.tmpl.Execute(&b, t.rec) + if err != nil { + return false, err + } + output, err := strconv.ParseBool(b.String()) + if err != nil { + return false, fmt.Errorf("error converting the condition go-template output to boolean, %w", err) + } + return output, nil +} diff --git a/pkg/processor/template_utils_test.go b/pkg/processor/template_utils_test.go new file mode 100644 index 000000000..74585db93 --- /dev/null +++ b/pkg/processor/template_utils_test.go @@ -0,0 +1,64 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "testing" + + "github.com/conduitio/conduit/pkg/record" + "github.com/matryer/is" +) + +func Test_TemplateUtils_ExecuteTrue(t *testing.T) { + is := is.New(t) + condition := "{{ eq .Metadata.key \"val\" }}" + rec := record.Record{ + Position: record.Position("position-out"), + Metadata: record.Metadata{"key": "val"}, + } + tmpl := NewTemplateUtils(condition, rec) + is.NoErr(tmpl.parse()) + res, err := tmpl.execute() + is.NoErr(err) + is.True(res) +} + +func Test_TemplateUtils_ExecuteFalse(t *testing.T) { + is := is.New(t) + condition := "{{ eq .Metadata.key \"wrongVal\" }}" + rec := record.Record{ + Position: record.Position("position-out"), + Metadata: record.Metadata{"key": "val"}, + } + tmpl := NewTemplateUtils(condition, rec) + is.NoErr(tmpl.parse()) + res, err := tmpl.execute() + is.NoErr(err) + is.True(res == false) +} + +func Test_TemplateUtils_NonBooleanValue(t *testing.T) { + is := is.New(t) + condition := "{{ printf \"hi\" }}" + rec := record.Record{ + Position: record.Position("position-out"), + Metadata: record.Metadata{"key": "val"}, + } + tmpl := NewTemplateUtils(condition, rec) + is.NoErr(tmpl.parse()) + res, err := tmpl.execute() + is.True(err != nil) + is.True(res == false) +} From 4fe9dbfc36bc4ef8343cd4923568d3699a8153a2 Mon Sep 17 00:00:00 2001 From: Maha Hajja Date: Fri, 26 Jan 2024 16:10:45 -0800 Subject: [PATCH 3/5] go mod tidy --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index bede2f22d..96d8fabe3 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21.1 require ( buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.32.0-20231027202514-3f42134f4c56.1 github.com/Masterminds/semver/v3 v3.2.1 + github.com/Masterminds/sprig/v3 v3.2.3 github.com/NYTimes/gziphandler v1.1.1 github.com/antchfx/jsonquery v1.3.3 github.com/bufbuild/buf v1.29.0 @@ -71,7 +72,6 @@ require ( github.com/GaijinEntertainment/go-exhaustruct/v3 v3.1.0 // indirect github.com/Masterminds/goutils v1.1.1 // indirect github.com/Masterminds/semver v1.5.0 // indirect - github.com/Masterminds/sprig/v3 v3.2.3 // indirect github.com/Masterminds/squirrel v1.5.4 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect github.com/OpenPeeDeeP/depguard/v2 v2.1.0 // indirect From c31760bae980e29dc0e9964ec4315ba6694d5ae5 Mon Sep 17 00:00:00 2001 From: Maha Hajja Date: Mon, 5 Feb 2024 13:37:52 -0800 Subject: [PATCH 4/5] address reviews --- pkg/processor/processor_condition.go | 59 ++++++++++++++++ ...ls_test.go => processor_condition_test.go} | 38 ++++++----- pkg/processor/template_utils.go | 67 ------------------- 3 files changed, 82 insertions(+), 82 deletions(-) create mode 100644 pkg/processor/processor_condition.go rename pkg/processor/{template_utils_test.go => processor_condition_test.go} (60%) delete mode 100644 pkg/processor/template_utils.go diff --git a/pkg/processor/processor_condition.go b/pkg/processor/processor_condition.go new file mode 100644 index 000000000..88321ef57 --- /dev/null +++ b/pkg/processor/processor_condition.go @@ -0,0 +1,59 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "bytes" + "strconv" + "text/template" + + "github.com/Masterminds/sprig/v3" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/record" +) + +// processorCondition parse go templates, Evaluate them for provided records, and return the boolean value of the output. +type processorCondition struct { + condition string + tmpl *template.Template +} + +// NewProcessorCondition parses and returns the template, returns an error if template parsing failed. +func NewProcessorCondition(condition string) (*processorCondition, error) { + // parse template + tmpl, err := template.New("").Funcs(sprig.FuncMap()).Parse(condition) + if err != nil { + return nil, err + } + return &processorCondition{ + condition: condition, + tmpl: tmpl, + }, nil +} + +// Evaluate executes the template for the provided record, and parses the output into a boolean, returns an error +// if output is not a boolean. +func (t *processorCondition) Evaluate(rec record.Record) (bool, error) { + var b bytes.Buffer + err := t.tmpl.Execute(&b, rec) + if err != nil { + return false, err + } + output, err := strconv.ParseBool(b.String()) + if err != nil { + return false, cerrors.Errorf("error converting the condition go-template output to boolean, %w", err) + } + return output, nil +} diff --git a/pkg/processor/template_utils_test.go b/pkg/processor/processor_condition_test.go similarity index 60% rename from pkg/processor/template_utils_test.go rename to pkg/processor/processor_condition_test.go index 74585db93..d2bf31deb 100644 --- a/pkg/processor/template_utils_test.go +++ b/pkg/processor/processor_condition_test.go @@ -21,44 +21,52 @@ import ( "github.com/matryer/is" ) -func Test_TemplateUtils_ExecuteTrue(t *testing.T) { +func Test_ProcessorCondition_InvalidTemplate(t *testing.T) { is := is.New(t) - condition := "{{ eq .Metadata.key \"val\" }}" + condition := `{{ Im not a valid template }}` + tmpl, err := NewProcessorCondition(condition) + is.True(err != nil) + is.Equal(tmpl, nil) +} + +func Test_ProcessorCondition_EvaluateTrue(t *testing.T) { + is := is.New(t) + condition := `{{ eq .Metadata.key "val" }}` rec := record.Record{ Position: record.Position("position-out"), Metadata: record.Metadata{"key": "val"}, } - tmpl := NewTemplateUtils(condition, rec) - is.NoErr(tmpl.parse()) - res, err := tmpl.execute() + tmpl, err := NewProcessorCondition(condition) + is.NoErr(err) + res, err := tmpl.Evaluate(rec) is.NoErr(err) is.True(res) } -func Test_TemplateUtils_ExecuteFalse(t *testing.T) { +func Test_ProcessorCondition_EvaluateFalse(t *testing.T) { is := is.New(t) - condition := "{{ eq .Metadata.key \"wrongVal\" }}" + condition := `{{ eq .Metadata.key "wrongVal" }}` rec := record.Record{ Position: record.Position("position-out"), Metadata: record.Metadata{"key": "val"}, } - tmpl := NewTemplateUtils(condition, rec) - is.NoErr(tmpl.parse()) - res, err := tmpl.execute() + tmpl, err := NewProcessorCondition(condition) + is.NoErr(err) + res, err := tmpl.Evaluate(rec) is.NoErr(err) is.True(res == false) } -func Test_TemplateUtils_NonBooleanValue(t *testing.T) { +func Test_ProcessorCondition_NonBooleanOutput(t *testing.T) { is := is.New(t) - condition := "{{ printf \"hi\" }}" + condition := `{{ printf "hi" }}` rec := record.Record{ Position: record.Position("position-out"), Metadata: record.Metadata{"key": "val"}, } - tmpl := NewTemplateUtils(condition, rec) - is.NoErr(tmpl.parse()) - res, err := tmpl.execute() + tmpl, err := NewProcessorCondition(condition) + is.NoErr(err) + res, err := tmpl.Evaluate(rec) is.True(err != nil) is.True(res == false) } diff --git a/pkg/processor/template_utils.go b/pkg/processor/template_utils.go deleted file mode 100644 index d328b588b..000000000 --- a/pkg/processor/template_utils.go +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright © 2024 Meroxa, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package processor - -import ( - "bytes" - "fmt" - "strconv" - "text/template" - - "github.com/Masterminds/sprig/v3" - "github.com/conduitio/conduit/pkg/record" -) - -// TemplateUtils provides methods to parse, execute go templates for provided records, and return the boolean value of the output. -type TemplateUtils struct { - condition string - rec record.Record - tmpl *template.Template -} - -func NewTemplateUtils(condition string, rec record.Record) *TemplateUtils { - return &TemplateUtils{ - condition: condition, - rec: rec, - } -} - -// parse parses the provided template with injecting `sprig` functions, returns an error if parsing failed. -func (t *TemplateUtils) parse() error { - tmpl, err := template.New("").Funcs(sprig.FuncMap()).Parse(t.condition) - if err != nil { - return err - } - t.tmpl = tmpl - return nil -} - -// execute executes the template for the provided record, and parses the template output into a boolean, returns an error -// if output is not a boolean, or if template is nil (should be called after TemplateUtils.parse). -func (t *TemplateUtils) execute() (bool, error) { - if t.tmpl == nil { - return false, fmt.Errorf("template is nil, make sure to parse it first by calling TemplateUtils.parse()") - } - var b bytes.Buffer - err := t.tmpl.Execute(&b, t.rec) - if err != nil { - return false, err - } - output, err := strconv.ParseBool(b.String()) - if err != nil { - return false, fmt.Errorf("error converting the condition go-template output to boolean, %w", err) - } - return output, nil -} From c39812c42a74f2e336c14aa7428f16a5b30845cc Mon Sep 17 00:00:00 2001 From: Maha Hajja Date: Tue, 6 Feb 2024 08:21:07 -0800 Subject: [PATCH 5/5] make the constructor private --- pkg/processor/processor_condition.go | 4 ++-- pkg/processor/processor_condition_test.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/processor/processor_condition.go b/pkg/processor/processor_condition.go index 88321ef57..88baa0258 100644 --- a/pkg/processor/processor_condition.go +++ b/pkg/processor/processor_condition.go @@ -30,8 +30,8 @@ type processorCondition struct { tmpl *template.Template } -// NewProcessorCondition parses and returns the template, returns an error if template parsing failed. -func NewProcessorCondition(condition string) (*processorCondition, error) { +// newProcessorCondition parses and returns the template, returns an error if template parsing failed. +func newProcessorCondition(condition string) (*processorCondition, error) { // parse template tmpl, err := template.New("").Funcs(sprig.FuncMap()).Parse(condition) if err != nil { diff --git a/pkg/processor/processor_condition_test.go b/pkg/processor/processor_condition_test.go index d2bf31deb..fb13f25c6 100644 --- a/pkg/processor/processor_condition_test.go +++ b/pkg/processor/processor_condition_test.go @@ -24,7 +24,7 @@ import ( func Test_ProcessorCondition_InvalidTemplate(t *testing.T) { is := is.New(t) condition := `{{ Im not a valid template }}` - tmpl, err := NewProcessorCondition(condition) + tmpl, err := newProcessorCondition(condition) is.True(err != nil) is.Equal(tmpl, nil) } @@ -36,7 +36,7 @@ func Test_ProcessorCondition_EvaluateTrue(t *testing.T) { Position: record.Position("position-out"), Metadata: record.Metadata{"key": "val"}, } - tmpl, err := NewProcessorCondition(condition) + tmpl, err := newProcessorCondition(condition) is.NoErr(err) res, err := tmpl.Evaluate(rec) is.NoErr(err) @@ -50,7 +50,7 @@ func Test_ProcessorCondition_EvaluateFalse(t *testing.T) { Position: record.Position("position-out"), Metadata: record.Metadata{"key": "val"}, } - tmpl, err := NewProcessorCondition(condition) + tmpl, err := newProcessorCondition(condition) is.NoErr(err) res, err := tmpl.Evaluate(rec) is.NoErr(err) @@ -64,7 +64,7 @@ func Test_ProcessorCondition_NonBooleanOutput(t *testing.T) { Position: record.Position("position-out"), Metadata: record.Metadata{"key": "val"}, } - tmpl, err := NewProcessorCondition(condition) + tmpl, err := newProcessorCondition(condition) is.NoErr(err) res, err := tmpl.Evaluate(rec) is.True(err != nil)