From d6e83ced30593dec6c18ec300a4d974aa3332f7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Mon, 5 Aug 2024 18:53:37 +0200 Subject: [PATCH] Update processor SDK, add middleware (#1742) * update processor SDK - apply processor middleware to builtin processors - use config.Config for configuration map - take into account middleware parameters in tests * organize imports * processor changes - update processor SDK - set SchemaService for builtin processors - add support for "time" in field.convert processor * go generate * rename config.Config parameter names * use proper logger for builtin processors * update generator and processor-sdk * update processor-sdk --- go.mod | 4 +- go.sum | 8 +- pkg/conduit/runtime.go | 2 +- pkg/foundation/ctxutil/processorid.go | 52 +++++++++++ pkg/foundation/ctxutil/processorid_test.go | 92 +++++++++++++++++++ pkg/orchestrator/orchestrator_test.go | 9 +- .../builtin/impl/avro/config_test.go | 25 ++--- .../processor/builtin/impl/avro/decode.go | 9 +- .../builtin/impl/avro/decode_examples_test.go | 3 +- .../builtin/impl/avro/decode_test.go | 5 +- .../processor/builtin/impl/avro/encode.go | 9 +- .../builtin/impl/avro/encode_examples_test.go | 5 +- .../builtin/impl/avro/encode_test.go | 17 ++-- .../processor/builtin/impl/base64/decode.go | 5 +- .../impl/base64/decode_examples_test.go | 3 +- .../builtin/impl/base64/decode_test.go | 5 +- .../processor/builtin/impl/base64/encode.go | 5 +- .../impl/base64/encode_examples_test.go | 5 +- .../builtin/impl/base64/encode_test.go | 5 +- .../builtin/impl/custom/javascript.go | 5 +- .../impl/custom/javascript_examples_test.go | 3 +- .../builtin/impl/custom/javascript_test.go | 19 +--- pkg/plugin/processor/builtin/impl/error.go | 3 +- .../builtin/impl/error_examples_test.go | 3 +- .../processor/builtin/impl/error_test.go | 9 +- .../processor/builtin/impl/field/convert.go | 35 ++++--- .../impl/field/convert_examples_test.go | 7 +- .../builtin/impl/field/convert_paramgen.go | 4 +- .../builtin/impl/field/convert_test.go | 13 +-- .../processor/builtin/impl/field/exclude.go | 5 +- .../impl/field/exclude_examples_test.go | 5 +- .../builtin/impl/field/exclude_test.go | 13 +-- .../processor/builtin/impl/field/rename.go | 5 +- .../impl/field/rename_examples_test.go | 3 +- .../builtin/impl/field/rename_test.go | 13 +-- .../processor/builtin/impl/field/set.go | 5 +- .../builtin/impl/field/set_examples_test.go | 7 +- .../processor/builtin/impl/field/set_test.go | 25 ++--- .../builtin/impl/filter_examples_test.go | 3 +- .../processor/builtin/impl/json/decode.go | 5 +- .../builtin/impl/json/decode_examples_test.go | 5 +- .../builtin/impl/json/decode_test.go | 37 ++++---- .../processor/builtin/impl/json/encode.go | 5 +- .../builtin/impl/json/encode_examples_test.go | 5 +- .../builtin/impl/json/encode_test.go | 27 +++--- .../processor/builtin/impl/unwrap/debezium.go | 12 +-- .../impl/unwrap/debezium_examples_test.go | 3 +- .../builtin/impl/unwrap/debezium_test.go | 21 +++-- .../builtin/impl/unwrap/kafka_connect.go | 5 +- .../unwrap/kafka_connect_examples_test.go | 3 +- .../builtin/impl/unwrap/kafka_connect_test.go | 15 +-- .../processor/builtin/impl/unwrap/opencdc.go | 5 +- .../impl/unwrap/opencdc_examples_test.go | 3 +- .../builtin/impl/unwrap/opencdc_test.go | 35 +++---- .../processor/builtin/impl/webhook/http.go | 5 +- .../impl/webhook/http_examples_test.go | 5 +- .../builtin/impl/webhook/http_test.go | 63 ++++++------- .../builtin/internal/exampleutil/example.go | 3 +- .../exampleutil/specs/field.convert.json | 4 +- pkg/plugin/processor/builtin/middleware.go | 56 +++++++++++ pkg/plugin/processor/builtin/registry.go | 46 +++++----- pkg/plugin/processor/builtin/registry_test.go | 2 +- pkg/plugin/processor/mock/processor.go | 45 ++++++++- pkg/plugin/processor/standalone/processor.go | 3 +- .../processor/standalone/processor_test.go | 3 +- .../processor/standalone/registry_test.go | 7 +- .../processor/standalone/standalone_test.go | 41 ++++++--- .../test/wasm_processors/chaos/processor.go | 2 +- .../specify_error/processor.go | 17 +--- pkg/provisioning/service_test.go | 9 +- 70 files changed, 614 insertions(+), 336 deletions(-) create mode 100644 pkg/foundation/ctxutil/processorid.go create mode 100644 pkg/foundation/ctxutil/processorid_test.go create mode 100644 pkg/plugin/processor/builtin/middleware.go diff --git a/go.mod b/go.mod index 2487dea3e..d0691f737 100644 --- a/go.mod +++ b/go.mod @@ -10,14 +10,14 @@ require ( github.com/bufbuild/buf v1.35.1 github.com/conduitio/conduit-commons v0.2.1-0.20240801113202-731b460a2c58 github.com/conduitio/conduit-connector-file v0.6.1-0.20240802092632-99d8e67ba629 - github.com/conduitio/conduit-connector-generator v0.6.1-0.20240621111436-e9fa3464f7b2 + github.com/conduitio/conduit-connector-generator v0.6.1-0.20240731150050-422a9e3cbd8f github.com/conduitio/conduit-connector-kafka v0.8.1-0.20240802092956-d92779b95e5b github.com/conduitio/conduit-connector-log v0.3.1-0.20240621111440-e2f0f04a35a4 github.com/conduitio/conduit-connector-postgres v0.7.6-0.20240630172132-84b5a6e6104f github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240802103310-fd4ab945b1ac github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240802092651-67dc543a6c90 github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240802133134-6635ddc2aff6 - github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240731185647-3b6f7d3b4b71 + github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240805163917-9b6b1e83ebde github.com/conduitio/conduit-schema-registry v0.0.0-20240725155046-a0fdb61d2588 github.com/conduitio/yaml/v3 v3.3.0 github.com/dop251/goja v0.0.0-20231027120936-b396bb4c349d diff --git a/go.sum b/go.sum index 3ece9be9f..94dad9a2f 100644 --- a/go.sum +++ b/go.sum @@ -215,8 +215,8 @@ github.com/conduitio/conduit-commons v0.2.1-0.20240801113202-731b460a2c58 h1:bv6 github.com/conduitio/conduit-commons v0.2.1-0.20240801113202-731b460a2c58/go.mod h1:QYVlSvfOG4AB9tFz9NS++ToinF1yuGG0D0CgzwLG4k0= github.com/conduitio/conduit-connector-file v0.6.1-0.20240802092632-99d8e67ba629 h1:q5rAlI2f5bD7ZG0TYzPeLnu6/BkF4GbgHH6177NcA+8= github.com/conduitio/conduit-connector-file v0.6.1-0.20240802092632-99d8e67ba629/go.mod h1:xHC0eA+/mtfoRePx1Ns+xNYvr34hcvctb+0MNPT+ngI= -github.com/conduitio/conduit-connector-generator v0.6.1-0.20240621111436-e9fa3464f7b2 h1:WMKvmvaE/E+03/0nz/2JpyelCd2nPtOTuBy3eyWcI58= -github.com/conduitio/conduit-connector-generator v0.6.1-0.20240621111436-e9fa3464f7b2/go.mod h1:2+n1SGAJ/+TpJC9YvxTUWHKt9e0Tm12JcqFJ8OndhX4= +github.com/conduitio/conduit-connector-generator v0.6.1-0.20240731150050-422a9e3cbd8f h1:8IpdNsz3lLtN4KWdZGJESTjfSZz3UEj+YiuVQOSw5Xs= +github.com/conduitio/conduit-connector-generator v0.6.1-0.20240731150050-422a9e3cbd8f/go.mod h1:28hV3b3aErncncQn3ihDjiUMhKXGG8u/7VSEmKVIi4Q= github.com/conduitio/conduit-connector-kafka v0.8.1-0.20240802092956-d92779b95e5b h1:dvCqshxRFhaQLg/mnbDccuvM/6dQdj1h9fAoo+CG8SA= github.com/conduitio/conduit-connector-kafka v0.8.1-0.20240802092956-d92779b95e5b/go.mod h1:AZ6CPfCVsD+a66FFdd3zcbS3T3Sv8KSM0+CpAFBDK7U= github.com/conduitio/conduit-connector-log v0.3.1-0.20240621111440-e2f0f04a35a4 h1:MrfxObXRj7Psz981MNNi6wUj+diu7nuq+/Wl3MqXMpk= @@ -229,8 +229,8 @@ github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240802092651-67dc543a6c90 h github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240802092651-67dc543a6c90/go.mod h1:Cg0rM0NJdIO+CQZWyXtikUVxVainUUq0MqQr8sWnO8E= github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240802133134-6635ddc2aff6 h1:yX2SddKRmb1gMi94umalIKF+8+hipknGAEFNXz0+B+E= github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240802133134-6635ddc2aff6/go.mod h1:R2V+ZXCFIeIIv8xxZHsi6NGntgsimsEbhQg4ezhna+0= -github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240731185647-3b6f7d3b4b71 h1:TCHq3L/LS9Ngo2a4h39vSNpXXisTox1l5uXdwPxxtNM= -github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240731185647-3b6f7d3b4b71/go.mod h1:n6VqVO07olTlvIUSHf2kZcU8cgu2jGmuO6bFrQST2v8= +github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240805163917-9b6b1e83ebde h1:fsHFTJ7XeFromOcg2Z6ueyDOmhDw8KV/XTNU2I59jQ0= +github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240805163917-9b6b1e83ebde/go.mod h1:kQ+7bUREM+F2L/yPPM/GF8z2lqqmpx8Su+ioyE1uO8Q= github.com/conduitio/conduit-schema-registry v0.0.0-20240725155046-a0fdb61d2588 h1:/OBjxI1JjE3AmifouogZ2KvlhGJ9tQGk4X7UxwjHo1o= github.com/conduitio/conduit-schema-registry v0.0.0-20240725155046-a0fdb61d2588/go.mod h1:G5t9W5Z5Mn0nW1TNnIQ1al4piqRXjc1R7HjdHgGFCx4= github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI= diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index 500060fb2..25ba34b4a 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -179,7 +179,7 @@ func createServices(r *Runtime) error { procPluginService := proc_plugin.NewPluginService( r.logger, - proc_builtin.NewRegistry(r.logger, proc_builtin.DefaultBuiltinProcessors), + proc_builtin.NewRegistry(r.logger, proc_builtin.DefaultBuiltinProcessors, procSchemaService), standaloneReg, ) diff --git a/pkg/foundation/ctxutil/processorid.go b/pkg/foundation/ctxutil/processorid.go new file mode 100644 index 000000000..6f9f85235 --- /dev/null +++ b/pkg/foundation/ctxutil/processorid.go @@ -0,0 +1,52 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ctxutil + +import ( + "context" + + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/rs/zerolog" +) + +// processorIDCtxKey is used as the key when saving the processor ID in a context. +type processorIDCtxKey struct{} + +// ContextWithProcessorID wraps ctx and returns a context that contains processor ID. +func ContextWithProcessorID(ctx context.Context, processorID string) context.Context { + return context.WithValue(ctx, processorIDCtxKey{}, processorID) +} + +// ProcessorIDFromContext fetches the record processor ID from the context. If the +// context does not contain a processor ID it returns nil. +func ProcessorIDFromContext(ctx context.Context) string { + processorID := ctx.Value(processorIDCtxKey{}) + if processorID != nil { + return processorID.(string) + } + return "" +} + +// ProcessorIDLogCtxHook fetches the record processor ID from the context and if it +// exists it adds the processorID to the log output. +type ProcessorIDLogCtxHook struct{} + +// Run executes the log hook. +func (h ProcessorIDLogCtxHook) Run(e *zerolog.Event, _ zerolog.Level, _ string) { + p := ProcessorIDFromContext(e.GetCtx()) + if p != "" { + e.Str(log.ProcessorIDField, p) + } +} diff --git a/pkg/foundation/ctxutil/processorid_test.go b/pkg/foundation/ctxutil/processorid_test.go new file mode 100644 index 000000000..7b7a54c55 --- /dev/null +++ b/pkg/foundation/ctxutil/processorid_test.go @@ -0,0 +1,92 @@ +// Copyright © 2022 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ctxutil + +import ( + "bytes" + "context" + "fmt" + "testing" + + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/google/uuid" + "github.com/matryer/is" + "github.com/rs/zerolog" +) + +func TestContextWithProcessorID_Success(t *testing.T) { + is := is.New(t) + + ctx := context.Background() + processorID := uuid.NewString() + + ctx = ContextWithProcessorID(ctx, processorID) + got := ProcessorIDFromContext(ctx) + + is.Equal(processorID, got) +} + +func TestContextWithProcessorID_Twice(t *testing.T) { + is := is.New(t) + + ctx := context.Background() + processorID := uuid.NewString() + + ctx = ContextWithProcessorID(ctx, "existing processor ID") + ctx = ContextWithProcessorID(ctx, processorID) + got := ProcessorIDFromContext(ctx) + + is.Equal(processorID, got) +} + +func TestProcessorIDFromContext_Empty(t *testing.T) { + is := is.New(t) + + ctx := context.Background() + got := ProcessorIDFromContext(ctx) + + is.Equal("", got) +} + +func TestProcessorIDLogCtxHook_Success(t *testing.T) { + is := is.New(t) + + ctx := context.Background() + processorID := uuid.NewString() + + ctx = ContextWithProcessorID(ctx, processorID) + + var logOutput bytes.Buffer + logger := zerolog.New(&logOutput) + e := logger.Info().Ctx(ctx) + ProcessorIDLogCtxHook{}.Run(e, zerolog.InfoLevel, "") + e.Send() + + is.Equal(fmt.Sprintf(`{"level":"info","%s":"%s"}`, log.ProcessorIDField, processorID)+"\n", logOutput.String()) +} + +func TestProcessorIDLogCtxHook_EmptyCtx(t *testing.T) { + is := is.New(t) + + ctx := context.Background() + + var logOutput bytes.Buffer + logger := zerolog.New(&logOutput) + e := logger.Info().Ctx(ctx) + ProcessorIDLogCtxHook{}.Run(e, zerolog.InfoLevel, "") + e.Send() + + is.Equal(`{"level":"info"}`+"\n", logOutput.String()) +} diff --git a/pkg/orchestrator/orchestrator_test.go b/pkg/orchestrator/orchestrator_test.go index ce7351813..eb61a646f 100644 --- a/pkg/orchestrator/orchestrator_test.go +++ b/pkg/orchestrator/orchestrator_test.go @@ -35,6 +35,7 @@ import ( conn_standalone "github.com/conduitio/conduit/pkg/plugin/connector/standalone" proc_plugin "github.com/conduitio/conduit/pkg/plugin/processor" proc_builtin "github.com/conduitio/conduit/pkg/plugin/processor/builtin" + "github.com/conduitio/conduit/pkg/plugin/processor/procutils" "github.com/conduitio/conduit/pkg/processor" "github.com/google/go-cmp/cmp" "github.com/matryer/is" @@ -74,19 +75,21 @@ func TestPipelineSimple(t *testing.T) { schemaRegistry, err := schemaregistry.NewSchemaRegistry(db) is.NoErr(err) authManager := connutils.NewAuthManager() - schemaService := connutils.NewSchemaService(logger, schemaRegistry, authManager) + connSchemaService := connutils.NewSchemaService(logger, schemaRegistry, authManager) connPluginService := conn_plugin.NewPluginService( logger, - conn_builtin.NewRegistry(logger, conn_builtin.DefaultBuiltinConnectors, schemaService), + conn_builtin.NewRegistry(logger, conn_builtin.DefaultBuiltinConnectors, connSchemaService), conn_standalone.NewRegistry(logger, ""), authManager, ) connPluginService.Init(ctx, "conn-utils-token:12345") + procSchemaService := procutils.NewSchemaService(logger, schemaRegistry) + procPluginService := proc_plugin.NewPluginService( logger, - proc_builtin.NewRegistry(logger, proc_builtin.DefaultBuiltinProcessors), + proc_builtin.NewRegistry(logger, proc_builtin.DefaultBuiltinProcessors, procSchemaService), nil, ) diff --git a/pkg/plugin/processor/builtin/impl/avro/config_test.go b/pkg/plugin/processor/builtin/impl/avro/config_test.go index 969150207..0cdc2dbfa 100644 --- a/pkg/plugin/processor/builtin/impl/avro/config_test.go +++ b/pkg/plugin/processor/builtin/impl/avro/config_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -27,13 +28,13 @@ import ( func TestConfig_Parse(t *testing.T) { testCases := []struct { name string - input map[string]string + input config.Config want encodeConfig wantErr error }{ { name: "preRegistered", - input: map[string]string{ + input: config.Config{ "url": "http://localhost", "schema.strategy": "preRegistered", "schema.preRegistered.subject": "testsubject", @@ -53,7 +54,7 @@ func TestConfig_Parse(t *testing.T) { }, { name: "preRegistered without version", - input: map[string]string{ + input: config.Config{ "url": "http://localhost", "schema.strategy": "preRegistered", "schema.preRegistered.subject": "testsubject", @@ -62,7 +63,7 @@ func TestConfig_Parse(t *testing.T) { }, { name: "preRegistered without subject", - input: map[string]string{ + input: config.Config{ "url": "http://localhost", "schema.strategy": "preRegistered", "schema.preRegistered.version": "123", @@ -71,7 +72,7 @@ func TestConfig_Parse(t *testing.T) { }, { name: "autoRegister", - input: map[string]string{ + input: config.Config{ "url": "http://localhost", "schema.strategy": "autoRegister", "schema.autoRegister.subject": "testsubject", @@ -87,7 +88,7 @@ func TestConfig_Parse(t *testing.T) { }, { name: "autoRegister without subject", - input: map[string]string{ + input: config.Config{ "url": "http://localhost", "schema.strategy": "autoRegister", }, @@ -95,7 +96,7 @@ func TestConfig_Parse(t *testing.T) { }, { name: "non-default target field", - input: map[string]string{ + input: config.Config{ "url": "http://localhost", "schema.strategy": "autoRegister", "schema.autoRegister.subject": "testsubject", @@ -112,7 +113,7 @@ func TestConfig_Parse(t *testing.T) { }, { name: "valid auth", - input: map[string]string{ + input: config.Config{ "url": "http://localhost", "schema.strategy": "autoRegister", "schema.autoRegister.subject": "testsubject", @@ -134,7 +135,7 @@ func TestConfig_Parse(t *testing.T) { }, { name: "auth -- no username", - input: map[string]string{ + input: config.Config{ "url": "http://localhost", "schema.strategy": "autoRegister", "schema.autoRegister.subject": "testsubject", @@ -144,7 +145,7 @@ func TestConfig_Parse(t *testing.T) { }, { name: "auth -- no password", - input: map[string]string{ + input: config.Config{ "url": "http://localhost", "schema.strategy": "autoRegister", "schema.autoRegister.subject": "testsubject", @@ -154,7 +155,7 @@ func TestConfig_Parse(t *testing.T) { }, { name: "tls: missing client cert and key", - input: map[string]string{ + input: config.Config{ "url": "http://localhost", "schema.strategy": "autoRegister", "schema.autoRegister.subject": "testsubject", @@ -166,7 +167,7 @@ missing field: tls.client.key`), }, { name: "valid tls", - input: map[string]string{ + input: config.Config{ "url": "http://localhost", "schema.strategy": "autoRegister", "schema.autoRegister.subject": "testsubject", diff --git a/pkg/plugin/processor/builtin/impl/avro/decode.go b/pkg/plugin/processor/builtin/impl/avro/decode.go index c748890e5..762e62fcd 100644 --- a/pkg/plugin/processor/builtin/impl/avro/decode.go +++ b/pkg/plugin/processor/builtin/impl/avro/decode.go @@ -21,6 +21,7 @@ import ( "context" "crypto/tls" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -49,9 +50,9 @@ type decodeConfig struct { fieldResolver sdk.ReferenceResolver } -func parseDecodeConfig(ctx context.Context, m map[string]string) (decodeConfig, error) { +func parseDecodeConfig(ctx context.Context, c config.Config) (decodeConfig, error) { cfg := decodeConfig{} - err := sdk.ParseConfig(ctx, m, &cfg, cfg.Parameters()) + err := sdk.ParseConfig(ctx, c, &cfg, cfg.Parameters()) if err != nil { return decodeConfig{}, err } @@ -126,8 +127,8 @@ This processor is the counterpart to [` + "`avro.encode`" + `](/docs/processors/ }, nil } -func (p *decodeProcessor) Configure(ctx context.Context, m map[string]string) error { - cfg, err := parseDecodeConfig(ctx, m) +func (p *decodeProcessor) Configure(ctx context.Context, c config.Config) error { + cfg, err := parseDecodeConfig(ctx, c) if err != nil { return cerrors.Errorf("invalid config: %w", err) } diff --git a/pkg/plugin/processor/builtin/impl/avro/decode_examples_test.go b/pkg/plugin/processor/builtin/impl/avro/decode_examples_test.go index 34f7a0bcf..d5b097331 100644 --- a/pkg/plugin/processor/builtin/impl/avro/decode_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/avro/decode_examples_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -74,7 +75,7 @@ In this example we use the following schema: ] } ` + "```", - Config: map[string]string{ + Config: config.Config{ "url": url, "field": ".Key", }, diff --git a/pkg/plugin/processor/builtin/impl/avro/decode_test.go b/pkg/plugin/processor/builtin/impl/avro/decode_test.go index faacd39cd..a7c47a018 100644 --- a/pkg/plugin/processor/builtin/impl/avro/decode_test.go +++ b/pkg/plugin/processor/builtin/impl/avro/decode_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -52,7 +53,7 @@ func TestDecodeProcessor_Process_RawData_CustomField(t *testing.T) { is := is.New(t) ctx := context.Background() - config := map[string]string{ + cfg := config.Config{ "url": "http://localhost", "field": ".Payload.Before.something", } @@ -70,7 +71,7 @@ func TestDecodeProcessor_Process_RawData_CustomField(t *testing.T) { want.Payload.Before.(opencdc.StructuredData)["something"] = decodedVal underTest := NewDecodeProcessor(log.Nop()) - err := underTest.Configure(ctx, config) + err := underTest.Configure(ctx, cfg) is.NoErr(err) // skipping Open(), so we can inject a mock encoder diff --git a/pkg/plugin/processor/builtin/impl/avro/encode.go b/pkg/plugin/processor/builtin/impl/avro/encode.go index 1639459ac..cbddcdbb7 100644 --- a/pkg/plugin/processor/builtin/impl/avro/encode.go +++ b/pkg/plugin/processor/builtin/impl/avro/encode.go @@ -21,6 +21,7 @@ import ( "context" "crypto/tls" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -71,9 +72,9 @@ func (c encodeConfig) ClientOptions() []sr.ClientOpt { return clientOpts } -func parseEncodeConfig(ctx context.Context, m map[string]string) (encodeConfig, error) { +func parseEncodeConfig(ctx context.Context, c config.Config) (encodeConfig, error) { cfg := encodeConfig{} - err := sdk.ParseConfig(ctx, m, &cfg, cfg.Parameters()) + err := sdk.ParseConfig(ctx, c, &cfg, cfg.Parameters()) if err != nil { return encodeConfig{}, err } @@ -146,8 +147,8 @@ This processor is the counterpart to [` + "`avro.decode`" + `](/docs/processors/ }, nil } -func (p *encodeProcessor) Configure(ctx context.Context, m map[string]string) error { - cfg, err := parseEncodeConfig(ctx, m) +func (p *encodeProcessor) Configure(ctx context.Context, c config.Config) error { + cfg, err := parseEncodeConfig(ctx, c) if err != nil { return cerrors.Errorf("invalid config: %w", err) } diff --git a/pkg/plugin/processor/builtin/impl/avro/encode_examples_test.go b/pkg/plugin/processor/builtin/impl/avro/encode_examples_test.go index ddfb87c2a..dc0416372 100644 --- a/pkg/plugin/processor/builtin/impl/avro/encode_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/avro/encode_examples_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -42,7 +43,7 @@ func ExampleEncodeProcessor_autoRegister() { with the ` + "`autoRegister`" + ` schema strategy. The processor encodes the record's ` + "`.Payload.After`" + ` field using the schema that is extracted from the data and registered on the fly under the subject ` + "`example-autoRegister`" + `.`, - Config: map[string]string{ + Config: config.Config{ "url": url, "schema.strategy": "autoRegister", "schema.autoRegister.subject": "example-autoRegister", @@ -154,7 +155,7 @@ schema has to be manually pre-registered. In this example we use the following s ` + "```" + ` The processor encodes the record's` + "`.Key`" + ` field using the above schema.`, - Config: map[string]string{ + Config: config.Config{ "url": url, "schema.strategy": "preRegistered", "schema.preRegistered.subject": "example-preRegistered", diff --git a/pkg/plugin/processor/builtin/impl/avro/encode_test.go b/pkg/plugin/processor/builtin/impl/avro/encode_test.go index b4d2e26b8..759c4a532 100644 --- a/pkg/plugin/processor/builtin/impl/avro/encode_test.go +++ b/pkg/plugin/processor/builtin/impl/avro/encode_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -31,7 +32,7 @@ func TestEncodeProcessor_Process_StructuredData(t *testing.T) { is := is.New(t) ctx := context.Background() - config := map[string]string{ + cfg := config.Config{ "url": "http://localhost", "schema.strategy": "autoRegister", "schema.autoRegister.subject": "testsubject", @@ -51,7 +52,7 @@ func TestEncodeProcessor_Process_StructuredData(t *testing.T) { want.Payload.After = opencdc.RawData("encoded") underTest := NewEncodeProcessor(log.Nop()) - err := underTest.Configure(ctx, config) + err := underTest.Configure(ctx, cfg) is.NoErr(err) // skipping Open(), so we can inject a mock encoder @@ -70,7 +71,7 @@ func TestEncodeProcessor_Process_RawData(t *testing.T) { is := is.New(t) ctx := context.Background() - config := map[string]string{ + cfg := config.Config{ "url": "http://localhost", "schema.strategy": "autoRegister", "schema.autoRegister.subject": "testsubject", @@ -88,7 +89,7 @@ func TestEncodeProcessor_Process_RawData(t *testing.T) { want.Payload.After = opencdc.RawData("encoded") underTest := NewEncodeProcessor(log.Nop()) - err := underTest.Configure(ctx, config) + err := underTest.Configure(ctx, cfg) is.NoErr(err) // skipping Open(), so we can inject a mock encoder @@ -127,7 +128,7 @@ func TestEncodeProcessor_Process_RawData_CustomField(t *testing.T) { is := is.New(t) ctx := context.Background() - config := map[string]string{ + cfg := config.Config{ "url": "http://localhost", "field": ".Payload.Before.something", "schema.strategy": "autoRegister", @@ -147,7 +148,7 @@ func TestEncodeProcessor_Process_RawData_CustomField(t *testing.T) { want.Payload.Before.(opencdc.StructuredData)["something"] = encodedValue underTest := NewEncodeProcessor(log.Nop()) - err := underTest.Configure(ctx, config) + err := underTest.Configure(ctx, cfg) is.NoErr(err) // skipping Open(), so we can inject a mock encoder @@ -225,7 +226,7 @@ func TestEncodeProcessor_Process_EmptyPayloadField(t *testing.T) { is := is.New(t) ctx := context.Background() - config := map[string]string{ + cfg := config.Config{ "url": "http://localhost", "field": tc.field, "schema.strategy": "autoRegister", @@ -237,7 +238,7 @@ func TestEncodeProcessor_Process_EmptyPayloadField(t *testing.T) { want.Payload.Before = tc.wantPayloadBefore underTest := NewEncodeProcessor(log.Nop()) - err := underTest.Configure(ctx, config) + err := underTest.Configure(ctx, cfg) is.NoErr(err) // skipping Open(), so we can inject a mock encoder diff --git a/pkg/plugin/processor/builtin/impl/base64/decode.go b/pkg/plugin/processor/builtin/impl/base64/decode.go index f18579b9a..40ee9a9b7 100644 --- a/pkg/plugin/processor/builtin/impl/base64/decode.go +++ b/pkg/plugin/processor/builtin/impl/base64/decode.go @@ -20,6 +20,7 @@ import ( "context" "encoding/base64" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -57,8 +58,8 @@ result in the target field. It is not allowed to decode the ` + "`.Position`" + }, nil } -func (p *decodeProcessor) Configure(ctx context.Context, m map[string]string) error { - err := sdk.ParseConfig(ctx, m, &p.config, p.config.Parameters()) +func (p *decodeProcessor) Configure(ctx context.Context, c config.Config) error { + err := sdk.ParseConfig(ctx, c, &p.config, p.config.Parameters()) if err != nil { return cerrors.Errorf("failed to parse configuration: %w", err) } diff --git a/pkg/plugin/processor/builtin/impl/base64/decode_examples_test.go b/pkg/plugin/processor/builtin/impl/base64/decode_examples_test.go index a57099885..b89f3d3c1 100644 --- a/pkg/plugin/processor/builtin/impl/base64/decode_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/base64/decode_examples_test.go @@ -15,6 +15,7 @@ package base64 import ( + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -30,7 +31,7 @@ func ExampleDecodeProcessor() { ` + "`.Payload.After`" + `. Note that the result is a string, so if you want to further process the result (e.g. parse the string as JSON), you need to chain other processors (e.g. [` + "`json.decode`" + `](/docs/processors/builtin/json.decode)).`, - Config: map[string]string{ + Config: config.Config{ "field": ".Payload.After.foo", }, Have: opencdc.Record{ diff --git a/pkg/plugin/processor/builtin/impl/base64/decode_test.go b/pkg/plugin/processor/builtin/impl/base64/decode_test.go index f474e08cf..828da5382 100644 --- a/pkg/plugin/processor/builtin/impl/base64/decode_test.go +++ b/pkg/plugin/processor/builtin/impl/base64/decode_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -63,7 +64,7 @@ func TestDecodeProcessor_Success(t *testing.T) { t.Run(tc.name, func(t *testing.T) { is := is.New(t) proc := NewDecodeProcessor(log.Nop()) - err := proc.Configure(ctx, map[string]string{"field": tc.field}) + err := proc.Configure(ctx, config.Config{"field": tc.field}) is.NoErr(err) got := proc.Process(ctx, []opencdc.Record{tc.record}) is.Equal(1, len(got)) @@ -133,7 +134,7 @@ func TestDecodeProcessor_Fail(t *testing.T) { t.Run(tc.name, func(t *testing.T) { is := is.New(t) proc := NewDecodeProcessor(log.Nop()) - err := proc.Configure(ctx, map[string]string{"field": tc.field}) + err := proc.Configure(ctx, config.Config{"field": tc.field}) is.NoErr(err) got := proc.Process(ctx, []opencdc.Record{tc.record}) is.Equal(1, len(got)) diff --git a/pkg/plugin/processor/builtin/impl/base64/encode.go b/pkg/plugin/processor/builtin/impl/base64/encode.go index cb9b3f1e9..dbcbdfb6b 100644 --- a/pkg/plugin/processor/builtin/impl/base64/encode.go +++ b/pkg/plugin/processor/builtin/impl/base64/encode.go @@ -21,6 +21,7 @@ import ( "encoding/base64" "fmt" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -60,8 +61,8 @@ assign its value.`, }, nil } -func (p *encodeProcessor) Configure(ctx context.Context, m map[string]string) error { - err := sdk.ParseConfig(ctx, m, &p.config, p.config.Parameters()) +func (p *encodeProcessor) Configure(ctx context.Context, c config.Config) error { + err := sdk.ParseConfig(ctx, c, &p.config, p.config.Parameters()) if err != nil { return cerrors.Errorf("failed to parse configuration: %w", err) } diff --git a/pkg/plugin/processor/builtin/impl/base64/encode_examples_test.go b/pkg/plugin/processor/builtin/impl/base64/encode_examples_test.go index e1e0fecb9..d1e339202 100644 --- a/pkg/plugin/processor/builtin/impl/base64/encode_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/base64/encode_examples_test.go @@ -15,6 +15,7 @@ package base64 import ( + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -28,7 +29,7 @@ func ExampleEncodeProcessor_rawData() { Summary: "Encode record key to base64", Description: `TThis example takes a record containing raw data in ` + "`.Key`" + ` and converts it into a base64 encoded string.`, - Config: map[string]string{ + Config: config.Config{ "field": ".Key", }, Have: opencdc.Record{ @@ -84,7 +85,7 @@ func ExampleBase64EncodeProcessor_stringField() { Summary: "Encode nested value to base64", Description: `This example takes a record containing a string in ` + "`.Payload.Before.foo`" + ` and converts it into a base64 encoded string.`, - Config: map[string]string{ + Config: config.Config{ "field": ".Payload.After.foo", }, Have: opencdc.Record{ diff --git a/pkg/plugin/processor/builtin/impl/base64/encode_test.go b/pkg/plugin/processor/builtin/impl/base64/encode_test.go index e5fd37a8f..b7d671900 100644 --- a/pkg/plugin/processor/builtin/impl/base64/encode_test.go +++ b/pkg/plugin/processor/builtin/impl/base64/encode_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -120,7 +121,7 @@ func TestEncodeProcessor_Success(t *testing.T) { t.Run(tc.name, func(t *testing.T) { is := is.New(t) proc := NewEncodeProcessor(log.Nop()) - err := proc.Configure(ctx, map[string]string{"field": tc.field}) + err := proc.Configure(ctx, config.Config{"field": tc.field}) is.NoErr(err) got := proc.Process(ctx, []opencdc.Record{tc.record}) is.Equal(1, len(got)) @@ -163,7 +164,7 @@ func TestEncodeProcessor_Fail(t *testing.T) { t.Run(tc.name, func(t *testing.T) { is := is.New(t) proc := NewEncodeProcessor(log.Nop()) - err := proc.Configure(ctx, map[string]string{"field": tc.field}) + err := proc.Configure(ctx, config.Config{"field": tc.field}) is.NoErr(err) got := proc.Process(ctx, []opencdc.Record{tc.record}) is.Equal(1, len(got)) diff --git a/pkg/plugin/processor/builtin/impl/custom/javascript.go b/pkg/plugin/processor/builtin/impl/custom/javascript.go index ee62f4b1d..996d8c65f 100644 --- a/pkg/plugin/processor/builtin/impl/custom/javascript.go +++ b/pkg/plugin/processor/builtin/impl/custom/javascript.go @@ -21,6 +21,7 @@ import ( "os" "sync" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -100,9 +101,9 @@ To find out what's possible with the JS processor, also refer to the documentati }, nil } -func (p *javascriptProcessor) Configure(ctx context.Context, m map[string]string) error { +func (p *javascriptProcessor) Configure(ctx context.Context, c config.Config) error { cfg := javascriptConfig{} - err := sdk.ParseConfig(ctx, m, &cfg, cfg.Parameters()) + err := sdk.ParseConfig(ctx, c, &cfg, cfg.Parameters()) if err != nil { return cerrors.Errorf("failed parsing configuration: %w", err) } diff --git a/pkg/plugin/processor/builtin/impl/custom/javascript_examples_test.go b/pkg/plugin/processor/builtin/impl/custom/javascript_examples_test.go index db9c31fd8..2eb066e8d 100644 --- a/pkg/plugin/processor/builtin/impl/custom/javascript_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/custom/javascript_examples_test.go @@ -15,6 +15,7 @@ package custom import ( + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -29,7 +30,7 @@ func ExampleJavascriptProcessor() { Summary: "Modify a record's metadata and payload using JavaScript", Description: "In this example we use the `custom.javascript` processor to add a metadata key " + "to the input record. It also prepends \"hello, \" to `.Payload.After`.", - Config: map[string]string{ + Config: config.Config{ "script": `function process(rec) { rec.Metadata["processed"] = "true"; let existing = String.fromCharCode.apply(String, rec.Payload.After); diff --git a/pkg/plugin/processor/builtin/impl/custom/javascript_test.go b/pkg/plugin/processor/builtin/impl/custom/javascript_test.go index 5350b94a7..201fd422f 100644 --- a/pkg/plugin/processor/builtin/impl/custom/javascript_test.go +++ b/pkg/plugin/processor/builtin/impl/custom/javascript_test.go @@ -20,6 +20,7 @@ import ( "strings" "testing" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -40,7 +41,7 @@ func TestJSProcessor_Logger(t *testing.T) { underTest := NewJavascriptProcessor(logger) err := underTest.Configure( ctx, - map[string]string{ + config.Config{ "script": ` function process(r) { logger.Info().Msg("Hello"); @@ -66,7 +67,7 @@ func TestJSProcessor_MissingEntrypoint(t *testing.T) { underTest := NewJavascriptProcessor(log.Nop()) err := underTest.Configure( ctx, - map[string]string{"script": `function something() { logger.Debug("no entrypoint"); }`}, + config.Config{"script": `function something() { logger.Debug("no entrypoint"); }`}, ) is.NoErr(err) @@ -407,12 +408,7 @@ func TestJSProcessor_BrokenJSCode(t *testing.T) { src := `function {` p := NewJavascriptProcessor(log.Test(t)) - err := p.Configure( - ctx, - map[string]string{ - "script": src, - }, - ) + err := p.Configure(ctx, config.Config{"script": src}) is.NoErr(err) // expected no error when configuration the JS processor err = p.Open(ctx) @@ -461,12 +457,7 @@ func newTestJavaScriptProc(t *testing.T, src string) sdk.Processor { ctx := context.Background() p := NewJavascriptProcessor(log.Test(t)) - err := p.Configure( - ctx, - map[string]string{ - "script": src, - }, - ) + err := p.Configure(ctx, config.Config{"script": src}) is.NoErr(err) // expected no error when configuration the JS processor err = p.Open(ctx) is.NoErr(err) // expected no error when opening the JS processor diff --git a/pkg/plugin/processor/builtin/impl/error.go b/pkg/plugin/processor/builtin/impl/error.go index 1aefd234a..be3f49b5b 100644 --- a/pkg/plugin/processor/builtin/impl/error.go +++ b/pkg/plugin/processor/builtin/impl/error.go @@ -22,6 +22,7 @@ import ( "text/template" "github.com/Masterminds/sprig/v3" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -61,7 +62,7 @@ to this processor, otherwise all records will trigger an error.`, }, nil } -func (p *errorProcessor) Configure(ctx context.Context, cfg map[string]string) error { +func (p *errorProcessor) Configure(ctx context.Context, cfg config.Config) error { err := sdk.ParseConfig(ctx, cfg, &p.config, p.config.Parameters()) if err != nil { return cerrors.Errorf("failed parsing configuration: %w", err) diff --git a/pkg/plugin/processor/builtin/impl/error_examples_test.go b/pkg/plugin/processor/builtin/impl/error_examples_test.go index 444f69842..27d1234bb 100644 --- a/pkg/plugin/processor/builtin/impl/error_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/error_examples_test.go @@ -15,6 +15,7 @@ package impl import ( + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -30,7 +31,7 @@ func ExampleErrorProcessor() { Summary: `Error record with custom error message`, Description: `This example shows how to configure the error processor to return a custom error message for a record using a Go template.`, - Config: map[string]string{ + Config: config.Config{ "message": "custom error message with data from record: {{.Metadata.foo}}", }, Have: opencdc.Record{ diff --git a/pkg/plugin/processor/builtin/impl/error_test.go b/pkg/plugin/processor/builtin/impl/error_test.go index 5703cef7a..21b49e0df 100644 --- a/pkg/plugin/processor/builtin/impl/error_test.go +++ b/pkg/plugin/processor/builtin/impl/error_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -27,7 +28,7 @@ import ( func TestError_EmptyConfig(t *testing.T) { is := is.New(t) proc := NewErrorProcessor(log.Nop()) - cfg := map[string]string{} + cfg := config.Config{} ctx := context.Background() records := []opencdc.Record{ { @@ -70,15 +71,15 @@ func TestError_ErrorMessage(t *testing.T) { } testCases := []struct { name string - cfg map[string]string + cfg config.Config wantErrMessages []string }{{ name: "static error message", - cfg: map[string]string{"message": "static error message"}, + cfg: config.Config{"message": "static error message"}, wantErrMessages: []string{"static error message", "static error message"}, }, { name: "template error message", - cfg: map[string]string{"message": "error message: {{.Metadata.foo}}"}, + cfg: config.Config{"message": "error message: {{.Metadata.foo}}"}, wantErrMessages: []string{"error message: rec 1", "error message: rec 2"}, }} diff --git a/pkg/plugin/processor/builtin/impl/field/convert.go b/pkg/plugin/processor/builtin/impl/field/convert.go index b49615ab1..670393178 100644 --- a/pkg/plugin/processor/builtin/impl/field/convert.go +++ b/pkg/plugin/processor/builtin/impl/field/convert.go @@ -20,7 +20,9 @@ import ( "context" "fmt" "strconv" + "time" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -45,8 +47,8 @@ type convertConfig struct { // // For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields). Field string `json:"field" validate:"required,regex=^\\.(Payload|Key).*"` - // Type is the target field type after conversion, available options are: string, int, float, bool. - Type string `json:"type" validate:"required,inclusion=string|int|float|bool"` + // Type is the target field type after conversion, available options are: `string`, `int`, `float`, `bool`, `time`. + Type string `json:"type" validate:"required,inclusion=string|int|float|bool|time"` } func (p *convertProcessor) Specification() (sdk.Specification, error) { @@ -65,8 +67,8 @@ to parse it into structured data first.`, }, nil } -func (p *convertProcessor) Configure(ctx context.Context, m map[string]string) error { - err := sdk.ParseConfig(ctx, m, &p.config, convertConfig{}.Parameters()) +func (p *convertProcessor) Configure(ctx context.Context, c config.Config) error { + err := sdk.ParseConfig(ctx, c, &p.config, convertConfig{}.Parameters()) if err != nil { return cerrors.Errorf("failed to parse configuration: %w", err) } @@ -105,23 +107,20 @@ func (p *convertProcessor) stringToType(value, typ string) (any, error) { case "string": return value, nil case "int": - newVal, err := strconv.Atoi(value) - if err != nil { - return nil, err - } - return newVal, nil + return strconv.Atoi(value) case "float": - newVal, err := strconv.ParseFloat(value, 64) - if err != nil { - return nil, err - } - return newVal, nil + return strconv.ParseFloat(value, 64) case "bool": - newVal, err := strconv.ParseBool(value) - if err != nil { - return nil, err + return strconv.ParseBool(value) + case "time": + // see if it's a number + unixnano, err := strconv.Atoi(value) + if err == nil { + // it's a number, use it as a unix nanosecond timestamp + return time.Unix(0, int64(unixnano)), nil } - return newVal, nil + // try to parse it as a time string + return time.Parse(time.RFC3339Nano, value) default: return nil, cerrors.Errorf("undefined type %q", typ) } diff --git a/pkg/plugin/processor/builtin/impl/field/convert_examples_test.go b/pkg/plugin/processor/builtin/impl/field/convert_examples_test.go index eae6beb9c..f031b599b 100644 --- a/pkg/plugin/processor/builtin/impl/field/convert_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/field/convert_examples_test.go @@ -15,6 +15,7 @@ package field import ( + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -28,7 +29,7 @@ func ExampleConvertProcessor_stringToInt() { exampleutil.RunExample(p, exampleutil.Example{ Summary: "Convert `string` to `int`", Description: "This example takes the string in field `.Key.id` and changes its data type to `int`.", - Config: map[string]string{"field": ".Key.id", "type": "int"}, + Config: config.Config{"field": ".Key.id", "type": "int"}, Have: opencdc.Record{ Operation: opencdc.OperationUpdate, Key: opencdc.StructuredData{"id": "123"}, @@ -69,7 +70,7 @@ func ExampleConvertProcessor_intToBool() { exampleutil.RunExample(p, exampleutil.Example{ Summary: "Convert `int` to `bool`", Description: "This example takes the `int` in field `.Payload.After.done` and changes its data type to `bool`.", - Config: map[string]string{"field": ".Payload.After.done", "type": "bool"}, + Config: config.Config{"field": ".Payload.After.done", "type": "bool"}, Have: opencdc.Record{ Operation: opencdc.OperationUpdate, Key: opencdc.StructuredData{"id": "123"}, @@ -110,7 +111,7 @@ func ExampleConvertProcessor_floatToString() { exampleutil.RunExample(p, exampleutil.Example{ Summary: "Convert `float` to `string`", Description: "This example takes the `float` in field `.Key.id` and changes its data type to `string`.", - Config: map[string]string{"field": ".Key.id", "type": "string"}, + Config: config.Config{"field": ".Key.id", "type": "string"}, Have: opencdc.Record{ Operation: opencdc.OperationUpdate, Key: opencdc.StructuredData{"id": 123.345}, diff --git a/pkg/plugin/processor/builtin/impl/field/convert_paramgen.go b/pkg/plugin/processor/builtin/impl/field/convert_paramgen.go index 9ec880ef6..016baae3f 100644 --- a/pkg/plugin/processor/builtin/impl/field/convert_paramgen.go +++ b/pkg/plugin/processor/builtin/impl/field/convert_paramgen.go @@ -27,11 +27,11 @@ func (convertConfig) Parameters() map[string]config.Parameter { }, convertConfigType: { Default: "", - Description: "Type is the target field type after conversion, available options are: string, int, float, bool.", + Description: "Type is the target field type after conversion, available options are: `string`, `int`, `float`, `bool`, `time`.", Type: config.ParameterTypeString, Validations: []config.Validation{ config.ValidationRequired{}, - config.ValidationInclusion{List: []string{"string", "int", "float", "bool"}}, + config.ValidationInclusion{List: []string{"string", "int", "float", "bool", "time"}}, }, }, } diff --git a/pkg/plugin/processor/builtin/impl/field/convert_test.go b/pkg/plugin/processor/builtin/impl/field/convert_test.go index 2892fced8..9b6957abb 100644 --- a/pkg/plugin/processor/builtin/impl/field/convert_test.go +++ b/pkg/plugin/processor/builtin/impl/field/convert_test.go @@ -19,6 +19,7 @@ import ( "strings" "testing" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -280,28 +281,28 @@ func TestConvertField_Configure(t *testing.T) { ctx := context.Background() testCases := []struct { name string - cfg map[string]string + cfg config.Config wantErr bool }{ { name: "valid config", - cfg: map[string]string{"field": ".Payload.After.foo", "type": "int"}, + cfg: config.Config{"field": ".Payload.After.foo", "type": "int"}, wantErr: false, }, { name: "invalid config, contains an invalid prefix for the field", - cfg: map[string]string{"field": ".Metadata.foo", "type": "int"}, + cfg: config.Config{"field": ".Metadata.foo", "type": "int"}, wantErr: true, }, { name: "invalid config, invalid prefix", - cfg: map[string]string{"field": "aPayload.foo", "type": "int"}, + cfg: config.Config{"field": "aPayload.foo", "type": "int"}, wantErr: true, }, { name: "invalid config, invalid type", - cfg: map[string]string{"field": ".Key.foo", "type": "map"}, + cfg: config.Config{"field": ".Key.foo", "type": "map"}, wantErr: true, }, { name: "missing param", - cfg: map[string]string{}, + cfg: config.Config{}, wantErr: true, }, } diff --git a/pkg/plugin/processor/builtin/impl/field/exclude.go b/pkg/plugin/processor/builtin/impl/field/exclude.go index a541710b3..dff3c7ddb 100644 --- a/pkg/plugin/processor/builtin/impl/field/exclude.go +++ b/pkg/plugin/processor/builtin/impl/field/exclude.go @@ -19,6 +19,7 @@ package field import ( "context" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -61,8 +62,8 @@ to parse it into structured data first.`, }, nil } -func (p *excludeProcessor) Configure(ctx context.Context, m map[string]string) error { - err := sdk.ParseConfig(ctx, m, &p.config, excludeConfig{}.Parameters()) +func (p *excludeProcessor) Configure(ctx context.Context, c config.Config) error { + err := sdk.ParseConfig(ctx, c, &p.config, excludeConfig{}.Parameters()) if err != nil { return cerrors.Errorf("failed to parse configuration: %w", err) } diff --git a/pkg/plugin/processor/builtin/impl/field/exclude_examples_test.go b/pkg/plugin/processor/builtin/impl/field/exclude_examples_test.go index fe76841d8..513af08d4 100644 --- a/pkg/plugin/processor/builtin/impl/field/exclude_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/field/exclude_examples_test.go @@ -15,6 +15,7 @@ package field import ( + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -28,7 +29,7 @@ func ExampleExcludeProcessor_oneField() { exampleutil.RunExample(p, exampleutil.Example{ Summary: "Exclude all fields in payload", Description: "Excluding all fields in `.Payload` results in an empty payload.", - Config: map[string]string{"fields": ".Payload"}, + Config: config.Config{"fields": ".Payload"}, Have: opencdc.Record{ Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, @@ -73,7 +74,7 @@ func ExampleExcludeProcessor_multipleFields() { Description: `It's possible to exclude multiple fields by providing a comma-separated list of fields. In this example, we exclude ` + "`.Metadata`" + `, ` + "`.Payload.After.foo`" + ` and ` + "`.Key.key1`" + `.`, - Config: map[string]string{"fields": ".Metadata,.Payload.After.foo,.Key.key1"}, + Config: config.Config{"fields": ".Metadata,.Payload.After.foo,.Key.key1"}, Have: opencdc.Record{ Operation: opencdc.OperationCreate, Metadata: map[string]string{"source": "s3"}, diff --git a/pkg/plugin/processor/builtin/impl/field/exclude_test.go b/pkg/plugin/processor/builtin/impl/field/exclude_test.go index 3649e77b3..dcdbef1d9 100644 --- a/pkg/plugin/processor/builtin/impl/field/exclude_test.go +++ b/pkg/plugin/processor/builtin/impl/field/exclude_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -27,7 +28,7 @@ import ( func TestExcludeFields_Process(t *testing.T) { is := is.New(t) proc := NewExcludeProcessor(log.Nop()) - cfg := map[string]string{"fields": ".Metadata,.Payload.After.foo"} + cfg := config.Config{"fields": ".Metadata,.Payload.After.foo"} ctx := context.Background() records := []opencdc.Record{ { @@ -60,24 +61,24 @@ func TestExcludeField_Configure(t *testing.T) { ctx := context.Background() testCases := []struct { name string - cfg map[string]string + cfg config.Config wantErr bool }{ { name: "valid config", - cfg: map[string]string{"fields": ".Metadata,.Payload"}, + cfg: config.Config{"fields": ".Metadata,.Payload"}, wantErr: false, }, { name: "missing parameter", - cfg: map[string]string{}, + cfg: config.Config{}, wantErr: true, }, { name: "cannot exclude .Operation", - cfg: map[string]string{"fields": ".Operation"}, + cfg: config.Config{"fields": ".Operation"}, wantErr: true, }, { name: "cannot exclude .Position", - cfg: map[string]string{"fields": ".Position"}, + cfg: config.Config{"fields": ".Position"}, wantErr: true, }, } diff --git a/pkg/plugin/processor/builtin/impl/field/rename.go b/pkg/plugin/processor/builtin/impl/field/rename.go index 89f7d448b..cfcb24e2f 100644 --- a/pkg/plugin/processor/builtin/impl/field/rename.go +++ b/pkg/plugin/processor/builtin/impl/field/rename.go @@ -20,6 +20,7 @@ import ( "context" "strings" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -66,7 +67,7 @@ to parse it into structured data first.`, }, nil } -func (p *renameProcessor) Configure(ctx context.Context, m map[string]string) error { +func (p *renameProcessor) Configure(ctx context.Context, c config.Config) error { var forbiddenFields = []string{ internal.MetadataReference, internal.PayloadReference, @@ -78,7 +79,7 @@ func (p *renameProcessor) Configure(ctx context.Context, m map[string]string) er } cfg := renameConfig{} - err := sdk.ParseConfig(ctx, m, &cfg, renameConfig{}.Parameters()) + err := sdk.ParseConfig(ctx, c, &cfg, renameConfig{}.Parameters()) if err != nil { return cerrors.Errorf("failed to parse configuration: %w", err) } diff --git a/pkg/plugin/processor/builtin/impl/field/rename_examples_test.go b/pkg/plugin/processor/builtin/impl/field/rename_examples_test.go index 90b53a733..d2c6b3080 100644 --- a/pkg/plugin/processor/builtin/impl/field/rename_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/field/rename_examples_test.go @@ -15,6 +15,7 @@ package field import ( + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -29,7 +30,7 @@ func ExampleRenameProcessor_rename1() { Summary: `Rename multiple fields`, Description: `This example renames the fields in ` + "`.Metadata`" + ` and ` + "`.Payload.After`" + ` as specified in the ` + "`mapping`" + ` configuration parameter.`, - Config: map[string]string{"mapping": ".Metadata.key1:newKey,.Payload.After.foo:newFoo"}, + Config: config.Config{"mapping": ".Metadata.key1:newKey,.Payload.After.foo:newFoo"}, Have: opencdc.Record{ Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, diff --git a/pkg/plugin/processor/builtin/impl/field/rename_test.go b/pkg/plugin/processor/builtin/impl/field/rename_test.go index 65e98ac3f..2290cfbc9 100644 --- a/pkg/plugin/processor/builtin/impl/field/rename_test.go +++ b/pkg/plugin/processor/builtin/impl/field/rename_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -28,7 +29,7 @@ func TestRenameField_Process(t *testing.T) { is := is.New(t) proc := NewRenameProcessor(log.Nop()) ctx := context.Background() - config := map[string]string{"mapping": ".Metadata.key1:newKey,.Payload.After.foo:newFoo"} + cfg := config.Config{"mapping": ".Metadata.key1:newKey,.Payload.After.foo:newFoo"} records := []opencdc.Record{ { Metadata: map[string]string{"key1": "val1", "key2": "val2"}, @@ -49,7 +50,7 @@ func TestRenameField_Process(t *testing.T) { }, }, } - err := proc.Configure(ctx, config) + err := proc.Configure(ctx, cfg) is.NoErr(err) output := proc.Process(context.Background(), records) is.True(len(output) == 1) @@ -61,20 +62,20 @@ func TestRenameField_Configure(t *testing.T) { ctx := context.Background() testCases := []struct { name string - cfg map[string]string + cfg config.Config wantErr bool }{ { name: "valid config", - cfg: map[string]string{"mapping": ".Payload.After.foo:bar"}, + cfg: config.Config{"mapping": ".Payload.After.foo:bar"}, wantErr: false, }, { name: "invalid config, contains a top-level reference", - cfg: map[string]string{"mapping": ".Metadata:foo,.Payload.After.foo:bar"}, + cfg: config.Config{"mapping": ".Metadata:foo,.Payload.After.foo:bar"}, wantErr: true, }, { name: "mapping param is missing", - cfg: map[string]string{}, + cfg: config.Config{}, wantErr: true, }, } diff --git a/pkg/plugin/processor/builtin/impl/field/set.go b/pkg/plugin/processor/builtin/impl/field/set.go index 29d51f4c2..ccdd4a3e2 100644 --- a/pkg/plugin/processor/builtin/impl/field/set.go +++ b/pkg/plugin/processor/builtin/impl/field/set.go @@ -22,6 +22,7 @@ import ( "text/template" "github.com/Masterminds/sprig/v3" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -65,9 +66,9 @@ Note that this processor only runs on structured data, if the record contains ra }, nil } -func (p *setProcessor) Configure(ctx context.Context, m map[string]string) error { +func (p *setProcessor) Configure(ctx context.Context, c config.Config) error { cfg := setConfig{} - err := sdk.ParseConfig(ctx, m, &cfg, setConfig{}.Parameters()) + err := sdk.ParseConfig(ctx, c, &cfg, setConfig{}.Parameters()) if err != nil { return cerrors.Errorf("failed to parse configuration: %w", err) } diff --git a/pkg/plugin/processor/builtin/impl/field/set_examples_test.go b/pkg/plugin/processor/builtin/impl/field/set_examples_test.go index c1700b022..e225af376 100644 --- a/pkg/plugin/processor/builtin/impl/field/set_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/field/set_examples_test.go @@ -15,6 +15,7 @@ package field import ( + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -28,7 +29,7 @@ func ExampleSetProcessor_setOperation() { exampleutil.RunExample(p, exampleutil.Example{ Summary: "Sets the record operation to `update`", Description: "This example sets the `.Operation` field to `update` for all records.", - Config: map[string]string{"field": ".Operation", "value": "update"}, + Config: config.Config{"field": ".Operation", "value": "update"}, Have: opencdc.Record{Operation: opencdc.OperationCreate}, Want: sdk.SingleRecord{Operation: opencdc.OperationUpdate}, }) @@ -59,7 +60,7 @@ func ExampleSetProcessor_addField() { Summary: `Add field`, Description: `This example adds a new field to the record. The field is added to ` + "`.Payload.After`" + ` and is set to ` + "`bar`" + `.`, - Config: map[string]string{"field": ".Payload.After.foo", "value": "bar"}, + Config: config.Config{"field": ".Payload.After.foo", "value": "bar"}, Have: opencdc.Record{Operation: opencdc.OperationSnapshot, Key: opencdc.StructuredData{"my-key": "id"}, }, @@ -99,7 +100,7 @@ func ExampleSetProcessor_template() { exampleutil.RunExample(p, exampleutil.Example{ Summary: `Set field using Go template`, Description: "This example sets the `.Payload.After.postgres` field to `true` if the `.Metadata.table` field contains `postgres`.", - Config: map[string]string{"field": ".Payload.After.postgres", "value": "{{ eq .Metadata.table \"postgres\" }}"}, + Config: config.Config{"field": ".Payload.After.postgres", "value": "{{ eq .Metadata.table \"postgres\" }}"}, Have: opencdc.Record{ Metadata: map[string]string{"table": "postgres"}, Operation: opencdc.OperationSnapshot, diff --git a/pkg/plugin/processor/builtin/impl/field/set_test.go b/pkg/plugin/processor/builtin/impl/field/set_test.go index 22f0db0b5..f86e12e27 100644 --- a/pkg/plugin/processor/builtin/impl/field/set_test.go +++ b/pkg/plugin/processor/builtin/impl/field/set_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -30,13 +31,13 @@ func TestSetField_Process(t *testing.T) { ctx := context.Background() testCases := []struct { name string - config map[string]string + config config.Config record opencdc.Record want sdk.SingleRecord }{ { name: "setting a metadata field", - config: map[string]string{"field": ".Metadata.table", "value": "postgres"}, + config: config.Config{"field": ".Metadata.table", "value": "postgres"}, record: opencdc.Record{ Metadata: map[string]string{"table": "my-table"}, }, @@ -46,7 +47,7 @@ func TestSetField_Process(t *testing.T) { }, { name: "setting a non existent field", - config: map[string]string{"field": ".Metadata.nonExistent", "value": "postgres"}, + config: config.Config{"field": ".Metadata.nonExistent", "value": "postgres"}, record: opencdc.Record{ Metadata: map[string]string{"table": "my-table"}, }, @@ -56,7 +57,7 @@ func TestSetField_Process(t *testing.T) { }, { name: "setting the operation field", - config: map[string]string{"field": ".Operation", "value": "delete"}, + config: config.Config{"field": ".Operation", "value": "delete"}, record: opencdc.Record{ Operation: opencdc.OperationCreate, }, @@ -65,7 +66,7 @@ func TestSetField_Process(t *testing.T) { }, }, { name: "setting the payload.after with a go template evaluated value", - config: map[string]string{"field": ".Payload.After.foo", "value": "{{ .Payload.After.baz }}"}, + config: config.Config{"field": ".Payload.After.foo", "value": "{{ .Payload.After.baz }}"}, record: opencdc.Record{ Payload: opencdc.Change{ Before: nil, @@ -102,33 +103,33 @@ func TestSetField_Configure(t *testing.T) { ctx := context.Background() testCases := []struct { name string - cfg map[string]string + cfg config.Config wantErr bool }{ { name: "valid config", - cfg: map[string]string{"field": ".Metadata", "value": "{{ .Payload.After.foo }}"}, + cfg: config.Config{"field": ".Metadata", "value": "{{ .Payload.After.foo }}"}, wantErr: false, }, { name: "invalid value template format", - cfg: map[string]string{"field": ".Metadata", "value": "{{ invalid }}"}, + cfg: config.Config{"field": ".Metadata", "value": "{{ invalid }}"}, wantErr: true, }, { name: "value param is missing", - cfg: map[string]string{"field": ".Metadata"}, + cfg: config.Config{"field": ".Metadata"}, wantErr: true, }, { name: "field param is missing", - cfg: map[string]string{"value": "sth"}, + cfg: config.Config{"value": "sth"}, wantErr: true, }, { name: "cannot set .Position", - cfg: map[string]string{"field": ".Position", "value": "newPos"}, + cfg: config.Config{"field": ".Position", "value": "newPos"}, wantErr: true, }, { name: "all params are missing", - cfg: map[string]string{}, + cfg: config.Config{}, wantErr: true, }, } diff --git a/pkg/plugin/processor/builtin/impl/filter_examples_test.go b/pkg/plugin/processor/builtin/impl/filter_examples_test.go index 01a561e5e..ae3361682 100644 --- a/pkg/plugin/processor/builtin/impl/filter_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/filter_examples_test.go @@ -15,6 +15,7 @@ package impl import ( + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -27,7 +28,7 @@ func ExampleFilterProcessor() { exampleutil.RunExample(p, exampleutil.Example{ Summary: `Filter out the record`, - Config: map[string]string{}, + Config: config.Config{}, Have: opencdc.Record{ Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, diff --git a/pkg/plugin/processor/builtin/impl/json/decode.go b/pkg/plugin/processor/builtin/impl/json/decode.go index dc8d71f13..87fd58e39 100644 --- a/pkg/plugin/processor/builtin/impl/json/decode.go +++ b/pkg/plugin/processor/builtin/impl/json/decode.go @@ -19,6 +19,7 @@ package json import ( "context" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -60,9 +61,9 @@ This processor is only applicable to fields under ` + "`.Key`" + `, ` + "`.Paylo }, nil } -func (p *decodeProcessor) Configure(ctx context.Context, m map[string]string) error { +func (p *decodeProcessor) Configure(ctx context.Context, c config.Config) error { cfg := decodeConfig{} - err := sdk.ParseConfig(ctx, m, &cfg, decodeConfig{}.Parameters()) + err := sdk.ParseConfig(ctx, c, &cfg, decodeConfig{}.Parameters()) if err != nil { return cerrors.Errorf("failed to parse configuration: %w", err) } diff --git a/pkg/plugin/processor/builtin/impl/json/decode_examples_test.go b/pkg/plugin/processor/builtin/impl/json/decode_examples_test.go index 6406896ef..c44f7dd47 100644 --- a/pkg/plugin/processor/builtin/impl/json/decode_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/json/decode_examples_test.go @@ -15,6 +15,7 @@ package json import ( + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -29,7 +30,7 @@ func ExampleDecodeProcessor_rawKey() { Summary: `Decode record key as JSON`, Description: `This example takes a record containing a raw JSON string in ` + "`.Key`" + ` and converts it into structured data.`, - Config: map[string]string{"field": ".Key"}, + Config: config.Config{"field": ".Key"}, Have: opencdc.Record{ Operation: opencdc.OperationCreate, Key: opencdc.RawData(`{"after":{"data":4,"id":3}}`), @@ -73,7 +74,7 @@ func ExampleDecodeProcessor_rawPayloadField() { Summary: "Decode nested field as JSON", Description: `This example takes a record containing a raw JSON string in ` + "`.Payload.Before.foo`" + ` and converts it into a map.`, - Config: map[string]string{"field": ".Payload.Before.foo"}, + Config: config.Config{"field": ".Payload.Before.foo"}, Have: opencdc.Record{ Operation: opencdc.OperationSnapshot, Payload: opencdc.Change{ diff --git a/pkg/plugin/processor/builtin/impl/json/decode_test.go b/pkg/plugin/processor/builtin/impl/json/decode_test.go index aa8464654..4fa22da49 100644 --- a/pkg/plugin/processor/builtin/impl/json/decode_test.go +++ b/pkg/plugin/processor/builtin/impl/json/decode_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -32,13 +33,13 @@ func TestDecodeJSON_Process(t *testing.T) { ctx := context.Background() testCases := []struct { name string - config map[string]string + config config.Config record opencdc.Record want sdk.ProcessedRecord }{ { name: "raw key to structured", - config: map[string]string{"field": ".Key"}, + config: config.Config{"field": ".Key"}, record: opencdc.Record{ Key: opencdc.RawData(`{"after":{"data":4,"id":3}}`), }, @@ -49,7 +50,7 @@ func TestDecodeJSON_Process(t *testing.T) { }, }, { name: "raw payload.after.foo to structured", - config: map[string]string{"field": ".Payload.After.foo"}, + config: config.Config{"field": ".Payload.After.foo"}, record: opencdc.Record{ Payload: opencdc.Change{ After: opencdc.StructuredData{ @@ -69,7 +70,7 @@ func TestDecodeJSON_Process(t *testing.T) { }, }, { name: "slice payload.after.foo to structured", - config: map[string]string{"field": ".Payload.After.foo"}, + config: config.Config{"field": ".Payload.After.foo"}, record: opencdc.Record{ Payload: opencdc.Change{ After: opencdc.StructuredData{ @@ -86,7 +87,7 @@ func TestDecodeJSON_Process(t *testing.T) { }, }, { name: "string JSON value payload.after.foo to structured", - config: map[string]string{"field": ".Payload.After.foo"}, + config: config.Config{"field": ".Payload.After.foo"}, record: opencdc.Record{ Payload: opencdc.Change{ After: opencdc.StructuredData{ @@ -103,7 +104,7 @@ func TestDecodeJSON_Process(t *testing.T) { }, }, { name: "raw payload.before to structured", - config: map[string]string{"field": ".Payload.Before"}, + config: config.Config{"field": ".Payload.Before"}, record: opencdc.Record{ Payload: opencdc.Change{ Before: opencdc.RawData(`{"before":{"data":4},"foo":"bar"}`), @@ -119,7 +120,7 @@ func TestDecodeJSON_Process(t *testing.T) { }, }, { name: "already structured data", - config: map[string]string{"field": ".Key"}, + config: config.Config{"field": ".Key"}, record: opencdc.Record{ Key: opencdc.StructuredData{ "after": map[string]any{"data": float64(4), "id": float64(3)}, @@ -132,7 +133,7 @@ func TestDecodeJSON_Process(t *testing.T) { }, }, { name: "empty raw data converted to empty structured data", - config: map[string]string{"field": ".Key"}, + config: config.Config{"field": ".Key"}, record: opencdc.Record{ Key: opencdc.RawData(""), }, @@ -141,7 +142,7 @@ func TestDecodeJSON_Process(t *testing.T) { }, }, { name: "nil value", - config: map[string]string{"field": ".Payload.After"}, + config: config.Config{"field": ".Payload.After"}, record: opencdc.Record{ Payload: opencdc.Change{ After: nil, @@ -154,7 +155,7 @@ func TestDecodeJSON_Process(t *testing.T) { }, }, { name: "invalid json", - config: map[string]string{"field": ".Key"}, + config: config.Config{"field": ".Key"}, record: opencdc.Record{ Key: opencdc.RawData(`"invalid":"json"`), }, @@ -178,42 +179,42 @@ func TestDecodeJSON_Configure(t *testing.T) { ctx := context.Background() testCases := []struct { name string - config map[string]string + config config.Config wantErr bool }{ { name: "valid field, key", - config: map[string]string{"field": ".Key"}, + config: config.Config{"field": ".Key"}, wantErr: false, }, { name: "valid field, payload.after", - config: map[string]string{"field": ".Payload.After"}, + config: config.Config{"field": ".Payload.After"}, wantErr: false, }, { name: "valid field, payload.before", - config: map[string]string{"field": ".Payload.Before"}, + config: config.Config{"field": ".Payload.Before"}, wantErr: false, }, { name: "valid field, payload.after.foo", - config: map[string]string{"field": ".Payload.After.foo"}, + config: config.Config{"field": ".Payload.After.foo"}, wantErr: false, }, { name: "invalid config, missing param", - config: map[string]string{}, + config: config.Config{}, wantErr: true, }, { name: "invalid field .Metadata", - config: map[string]string{"field": ".Metadata"}, + config: config.Config{"field": ".Metadata"}, wantErr: true, }, { name: "invalid field, .Payload", - config: map[string]string{"field": ".Payload"}, + config: config.Config{"field": ".Payload"}, wantErr: true, }, } diff --git a/pkg/plugin/processor/builtin/impl/json/encode.go b/pkg/plugin/processor/builtin/impl/json/encode.go index 193ebd05e..e195a172f 100644 --- a/pkg/plugin/processor/builtin/impl/json/encode.go +++ b/pkg/plugin/processor/builtin/impl/json/encode.go @@ -19,6 +19,7 @@ package json import ( "context" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -59,9 +60,9 @@ This processor is only applicable to fields under ` + "`.Key`" + `, ` + "`.Paylo }, nil } -func (p *encodeProcessor) Configure(ctx context.Context, m map[string]string) error { +func (p *encodeProcessor) Configure(ctx context.Context, c config.Config) error { cfg := encodeConfig{} - err := sdk.ParseConfig(ctx, m, &cfg, encodeConfig{}.Parameters()) + err := sdk.ParseConfig(ctx, c, &cfg, encodeConfig{}.Parameters()) if err != nil { return cerrors.Errorf("failed to parse configuration: %w", err) } diff --git a/pkg/plugin/processor/builtin/impl/json/encode_examples_test.go b/pkg/plugin/processor/builtin/impl/json/encode_examples_test.go index e2846f2fb..b4ca2df9d 100644 --- a/pkg/plugin/processor/builtin/impl/json/encode_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/json/encode_examples_test.go @@ -15,6 +15,7 @@ package json import ( + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -29,7 +30,7 @@ func ExampleEncodeProcessor_structuredKey() { Summary: "Encode record key to JSON", Description: `This example takes a record containing structured data in ` + "`.Key`" + ` and converts it into a raw JSON string.`, - Config: map[string]string{"field": ".Key"}, + Config: config.Config{"field": ".Key"}, Have: opencdc.Record{ Operation: opencdc.OperationCreate, Key: opencdc.StructuredData{ @@ -72,7 +73,7 @@ func ExampleEncodeProcessor_mapToJSON() { Summary: "Encode nested field to JSON", Description: `This example takes a record containing a map in ` + "`.Payload.Before.foo`" + ` and converts it into a raw JSON string.`, - Config: map[string]string{"field": ".Payload.Before.foo"}, + Config: config.Config{"field": ".Payload.Before.foo"}, Have: opencdc.Record{ Operation: opencdc.OperationSnapshot, Payload: opencdc.Change{ diff --git a/pkg/plugin/processor/builtin/impl/json/encode_test.go b/pkg/plugin/processor/builtin/impl/json/encode_test.go index 4989ff498..346eeb42f 100644 --- a/pkg/plugin/processor/builtin/impl/json/encode_test.go +++ b/pkg/plugin/processor/builtin/impl/json/encode_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -31,13 +32,13 @@ func TestEncode_Process(t *testing.T) { ctx := context.Background() testCases := []struct { name string - config map[string]string + config config.Config record opencdc.Record want sdk.ProcessedRecord }{ { name: "structured key to raw data", - config: map[string]string{"field": ".Key"}, + config: config.Config{"field": ".Key"}, record: opencdc.Record{ Key: opencdc.StructuredData{ "after": map[string]any{"data": float64(4), "id": float64(3)}, @@ -48,7 +49,7 @@ func TestEncode_Process(t *testing.T) { }, }, { name: "encode payload.after.foo map", - config: map[string]string{"field": ".Payload.After.foo"}, + config: config.Config{"field": ".Payload.After.foo"}, record: opencdc.Record{ Payload: opencdc.Change{ After: opencdc.StructuredData{ @@ -68,7 +69,7 @@ func TestEncode_Process(t *testing.T) { }, }, { name: "slice under payload.after to raw", - config: map[string]string{"field": ".Payload.After"}, + config: config.Config{"field": ".Payload.After"}, record: opencdc.Record{ Payload: opencdc.Change{ After: opencdc.StructuredData{ @@ -83,7 +84,7 @@ func TestEncode_Process(t *testing.T) { }, }, { name: "encode int value", - config: map[string]string{"field": ".Payload.After.foo"}, + config: config.Config{"field": ".Payload.After.foo"}, record: opencdc.Record{ Payload: opencdc.Change{ After: opencdc.StructuredData{ @@ -100,7 +101,7 @@ func TestEncode_Process(t *testing.T) { }, }, { name: "nil value", - config: map[string]string{"field": ".Payload.After"}, + config: config.Config{"field": ".Payload.After"}, record: opencdc.Record{ Payload: opencdc.Change{ After: nil, @@ -130,37 +131,37 @@ func TestEncode_Configure(t *testing.T) { ctx := context.Background() testCases := []struct { name string - config map[string]string + config config.Config wantErr bool }{ { name: "valid field, key", - config: map[string]string{"field": ".Key"}, + config: config.Config{"field": ".Key"}, wantErr: false, }, { name: "valid field, payload.after", - config: map[string]string{"field": ".Payload.After"}, + config: config.Config{"field": ".Payload.After"}, wantErr: false, }, { name: "valid field, payload.after.foo", - config: map[string]string{"field": ".Payload.After.foo"}, + config: config.Config{"field": ".Payload.After.foo"}, wantErr: false, }, { name: "invalid config, missing param", - config: map[string]string{}, + config: config.Config{}, wantErr: true, }, { name: "invalid field .Metadata", - config: map[string]string{"field": ".Metadata"}, + config: config.Config{"field": ".Metadata"}, wantErr: true, }, { name: "invalid field, .Payload", - config: map[string]string{"field": ".Payload"}, + config: config.Config{"field": ".Payload"}, wantErr: true, }, } diff --git a/pkg/plugin/processor/builtin/impl/unwrap/debezium.go b/pkg/plugin/processor/builtin/impl/unwrap/debezium.go index e015e5c07..9088d3d6e 100644 --- a/pkg/plugin/processor/builtin/impl/unwrap/debezium.go +++ b/pkg/plugin/processor/builtin/impl/unwrap/debezium.go @@ -79,17 +79,11 @@ In such cases, the Debezium record is set as the [OpenCDC record](https://condui }, nil } -func (d *debeziumProcessor) Configure(_ context.Context, m map[string]string) error { +func (d *debeziumProcessor) Configure(ctx context.Context, c config.Config) error { cfg := debeziumConfig{} - inputCfg := config.Config(m).Sanitize().ApplyDefaults(cfg.Parameters()) - err := inputCfg.Validate(cfg.Parameters()) + err := sdk.ParseConfig(ctx, c, &cfg, cfg.Parameters()) if err != nil { - return cerrors.Errorf("invalid configuration: %w", err) - } - - err = inputCfg.DecodeInto(&cfg) - if err != nil { - return cerrors.Errorf("failed decoding configuration: %w", err) + return err } rr, err := sdk.NewReferenceResolver(cfg.Field) diff --git a/pkg/plugin/processor/builtin/impl/unwrap/debezium_examples_test.go b/pkg/plugin/processor/builtin/impl/unwrap/debezium_examples_test.go index d115267d5..87fc9c259 100644 --- a/pkg/plugin/processor/builtin/impl/unwrap/debezium_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/unwrap/debezium_examples_test.go @@ -15,6 +15,7 @@ package unwrap import ( + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -29,7 +30,7 @@ func ExampleDebeziumProcessor() { Summary: "Unwrap a Debezium record", Description: `This example how to unwrap a Debezium record from a field nested in a record's ` + "`.Payload.After`" + ` field. It additionally shows how the key is unwrapped, and the metadata merged.`, - Config: map[string]string{ + Config: config.Config{ "field": ".Payload.After.nested", }, Have: opencdc.Record{ diff --git a/pkg/plugin/processor/builtin/impl/unwrap/debezium_test.go b/pkg/plugin/processor/builtin/impl/unwrap/debezium_test.go index 0d3bcb226..3764fb263 100644 --- a/pkg/plugin/processor/builtin/impl/unwrap/debezium_test.go +++ b/pkg/plugin/processor/builtin/impl/unwrap/debezium_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -30,23 +31,23 @@ import ( func TestDebeziumProcessor_Configure(t *testing.T) { testCases := []struct { name string - config map[string]string + config config.Config wantErr string }{ { name: "optional not provided", - config: map[string]string{}, + config: config.Config{}, wantErr: "", }, { name: "valid field (within .Payload)", - config: map[string]string{"field": ".Payload.After.something"}, + config: config.Config{"field": ".Payload.After.something"}, wantErr: "", }, { name: "invalid field", - config: map[string]string{"field": ".Key"}, - wantErr: `invalid configuration: error validating "field": ".Key" should match the regex "^.Payload": regex validation failed`, + config: config.Config{"field": ".Key"}, + wantErr: `config invalid: error validating "field": ".Key" should match the regex "^.Payload": regex validation failed`, }, } @@ -68,14 +69,14 @@ func TestDebeziumProcessor_Configure(t *testing.T) { func TestDebeziumProcessor_Process(t *testing.T) { testCases := []struct { name string - config map[string]string + config config.Config record opencdc.Record want sdk.ProcessedRecord wantErr string }{ { name: "raw payload", - config: map[string]string{"field": ".Payload.After"}, + config: config.Config{"field": ".Payload.After"}, record: opencdc.Record{ Metadata: map[string]string{}, Key: opencdc.RawData(`{"payload":"27"}`), @@ -117,7 +118,7 @@ func TestDebeziumProcessor_Process(t *testing.T) { }, { name: "structured payload", - config: map[string]string{"field": ".Payload.After"}, + config: config.Config{"field": ".Payload.After"}, record: opencdc.Record{ Metadata: map[string]string{ "conduit.version": "v0.4.0", @@ -162,7 +163,7 @@ func TestDebeziumProcessor_Process(t *testing.T) { }, { name: "structured data, payload missing", - config: map[string]string{"field": ".Payload.After"}, + config: config.Config{"field": ".Payload.After"}, record: opencdc.Record{ Metadata: map[string]string{ "conduit.version": "v0.4.0", @@ -185,7 +186,7 @@ func TestDebeziumProcessor_Process(t *testing.T) { }, { name: "custom field, structured payload", - config: map[string]string{"field": ".Payload.After[\"debezium_event\"]"}, + config: config.Config{"field": ".Payload.After[\"debezium_event\"]"}, record: opencdc.Record{ Metadata: map[string]string{ "conduit.version": "v0.4.0", diff --git a/pkg/plugin/processor/builtin/impl/unwrap/kafka_connect.go b/pkg/plugin/processor/builtin/impl/unwrap/kafka_connect.go index ee7eb2871..792466c09 100644 --- a/pkg/plugin/processor/builtin/impl/unwrap/kafka_connect.go +++ b/pkg/plugin/processor/builtin/impl/unwrap/kafka_connect.go @@ -20,6 +20,7 @@ import ( "context" "fmt" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -60,9 +61,9 @@ In such cases, the Debezium record is set as the [OpenCDC record](https://condui }, nil } -func (u *kafkaConnectProcessor) Configure(ctx context.Context, m map[string]string) error { +func (u *kafkaConnectProcessor) Configure(ctx context.Context, c config.Config) error { cfg := kafkaConnectConfig{} - err := sdk.ParseConfig(ctx, m, &cfg, cfg.Parameters()) + err := sdk.ParseConfig(ctx, c, &cfg, cfg.Parameters()) if err != nil { return err } diff --git a/pkg/plugin/processor/builtin/impl/unwrap/kafka_connect_examples_test.go b/pkg/plugin/processor/builtin/impl/unwrap/kafka_connect_examples_test.go index f6819abd6..48dd34114 100644 --- a/pkg/plugin/processor/builtin/impl/unwrap/kafka_connect_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/unwrap/kafka_connect_examples_test.go @@ -15,6 +15,7 @@ package unwrap import ( + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -33,7 +34,7 @@ The Kafka Connect record is serialized as a JSON string in the ` + "`.Payload.Af The Kafka Connect record's payload will replace the [OpenCDC record](https://conduit.io/docs/features/opencdc-record)'s payload. We also see how the key is unwrapped too. In this case, the key comes in as structured data.`, - Config: map[string]string{}, + Config: config.Config{}, Have: opencdc.Record{ Position: opencdc.Position("test position"), Operation: opencdc.OperationCreate, diff --git a/pkg/plugin/processor/builtin/impl/unwrap/kafka_connect_test.go b/pkg/plugin/processor/builtin/impl/unwrap/kafka_connect_test.go index 23d4dc215..753083efa 100644 --- a/pkg/plugin/processor/builtin/impl/unwrap/kafka_connect_test.go +++ b/pkg/plugin/processor/builtin/impl/unwrap/kafka_connect_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -29,22 +30,22 @@ import ( func TestKafkaConnectProcessor_Configure(t *testing.T) { testCases := []struct { name string - config map[string]string + config config.Config wantErr string }{ { name: "optional parameter not provided", - config: map[string]string{}, + config: config.Config{}, wantErr: "", }, { name: "valid field (within .Payload)", - config: map[string]string{"field": ".Payload.After.something"}, + config: config.Config{"field": ".Payload.After.something"}, wantErr: "", }, { name: "invalid field", - config: map[string]string{"field": ".Key"}, + config: config.Config{"field": ".Key"}, wantErr: `config invalid: error validating "field": ".Key" should match the regex "^.Payload": regex validation failed`, }, } @@ -66,13 +67,13 @@ func TestKafkaConnectProcessor_Configure(t *testing.T) { func TestKafkaConnectProcessor_Process(t *testing.T) { testCases := []struct { name string - config map[string]string + config config.Config record opencdc.Record want sdk.ProcessedRecord }{ { name: "structured payload", - config: map[string]string{}, + config: config.Config{}, record: opencdc.Record{ Metadata: map[string]string{}, Payload: opencdc.Change{ @@ -103,7 +104,7 @@ func TestKafkaConnectProcessor_Process(t *testing.T) { }, { name: "raw payload", - config: map[string]string{}, + config: config.Config{}, record: opencdc.Record{ Position: opencdc.Position("test position"), Operation: opencdc.OperationSnapshot, diff --git a/pkg/plugin/processor/builtin/impl/unwrap/opencdc.go b/pkg/plugin/processor/builtin/impl/unwrap/opencdc.go index 814ff0abf..64bd3226d 100644 --- a/pkg/plugin/processor/builtin/impl/unwrap/opencdc.go +++ b/pkg/plugin/processor/builtin/impl/unwrap/opencdc.go @@ -21,6 +21,7 @@ import ( "encoding/base64" "fmt" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -61,9 +62,9 @@ Note: if the wrapped [OpenCDC record](https://conduit.io/docs/features/opencdc-r }, nil } -func (u *openCDCProcessor) Configure(ctx context.Context, m map[string]string) error { +func (u *openCDCProcessor) Configure(ctx context.Context, c config.Config) error { cfg := openCDCConfig{} - err := sdk.ParseConfig(ctx, m, &cfg, cfg.Parameters()) + err := sdk.ParseConfig(ctx, c, &cfg, cfg.Parameters()) if err != nil { return cerrors.Errorf("failed parsing configuration: %w", err) } diff --git a/pkg/plugin/processor/builtin/impl/unwrap/opencdc_examples_test.go b/pkg/plugin/processor/builtin/impl/unwrap/opencdc_examples_test.go index 2cebbfb4b..d055512b3 100644 --- a/pkg/plugin/processor/builtin/impl/unwrap/opencdc_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/unwrap/opencdc_examples_test.go @@ -15,6 +15,7 @@ package unwrap import ( + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -29,7 +30,7 @@ func ExampleOpenCDCProcessor() { Summary: "Unwrap an [OpenCDC record](https://conduit.io/docs/features/opencdc-record)", Description: "In this example we use the `unwrap.opencdc` processor to unwrap the [OpenCDC record](https://conduit.io/docs/features/opencdc-record) found in the " + "record's `.Payload.After` field.", - Config: map[string]string{}, + Config: config.Config{}, Have: opencdc.Record{ Position: opencdc.Position("wrapping position"), Key: opencdc.RawData("wrapping key"), diff --git a/pkg/plugin/processor/builtin/impl/unwrap/opencdc_test.go b/pkg/plugin/processor/builtin/impl/unwrap/opencdc_test.go index b0b98bb61..421b2a04f 100644 --- a/pkg/plugin/processor/builtin/impl/unwrap/opencdc_test.go +++ b/pkg/plugin/processor/builtin/impl/unwrap/opencdc_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -135,17 +136,17 @@ const RecordCreate = `{ func TestUnwrapOpenCDC_Configure(t *testing.T) { testCases := []struct { name string - in map[string]string + config config.Config wantErr string }{ { name: "invalid field", - in: map[string]string{"field": ".Payload.Something"}, + config: config.Config{"field": ".Payload.Something"}, wantErr: `invalid reference: invalid reference ".Payload.Something": unexpected field "Something": cannot resolve reference`, }, { name: "valid field", - in: map[string]string{"field": ".Payload.Before"}, + config: config.Config{"field": ".Payload.Before"}, wantErr: "", }, } @@ -155,7 +156,7 @@ func TestUnwrapOpenCDC_Configure(t *testing.T) { is := is.New(t) underTest := NewOpenCDCProcessor(log.Test(t)) - gotErr := underTest.Configure(context.Background(), tc.in) + gotErr := underTest.Configure(context.Background(), tc.config) if tc.wantErr == "" { is.NoErr(gotErr) } else { @@ -171,11 +172,11 @@ func TestUnwrapOpenCDC_Process(t *testing.T) { name string record opencdc.Record want sdk.ProcessedRecord - config map[string]string + config config.Config }{ { name: "create with structured data and no payload after", - config: map[string]string{}, + config: config.Config{}, record: opencdc.Record{ Key: opencdc.RawData("one-key"), Operation: opencdc.OperationCreate, @@ -187,7 +188,7 @@ func TestUnwrapOpenCDC_Process(t *testing.T) { }, { name: "create with an invalid operation", - config: map[string]string{}, + config: config.Config{}, record: opencdc.Record{ Key: opencdc.RawData("one-key-raw-data"), Operation: opencdc.OperationCreate, @@ -213,7 +214,7 @@ func TestUnwrapOpenCDC_Process(t *testing.T) { }, { name: "create with an invalid metadata", - config: map[string]string{}, + config: config.Config{}, record: opencdc.Record{ Key: opencdc.RawData("one-key-raw-data"), Operation: opencdc.OperationCreate, @@ -238,7 +239,7 @@ func TestUnwrapOpenCDC_Process(t *testing.T) { }, { name: "create with an invalid key", - config: map[string]string{}, + config: config.Config{}, record: opencdc.Record{ Key: opencdc.RawData("one-key-raw-data"), Operation: opencdc.OperationCreate, @@ -263,7 +264,7 @@ func TestUnwrapOpenCDC_Process(t *testing.T) { }, { name: "create with an invalid payload", - config: map[string]string{}, + config: config.Config{}, record: opencdc.Record{ Key: opencdc.RawData("one-key-raw-data"), Operation: opencdc.OperationCreate, @@ -277,7 +278,7 @@ func TestUnwrapOpenCDC_Process(t *testing.T) { }, { name: "create with structured data", - config: map[string]string{}, + config: config.Config{}, record: opencdc.Record{ Key: opencdc.RawData("one-key"), Operation: opencdc.OperationCreate, @@ -338,7 +339,7 @@ func TestUnwrapOpenCDC_Process(t *testing.T) { }, { name: "create with raw data", - config: map[string]string{}, + config: config.Config{}, record: opencdc.Record{ Key: opencdc.RawData("one-key-raw-data"), Operation: opencdc.OperationCreate, @@ -378,7 +379,7 @@ func TestUnwrapOpenCDC_Process(t *testing.T) { }, { name: "delete with before and with raw data", - config: map[string]string{}, + config: config.Config{}, record: opencdc.Record{ Key: opencdc.RawData("one-key-raw-data"), Operation: opencdc.OperationCreate, @@ -418,7 +419,7 @@ func TestUnwrapOpenCDC_Process(t *testing.T) { }, { name: "delete without before and with raw data", - config: map[string]string{}, + config: config.Config{}, record: opencdc.Record{ Key: opencdc.RawData("one-key-raw-data"), Operation: opencdc.OperationCreate, @@ -452,7 +453,7 @@ func TestUnwrapOpenCDC_Process(t *testing.T) { }, { name: "update with before and with raw data", - config: map[string]string{}, + config: config.Config{}, record: opencdc.Record{ Key: opencdc.RawData("one-key-raw-data"), Operation: opencdc.OperationCreate, @@ -498,7 +499,7 @@ func TestUnwrapOpenCDC_Process(t *testing.T) { }, { name: "update without before and with raw data", - config: map[string]string{}, + config: config.Config{}, record: opencdc.Record{ Key: opencdc.RawData("one-key-raw-data"), Operation: opencdc.OperationCreate, @@ -538,7 +539,7 @@ func TestUnwrapOpenCDC_Process(t *testing.T) { }, { name: "update without before and with raw data", - config: map[string]string{"field": ".Payload.After.nested"}, + config: config.Config{"field": ".Payload.After.nested"}, record: opencdc.Record{ Key: opencdc.RawData("one-key-raw-data"), Operation: opencdc.OperationCreate, diff --git a/pkg/plugin/processor/builtin/impl/webhook/http.go b/pkg/plugin/processor/builtin/impl/webhook/http.go index 5d94d63ce..5a82ec7ba 100644 --- a/pkg/plugin/processor/builtin/impl/webhook/http.go +++ b/pkg/plugin/processor/builtin/impl/webhook/http.go @@ -28,6 +28,7 @@ import ( "time" "github.com/Masterminds/sprig/v3" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -137,8 +138,8 @@ The processor will retry the request according to the backoff configuration.`, }, nil } -func (p *httpProcessor) Configure(ctx context.Context, m map[string]string) error { - err := sdk.ParseConfig(ctx, m, &p.config, p.config.Parameters()) +func (p *httpProcessor) Configure(ctx context.Context, c config.Config) error { + err := sdk.ParseConfig(ctx, c, &p.config, p.config.Parameters()) if err != nil { return cerrors.Errorf("failed parsing configuration: %w", err) } diff --git a/pkg/plugin/processor/builtin/impl/webhook/http_examples_test.go b/pkg/plugin/processor/builtin/impl/webhook/http_examples_test.go index 6c6c36548..0aa03e5f7 100644 --- a/pkg/plugin/processor/builtin/impl/webhook/http_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/webhook/http_examples_test.go @@ -22,6 +22,7 @@ import ( "net/http/httptest" "strings" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" conduit_log "github.com/conduitio/conduit/pkg/foundation/log" @@ -44,7 +45,7 @@ HTTP server that replies back with a greeting. The record's ` + "`.Payload.After`" + ` is overwritten with the response. Additionally, the example shows how to set a request header and how to store the value of the HTTP response's code in the metadata field ` + "`http_status`" + `.`, - Config: map[string]string{ + Config: config.Config{ "request.url": srv.URL, "request.body": `{{ printf "%s" .Payload.After }}`, "response.status": `.Metadata["http_status"]`, @@ -105,7 +106,7 @@ This example shows how to use the HTTP processor to use a record's ` + "`.Payloa send it to a dummy HTTP server, and get a greeting with the name back. The response will be written under the record's ` + "`.Payload.After.response`.", - Config: map[string]string{ + Config: config.Config{ "request.url": srv.URL + "/{{.Payload.After.name}}", "response.body": ".Payload.After.response", }, diff --git a/pkg/plugin/processor/builtin/impl/webhook/http_test.go b/pkg/plugin/processor/builtin/impl/webhook/http_test.go index ac551453d..70afd4450 100644 --- a/pkg/plugin/processor/builtin/impl/webhook/http_test.go +++ b/pkg/plugin/processor/builtin/impl/webhook/http_test.go @@ -21,6 +21,7 @@ import ( "net/http/httptest" "testing" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" @@ -32,31 +33,31 @@ import ( func TestHTTPProcessor_Configure(t *testing.T) { tests := []struct { name string - config map[string]string + config config.Config wantErr string }{ { name: "empty config returns error", - config: map[string]string{}, + config: config.Config{}, wantErr: `failed parsing configuration: config invalid: error validating "request.url": required parameter is not provided`, }, { name: "empty url returns error", - config: map[string]string{ + config: config.Config{ "request.url": "", }, wantErr: `failed parsing configuration: config invalid: error validating "request.url": required parameter is not provided`, }, { name: "invalid url returns error", - config: map[string]string{ + config: config.Config{ "request.url": ":not/a/valid/url", }, wantErr: "configuration check failed: parse \":not/a/valid/url\": missing protocol scheme", }, { name: "invalid method returns error", - config: map[string]string{ + config: config.Config{ "request.url": "http://example.com", "request.method": ":foo", }, @@ -64,7 +65,7 @@ func TestHTTPProcessor_Configure(t *testing.T) { }, { name: "invalid backoffRetry.count returns error", - config: map[string]string{ + config: config.Config{ "request.url": "http://example.com", "backoffRetry.count": "not-a-number", }, @@ -72,7 +73,7 @@ func TestHTTPProcessor_Configure(t *testing.T) { }, { name: "invalid backoffRetry.min returns error", - config: map[string]string{ + config: config.Config{ "request.url": "http://example.com", "backoffRetry.count": "1", "backoffRetry.min": "not-a-duration", @@ -81,7 +82,7 @@ func TestHTTPProcessor_Configure(t *testing.T) { }, { name: "invalid backoffRetry.max returns error", - config: map[string]string{ + config: config.Config{ "request.url": "http://example.com", "backoffRetry.count": "1", "backoffRetry.max": "not-a-duration", @@ -90,7 +91,7 @@ func TestHTTPProcessor_Configure(t *testing.T) { }, { name: "invalid backoffRetry.factor returns error", - config: map[string]string{ + config: config.Config{ "request.url": "http://example.com", "backoffRetry.count": "1", "backoffRetry.factor": "not-a-number", @@ -99,28 +100,28 @@ func TestHTTPProcessor_Configure(t *testing.T) { }, { name: "valid url returns processor", - config: map[string]string{ + config: config.Config{ "request.url": "http://example.com", }, wantErr: "", }, { name: "valid url template returns processor", - config: map[string]string{ + config: config.Config{ "request.url": "http://example.com/{{.Payload.After}}", }, wantErr: "", }, { name: "invalid url template with a hyphen", - config: map[string]string{ + config: config.Config{ "request.url": "http://example.com/{{.Payload.After.my-key}}", }, wantErr: "error while parsing the URL template: template: :1: bad character U+002D '-'", }, { name: "valid url and method returns processor", - config: map[string]string{ + config: config.Config{ "request.url": "http://example.com", "request.method": "GET", }, @@ -128,7 +129,7 @@ func TestHTTPProcessor_Configure(t *testing.T) { }, { name: "valid url, method and backoff retry config returns processor", - config: map[string]string{ + config: config.Config{ "request.url": "http://example.com", "request.contentType": "application/json", "backoffRetry.count": "1", @@ -140,7 +141,7 @@ func TestHTTPProcessor_Configure(t *testing.T) { }, { name: "content-type header", - config: map[string]string{ + config: config.Config{ "request.url": "http://example.com", "headers.content-type": "application/json", }, @@ -148,7 +149,7 @@ func TestHTTPProcessor_Configure(t *testing.T) { }, { name: "invalid: content-type header and request.contentType", - config: map[string]string{ + config: config.Config{ "request.url": "http://example.com", "request.contentType": "application/json", "headers.content-type": "application/json", @@ -157,7 +158,7 @@ func TestHTTPProcessor_Configure(t *testing.T) { }, { name: "invalid: same value of response.body and response.status", - config: map[string]string{ + config: config.Config{ "request.url": "http://example.com", "response.body": ".Payload.After", "response.status": ".Payload.After", @@ -166,7 +167,7 @@ func TestHTTPProcessor_Configure(t *testing.T) { }, { name: "valid response.body and response.status", - config: map[string]string{ + config: config.Config{ "request.url": "http://example.com", "response.body": ".Payload.After", "response.status": `.Metadata["response.status"]`, @@ -195,7 +196,7 @@ func TestHTTPProcessor_Success(t *testing.T) { tests := []struct { name string - config map[string]string + config config.Config status int record opencdc.Record wantBody string @@ -203,7 +204,7 @@ func TestHTTPProcessor_Success(t *testing.T) { }{ { name: "structured data", - config: map[string]string{ + config: config.Config{ "request.method": "POST", "request.body": "{{ toJson . }}", }, @@ -228,7 +229,7 @@ func TestHTTPProcessor_Success(t *testing.T) { }, { name: "raw data", - config: map[string]string{ + config: config.Config{ "request.method": "GET", "request.body": "{{ toJson . }}", }, @@ -249,7 +250,7 @@ func TestHTTPProcessor_Success(t *testing.T) { }, { name: "custom field for response body and status", - config: map[string]string{ + config: config.Config{ "response.body": ".Payload.After.body", "response.status": ".Payload.After.status", "request.method": "POST", @@ -278,7 +279,7 @@ func TestHTTPProcessor_Success(t *testing.T) { }, { name: "request body: custom field, structured", - config: map[string]string{ + config: config.Config{ "request.body": "{{ toJson . }}", "response.body": ".Payload.After.httpResponse", "request.method": "POST", @@ -311,7 +312,7 @@ func TestHTTPProcessor_Success(t *testing.T) { }, { name: "request body: custom field, raw data", - config: map[string]string{ + config: config.Config{ "request.body": `{{ printf "%s" .Payload.Before }}`, "response.body": ".Payload.After.httpResponse", "request.method": "POST", @@ -338,7 +339,7 @@ func TestHTTPProcessor_Success(t *testing.T) { }, { name: "request body: static", - config: map[string]string{ + config: config.Config{ "request.body": `foo`, "response.body": ".Payload.After.httpResponse", "request.method": "POST", @@ -460,12 +461,12 @@ func TestHTTPProcessor_URLTemplate(t *testing.T) { })) defer srv.Close() - config := map[string]string{ + cfg := config.Config{ // attach the path template to the URL "request.url": srv.URL + tc.pathTmpl, } underTest := NewHTTPProcessor(log.Test(t)) - err := underTest.Configure(context.Background(), config) + err := underTest.Configure(context.Background(), cfg) is.NoErr(err) got := underTest.Process(context.Background(), []opencdc.Record{tc.record}) @@ -508,7 +509,7 @@ func TestHTTPProcessor_RetrySuccess(t *testing.T) { })) defer srv.Close() - config := map[string]string{ + cfg := config.Config{ "request.url": srv.URL, "backoffRetry.count": "4", "backoffRetry.min": "5ms", @@ -518,7 +519,7 @@ func TestHTTPProcessor_RetrySuccess(t *testing.T) { } underTest := NewHTTPProcessor(log.Test(t)) - err := underTest.Configure(context.Background(), config) + err := underTest.Configure(context.Background(), cfg) is.NoErr(err) got := underTest.Process(context.Background(), rec) @@ -545,7 +546,7 @@ func TestHTTPProcessor_RetryFail(t *testing.T) { })) defer srv.Close() - config := map[string]string{ + cfg := config.Config{ "request.url": srv.URL, "backoffRetry.count": "5", "backoffRetry.min": "5ms", @@ -554,7 +555,7 @@ func TestHTTPProcessor_RetryFail(t *testing.T) { } underTest := NewHTTPProcessor(log.Test(t)) - err := underTest.Configure(context.Background(), config) + err := underTest.Configure(context.Background(), cfg) is.NoErr(err) got := underTest.Process( diff --git a/pkg/plugin/processor/builtin/internal/exampleutil/example.go b/pkg/plugin/processor/builtin/internal/exampleutil/example.go index 6f74fe40e..b47477c23 100644 --- a/pkg/plugin/processor/builtin/internal/exampleutil/example.go +++ b/pkg/plugin/processor/builtin/internal/exampleutil/example.go @@ -20,6 +20,7 @@ import ( "fmt" "log" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal" @@ -43,7 +44,7 @@ type Example struct { Order int `json:"-"` Summary string `json:"summary"` Description string `json:"description"` - Config map[string]string `json:"config"` + Config config.Config `json:"config"` Have opencdc.Record `json:"have"` Want sdk.ProcessedRecord `json:"want"` } diff --git a/pkg/plugin/processor/builtin/internal/exampleutil/specs/field.convert.json b/pkg/plugin/processor/builtin/internal/exampleutil/specs/field.convert.json index c7657a462..55758e6ff 100644 --- a/pkg/plugin/processor/builtin/internal/exampleutil/specs/field.convert.json +++ b/pkg/plugin/processor/builtin/internal/exampleutil/specs/field.convert.json @@ -23,7 +23,7 @@ }, "type": { "default": "", - "description": "Type is the target field type after conversion, available options are: string, int, float, bool.", + "description": "Type is the target field type after conversion, available options are: `string`, `int`, `float`, `bool`, `time`.", "type": "string", "validations": [ { @@ -32,7 +32,7 @@ }, { "type": "inclusion", - "value": "string,int,float,bool" + "value": "string,int,float,bool,time" } ] } diff --git a/pkg/plugin/processor/builtin/middleware.go b/pkg/plugin/processor/builtin/middleware.go new file mode 100644 index 000000000..5a10039c6 --- /dev/null +++ b/pkg/plugin/processor/builtin/middleware.go @@ -0,0 +1,56 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package builtin + +import ( + "context" + + "github.com/conduitio/conduit-commons/config" + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-processor-sdk" + "github.com/conduitio/conduit/pkg/foundation/ctxutil" +) + +type processorWithID struct { + sdk.Processor + id string +} + +func newProcessorWithID(processor sdk.Processor, id string) *processorWithID { + return &processorWithID{ + Processor: processor, + id: id, + } +} + +func (p *processorWithID) Configure(ctx context.Context, cfg config.Config) error { + ctx = ctxutil.ContextWithProcessorID(ctx, p.id) + return p.Processor.Configure(ctx, cfg) +} + +func (p *processorWithID) Open(ctx context.Context) error { + ctx = ctxutil.ContextWithProcessorID(ctx, p.id) + return p.Processor.Open(ctx) +} + +func (p *processorWithID) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord { + ctx = ctxutil.ContextWithProcessorID(ctx, p.id) + return p.Processor.Process(ctx, records) +} + +func (p *processorWithID) Teardown(ctx context.Context) error { + ctx = ctxutil.ContextWithProcessorID(ctx, p.id) + return p.Processor.Teardown(ctx) +} diff --git a/pkg/plugin/processor/builtin/registry.go b/pkg/plugin/processor/builtin/registry.go index ee631a9b4..75adea2b6 100644 --- a/pkg/plugin/processor/builtin/registry.go +++ b/pkg/plugin/processor/builtin/registry.go @@ -16,11 +16,13 @@ package builtin import ( "context" - "reflect" "runtime/debug" sdk "github.com/conduitio/conduit-processor-sdk" + "github.com/conduitio/conduit-processor-sdk/pprocutils" + "github.com/conduitio/conduit-processor-sdk/schema" "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/ctxutil" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/plugin" "github.com/conduitio/conduit/pkg/plugin/processor/builtin/impl" @@ -69,7 +71,17 @@ type blueprint struct { type ProcessorPluginConstructor func(log.CtxLogger) sdk.Processor -func NewRegistry(logger log.CtxLogger, constructors map[string]ProcessorPluginConstructor) *Registry { +func NewRegistry( + logger log.CtxLogger, + constructors map[string]ProcessorPluginConstructor, + schemaService pprocutils.SchemaService, +) *Registry { + // set schema service and logger for builtin processors + schema.SchemaService = schemaService + pprocutils.Logger = logger.WithComponent("processor"). + ZerologWithComponent(). + Hook(ctxutil.ProcessorIDLogCtxHook{}) + logger = logger.WithComponent("plugin.processor.builtin.Registry") buildInfo, ok := debug.ReadBuildInfo() if !ok { @@ -86,25 +98,6 @@ func NewRegistry(logger log.CtxLogger, constructors map[string]ProcessorPluginCo return r } -func NewProcessorPluginConstructor(processorPlugin sdk.Processor) ProcessorPluginConstructor { - procType := reflect.TypeOf(processorPlugin) - for procType.Kind() != reflect.Struct { - procType.Elem() - } - - f := func(logger log.CtxLogger) sdk.Processor { - // TODO create processor plugin wrapper that injects logger into context - // before forwarding the call to the plugin - newProcValue := reflect.New(procType) - return newProcValue.Interface().(sdk.Processor) - } - - // try out f, to catch any panic early - f(log.CtxLogger{}) - - return f -} - func loadPlugins(buildInfo *debug.BuildInfo, constructors map[string]ProcessorPluginConstructor) map[string]map[string]blueprint { plugins := make(map[string]map[string]blueprint, len(constructors)) for moduleName, constructor := range constructors { @@ -171,7 +164,7 @@ func newFullName(pluginName, pluginVersion string) plugin.FullName { return plugin.NewFullName(plugin.PluginTypeBuiltin, pluginName, pluginVersion) } -func (r *Registry) NewProcessor(_ context.Context, fullName plugin.FullName, _ string) (sdk.Processor, error) { +func (r *Registry) NewProcessor(_ context.Context, fullName plugin.FullName, id string) (sdk.Processor, error) { versionMap, ok := r.plugins[fullName.PluginName()] if !ok { return nil, plugin.ErrPluginNotFound @@ -185,7 +178,14 @@ func (r *Registry) NewProcessor(_ context.Context, fullName plugin.FullName, _ s return nil, cerrors.Errorf("could not find builtin plugin %q, only found versions %v: %w", fullName, availableVersions, plugin.ErrPluginNotFound) } - return b.constructor(r.logger), nil + p := b.constructor(r.logger) + + // apply default middleware + p = sdk.ProcessorWithMiddleware(p, sdk.DefaultProcessorMiddleware(p.MiddlewareOptions()...)...) + // attach processor ID for logs + p = newProcessorWithID(p, id) + + return p, nil } func (r *Registry) List() map[plugin.FullName]sdk.Specification { diff --git a/pkg/plugin/processor/builtin/registry_test.go b/pkg/plugin/processor/builtin/registry_test.go index 3f9c55218..e92292953 100644 --- a/pkg/plugin/processor/builtin/registry_test.go +++ b/pkg/plugin/processor/builtin/registry_test.go @@ -43,7 +43,7 @@ func TestRegistry_List(t *testing.T) { "builtin:test-processor@v0.1.2": procSpec, } - reg := NewRegistry(logger, map[string]ProcessorPluginConstructor{procSpec.Name: procConstructor}) + reg := NewRegistry(logger, map[string]ProcessorPluginConstructor{procSpec.Name: procConstructor}, nil) got := reg.List() is.Equal(got, wantList) diff --git a/pkg/plugin/processor/mock/processor.go b/pkg/plugin/processor/mock/processor.go index 5f9bd4d18..df1b83606 100644 --- a/pkg/plugin/processor/mock/processor.go +++ b/pkg/plugin/processor/mock/processor.go @@ -13,6 +13,7 @@ import ( context "context" reflect "reflect" + config "github.com/conduitio/conduit-commons/config" opencdc "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" gomock "go.uber.org/mock/gomock" @@ -43,7 +44,7 @@ func (m *Processor) EXPECT() *ProcessorMockRecorder { } // Configure mocks base method. -func (m *Processor) Configure(arg0 context.Context, arg1 map[string]string) error { +func (m *Processor) Configure(arg0 context.Context, arg1 config.Config) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Configure", arg0, arg1) ret0, _ := ret[0].(error) @@ -69,13 +70,51 @@ func (c *ProcessorConfigureCall) Return(arg0 error) *ProcessorConfigureCall { } // Do rewrite *gomock.Call.Do -func (c *ProcessorConfigureCall) Do(f func(context.Context, map[string]string) error) *ProcessorConfigureCall { +func (c *ProcessorConfigureCall) Do(f func(context.Context, config.Config) error) *ProcessorConfigureCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *ProcessorConfigureCall) DoAndReturn(f func(context.Context, map[string]string) error) *ProcessorConfigureCall { +func (c *ProcessorConfigureCall) DoAndReturn(f func(context.Context, config.Config) error) *ProcessorConfigureCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// MiddlewareOptions mocks base method. +func (m *Processor) MiddlewareOptions() []sdk.ProcessorMiddlewareOption { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MiddlewareOptions") + ret0, _ := ret[0].([]sdk.ProcessorMiddlewareOption) + return ret0 +} + +// MiddlewareOptions indicates an expected call of MiddlewareOptions. +func (mr *ProcessorMockRecorder) MiddlewareOptions() *ProcessorMiddlewareOptionsCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MiddlewareOptions", reflect.TypeOf((*Processor)(nil).MiddlewareOptions)) + return &ProcessorMiddlewareOptionsCall{Call: call} +} + +// ProcessorMiddlewareOptionsCall wrap *gomock.Call +type ProcessorMiddlewareOptionsCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *ProcessorMiddlewareOptionsCall) Return(arg0 []sdk.ProcessorMiddlewareOption) *ProcessorMiddlewareOptionsCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *ProcessorMiddlewareOptionsCall) Do(f func() []sdk.ProcessorMiddlewareOption) *ProcessorMiddlewareOptionsCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *ProcessorMiddlewareOptionsCall) DoAndReturn(f func() []sdk.ProcessorMiddlewareOption) *ProcessorMiddlewareOptionsCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/pkg/plugin/processor/standalone/processor.go b/pkg/plugin/processor/standalone/processor.go index aab7e8ef2..2122452fa 100644 --- a/pkg/plugin/processor/standalone/processor.go +++ b/pkg/plugin/processor/standalone/processor.go @@ -19,6 +19,7 @@ import ( "fmt" "time" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit-processor-sdk/pprocutils" @@ -198,7 +199,7 @@ func (p *wasmProcessor) Specification() (sdk.Specification, error) { } } -func (p *wasmProcessor) Configure(ctx context.Context, config map[string]string) error { +func (p *wasmProcessor) Configure(ctx context.Context, config config.Config) error { req := &processorv1.CommandRequest{ Request: &processorv1.CommandRequest_Configure{ Configure: &processorv1.Configure_Request{ diff --git a/pkg/plugin/processor/standalone/processor_test.go b/pkg/plugin/processor/standalone/processor_test.go index c135e196b..fad3c499e 100644 --- a/pkg/plugin/processor/standalone/processor_test.go +++ b/pkg/plugin/processor/standalone/processor_test.go @@ -25,6 +25,7 @@ import ( "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/plugin" + "github.com/google/go-cmp/cmp" "github.com/matryer/is" ) @@ -40,7 +41,7 @@ func TestWASMProcessor_Specification_Success(t *testing.T) { is.NoErr(err) wantSpec := ChaosProcessorSpecifications() - is.Equal(gotSpec, wantSpec) + is.Equal("", cmp.Diff(gotSpec, wantSpec)) is.NoErr(underTest.Teardown(ctx)) } diff --git a/pkg/plugin/processor/standalone/registry_test.go b/pkg/plugin/processor/standalone/registry_test.go index d505c7e13..2ca347681 100644 --- a/pkg/plugin/processor/standalone/registry_test.go +++ b/pkg/plugin/processor/standalone/registry_test.go @@ -27,6 +27,7 @@ import ( "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/plugin" + "github.com/google/go-cmp/cmp" "github.com/google/uuid" "github.com/matryer/is" ) @@ -43,7 +44,7 @@ func TestRegistry_List(t *testing.T) { want := ChaosProcessorSpecifications() - is.Equal(got, want) + is.Equal("", cmp.Diff(got, want)) } func TestRegistry_MalformedProcessor(t *testing.T) { @@ -84,7 +85,7 @@ func TestRegistry_ChaosProcessor(t *testing.T) { is.True(ok) want := ChaosProcessorSpecifications() - is.Equal(got, want) + is.Equal("", cmp.Diff(got, want)) }) t.Run("NewProcessor", func(t *testing.T) { @@ -97,7 +98,7 @@ func TestRegistry_ChaosProcessor(t *testing.T) { is.NoErr(err) want := ChaosProcessorSpecifications() - is.Equal(got, want) + is.Equal("", cmp.Diff(got, want)) is.NoErr(p.Teardown(ctx)) }) diff --git a/pkg/plugin/processor/standalone/standalone_test.go b/pkg/plugin/processor/standalone/standalone_test.go index 440f60eee..5b2fbca03 100644 --- a/pkg/plugin/processor/standalone/standalone_test.go +++ b/pkg/plugin/processor/standalone/standalone_test.go @@ -17,6 +17,7 @@ package standalone import ( "context" "fmt" + "maps" "os" "os/exec" "testing" @@ -25,6 +26,7 @@ import ( "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/csync" sdk "github.com/conduitio/conduit-processor-sdk" + "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/stealthrocket/wazergo" "github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" @@ -119,25 +121,36 @@ func ChaosProcessorSpecifications() sdk.Specification { config.ValidationInclusion{List: []string{"success", "error", "panic"}}, }, } + + dummyProcessor := sdk.NewProcessorFunc(sdk.Specification{}, nil) + spec, err := sdk.ProcessorWithMiddleware(dummyProcessor, sdk.DefaultProcessorMiddleware()...).Specification() + if err != nil { + panic(cerrors.Errorf("failed to get specifications for middleware: %w", err)) + } + + chaosParams := map[string]config.Parameter{ + "configure": param, + "open": param, + "process.prefix": { + Default: "", + Type: config.ParameterTypeString, + Description: "prefix to be added to the payload's after", + Validations: []config.Validation{ + config.ValidationRequired{}, + }, + }, + "process": param, + "teardown": param, + } + // add parameters from middleware + maps.Copy(chaosParams, spec.Parameters) + return sdk.Specification{ Name: "chaos-processor", Summary: "chaos processor summary", Description: "chaos processor description", Version: "v1.3.5", Author: "Meroxa, Inc.", - Parameters: map[string]config.Parameter{ - "configure": param, - "open": param, - "process.prefix": { - Default: "", - Type: config.ParameterTypeString, - Description: "prefix to be added to the payload's after", - Validations: []config.Validation{ - config.ValidationRequired{}, - }, - }, - "process": param, - "teardown": param, - }, + Parameters: chaosParams, } } diff --git a/pkg/plugin/processor/standalone/test/wasm_processors/chaos/processor.go b/pkg/plugin/processor/standalone/test/wasm_processors/chaos/processor.go index d50850d2a..15c8eea79 100644 --- a/pkg/plugin/processor/standalone/test/wasm_processors/chaos/processor.go +++ b/pkg/plugin/processor/standalone/test/wasm_processors/chaos/processor.go @@ -70,7 +70,7 @@ func (p *chaosProcessor) Specification() (sdk.Specification, error) { }, nil } -func (p *chaosProcessor) Configure(ctx context.Context, cfg map[string]string) error { +func (p *chaosProcessor) Configure(ctx context.Context, cfg config.Config) error { p.cfg = cfg return p.methodBehavior(ctx, "configure") diff --git a/pkg/plugin/processor/standalone/test/wasm_processors/specify_error/processor.go b/pkg/plugin/processor/standalone/test/wasm_processors/specify_error/processor.go index 056873472..37f54aac9 100644 --- a/pkg/plugin/processor/standalone/test/wasm_processors/specify_error/processor.go +++ b/pkg/plugin/processor/standalone/test/wasm_processors/specify_error/processor.go @@ -36,21 +36,6 @@ func (p *testProcessor) Specification() (sdk.Specification, error) { return sdk.Specification{}, errors.New("boom") } -func (p *testProcessor) Configure(context.Context, map[string]string) error { - // TODO implement me - panic("implement me") -} - -func (p *testProcessor) Open(context.Context) error { - // TODO implement me - panic("implement me") -} - func (p *testProcessor) Process(context.Context, []opencdc.Record) []sdk.ProcessedRecord { - // TODO implement me - panic("implement me") -} - -func (p *testProcessor) Teardown(context.Context) error { - return nil + panic("shouldn't be called") } diff --git a/pkg/provisioning/service_test.go b/pkg/provisioning/service_test.go index 945a101e2..812fd5d29 100644 --- a/pkg/provisioning/service_test.go +++ b/pkg/provisioning/service_test.go @@ -33,6 +33,7 @@ import ( "github.com/conduitio/conduit/pkg/plugin/connector/standalone" proc_plugin "github.com/conduitio/conduit/pkg/plugin/processor" proc_builtin "github.com/conduitio/conduit/pkg/plugin/processor/builtin" + "github.com/conduitio/conduit/pkg/plugin/processor/procutils" "github.com/conduitio/conduit/pkg/processor" p1 "github.com/conduitio/conduit/pkg/provisioning/test/pipelines1" p2 "github.com/conduitio/conduit/pkg/provisioning/test/pipelines2" @@ -492,19 +493,21 @@ func TestService_IntegrationTestServices(t *testing.T) { schemaRegistry, err := schemaregistry.NewSchemaRegistry(db) is.NoErr(err) - schemaService := connutils.NewSchemaService(logger, schemaRegistry, tokenService) + connSchemaService := connutils.NewSchemaService(logger, schemaRegistry, tokenService) connPluginService := conn_plugin.NewPluginService( logger, - builtin.NewRegistry(logger, builtin.DefaultBuiltinConnectors, schemaService), + builtin.NewRegistry(logger, builtin.DefaultBuiltinConnectors, connSchemaService), standalone.NewRegistry(logger, ""), tokenService, ) connPluginService.Init(ctx, "conn-utils-token:12345") + procSchemaService := procutils.NewSchemaService(logger, schemaRegistry) + procPluginService := proc_plugin.NewPluginService( logger, - proc_builtin.NewRegistry(logger, proc_builtin.DefaultBuiltinProcessors), + proc_builtin.NewRegistry(logger, proc_builtin.DefaultBuiltinProcessors, procSchemaService), nil, )