Skip to content

Commit

Permalink
backend: add reverse config hook for MongoDB
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasz-sadura committed Sep 22, 2023
1 parent cda7365 commit 40ed8ca
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 41 deletions.
10 changes: 6 additions & 4 deletions backend/pkg/connector/interceptor/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@ func CommunityGuides(opts ...guide.Option) []guide.Guide {
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleMirrorSourceHook)),
guide.NewMirrorHeartbeatGuide(opts...),
guide.NewMongoSourceGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectMongoDBHook),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleMongoDBHook),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleJSONSchemaHook)),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectValidateToConsoleMongoDBHook),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleJSONSchemaHook),
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleMongoDBHook)),
guide.NewMongoSinkGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectMongoDBHook),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleMongoDBHook),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleJSONSchemaHook)),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectValidateToConsoleMongoDBHook),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleJSONSchemaHook),
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleMongoDBHook)),
}
}

Expand Down
2 changes: 1 addition & 1 deletion backend/pkg/connector/interceptor/mirror_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func ConsoleToKafkaConnectMirrorSourceHook(config map[string]any) map[string]any
return config
}

// KafkaConnectToConsoleMirrorSourceHook removed MirrorMaker source specific config options
// KafkaConnectToConsoleMirrorSourceHook removes MirrorMaker source specific config options
func KafkaConnectToConsoleMirrorSourceHook(configs map[string]string) map[string]string {
result := make(map[string]string)
for key, value := range configs {
Expand Down
95 changes: 59 additions & 36 deletions backend/pkg/connector/interceptor/mongo_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,37 @@ var (
// ConsoleToKafkaConnectMongoDBHook sets connection authentication, output format properties and post processor chain
func ConsoleToKafkaConnectMongoDBHook(config map[string]any) map[string]any {
setConnectionURI(config)
setFormatOutputStream(config)
setPostProcessorChain(config)
for _, field := range []string{"key", "value"} {
if config["output.format."+field] == nil {
if v := getFormatOutputString(config[field+".converter"]); v != "" {
config["output.format."+field] = v
}
}
}
config["post.processor.chain"] = getPostProcessorChain(config["post.processor.chain"], config["key.projection.type"], config["value.projection.type"], config["field.renamer.mapping"])

return config
}

// KafkaConnectToConsoleMongoDBHook removes MongoDB specific config options
func KafkaConnectToConsoleMongoDBHook(config map[string]string) map[string]string {
result := make(map[string]string)
for key, value := range config {
if key == "connection.url" {

Check failure on line 39 in backend/pkg/connector/interceptor/mongo_hook.go

View workflow job for this annotation

GitHub Actions / verify

ifElseChain: rewrite if-else to switch statement (gocritic)
// skip
} else if key == "output.format.key" && value == getFormatOutputString(config["key.converter"]) {

Check warning on line 41 in backend/pkg/connector/interceptor/mongo_hook.go

View workflow job for this annotation

GitHub Actions / verify

empty-block: this block is empty, you can remove it (revive)
// skip
} else if key == "output.format.value" && value == getFormatOutputString(config["value.converter"]) {

Check warning on line 43 in backend/pkg/connector/interceptor/mongo_hook.go

View workflow job for this annotation

GitHub Actions / verify

empty-block: this block is empty, you can remove it (revive)
// skip
} else if key == "post.processor.chain" && value == getPostProcessorChain(nil, config["key.projection.type"], config["value.projection.type"], config["field.renamer.mapping"]) {

Check warning on line 45 in backend/pkg/connector/interceptor/mongo_hook.go

View workflow job for this annotation

GitHub Actions / verify

empty-block: this block is empty, you can remove it (revive)
// skip
} else {
result[key] = value
}
}

return result
}

Check failure on line 53 in backend/pkg/connector/interceptor/mongo_hook.go

View workflow job for this annotation

GitHub Actions / verify

File is not `gofumpt`-ed (gofumpt)
func setConnectionURI(config map[string]any) {
if _, exists := config["connection.uri"]; !exists {
if _, exists := config["connection.url"]; exists {
Expand Down Expand Up @@ -62,63 +87,61 @@ func setConnectionURI(config map[string]any) {
}
}

func setFormatOutputStream(config map[string]any) {
for _, field := range []string{"key", "value"} {
if config["output.format."+field] == nil {
switch config[field+".converter"] {
case "org.apache.kafka.connect.json.JsonConverter",
"io.confluent.connect.avro.AvroConverter":
config["output.format."+field] = "schema"
case "org.apache.kafka.connect.storage.StringConverter":
config["output.format."+field] = "json"
case "org.apache.kafka.connect.converters.ByteArrayConverter":
config["output.format."+field] = "bson"
}
}
func getFormatOutputString(converter any) string {
switch converter {
case "org.apache.kafka.connect.json.JsonConverter",
"io.confluent.connect.avro.AvroConverter":
return "schema"
case "org.apache.kafka.connect.storage.StringConverter":
return "json"
case "org.apache.kafka.connect.converters.ByteArrayConverter":
return "bson"
}

return ""
}

func setPostProcessorChain(config map[string]any) {
var postProcessorChain string
if config["post.processor.chain"] != nil {
postProcessorChain = config["post.processor.chain"].(string)
func getPostProcessorChain(postProcessorChain any, keyProjectionType any, valueProjectionType any, fieldRenamerMapping any) string {
var postProcessorChainResult string
if postProcessorChain != nil {
postProcessorChainResult = postProcessorChain.(string)
} else {
postProcessorChain = "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder"
postProcessorChainResult = "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder"
}

switch config["key.projection.type"] {
switch keyProjectionType {
case "allowlist":
if !strings.Contains(postProcessorChain, "com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector") {
postProcessorChain += ",com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector"
if !strings.Contains(postProcessorChainResult, "com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector") {
postProcessorChainResult += ",com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector"
}
case "blocklist":
if !strings.Contains(postProcessorChain, "com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector") {
postProcessorChain += ",com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector"
if !strings.Contains(postProcessorChainResult, "com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector") {
postProcessorChainResult += ",com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector"
}
}

switch config["value.projection.type"] {
switch valueProjectionType {
case "allowlist":
if !strings.Contains(postProcessorChain, "com.mongodb.kafka.connect.sink.processor.AllowListValueProjector") {
postProcessorChain += ",com.mongodb.kafka.connect.sink.processor.AllowListValueProjector"
if !strings.Contains(postProcessorChainResult, "com.mongodb.kafka.connect.sink.processor.AllowListValueProjector") {
postProcessorChainResult += ",com.mongodb.kafka.connect.sink.processor.AllowListValueProjector"
}
case "blocklist":
if !strings.Contains(postProcessorChain, "com.mongodb.kafka.connect.sink.processor.BlockListValueProjector") {
postProcessorChain += ",com.mongodb.kafka.connect.sink.processor.BlockListValueProjector"
if !strings.Contains(postProcessorChainResult, "com.mongodb.kafka.connect.sink.processor.BlockListValueProjector") {
postProcessorChainResult += ",com.mongodb.kafka.connect.sink.processor.BlockListValueProjector"
}
}

if config["field.renamer.mapping"] != nil && config["field.renamer.mapping"] != "[]" {
if !strings.Contains(postProcessorChain, "com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping") {
postProcessorChain += ",com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping"
if fieldRenamerMapping != nil && fieldRenamerMapping != "" && fieldRenamerMapping != "[]" {
if !strings.Contains(postProcessorChainResult, "com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping") {
postProcessorChainResult += ",com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping"
}
}

config["post.processor.chain"] = postProcessorChain
return postProcessorChainResult
}

// KafkaConnectToConsoleMongoDBHook adds connection fields: URL, username and password
func KafkaConnectToConsoleMongoDBHook(response model.ValidationResponse, _ map[string]any) model.ValidationResponse {
// KafkaConnectValidateToConsoleMongoDBHook adds connection fields: URL, username and password
func KafkaConnectValidateToConsoleMongoDBHook(response model.ValidationResponse, _ map[string]any) model.ValidationResponse {
response.Configs = append(response.Configs,
model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Expand Down
146 changes: 146 additions & 0 deletions backend/pkg/connector/interceptor/mongo_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,3 +414,149 @@ func TestConsoleToKafkaConnectMongoDBHook(t *testing.T) {
})
}
}

func TestKafkaConnectToConsoleMongoDBHook(t *testing.T) {
tests := []struct {
name string
config map[string]string
want map[string]string
}{
{
name: "Should remove connection.url",
config: map[string]string{
"connection.uri": "mongodb+srv://cluster0.abcd.mongodb.net",
"connection.url": "mongodb+srv://cluster0.abcd.mongodb.net",
},
want: map[string]string{
"connection.uri": "mongodb+srv://cluster0.abcd.mongodb.net",
},
},
{
name: "Should remove default post.processor.chain",
config: map[string]string{
"connection.uri": "mongodb+srv://cluster0.abcd.mongodb.net",
"post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder",
},
want: map[string]string{
"connection.uri": "mongodb+srv://cluster0.abcd.mongodb.net",
},
},
{
name: "Should not remove custom post.processor.chain",
config: map[string]string{
"connection.uri": "mongodb+srv://cluster0.abcd.mongodb.net",
"post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,Custom",
},
want: map[string]string{
"connection.uri": "mongodb+srv://cluster0.abcd.mongodb.net",
"post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,Custom",
},
},
{
name: "Should remove post.processor.chain for allowlist",
config: map[string]string{
"connection.uri": "mongodb://",
"key.projection.type": "allowlist",
"post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector",
},
want: map[string]string{
"connection.uri": "mongodb://",
"key.projection.type": "allowlist",
},
},
{
name: "Should remove post.processor.chain for blocklist",
config: map[string]string{
"connection.uri": "mongodb://",
"key.projection.type": "blocklist",
"post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector",
},
want: map[string]string{
"connection.uri": "mongodb://",
"key.projection.type": "blocklist",
},
},
{
name: "Should remove output.format.key for JsonConverter",
config: map[string]string{
"connection.uri": "mongodb://",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"output.format.key": "schema",
},
want: map[string]string{
"connection.uri": "mongodb://",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
},
},
{
name: "Should not remove custom output.format.key for JsonConverter",
config: map[string]string{
"connection.uri": "mongodb://",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"output.format.key": "json",
},
want: map[string]string{
"connection.uri": "mongodb://",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"output.format.key": "json",
},
},
{
name: "Should remove output.format.key for AvroConverter",
config: map[string]string{
"connection.uri": "mongodb://",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"output.format.key": "schema",
},
want: map[string]string{
"connection.uri": "mongodb://",
"key.converter": "io.confluent.connect.avro.AvroConverter",
},
},
{
name: "Should remove output.format.key for StringConverter",
config: map[string]string{
"connection.uri": "mongodb://",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"output.format.key": "json",
},
want: map[string]string{
"connection.uri": "mongodb://",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
},
},
{
name: "Should not remove custom output.format.key for StringConverter",
config: map[string]string{
"connection.uri": "mongodb://",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"output.format.key": "schema",
},
want: map[string]string{
"connection.uri": "mongodb://",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"output.format.key": "schema",
},
},
{
name: "Should remove output.format.key for ByteArrayConverter",
config: map[string]string{
"connection.uri": "mongodb://",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"output.format.key": "bson",
},
want: map[string]string{
"connection.uri": "mongodb://",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := KafkaConnectToConsoleMongoDBHook(tt.config); !reflect.DeepEqual(got, tt.want) {
t.Errorf("ConsoleToKafkaConnectMongoDBHook() = %v, want %v", got, tt.want)
}
})
}
}

0 comments on commit 40ed8ca

Please sign in to comment.