Skip to content

Commit

Permalink
Error processor (#1598)
Browse files Browse the repository at this point in the history
* implement error processor

* regenerate error processor specs

* update conduit-processor-sdk

* fix specs

* update doc
  • Loading branch information
lovromazgon authored May 16, 2024
1 parent b45b05e commit a79386e
Show file tree
Hide file tree
Showing 12 changed files with 322 additions and 12 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/conduitio/conduit-connector-protocol v0.6.0
github.com/conduitio/conduit-connector-s3 v0.5.1
github.com/conduitio/conduit-connector-sdk v0.9.1
github.com/conduitio/conduit-processor-sdk v0.1.1
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd
github.com/conduitio/yaml/v3 v3.3.0
github.com/dgraph-io/badger/v4 v4.2.0
github.com/dop251/goja v0.0.0-20231027120936-b396bb4c349d
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ github.com/conduitio/conduit-connector-s3 v0.5.1 h1:yRo8004ryCIZc/S3iWQ1rN6pm6bj
github.com/conduitio/conduit-connector-s3 v0.5.1/go.mod h1:nbxzsyS95gbFJ28Job9vFFB+byRFINSv70/13Yi4mKQ=
github.com/conduitio/conduit-connector-sdk v0.9.1 h1:DiMUn7udnjWvyaDsyeTZFHeYTEIdqUU6dqPunEEE3Kw=
github.com/conduitio/conduit-connector-sdk v0.9.1/go.mod h1:cNoofumgDlsaThkxkNYg7zab4AkmRZt1V711aO7guGU=
github.com/conduitio/conduit-processor-sdk v0.1.1 h1:C+5Z9pGKVTpdIf5QFNx4UxpvxuOylGRVkGidEpom7HQ=
github.com/conduitio/conduit-processor-sdk v0.1.1/go.mod h1:StkbqQX1WxTjr9LOy7zY+e3DAbEDVvozeamELdzFqck=
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd h1:R+tpcZKWOnr6LRsXr85C167SK9MhaLhYUEjBSUupU9Y=
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd/go.mod h1:E9zqj0atY1+yBHWi4eZ3TagCZSBnFxBQBUcZktL6RFE=
github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI=
github.com/conduitio/yaml/v3 v3.3.0/go.mod h1:JNgFMOX1t8W4YJuRZOh6GggVtSMsgP9XgTw+7dIenpc=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
Expand Down
3 changes: 1 addition & 2 deletions pkg/plugin/processor/builtin/impl/avro/decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ import (
"context"
"testing"

"github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal"

"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal"
"github.com/google/go-cmp/cmp"
"github.com/matryer/is"
"go.uber.org/mock/gomock"
Expand Down
3 changes: 1 addition & 2 deletions pkg/plugin/processor/builtin/impl/avro/encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ import (
"context"
"testing"

"github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal"

"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal"
"github.com/google/go-cmp/cmp"
"github.com/matryer/is"
"go.uber.org/mock/gomock"
Expand Down
101 changes: 101 additions & 0 deletions pkg/plugin/processor/builtin/impl/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate paramgen -output=error_paramgen.go errorConfig

package impl

import (
"context"
"strings"
"text/template"

"github.com/Masterminds/sprig/v3"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
)

type errorConfig struct {
// Error message to be returned. This can be a Go [template](https://pkg.go.dev/text/template)
// executed on each [`Record`](https://pkg.go.dev/github.com/conduitio/conduit-commons/opencdc#Record)
// being processed.
Message string `json:"message" default:"error processor triggered"`
}

type errorProcessor struct {
sdk.UnimplementedProcessor

config errorConfig
errorMessageTmpl *template.Template
}

func NewErrorProcessor(log.CtxLogger) sdk.Processor {
return &errorProcessor{}
}

func (p *errorProcessor) Specification() (sdk.Specification, error) {
return sdk.Specification{
Name: "error",
Summary: "Returns an error for all records that get passed to the processor.",
Description: `Any time a record is passed to this processor it returns an error,
which results in the record being sent to the DLQ if it's configured, or the pipeline stopping.
**Important:** Make sure to add a [condition](https://conduit.io/docs/processors/conditions)
to this processor, otherwise all records will trigger an error.`,
Version: "v0.1.0",
Author: "Meroxa, Inc.",
Parameters: p.config.Parameters(),
}, nil
}

func (p *errorProcessor) Configure(ctx context.Context, cfg map[string]string) error {
err := sdk.ParseConfig(ctx, cfg, &p.config, p.config.Parameters())
if err != nil {
return cerrors.Errorf("failed parsing configuration: %w", err)
}

if strings.Contains(p.config.Message, "{{") {
// create URL template
p.errorMessageTmpl, err = template.New("").Funcs(sprig.FuncMap()).Parse(p.config.Message)
if err != nil {
return cerrors.Errorf("error while parsing the error message template: %w", err)
}
}

return nil
}

func (p *errorProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord {
out := make([]sdk.ProcessedRecord, len(records))
for i := range records {
out[i] = sdk.ErrorRecord{
Error: cerrors.New(p.errorMessage(records[i])),
}
}
return out
}

func (p *errorProcessor) errorMessage(record opencdc.Record) string {
if p.errorMessageTmpl == nil {
return p.config.Message
}

var buf strings.Builder
if err := p.errorMessageTmpl.Execute(&buf, record); err != nil {
return err.Error()
}
return buf.String()
}
47 changes: 47 additions & 0 deletions pkg/plugin/processor/builtin/impl/error_examples_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package impl

import (
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal/exampleutil"
)

//nolint:govet // we're using a more descriptive name of example
func ExampleErrorProcessor() {
p := NewErrorProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
Summary: `Error record with custom error message`,
Description: `This example shows how to configure the error processor to
return a custom error message for a record using a Go template.`,
Config: map[string]string{
"message": "custom error message with data from record: {{.Metadata.foo}}",
},
Have: opencdc.Record{
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"foo": "bar"},
Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}},
},
Want: sdk.ErrorRecord{
Error: cerrors.New("custom error message with data from record: bar"),
}})

// Output:
// processor returned error: custom error message with data from record: bar
}
19 changes: 19 additions & 0 deletions pkg/plugin/processor/builtin/impl/error_paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

101 changes: 101 additions & 0 deletions pkg/plugin/processor/builtin/impl/error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package impl

import (
"context"
"testing"

"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/matryer/is"
)

func TestError_EmptyConfig(t *testing.T) {
is := is.New(t)
proc := NewErrorProcessor(log.Nop())
cfg := map[string]string{}
ctx := context.Background()
records := []opencdc.Record{
{
Metadata: map[string]string{"key1": "val1"},
Payload: opencdc.Change{
After: opencdc.StructuredData{
"foo": "bar",
},
},
},
{
Metadata: map[string]string{"key2": "val2"},
Payload: opencdc.Change{},
},
}
err := proc.Configure(ctx, cfg)
is.NoErr(err)

got := proc.Process(ctx, records)
is.Equal(len(got), 2)
for _, r := range got {
is.Equal(r.(sdk.ErrorRecord).Error.Error(), "error processor triggered")
}
}

func TestError_ErrorMessage(t *testing.T) {
records := []opencdc.Record{
{
Metadata: map[string]string{"foo": "rec 1"},
Payload: opencdc.Change{
After: opencdc.StructuredData{
"foo": "bar",
},
},
},
{
Metadata: map[string]string{"foo": "rec 2"},
Payload: opencdc.Change{},
},
}
testCases := []struct {
name string
cfg map[string]string
wantErrMessages []string
}{{
name: "static error message",
cfg: map[string]string{"message": "static error message"},
wantErrMessages: []string{"static error message", "static error message"},
}, {
name: "template error message",
cfg: map[string]string{"message": "error message: {{.Metadata.foo}}"},
wantErrMessages: []string{"error message: rec 1", "error message: rec 2"},
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
proc := NewErrorProcessor(log.Nop())
ctx := context.Background()

err := proc.Configure(ctx, tc.cfg)
is.NoErr(err)

got := proc.Process(ctx, records)
is.Equal(len(got), 2)
for i, r := range got {
is.Equal(r.(sdk.ErrorRecord).Error.Error(), tc.wantErrMessages[i])
}
})
}
}
5 changes: 2 additions & 3 deletions pkg/plugin/processor/builtin/impl/json/encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ import (
"context"
"testing"

"github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal"
"github.com/google/go-cmp/cmp"

"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal"
"github.com/google/go-cmp/cmp"
"github.com/matryer/is"
)

Expand Down
3 changes: 1 addition & 2 deletions pkg/plugin/processor/builtin/impl/unwrap/opencdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ import (
"context"
"testing"

"github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal"

"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal"
"github.com/google/go-cmp/cmp"
"github.com/matryer/is"
)
Expand Down
45 changes: 45 additions & 0 deletions pkg/plugin/processor/builtin/internal/exampleutil/specs/error.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"specification": {
"name": "error",
"summary": "Returns an error for all records that get passed to the processor.",
"description": "Any time a record is passed to this processor it returns an error,\nwhich results in the record being sent to the DLQ if it's configured, or the pipeline stopping.\n\n**Important:** Make sure to add a [condition](https://conduit.io/docs/processors/conditions)\nto this processor, otherwise all records will trigger an error.",
"version": "v0.1.0",
"author": "Meroxa, Inc.",
"parameters": {
"message": {
"default": "error processor triggered",
"description": "Error message to be returned. This can be a Go [template](https://pkg.go.dev/text/template)\nexecuted on each [`Record`](https://pkg.go.dev/github.com/conduitio/conduit-commons/opencdc#Record)\nbeing processed.",
"type": "string",
"validations": []
}
}
},
"examples": [
{
"summary": "Error record with custom error message",
"description": "This example shows how to configure the error processor to\nreturn a custom error message for a record using a Go template.",
"config": {
"message": "custom error message with data from record: {{.Metadata.foo}}"
},
"have": {
"position": null,
"operation": "create",
"metadata": {
"foo": "bar"
},
"key": null,
"payload": {
"before": {
"bar": "baz"
},
"after": {
"foo": "bar"
}
}
},
"want": {
"error": "custom error message with data from record: bar"
}
}
]
}
Loading

0 comments on commit a79386e

Please sign in to comment.