Skip to content

Commit

Permalink
Update processor SDK, add middleware (#1742)
Browse files Browse the repository at this point in the history
* update processor SDK

- apply processor middleware to builtin processors
- use config.Config for configuration map
- take into account middleware parameters in tests

* organize imports

* processor changes

- update processor SDK
- set SchemaService for builtin processors
- add support for "time" in field.convert processor

* go generate

* rename config.Config parameter names

* use proper logger for builtin processors

* update generator and processor-sdk

* update processor-sdk
  • Loading branch information
lovromazgon authored Aug 5, 2024
1 parent a8d76dc commit d6e83ce
Show file tree
Hide file tree
Showing 70 changed files with 614 additions and 336 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ require (
github.com/bufbuild/buf v1.35.1
github.com/conduitio/conduit-commons v0.2.1-0.20240801113202-731b460a2c58
github.com/conduitio/conduit-connector-file v0.6.1-0.20240802092632-99d8e67ba629
github.com/conduitio/conduit-connector-generator v0.6.1-0.20240621111436-e9fa3464f7b2
github.com/conduitio/conduit-connector-generator v0.6.1-0.20240731150050-422a9e3cbd8f
github.com/conduitio/conduit-connector-kafka v0.8.1-0.20240802092956-d92779b95e5b
github.com/conduitio/conduit-connector-log v0.3.1-0.20240621111440-e2f0f04a35a4
github.com/conduitio/conduit-connector-postgres v0.7.6-0.20240630172132-84b5a6e6104f
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240802103310-fd4ab945b1ac
github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240802092651-67dc543a6c90
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240802133134-6635ddc2aff6
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240731185647-3b6f7d3b4b71
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240805163917-9b6b1e83ebde
github.com/conduitio/conduit-schema-registry v0.0.0-20240725155046-a0fdb61d2588
github.com/conduitio/yaml/v3 v3.3.0
github.com/dop251/goja v0.0.0-20231027120936-b396bb4c349d
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ github.com/conduitio/conduit-commons v0.2.1-0.20240801113202-731b460a2c58 h1:bv6
github.com/conduitio/conduit-commons v0.2.1-0.20240801113202-731b460a2c58/go.mod h1:QYVlSvfOG4AB9tFz9NS++ToinF1yuGG0D0CgzwLG4k0=
github.com/conduitio/conduit-connector-file v0.6.1-0.20240802092632-99d8e67ba629 h1:q5rAlI2f5bD7ZG0TYzPeLnu6/BkF4GbgHH6177NcA+8=
github.com/conduitio/conduit-connector-file v0.6.1-0.20240802092632-99d8e67ba629/go.mod h1:xHC0eA+/mtfoRePx1Ns+xNYvr34hcvctb+0MNPT+ngI=
github.com/conduitio/conduit-connector-generator v0.6.1-0.20240621111436-e9fa3464f7b2 h1:WMKvmvaE/E+03/0nz/2JpyelCd2nPtOTuBy3eyWcI58=
github.com/conduitio/conduit-connector-generator v0.6.1-0.20240621111436-e9fa3464f7b2/go.mod h1:2+n1SGAJ/+TpJC9YvxTUWHKt9e0Tm12JcqFJ8OndhX4=
github.com/conduitio/conduit-connector-generator v0.6.1-0.20240731150050-422a9e3cbd8f h1:8IpdNsz3lLtN4KWdZGJESTjfSZz3UEj+YiuVQOSw5Xs=
github.com/conduitio/conduit-connector-generator v0.6.1-0.20240731150050-422a9e3cbd8f/go.mod h1:28hV3b3aErncncQn3ihDjiUMhKXGG8u/7VSEmKVIi4Q=
github.com/conduitio/conduit-connector-kafka v0.8.1-0.20240802092956-d92779b95e5b h1:dvCqshxRFhaQLg/mnbDccuvM/6dQdj1h9fAoo+CG8SA=
github.com/conduitio/conduit-connector-kafka v0.8.1-0.20240802092956-d92779b95e5b/go.mod h1:AZ6CPfCVsD+a66FFdd3zcbS3T3Sv8KSM0+CpAFBDK7U=
github.com/conduitio/conduit-connector-log v0.3.1-0.20240621111440-e2f0f04a35a4 h1:MrfxObXRj7Psz981MNNi6wUj+diu7nuq+/Wl3MqXMpk=
Expand All @@ -229,8 +229,8 @@ github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240802092651-67dc543a6c90 h
github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240802092651-67dc543a6c90/go.mod h1:Cg0rM0NJdIO+CQZWyXtikUVxVainUUq0MqQr8sWnO8E=
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240802133134-6635ddc2aff6 h1:yX2SddKRmb1gMi94umalIKF+8+hipknGAEFNXz0+B+E=
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240802133134-6635ddc2aff6/go.mod h1:R2V+ZXCFIeIIv8xxZHsi6NGntgsimsEbhQg4ezhna+0=
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240731185647-3b6f7d3b4b71 h1:TCHq3L/LS9Ngo2a4h39vSNpXXisTox1l5uXdwPxxtNM=
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240731185647-3b6f7d3b4b71/go.mod h1:n6VqVO07olTlvIUSHf2kZcU8cgu2jGmuO6bFrQST2v8=
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240805163917-9b6b1e83ebde h1:fsHFTJ7XeFromOcg2Z6ueyDOmhDw8KV/XTNU2I59jQ0=
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240805163917-9b6b1e83ebde/go.mod h1:kQ+7bUREM+F2L/yPPM/GF8z2lqqmpx8Su+ioyE1uO8Q=
github.com/conduitio/conduit-schema-registry v0.0.0-20240725155046-a0fdb61d2588 h1:/OBjxI1JjE3AmifouogZ2KvlhGJ9tQGk4X7UxwjHo1o=
github.com/conduitio/conduit-schema-registry v0.0.0-20240725155046-a0fdb61d2588/go.mod h1:G5t9W5Z5Mn0nW1TNnIQ1al4piqRXjc1R7HjdHgGFCx4=
github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI=
Expand Down
2 changes: 1 addition & 1 deletion pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func createServices(r *Runtime) error {

procPluginService := proc_plugin.NewPluginService(
r.logger,
proc_builtin.NewRegistry(r.logger, proc_builtin.DefaultBuiltinProcessors),
proc_builtin.NewRegistry(r.logger, proc_builtin.DefaultBuiltinProcessors, procSchemaService),
standaloneReg,
)

Expand Down
52 changes: 52 additions & 0 deletions pkg/foundation/ctxutil/processorid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ctxutil

import (
"context"

"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/rs/zerolog"
)

// processorIDCtxKey is used as the key when saving the processor ID in a context.
type processorIDCtxKey struct{}

// ContextWithProcessorID wraps ctx and returns a context that contains processor ID.
func ContextWithProcessorID(ctx context.Context, processorID string) context.Context {
return context.WithValue(ctx, processorIDCtxKey{}, processorID)
}

// ProcessorIDFromContext fetches the record processor ID from the context. If the
// context does not contain a processor ID it returns nil.
func ProcessorIDFromContext(ctx context.Context) string {
processorID := ctx.Value(processorIDCtxKey{})
if processorID != nil {
return processorID.(string)
}
return ""
}

// ProcessorIDLogCtxHook fetches the record processor ID from the context and if it
// exists it adds the processorID to the log output.
type ProcessorIDLogCtxHook struct{}

// Run executes the log hook.
func (h ProcessorIDLogCtxHook) Run(e *zerolog.Event, _ zerolog.Level, _ string) {
p := ProcessorIDFromContext(e.GetCtx())
if p != "" {
e.Str(log.ProcessorIDField, p)
}
}
92 changes: 92 additions & 0 deletions pkg/foundation/ctxutil/processorid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright © 2022 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ctxutil

import (
"bytes"
"context"
"fmt"
"testing"

"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/google/uuid"
"github.com/matryer/is"
"github.com/rs/zerolog"
)

func TestContextWithProcessorID_Success(t *testing.T) {
is := is.New(t)

ctx := context.Background()
processorID := uuid.NewString()

ctx = ContextWithProcessorID(ctx, processorID)
got := ProcessorIDFromContext(ctx)

is.Equal(processorID, got)
}

func TestContextWithProcessorID_Twice(t *testing.T) {
is := is.New(t)

ctx := context.Background()
processorID := uuid.NewString()

ctx = ContextWithProcessorID(ctx, "existing processor ID")
ctx = ContextWithProcessorID(ctx, processorID)
got := ProcessorIDFromContext(ctx)

is.Equal(processorID, got)
}

func TestProcessorIDFromContext_Empty(t *testing.T) {
is := is.New(t)

ctx := context.Background()
got := ProcessorIDFromContext(ctx)

is.Equal("", got)
}

func TestProcessorIDLogCtxHook_Success(t *testing.T) {
is := is.New(t)

ctx := context.Background()
processorID := uuid.NewString()

ctx = ContextWithProcessorID(ctx, processorID)

var logOutput bytes.Buffer
logger := zerolog.New(&logOutput)
e := logger.Info().Ctx(ctx)
ProcessorIDLogCtxHook{}.Run(e, zerolog.InfoLevel, "")
e.Send()

is.Equal(fmt.Sprintf(`{"level":"info","%s":"%s"}`, log.ProcessorIDField, processorID)+"\n", logOutput.String())
}

func TestProcessorIDLogCtxHook_EmptyCtx(t *testing.T) {
is := is.New(t)

ctx := context.Background()

var logOutput bytes.Buffer
logger := zerolog.New(&logOutput)
e := logger.Info().Ctx(ctx)
ProcessorIDLogCtxHook{}.Run(e, zerolog.InfoLevel, "")
e.Send()

is.Equal(`{"level":"info"}`+"\n", logOutput.String())
}
9 changes: 6 additions & 3 deletions pkg/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
conn_standalone "github.com/conduitio/conduit/pkg/plugin/connector/standalone"
proc_plugin "github.com/conduitio/conduit/pkg/plugin/processor"
proc_builtin "github.com/conduitio/conduit/pkg/plugin/processor/builtin"
"github.com/conduitio/conduit/pkg/plugin/processor/procutils"
"github.com/conduitio/conduit/pkg/processor"
"github.com/google/go-cmp/cmp"
"github.com/matryer/is"
Expand Down Expand Up @@ -74,19 +75,21 @@ func TestPipelineSimple(t *testing.T) {
schemaRegistry, err := schemaregistry.NewSchemaRegistry(db)
is.NoErr(err)
authManager := connutils.NewAuthManager()
schemaService := connutils.NewSchemaService(logger, schemaRegistry, authManager)
connSchemaService := connutils.NewSchemaService(logger, schemaRegistry, authManager)

connPluginService := conn_plugin.NewPluginService(
logger,
conn_builtin.NewRegistry(logger, conn_builtin.DefaultBuiltinConnectors, schemaService),
conn_builtin.NewRegistry(logger, conn_builtin.DefaultBuiltinConnectors, connSchemaService),
conn_standalone.NewRegistry(logger, ""),
authManager,
)
connPluginService.Init(ctx, "conn-utils-token:12345")

procSchemaService := procutils.NewSchemaService(logger, schemaRegistry)

procPluginService := proc_plugin.NewPluginService(
logger,
proc_builtin.NewRegistry(logger, proc_builtin.DefaultBuiltinProcessors),
proc_builtin.NewRegistry(logger, proc_builtin.DefaultBuiltinProcessors, procSchemaService),
nil,
)

Expand Down
25 changes: 13 additions & 12 deletions pkg/plugin/processor/builtin/impl/avro/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"testing"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand All @@ -27,13 +28,13 @@ import (
func TestConfig_Parse(t *testing.T) {
testCases := []struct {
name string
input map[string]string
input config.Config
want encodeConfig
wantErr error
}{
{
name: "preRegistered",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "preRegistered",
"schema.preRegistered.subject": "testsubject",
Expand All @@ -53,7 +54,7 @@ func TestConfig_Parse(t *testing.T) {
},
{
name: "preRegistered without version",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "preRegistered",
"schema.preRegistered.subject": "testsubject",
Expand All @@ -62,7 +63,7 @@ func TestConfig_Parse(t *testing.T) {
},
{
name: "preRegistered without subject",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "preRegistered",
"schema.preRegistered.version": "123",
Expand All @@ -71,7 +72,7 @@ func TestConfig_Parse(t *testing.T) {
},
{
name: "autoRegister",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
Expand All @@ -87,15 +88,15 @@ func TestConfig_Parse(t *testing.T) {
},
{
name: "autoRegister without subject",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
},
wantErr: cerrors.New("failed parsing schema strategy: subject required for schema strategy 'autoRegister'"),
},
{
name: "non-default target field",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
Expand All @@ -112,7 +113,7 @@ func TestConfig_Parse(t *testing.T) {
},
{
name: "valid auth",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
Expand All @@ -134,7 +135,7 @@ func TestConfig_Parse(t *testing.T) {
},
{
name: "auth -- no username",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
Expand All @@ -144,7 +145,7 @@ func TestConfig_Parse(t *testing.T) {
},
{
name: "auth -- no password",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
Expand All @@ -154,7 +155,7 @@ func TestConfig_Parse(t *testing.T) {
},
{
name: "tls: missing client cert and key",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
Expand All @@ -166,7 +167,7 @@ missing field: tls.client.key`),
},
{
name: "valid tls",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
Expand Down
9 changes: 5 additions & 4 deletions pkg/plugin/processor/builtin/impl/avro/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"crypto/tls"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
Expand Down Expand Up @@ -49,9 +50,9 @@ type decodeConfig struct {
fieldResolver sdk.ReferenceResolver
}

func parseDecodeConfig(ctx context.Context, m map[string]string) (decodeConfig, error) {
func parseDecodeConfig(ctx context.Context, c config.Config) (decodeConfig, error) {
cfg := decodeConfig{}
err := sdk.ParseConfig(ctx, m, &cfg, cfg.Parameters())
err := sdk.ParseConfig(ctx, c, &cfg, cfg.Parameters())
if err != nil {
return decodeConfig{}, err
}
Expand Down Expand Up @@ -126,8 +127,8 @@ This processor is the counterpart to [` + "`avro.encode`" + `](/docs/processors/
}, nil
}

func (p *decodeProcessor) Configure(ctx context.Context, m map[string]string) error {
cfg, err := parseDecodeConfig(ctx, m)
func (p *decodeProcessor) Configure(ctx context.Context, c config.Config) error {
cfg, err := parseDecodeConfig(ctx, c)
if err != nil {
return cerrors.Errorf("invalid config: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/log"
Expand Down Expand Up @@ -74,7 +75,7 @@ In this example we use the following schema:
]
}
` + "```",
Config: map[string]string{
Config: config.Config{
"url": url,
"field": ".Key",
},
Expand Down
Loading

0 comments on commit d6e83ce

Please sign in to comment.