diff --git a/backend/pkg/connector/guide/iceberg_sink.go b/backend/pkg/connector/guide/iceberg_sink.go new file mode 100644 index 000000000..80d792ab9 --- /dev/null +++ b/backend/pkg/connector/guide/iceberg_sink.go @@ -0,0 +1,85 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package guide + +import "github.com/redpanda-data/console/backend/pkg/connector/model" + +// NewIcebergSinkGuide returns a new guide for Iceberg sink connector. +func NewIcebergSinkGuide(opts ...Option) Guide { + var o Options + for _, opt := range opts { + opt(&o) + } + + return &WizardGuide{ + className: "io.tabular.iceberg.connect.IcebergSinkConnector", + options: o, + wizardSteps: []model.ValidationResponseStep{ + { + Name: "Topics to export", + Groups: []model.ValidationResponseStepGroup{ + { + // No Group name and description here + ConfigKeys: []string{"topics", "topics.regex"}, + }, + }, + }, + + { + Name: "Connection", + Groups: []model.ValidationResponseStepGroup{ + { + ConfigKeys: []string{ + "iceberg.catalog.type", + "iceberg.catalog.uri", + "iceberg.catalog.s3.access-key-id", + "iceberg.catalog.s3.secret-access-key", + "iceberg.catalog.client.region", + "iceberg.catalog.s3.path-style-access", + "iceberg.catalog.s3.endpoint", + }, + }, + }, + }, + + { + Name: "Connector configuration", + Groups: []model.ValidationResponseStepGroup{ + { + // No Group name and description here + ConfigKeys: append([]string{ + "key.converter", + "key.converter.schemas.enable", + "value.converter", + "value.converter.schemas.enable", + "header.converter", + "iceberg.control.topic", + "iceberg.control.group.id", + "iceberg.catalog", + "iceberg.control.commitIntervalMs", + "iceberg.control.commitThreads", + "iceberg.control.commitTimeoutMs", + "iceberg.tables", + "iceberg.tables.cdcField", + "iceberg.tables.dynamic.enabled", + "iceberg.tables.routeField", + "iceberg.tables.upsertModeEnabled", + "consumer.override.auto.offset.reset", + }, dlq()...), + }, + }, + }, + + sizing(), + + reviewAndLaunch(), + }, + } +} diff --git a/backend/pkg/connector/interceptor/iceberg_sink_hook.go b/backend/pkg/connector/interceptor/iceberg_sink_hook.go new file mode 100644 index 000000000..4c6a611b9 --- /dev/null +++ b/backend/pkg/connector/interceptor/iceberg_sink_hook.go @@ -0,0 +1,209 @@ +package interceptor + +import ( + "github.com/redpanda-data/console/backend/pkg/connector/model" +) + +// KafkaConnectToConsoleIcebergSinkHook adds Iceberg sink specific config options +// missing in Validate Kafka Connect response +func KafkaConnectToConsoleIcebergSinkHook(response model.ValidationResponse, config map[string]any) model.ValidationResponse { + response.Configs = append(response.Configs, model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.type", + Type: "STRING", + DefaultValue: "rest", + Importance: model.ConfigDefinitionImportanceMedium, + Required: false, + DisplayName: "Iceberg catalog type", + Documentation: "The Iceberg catalog type", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.type", + Value: "rest", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + Metadata: model.ConfigDefinitionMetadata{ + ComponentType: model.ComponentRadioGroup, + RecommendedValues: []model.RecommendedValueWithMetadata{ + {Value: "rest", DisplayName: "REST"}, + {Value: "hive", DisplayName: "HIVE"}, + {Value: "hadoop", DisplayName: "HADOOP"}, + }, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.uri", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: false, + DisplayName: "Iceberg catalog uri", + Documentation: "Iceberg catalog uri", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.uri", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.s3.secret-access-key", + Type: "PASSWORD", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: false, + DisplayName: "Iceberg catalog S3 Secret Access Key", + Documentation: "", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.s3.secret-access-key", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.s3.access-key-id", + Type: "PASSWORD", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: false, + DisplayName: "Iceberg catalog S3 Access Key ID", + Documentation: "", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.s3.access-key-id", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.s3.endpoint", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceMedium, + Required: false, + DisplayName: "Iceberg catalog S3 endpoint", + Documentation: "", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.s3.endpoint", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.s3.path-style-access", + Type: "BOOLEAN", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: false, + DisplayName: "Iceberg catalog S3 path style access", + Documentation: "", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.s3.path-style-access", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.client.region", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: false, + DisplayName: "Iceberg catalog client region", + Documentation: "", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.client.region", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.credential", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: false, + DisplayName: "Iceberg catalog credential", + Documentation: "", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.credential", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.warehouse", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: false, + DisplayName: "Iceberg catalog warehouse", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.warehouse", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.uri", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: false, + DisplayName: "Iceberg catalog URI", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.uri", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + ) + + return KafkaConnectToConsoleJSONSchemaHook(response, config) +} diff --git a/backend/pkg/connector/interceptor/interceptor.go b/backend/pkg/connector/interceptor/interceptor.go index d00e1f3c7..46b3b7678 100644 --- a/backend/pkg/connector/interceptor/interceptor.go +++ b/backend/pkg/connector/interceptor/interceptor.go @@ -57,6 +57,7 @@ func CommunityPatches() []patch.ConfigPatch { patch.NewConfigPatchJdbcSink(), patch.NewConfigPatchJdbcSource(), patch.NewConfigPatchHTTPSource(), + patch.NewConfigPatchIcebergSink(), patch.NewConfigPatchMirrorSource(), patch.NewConfigPatchMirrorHeartbeat(), patch.NewConfigPatchMongoDB(), @@ -88,6 +89,7 @@ func CommunityGuides(opts ...guide.Option) []guide.Guide { guide.NewJdbcSourceGuide(guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleTopicCreationHook)), guide.NewHTTPSourceGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectHTTPSourceHook), guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleHTTPSourceHook)), + guide.NewIcebergSinkGuide(guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleIcebergSinkHook)), guide.NewMirrorSourceGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectMirrorSourceHook), guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleMirrorSourceHook)), guide.NewMirrorCheckpointGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectMirrorSourceHook), diff --git a/backend/pkg/connector/patch/http_source.go b/backend/pkg/connector/patch/http_source.go index 129553556..5ebcff06c 100644 --- a/backend/pkg/connector/patch/http_source.go +++ b/backend/pkg/connector/patch/http_source.go @@ -18,15 +18,15 @@ import ( ) // ConfigPatchHTTPSource is a config patch that includes changes that shall be applied to the -// BigQuery source connector. +// HTTP source connector. type ConfigPatchHTTPSource struct { ConfigurationKeySelector IncludeExcludeSelector ConnectorClassSelector IncludeExcludeSelector } -var _ ConfigPatch = (*ConfigPatchBigQuery)(nil) +var _ ConfigPatch = (*ConfigPatchHTTPSource)(nil) -// NewConfigPatchHTTPSource returns a new Patch for the BigQuery source connector. +// NewConfigPatchHTTPSource returns a new Patch for the HTTP source connector. func NewConfigPatchHTTPSource() *ConfigPatchHTTPSource { return &ConfigPatchHTTPSource{ ConfigurationKeySelector: IncludeExcludeSelector{ diff --git a/backend/pkg/connector/patch/iceberg_sink.go b/backend/pkg/connector/patch/iceberg_sink.go new file mode 100644 index 000000000..af871f5a9 --- /dev/null +++ b/backend/pkg/connector/patch/iceberg_sink.go @@ -0,0 +1,82 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package patch + +import ( + "regexp" + "strings" + + "github.com/redpanda-data/console/backend/pkg/connector/model" + "github.com/redpanda-data/console/backend/pkg/random" +) + +// ConfigPatchIcebergSink is a config patch that includes changes that shall be applied to the +// Iceberg sink connector. +type ConfigPatchIcebergSink struct { + ConfigurationKeySelector IncludeExcludeSelector + ConnectorClassSelector IncludeExcludeSelector +} + +var _ ConfigPatch = (*ConfigPatchIcebergSink)(nil) + +// NewConfigPatchIcebergSink returns a new Patch for the Iceberg sink connector. +func NewConfigPatchIcebergSink() *ConfigPatchIcebergSink { + return &ConfigPatchIcebergSink{ + ConfigurationKeySelector: IncludeExcludeSelector{ + Include: regexp.MustCompile(`.*`), + Exclude: nil, + }, + ConnectorClassSelector: IncludeExcludeSelector{ + Include: regexp.MustCompile(`io.tabular.iceberg.connect.IcebergSinkConnector`), + Exclude: nil, + }, + } +} + +// IsMatch implements the ConfigPatch.IsMatch interface. +func (c *ConfigPatchIcebergSink) IsMatch(configKey, connectorClass string) bool { + return c.ConfigurationKeySelector.IsMatch(configKey) && c.ConnectorClassSelector.IsMatch(connectorClass) +} + +// PatchDefinition implements the ConfigPatch.PatchDefinition interface. +func (*ConfigPatchIcebergSink) PatchDefinition(d model.ConfigDefinition, _ string) model.ConfigDefinition { + // Misc patches + switch d.Definition.Name { + case keyConverter: + d.SetImportance(model.ConfigDefinitionImportanceHigh). + ClearRecommendedValuesWithMetadata(). + AddRecommendedValueWithMetadata("io.confluent.connect.avro.AvroConverter", "AVRO"). + AddRecommendedValueWithMetadata("org.apache.kafka.connect.json.JsonConverter", "JSON"). + SetDefaultValue("org.apache.kafka.connect.json.JsonConverter") + case valueConverter: + d.ClearRecommendedValuesWithMetadata(). + AddRecommendedValueWithMetadata("io.confluent.connect.avro.AvroConverter", "AVRO"). + AddRecommendedValueWithMetadata("org.apache.kafka.connect.json.JsonConverter", "JSON"). + SetDefaultValue("org.apache.kafka.connect.json.JsonConverter") + case name: + d.SetDefaultValue("iceberg-sink-connector-" + strings.ToLower(random.String(4))) + case "iceberg.control.group.id": + d.SetVisible(false) + case "iceberg.control.commitIntervalMs": + d.SetDisplayName("Iceberg commit interval ms") + case "iceberg.control.commitThreads": + d.SetDisplayName("Iceberg commit threads") + case "iceberg.control.commitTimeoutMs": + d.SetDisplayName("Iceberg commit timeout ms") + case "iceberg.tables.upsertModeEnabled": + d.SetDisplayName("Iceberg tables upsert mode enabled") + case "iceberg.control.topic": + d.SetImportance(model.ConfigDefinitionImportanceHigh). + SetDocumentation("Name of the control topic. Control topic has to exist before creating the connector. It has to be unique for each Iceberg connector working in the same cluster"). + SetDefaultValue("iceberg-connector-control-" + strings.ToLower(random.String(4))) + } + + return d +} diff --git a/backend/pkg/connector/patch/jdbc_sink.go b/backend/pkg/connector/patch/jdbc_sink.go index e10e7f88f..a43631667 100644 --- a/backend/pkg/connector/patch/jdbc_sink.go +++ b/backend/pkg/connector/patch/jdbc_sink.go @@ -18,15 +18,15 @@ import ( ) // ConfigPatchJdbcSink is a config patch that includes changes that shall be applied to the -// BigQuery sink connector. +// JDBC sink connector. type ConfigPatchJdbcSink struct { ConfigurationKeySelector IncludeExcludeSelector ConnectorClassSelector IncludeExcludeSelector } -var _ ConfigPatch = (*ConfigPatchBigQuery)(nil) +var _ ConfigPatch = (*ConfigPatchJdbcSink)(nil) -// NewConfigPatchJdbcSink returns a new Patch for the BigQuery sink connector. +// NewConfigPatchJdbcSink returns a new Patch for the JDBC sink connector. func NewConfigPatchJdbcSink() *ConfigPatchJdbcSink { return &ConfigPatchJdbcSink{ ConfigurationKeySelector: IncludeExcludeSelector{ diff --git a/backend/pkg/connector/patch/jdbc_source.go b/backend/pkg/connector/patch/jdbc_source.go index 15e3666c2..4c54e75ff 100644 --- a/backend/pkg/connector/patch/jdbc_source.go +++ b/backend/pkg/connector/patch/jdbc_source.go @@ -18,15 +18,15 @@ import ( ) // ConfigPatchJdbcSource is a config patch that includes changes that shall be applied to the -// BigQuery source connector. +// JDBC source connector. type ConfigPatchJdbcSource struct { ConfigurationKeySelector IncludeExcludeSelector ConnectorClassSelector IncludeExcludeSelector } -var _ ConfigPatch = (*ConfigPatchBigQuery)(nil) +var _ ConfigPatch = (*ConfigPatchJdbcSource)(nil) -// NewConfigPatchJdbcSource returns a new Patch for the BigQuery source connector. +// NewConfigPatchJdbcSource returns a new Patch for the JDBC source connector. func NewConfigPatchJdbcSource() *ConfigPatchJdbcSource { return &ConfigPatchJdbcSource{ ConfigurationKeySelector: IncludeExcludeSelector{