diff --git a/backend/pkg/connector/interceptor/interceptor.go b/backend/pkg/connector/interceptor/interceptor.go index 71661115d..572af76b8 100644 --- a/backend/pkg/connector/interceptor/interceptor.go +++ b/backend/pkg/connector/interceptor/interceptor.go @@ -96,11 +96,13 @@ func CommunityGuides(opts ...guide.Option) []guide.Guide { guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleMirrorSourceHook)), guide.NewMirrorHeartbeatGuide(opts...), guide.NewMongoSourceGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectMongoDBHook), - guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleMongoDBHook), - guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleJSONSchemaHook)), + guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectValidateToConsoleMongoDBHook), + guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleJSONSchemaHook), + guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleMongoDBHook)), guide.NewMongoSinkGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectMongoDBHook), - guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleMongoDBHook), - guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleJSONSchemaHook)), + guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectValidateToConsoleMongoDBHook), + guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleJSONSchemaHook), + guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleMongoDBHook)), } } diff --git a/backend/pkg/connector/interceptor/mirror_hook.go b/backend/pkg/connector/interceptor/mirror_hook.go index f600ce8ac..8cdf8cef3 100644 --- a/backend/pkg/connector/interceptor/mirror_hook.go +++ b/backend/pkg/connector/interceptor/mirror_hook.go @@ -31,7 +31,7 @@ func ConsoleToKafkaConnectMirrorSourceHook(config map[string]any) map[string]any return config } -// KafkaConnectToConsoleMirrorSourceHook removed MirrorMaker source specific config options +// KafkaConnectToConsoleMirrorSourceHook removes MirrorMaker source specific config options func KafkaConnectToConsoleMirrorSourceHook(configs map[string]string) map[string]string { result := make(map[string]string) for key, value := range configs { diff --git a/backend/pkg/connector/interceptor/mongo_hook.go b/backend/pkg/connector/interceptor/mongo_hook.go index 0390dbaa0..44c3a630f 100644 --- a/backend/pkg/connector/interceptor/mongo_hook.go +++ b/backend/pkg/connector/interceptor/mongo_hook.go @@ -20,12 +20,37 @@ var ( // ConsoleToKafkaConnectMongoDBHook sets connection authentication, output format properties and post processor chain func ConsoleToKafkaConnectMongoDBHook(config map[string]any) map[string]any { setConnectionURI(config) - setFormatOutputStream(config) - setPostProcessorChain(config) + for _, field := range []string{"key", "value"} { + if config["output.format."+field] == nil { + if v := getFormatOutputString(config[field+".converter"]); v != "" { + config["output.format."+field] = v + } + } + } + config["post.processor.chain"] = getPostProcessorChain(config["post.processor.chain"], config["key.projection.type"], config["value.projection.type"], config["field.renamer.mapping"]) return config } +// KafkaConnectToConsoleMongoDBHook removes MongoDB specific config options +func KafkaConnectToConsoleMongoDBHook(config map[string]string) map[string]string { + result := make(map[string]string) + for key, value := range config { + if key == "connection.url" { + // skip + } else if key == "output.format.key" && value == getFormatOutputString(config["key.converter"]) { + // skip + } else if key == "output.format.value" && value == getFormatOutputString(config["value.converter"]) { + // skip + } else if key == "post.processor.chain" && value == getPostProcessorChain(nil, config["key.projection.type"], config["value.projection.type"], config["field.renamer.mapping"]) { + // skip + } else { + result[key] = value + } + } + + return result +} func setConnectionURI(config map[string]any) { if _, exists := config["connection.uri"]; !exists { if _, exists := config["connection.url"]; exists { @@ -62,63 +87,61 @@ func setConnectionURI(config map[string]any) { } } -func setFormatOutputStream(config map[string]any) { - for _, field := range []string{"key", "value"} { - if config["output.format."+field] == nil { - switch config[field+".converter"] { - case "org.apache.kafka.connect.json.JsonConverter", - "io.confluent.connect.avro.AvroConverter": - config["output.format."+field] = "schema" - case "org.apache.kafka.connect.storage.StringConverter": - config["output.format."+field] = "json" - case "org.apache.kafka.connect.converters.ByteArrayConverter": - config["output.format."+field] = "bson" - } - } +func getFormatOutputString(converter any) string { + switch converter { + case "org.apache.kafka.connect.json.JsonConverter", + "io.confluent.connect.avro.AvroConverter": + return "schema" + case "org.apache.kafka.connect.storage.StringConverter": + return "json" + case "org.apache.kafka.connect.converters.ByteArrayConverter": + return "bson" } + + return "" } -func setPostProcessorChain(config map[string]any) { - var postProcessorChain string - if config["post.processor.chain"] != nil { - postProcessorChain = config["post.processor.chain"].(string) +func getPostProcessorChain(postProcessorChain any, keyProjectionType any, valueProjectionType any, fieldRenamerMapping any) string { + var postProcessorChainResult string + if postProcessorChain != nil { + postProcessorChainResult = postProcessorChain.(string) } else { - postProcessorChain = "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder" + postProcessorChainResult = "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder" } - switch config["key.projection.type"] { + switch keyProjectionType { case "allowlist": - if !strings.Contains(postProcessorChain, "com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector") { - postProcessorChain += ",com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector" + if !strings.Contains(postProcessorChainResult, "com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector") { + postProcessorChainResult += ",com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector" } case "blocklist": - if !strings.Contains(postProcessorChain, "com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector") { - postProcessorChain += ",com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector" + if !strings.Contains(postProcessorChainResult, "com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector") { + postProcessorChainResult += ",com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector" } } - switch config["value.projection.type"] { + switch valueProjectionType { case "allowlist": - if !strings.Contains(postProcessorChain, "com.mongodb.kafka.connect.sink.processor.AllowListValueProjector") { - postProcessorChain += ",com.mongodb.kafka.connect.sink.processor.AllowListValueProjector" + if !strings.Contains(postProcessorChainResult, "com.mongodb.kafka.connect.sink.processor.AllowListValueProjector") { + postProcessorChainResult += ",com.mongodb.kafka.connect.sink.processor.AllowListValueProjector" } case "blocklist": - if !strings.Contains(postProcessorChain, "com.mongodb.kafka.connect.sink.processor.BlockListValueProjector") { - postProcessorChain += ",com.mongodb.kafka.connect.sink.processor.BlockListValueProjector" + if !strings.Contains(postProcessorChainResult, "com.mongodb.kafka.connect.sink.processor.BlockListValueProjector") { + postProcessorChainResult += ",com.mongodb.kafka.connect.sink.processor.BlockListValueProjector" } } - if config["field.renamer.mapping"] != nil && config["field.renamer.mapping"] != "[]" { - if !strings.Contains(postProcessorChain, "com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping") { - postProcessorChain += ",com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping" + if fieldRenamerMapping != nil && fieldRenamerMapping != "" && fieldRenamerMapping != "[]" { + if !strings.Contains(postProcessorChainResult, "com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping") { + postProcessorChainResult += ",com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping" } } - config["post.processor.chain"] = postProcessorChain + return postProcessorChainResult } -// KafkaConnectToConsoleMongoDBHook adds connection fields: URL, username and password -func KafkaConnectToConsoleMongoDBHook(response model.ValidationResponse, _ map[string]any) model.ValidationResponse { +// KafkaConnectValidateToConsoleMongoDBHook adds connection fields: URL, username and password +func KafkaConnectValidateToConsoleMongoDBHook(response model.ValidationResponse, _ map[string]any) model.ValidationResponse { response.Configs = append(response.Configs, model.ConfigDefinition{ Definition: model.ConfigDefinitionKey{ diff --git a/backend/pkg/connector/interceptor/mongo_hook_test.go b/backend/pkg/connector/interceptor/mongo_hook_test.go index 3ed11fc02..0d8d23242 100644 --- a/backend/pkg/connector/interceptor/mongo_hook_test.go +++ b/backend/pkg/connector/interceptor/mongo_hook_test.go @@ -414,3 +414,149 @@ func TestConsoleToKafkaConnectMongoDBHook(t *testing.T) { }) } } + +func TestKafkaConnectToConsoleMongoDBHook(t *testing.T) { + tests := []struct { + name string + config map[string]string + want map[string]string + }{ + { + name: "Should remove connection.url", + config: map[string]string{ + "connection.uri": "mongodb+srv://cluster0.abcd.mongodb.net", + "connection.url": "mongodb+srv://cluster0.abcd.mongodb.net", + }, + want: map[string]string{ + "connection.uri": "mongodb+srv://cluster0.abcd.mongodb.net", + }, + }, + { + name: "Should remove default post.processor.chain", + config: map[string]string{ + "connection.uri": "mongodb+srv://cluster0.abcd.mongodb.net", + "post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder", + }, + want: map[string]string{ + "connection.uri": "mongodb+srv://cluster0.abcd.mongodb.net", + }, + }, + { + name: "Should not remove custom post.processor.chain", + config: map[string]string{ + "connection.uri": "mongodb+srv://cluster0.abcd.mongodb.net", + "post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,Custom", + }, + want: map[string]string{ + "connection.uri": "mongodb+srv://cluster0.abcd.mongodb.net", + "post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,Custom", + }, + }, + { + name: "Should remove post.processor.chain for allowlist", + config: map[string]string{ + "connection.uri": "mongodb://", + "key.projection.type": "allowlist", + "post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector", + }, + want: map[string]string{ + "connection.uri": "mongodb://", + "key.projection.type": "allowlist", + }, + }, + { + name: "Should remove post.processor.chain for blocklist", + config: map[string]string{ + "connection.uri": "mongodb://", + "key.projection.type": "blocklist", + "post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector", + }, + want: map[string]string{ + "connection.uri": "mongodb://", + "key.projection.type": "blocklist", + }, + }, + { + name: "Should remove output.format.key for JsonConverter", + config: map[string]string{ + "connection.uri": "mongodb://", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "output.format.key": "schema", + }, + want: map[string]string{ + "connection.uri": "mongodb://", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + }, + }, + { + name: "Should not remove custom output.format.key for JsonConverter", + config: map[string]string{ + "connection.uri": "mongodb://", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "output.format.key": "json", + }, + want: map[string]string{ + "connection.uri": "mongodb://", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "output.format.key": "json", + }, + }, + { + name: "Should remove output.format.key for AvroConverter", + config: map[string]string{ + "connection.uri": "mongodb://", + "key.converter": "io.confluent.connect.avro.AvroConverter", + "output.format.key": "schema", + }, + want: map[string]string{ + "connection.uri": "mongodb://", + "key.converter": "io.confluent.connect.avro.AvroConverter", + }, + }, + { + name: "Should remove output.format.key for StringConverter", + config: map[string]string{ + "connection.uri": "mongodb://", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "output.format.key": "json", + }, + want: map[string]string{ + "connection.uri": "mongodb://", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + }, + }, + { + name: "Should not remove custom output.format.key for StringConverter", + config: map[string]string{ + "connection.uri": "mongodb://", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "output.format.key": "schema", + }, + want: map[string]string{ + "connection.uri": "mongodb://", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "output.format.key": "schema", + }, + }, + { + name: "Should remove output.format.key for ByteArrayConverter", + config: map[string]string{ + "connection.uri": "mongodb://", + "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", + "output.format.key": "bson", + }, + want: map[string]string{ + "connection.uri": "mongodb://", + "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := KafkaConnectToConsoleMongoDBHook(tt.config); !reflect.DeepEqual(got, tt.want) { + t.Errorf("ConsoleToKafkaConnectMongoDBHook() = %v, want %v", got, tt.want) + } + }) + } +}