Skip to content

Commit

Permalink
backend: add reverse config hook option for MM2
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasz-sadura committed Sep 11, 2023
1 parent 2227979 commit b1e07c1
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 31 deletions.
8 changes: 6 additions & 2 deletions backend/pkg/connector/guide/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func (g *DefaultGuide) ConsoleToKafkaConnect(configs map[string]any) map[string]

// KafkaConnectToConsole implements Guide.KafkaConnectToConsole.
func (g *DefaultGuide) KafkaConnectToConsole(configs map[string]string) map[string]string {
if g.options.kafkaConnectValidateToConsoleHookFn != nil {
configs = g.options.kafkaConnectToConsoleHookFn(configs)
}

result := make(map[string]string)
for key, value := range configs {
if !g.wasInjected(key, value) {
Expand Down Expand Up @@ -175,10 +179,10 @@ func (g *DefaultGuide) KafkaConnectValidateToConsole(pluginClassName string, pat
},
}

if g.options.kafkaConnectToConsoleHookFn == nil {
if g.options.kafkaConnectValidateToConsoleHookFn == nil {
return validationResponse
}
return g.options.kafkaConnectToConsoleHookFn(validationResponse, originalConfig)
return g.options.kafkaConnectValidateToConsoleHookFn(validationResponse, originalConfig)
}

func (g *DefaultGuide) wasInjected(key string, value string) bool {
Expand Down
34 changes: 27 additions & 7 deletions backend/pkg/connector/guide/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ import (
"github.com/redpanda-data/console/backend/pkg/connector/model"
)

// KafkaConnectToConsoleHook is a function that lets you modify the validation response
// KafkaConnectToConsoleHook is a function that lets you modify the connector config
// before it is sent to the Console frontend.
type KafkaConnectToConsoleHook = func(result model.ValidationResponse, config map[string]any) model.ValidationResponse
type KafkaConnectToConsoleHook = func(config map[string]string) map[string]string

// KafkaConnectValidateToConsoleHook is a function that lets you modify the validation response
// before it is sent to the Console frontend.
type KafkaConnectValidateToConsoleHook = func(result model.ValidationResponse, config map[string]any) model.ValidationResponse

// ConsoleToKafkaConnectHook is a function that lets you modify the configuration key/value
// pairs before they are sent from Console to Kafka Connect.
Expand All @@ -25,8 +29,9 @@ type ConsoleToKafkaConnectHook = func(map[string]any) map[string]any
type Options struct {
injectedValues map[string]injectedValue

consoleToKafkaConnectHookFn ConsoleToKafkaConnectHook
kafkaConnectToConsoleHookFn KafkaConnectToConsoleHook
consoleToKafkaConnectHookFn ConsoleToKafkaConnectHook
kafkaConnectValidateToConsoleHookFn KafkaConnectValidateToConsoleHook
kafkaConnectToConsoleHookFn KafkaConnectToConsoleHook
}

type injectedValue struct {
Expand Down Expand Up @@ -70,15 +75,30 @@ func WithConsoleToKafkaConnectHookFn(fn ConsoleToKafkaConnectHook) Option {
}
}

// WithKafkaConnectToConsoleHookFn lets you pass a hook which can modify the connector's validation
// WithKafkaConnectValidateToConsoleHookFn lets you pass a hook which can modify the connector's validation
// results before they are sent to the Console frontend. This hook will be called at the end
// of the Guide's KafkaConnectValidateToConsole func, thus it may have done certain modifications already.
func WithKafkaConnectValidateToConsoleHookFn(fn KafkaConnectValidateToConsoleHook) Option {
return func(o *Options) {
if o.kafkaConnectValidateToConsoleHookFn != nil {
previousHook := o.kafkaConnectValidateToConsoleHookFn
o.kafkaConnectValidateToConsoleHookFn = func(result model.ValidationResponse, config map[string]any) model.ValidationResponse {
return fn(previousHook(result, config), config)
}
} else {
o.kafkaConnectValidateToConsoleHookFn = fn
}
}
}

// WithKafkaConnectToConsoleHookFn lets you pass a hook which can modify the connector's config
// before it is sent to the Console frontend.
func WithKafkaConnectToConsoleHookFn(fn KafkaConnectToConsoleHook) Option {
return func(o *Options) {
if o.kafkaConnectToConsoleHookFn != nil {
previousHook := o.kafkaConnectToConsoleHookFn
o.kafkaConnectToConsoleHookFn = func(result model.ValidationResponse, config map[string]any) model.ValidationResponse {
return fn(previousHook(result, config), config)
o.kafkaConnectToConsoleHookFn = func(config map[string]string) map[string]string {
return fn(previousHook(config))
}
} else {
o.kafkaConnectToConsoleHookFn = fn
Expand Down
4 changes: 2 additions & 2 deletions backend/pkg/connector/guide/wizard.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func (g *WizardGuide) KafkaConnectValidateToConsole(pluginClassName string, patc
Configs: configs,
Steps: g.wizardSteps,
}
if g.options.kafkaConnectToConsoleHookFn == nil {
if g.options.kafkaConnectValidateToConsoleHookFn == nil {
return validationResponse
}
return g.options.kafkaConnectToConsoleHookFn(validationResponse, originalConfig)
return g.options.kafkaConnectValidateToConsoleHookFn(validationResponse, originalConfig)
}
34 changes: 18 additions & 16 deletions backend/pkg/connector/interceptor/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,34 +71,36 @@ func CommunityPatches() []patch.ConfigPatch {
// Kafka connect cluster.
func CommunityGuides(opts ...guide.Option) []guide.Guide {
return []guide.Guide{
guide.NewRedpandaAwsS3SinkGuide(guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleJSONSchemaHook),
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleAvroCodecHook)),
guide.NewRedpandaGCSSinkGuide(guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleJSONSchemaHook),
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleAvroCodecHook)),
guide.NewRedpandaAwsS3SinkGuide(guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleJSONSchemaHook),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleAvroCodecHook)),
guide.NewRedpandaGCSSinkGuide(guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleJSONSchemaHook),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleAvroCodecHook)),
guide.NewDebeziumPostgresGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectDebeziumPostgresConfigsHook),
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleDebeziumPostgresSourceHook),
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleCloudEventsConverterHook),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleDebeziumPostgresSourceHook),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleCloudEventsConverterHook),
),
guide.NewDebeziumMySQLGuide(guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleDebeziumMysqlSourceHook),
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleCloudEventsConverterHook)),
guide.NewSnowflakeSinkGuide(guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleSnowflakeHook)),
guide.NewDebeziumMySQLGuide(guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleDebeziumMysqlSourceHook),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleCloudEventsConverterHook)),
guide.NewSnowflakeSinkGuide(guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleSnowflakeHook)),
guide.NewBigQuerySinkGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectBigQueryHook),
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleJSONSchemaHook)),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleJSONSchemaHook)),
guide.NewJdbcSinkGuide(opts...),
guide.NewJdbcSourceGuide(guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleTopicCreationHook)),
guide.NewJdbcSourceGuide(guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleTopicCreationHook)),
guide.NewHTTPSourceGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectHTTPSourceHook),
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleHTTPSourceHook)),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleHTTPSourceHook)),
guide.NewMirrorSourceGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectMirrorSourceHook),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectValidateToConsoleMirrorSourceHook),
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleMirrorSourceHook)),
guide.NewMirrorCheckpointGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectMirrorSourceHook),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectValidateToConsoleMirrorSourceHook),
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleMirrorSourceHook)),
guide.NewMirrorHeartbeatGuide(opts...),
guide.NewMongoSourceGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectMongoDBHook),
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleMongoDBHook),
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleJSONSchemaHook)),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleMongoDBHook),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleJSONSchemaHook)),
guide.NewMongoSinkGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectMongoDBHook),
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleMongoDBHook),
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleJSONSchemaHook)),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleMongoDBHook),
guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleJSONSchemaHook)),
}
}

Expand Down
16 changes: 14 additions & 2 deletions backend/pkg/connector/interceptor/mirror_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,21 @@ func ConsoleToKafkaConnectMirrorSourceHook(config map[string]any) map[string]any
return config
}

// KafkaConnectToConsoleMirrorSourceHook adds MirrorMaker source specific config options
// KafkaConnectToConsoleMirrorSourceHook removed MirrorMaker source specific config options
func KafkaConnectToConsoleMirrorSourceHook(configs map[string]string) map[string]string {
result := make(map[string]string)
for key, value := range configs {
if key != "security.protocol" {
result[key] = value
}
}

return result
}

// KafkaConnectValidateToConsoleMirrorSourceHook adds MirrorMaker source specific config options
// missing in Validate Kafka Connect response
func KafkaConnectToConsoleMirrorSourceHook(response model.ValidationResponse, _ map[string]any) model.ValidationResponse {
func KafkaConnectValidateToConsoleMirrorSourceHook(response model.ValidationResponse, _ map[string]any) model.ValidationResponse {
securityProtocol := getConfig(&response, "security.protocol")

sasl := false
Expand Down
4 changes: 2 additions & 2 deletions backend/pkg/connector/interceptor/mirror_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1209,8 +1209,8 @@ func TestKafkaConnectToConsoleMirrorSourceHook(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := KafkaConnectToConsoleMirrorSourceHook(tt.args.response, tt.args.config); !reflect.DeepEqual(got, tt.want) {
t.Errorf("KafkaConnectToConsoleMirrorSourceHook() = %v, want %v", got, tt.want)
if got := KafkaConnectValidateToConsoleMirrorSourceHook(tt.args.response, tt.args.config); !reflect.DeepEqual(got, tt.want) {
t.Errorf("KafkaConnectValidateToConsoleMirrorSourceHook() = %v, want %v", got, tt.want)
}
})
}
Expand Down

0 comments on commit b1e07c1

Please sign in to comment.