From a79386e1221ff26c26834b20f1b6daa5226652e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Thu, 16 May 2024 18:24:24 +0200 Subject: [PATCH] Error processor (#1598) * implement error processor * regenerate error processor specs * update conduit-processor-sdk * fix specs * update doc --- go.mod | 2 +- go.sum | 4 +- .../builtin/impl/avro/decode_test.go | 3 +- .../builtin/impl/avro/encode_test.go | 3 +- pkg/plugin/processor/builtin/impl/error.go | 101 ++++++++++++++++++ .../builtin/impl/error_examples_test.go | 47 ++++++++ .../processor/builtin/impl/error_paramgen.go | 19 ++++ .../processor/builtin/impl/error_test.go | 101 ++++++++++++++++++ .../builtin/impl/json/encode_test.go | 5 +- .../builtin/impl/unwrap/opencdc_test.go | 3 +- .../internal/exampleutil/specs/error.json | 45 ++++++++ pkg/plugin/processor/builtin/registry.go | 1 + 12 files changed, 322 insertions(+), 12 deletions(-) create mode 100644 pkg/plugin/processor/builtin/impl/error.go create mode 100644 pkg/plugin/processor/builtin/impl/error_examples_test.go create mode 100644 pkg/plugin/processor/builtin/impl/error_paramgen.go create mode 100644 pkg/plugin/processor/builtin/impl/error_test.go create mode 100644 pkg/plugin/processor/builtin/internal/exampleutil/specs/error.json diff --git a/go.mod b/go.mod index f0929926d..392b204a8 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/conduitio/conduit-connector-protocol v0.6.0 github.com/conduitio/conduit-connector-s3 v0.5.1 github.com/conduitio/conduit-connector-sdk v0.9.1 - github.com/conduitio/conduit-processor-sdk v0.1.1 + github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd github.com/conduitio/yaml/v3 v3.3.0 github.com/dgraph-io/badger/v4 v4.2.0 github.com/dop251/goja v0.0.0-20231027120936-b396bb4c349d diff --git a/go.sum b/go.sum index ebb84e00e..ebd99b6c4 100644 --- a/go.sum +++ b/go.sum @@ -337,8 +337,8 @@ github.com/conduitio/conduit-connector-s3 v0.5.1 h1:yRo8004ryCIZc/S3iWQ1rN6pm6bj github.com/conduitio/conduit-connector-s3 v0.5.1/go.mod h1:nbxzsyS95gbFJ28Job9vFFB+byRFINSv70/13Yi4mKQ= github.com/conduitio/conduit-connector-sdk v0.9.1 h1:DiMUn7udnjWvyaDsyeTZFHeYTEIdqUU6dqPunEEE3Kw= github.com/conduitio/conduit-connector-sdk v0.9.1/go.mod h1:cNoofumgDlsaThkxkNYg7zab4AkmRZt1V711aO7guGU= -github.com/conduitio/conduit-processor-sdk v0.1.1 h1:C+5Z9pGKVTpdIf5QFNx4UxpvxuOylGRVkGidEpom7HQ= -github.com/conduitio/conduit-processor-sdk v0.1.1/go.mod h1:StkbqQX1WxTjr9LOy7zY+e3DAbEDVvozeamELdzFqck= +github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd h1:R+tpcZKWOnr6LRsXr85C167SK9MhaLhYUEjBSUupU9Y= +github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd/go.mod h1:E9zqj0atY1+yBHWi4eZ3TagCZSBnFxBQBUcZktL6RFE= github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI= github.com/conduitio/yaml/v3 v3.3.0/go.mod h1:JNgFMOX1t8W4YJuRZOh6GggVtSMsgP9XgTw+7dIenpc= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= diff --git a/pkg/plugin/processor/builtin/impl/avro/decode_test.go b/pkg/plugin/processor/builtin/impl/avro/decode_test.go index 94a0a1acb..faacd39cd 100644 --- a/pkg/plugin/processor/builtin/impl/avro/decode_test.go +++ b/pkg/plugin/processor/builtin/impl/avro/decode_test.go @@ -18,11 +18,10 @@ import ( "context" "testing" - "github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal" - "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal" "github.com/google/go-cmp/cmp" "github.com/matryer/is" "go.uber.org/mock/gomock" diff --git a/pkg/plugin/processor/builtin/impl/avro/encode_test.go b/pkg/plugin/processor/builtin/impl/avro/encode_test.go index ece621292..2a6b06701 100644 --- a/pkg/plugin/processor/builtin/impl/avro/encode_test.go +++ b/pkg/plugin/processor/builtin/impl/avro/encode_test.go @@ -18,11 +18,10 @@ import ( "context" "testing" - "github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal" - "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal" "github.com/google/go-cmp/cmp" "github.com/matryer/is" "go.uber.org/mock/gomock" diff --git a/pkg/plugin/processor/builtin/impl/error.go b/pkg/plugin/processor/builtin/impl/error.go new file mode 100644 index 000000000..1aefd234a --- /dev/null +++ b/pkg/plugin/processor/builtin/impl/error.go @@ -0,0 +1,101 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:generate paramgen -output=error_paramgen.go errorConfig + +package impl + +import ( + "context" + "strings" + "text/template" + + "github.com/Masterminds/sprig/v3" + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-processor-sdk" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" +) + +type errorConfig struct { + // Error message to be returned. This can be a Go [template](https://pkg.go.dev/text/template) + // executed on each [`Record`](https://pkg.go.dev/github.com/conduitio/conduit-commons/opencdc#Record) + // being processed. + Message string `json:"message" default:"error processor triggered"` +} + +type errorProcessor struct { + sdk.UnimplementedProcessor + + config errorConfig + errorMessageTmpl *template.Template +} + +func NewErrorProcessor(log.CtxLogger) sdk.Processor { + return &errorProcessor{} +} + +func (p *errorProcessor) Specification() (sdk.Specification, error) { + return sdk.Specification{ + Name: "error", + Summary: "Returns an error for all records that get passed to the processor.", + Description: `Any time a record is passed to this processor it returns an error, +which results in the record being sent to the DLQ if it's configured, or the pipeline stopping. + +**Important:** Make sure to add a [condition](https://conduit.io/docs/processors/conditions) +to this processor, otherwise all records will trigger an error.`, + Version: "v0.1.0", + Author: "Meroxa, Inc.", + Parameters: p.config.Parameters(), + }, nil +} + +func (p *errorProcessor) Configure(ctx context.Context, cfg map[string]string) error { + err := sdk.ParseConfig(ctx, cfg, &p.config, p.config.Parameters()) + if err != nil { + return cerrors.Errorf("failed parsing configuration: %w", err) + } + + if strings.Contains(p.config.Message, "{{") { + // create URL template + p.errorMessageTmpl, err = template.New("").Funcs(sprig.FuncMap()).Parse(p.config.Message) + if err != nil { + return cerrors.Errorf("error while parsing the error message template: %w", err) + } + } + + return nil +} + +func (p *errorProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord { + out := make([]sdk.ProcessedRecord, len(records)) + for i := range records { + out[i] = sdk.ErrorRecord{ + Error: cerrors.New(p.errorMessage(records[i])), + } + } + return out +} + +func (p *errorProcessor) errorMessage(record opencdc.Record) string { + if p.errorMessageTmpl == nil { + return p.config.Message + } + + var buf strings.Builder + if err := p.errorMessageTmpl.Execute(&buf, record); err != nil { + return err.Error() + } + return buf.String() +} diff --git a/pkg/plugin/processor/builtin/impl/error_examples_test.go b/pkg/plugin/processor/builtin/impl/error_examples_test.go new file mode 100644 index 000000000..444f69842 --- /dev/null +++ b/pkg/plugin/processor/builtin/impl/error_examples_test.go @@ -0,0 +1,47 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package impl + +import ( + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-processor-sdk" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal/exampleutil" +) + +//nolint:govet // we're using a more descriptive name of example +func ExampleErrorProcessor() { + p := NewErrorProcessor(log.Nop()) + + exampleutil.RunExample(p, exampleutil.Example{ + Summary: `Error record with custom error message`, + Description: `This example shows how to configure the error processor to +return a custom error message for a record using a Go template.`, + Config: map[string]string{ + "message": "custom error message with data from record: {{.Metadata.foo}}", + }, + Have: opencdc.Record{ + Operation: opencdc.OperationCreate, + Metadata: map[string]string{"foo": "bar"}, + Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}}, + }, + Want: sdk.ErrorRecord{ + Error: cerrors.New("custom error message with data from record: bar"), + }}) + + // Output: + // processor returned error: custom error message with data from record: bar +} diff --git a/pkg/plugin/processor/builtin/impl/error_paramgen.go b/pkg/plugin/processor/builtin/impl/error_paramgen.go new file mode 100644 index 000000000..9e4351892 --- /dev/null +++ b/pkg/plugin/processor/builtin/impl/error_paramgen.go @@ -0,0 +1,19 @@ +// Code generated by paramgen. DO NOT EDIT. +// Source: github.com/ConduitIO/conduit-commons/tree/main/paramgen + +package impl + +import ( + "github.com/conduitio/conduit-commons/config" +) + +func (errorConfig) Parameters() map[string]config.Parameter { + return map[string]config.Parameter{ + "message": { + Default: "error processor triggered", + Description: "Error message to be returned. This can be a Go [template](https://pkg.go.dev/text/template)\nexecuted on each [`Record`](https://pkg.go.dev/github.com/conduitio/conduit-commons/opencdc#Record)\nbeing processed.", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, + }, + } +} diff --git a/pkg/plugin/processor/builtin/impl/error_test.go b/pkg/plugin/processor/builtin/impl/error_test.go new file mode 100644 index 000000000..5703cef7a --- /dev/null +++ b/pkg/plugin/processor/builtin/impl/error_test.go @@ -0,0 +1,101 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package impl + +import ( + "context" + "testing" + + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-processor-sdk" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/matryer/is" +) + +func TestError_EmptyConfig(t *testing.T) { + is := is.New(t) + proc := NewErrorProcessor(log.Nop()) + cfg := map[string]string{} + ctx := context.Background() + records := []opencdc.Record{ + { + Metadata: map[string]string{"key1": "val1"}, + Payload: opencdc.Change{ + After: opencdc.StructuredData{ + "foo": "bar", + }, + }, + }, + { + Metadata: map[string]string{"key2": "val2"}, + Payload: opencdc.Change{}, + }, + } + err := proc.Configure(ctx, cfg) + is.NoErr(err) + + got := proc.Process(ctx, records) + is.Equal(len(got), 2) + for _, r := range got { + is.Equal(r.(sdk.ErrorRecord).Error.Error(), "error processor triggered") + } +} + +func TestError_ErrorMessage(t *testing.T) { + records := []opencdc.Record{ + { + Metadata: map[string]string{"foo": "rec 1"}, + Payload: opencdc.Change{ + After: opencdc.StructuredData{ + "foo": "bar", + }, + }, + }, + { + Metadata: map[string]string{"foo": "rec 2"}, + Payload: opencdc.Change{}, + }, + } + testCases := []struct { + name string + cfg map[string]string + wantErrMessages []string + }{{ + name: "static error message", + cfg: map[string]string{"message": "static error message"}, + wantErrMessages: []string{"static error message", "static error message"}, + }, { + name: "template error message", + cfg: map[string]string{"message": "error message: {{.Metadata.foo}}"}, + wantErrMessages: []string{"error message: rec 1", "error message: rec 2"}, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + is := is.New(t) + proc := NewErrorProcessor(log.Nop()) + ctx := context.Background() + + err := proc.Configure(ctx, tc.cfg) + is.NoErr(err) + + got := proc.Process(ctx, records) + is.Equal(len(got), 2) + for i, r := range got { + is.Equal(r.(sdk.ErrorRecord).Error.Error(), tc.wantErrMessages[i]) + } + }) + } +} diff --git a/pkg/plugin/processor/builtin/impl/json/encode_test.go b/pkg/plugin/processor/builtin/impl/json/encode_test.go index 884cfb7cb..4989ff498 100644 --- a/pkg/plugin/processor/builtin/impl/json/encode_test.go +++ b/pkg/plugin/processor/builtin/impl/json/encode_test.go @@ -18,12 +18,11 @@ import ( "context" "testing" - "github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal" - "github.com/google/go-cmp/cmp" - "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal" + "github.com/google/go-cmp/cmp" "github.com/matryer/is" ) diff --git a/pkg/plugin/processor/builtin/impl/unwrap/opencdc_test.go b/pkg/plugin/processor/builtin/impl/unwrap/opencdc_test.go index c04dd784e..b0b98bb61 100644 --- a/pkg/plugin/processor/builtin/impl/unwrap/opencdc_test.go +++ b/pkg/plugin/processor/builtin/impl/unwrap/opencdc_test.go @@ -18,12 +18,11 @@ import ( "context" "testing" - "github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal" - "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal" "github.com/google/go-cmp/cmp" "github.com/matryer/is" ) diff --git a/pkg/plugin/processor/builtin/internal/exampleutil/specs/error.json b/pkg/plugin/processor/builtin/internal/exampleutil/specs/error.json new file mode 100644 index 000000000..179095137 --- /dev/null +++ b/pkg/plugin/processor/builtin/internal/exampleutil/specs/error.json @@ -0,0 +1,45 @@ +{ + "specification": { + "name": "error", + "summary": "Returns an error for all records that get passed to the processor.", + "description": "Any time a record is passed to this processor it returns an error,\nwhich results in the record being sent to the DLQ if it's configured, or the pipeline stopping.\n\n**Important:** Make sure to add a [condition](https://conduit.io/docs/processors/conditions)\nto this processor, otherwise all records will trigger an error.", + "version": "v0.1.0", + "author": "Meroxa, Inc.", + "parameters": { + "message": { + "default": "error processor triggered", + "description": "Error message to be returned. This can be a Go [template](https://pkg.go.dev/text/template)\nexecuted on each [`Record`](https://pkg.go.dev/github.com/conduitio/conduit-commons/opencdc#Record)\nbeing processed.", + "type": "string", + "validations": [] + } + } + }, + "examples": [ + { + "summary": "Error record with custom error message", + "description": "This example shows how to configure the error processor to\nreturn a custom error message for a record using a Go template.", + "config": { + "message": "custom error message with data from record: {{.Metadata.foo}}" + }, + "have": { + "position": null, + "operation": "create", + "metadata": { + "foo": "bar" + }, + "key": null, + "payload": { + "before": { + "bar": "baz" + }, + "after": { + "foo": "bar" + } + } + }, + "want": { + "error": "custom error message with data from record: bar" + } + } + ] +} diff --git a/pkg/plugin/processor/builtin/registry.go b/pkg/plugin/processor/builtin/registry.go index 03951fd43..6e4024b87 100644 --- a/pkg/plugin/processor/builtin/registry.go +++ b/pkg/plugin/processor/builtin/registry.go @@ -39,6 +39,7 @@ var DefaultBuiltinProcessors = map[string]ProcessorPluginConstructor{ "base64.decode": base64.NewDecodeProcessor, "base64.encode": base64.NewEncodeProcessor, "custom.javascript": custom.NewJavascriptProcessor, + "error": impl.NewErrorProcessor, "filter": impl.NewFilterProcessor, "field.convert": field.NewConvertProcessor, "field.exclude": field.NewExcludeProcessor,