From 555d838a545fe1e2cc81878918c5bbcd6cfb8a96 Mon Sep 17 00:00:00 2001 From: Martin Bochenek Date: Thu, 3 Aug 2023 22:39:37 +0200 Subject: [PATCH] connectors: Iceberg sink connector guide, bugfixes --- backend/pkg/connector/guide/iceberg_sink.go | 84 +++++++ .../interceptor/iceberg_sink_hook.go | 220 ++++++++++++++++++ .../pkg/connector/interceptor/interceptor.go | 2 + backend/pkg/connector/patch/http_source.go | 6 +- backend/pkg/connector/patch/iceberg_sink.go | 81 +++++++ backend/pkg/connector/patch/jdbc_sink.go | 6 +- backend/pkg/connector/patch/jdbc_source.go | 6 +- .../pkg/connector/patch/redpanda_s3_sink.go | 2 +- backend/pkg/connector/patch/util.go | 3 +- .../src/components/pages/connect/helper.tsx | 2 +- 10 files changed, 400 insertions(+), 12 deletions(-) create mode 100644 backend/pkg/connector/guide/iceberg_sink.go create mode 100644 backend/pkg/connector/interceptor/iceberg_sink_hook.go create mode 100644 backend/pkg/connector/patch/iceberg_sink.go diff --git a/backend/pkg/connector/guide/iceberg_sink.go b/backend/pkg/connector/guide/iceberg_sink.go new file mode 100644 index 000000000..42219a04f --- /dev/null +++ b/backend/pkg/connector/guide/iceberg_sink.go @@ -0,0 +1,84 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package guide + +import "github.com/redpanda-data/console/backend/pkg/connector/model" + +// NewIcebergSinkGuide returns a new guide for Iceberg sink connector. +func NewIcebergSinkGuide(opts ...Option) Guide { + var o Options + for _, opt := range opts { + opt(&o) + } + + return &WizardGuide{ + className: "io.tabular.iceberg.connect.IcebergSinkConnector", + wizardSteps: []model.ValidationResponseStep{ + { + Name: "Topics to export", + Groups: []model.ValidationResponseStepGroup{ + { + // No Group name and description here + ConfigKeys: []string{"topics", "topics.regex"}, + }, + }, + }, + + { + Name: "Connection", + Groups: []model.ValidationResponseStepGroup{ + { + ConfigKeys: []string{ + "iceberg.catalog.type", + "iceberg.catalog.uri", + "iceberg.catalog.s3.access-key-id", + "iceberg.catalog.s3.secret-access-key", + "iceberg.catalog.client.region", + "iceberg.catalog.s3.path-style-access", + "iceberg.catalog.s3.endpoint", + }, + }, + }, + }, + + { + Name: "Connector configuration", + Groups: []model.ValidationResponseStepGroup{ + { + // No Group name and description here + ConfigKeys: append([]string{ + "key.converter", + "key.converter.schemas.enable", + "value.converter", + "value.converter.schemas.enable", + "header.converter", + "iceberg.control.topic", + "iceberg.control.group.id", + "iceberg.catalog", + "iceberg.control.commitIntervalMs", + "iceberg.control.commitThreads", + "iceberg.control.commitTimeoutMs", + "iceberg.tables", + "iceberg.tables.cdcField", + "iceberg.tables.dynamic.enabled", + "iceberg.tables.routeField", + "iceberg.tables.upsertModeEnabled", + "consumer.override.auto.offset.reset", + }, dlq()...), + }, + }, + }, + + sizing(), + + reviewAndLaunch(), + }, + } +} diff --git a/backend/pkg/connector/interceptor/iceberg_sink_hook.go b/backend/pkg/connector/interceptor/iceberg_sink_hook.go new file mode 100644 index 000000000..fa9d0482d --- /dev/null +++ b/backend/pkg/connector/interceptor/iceberg_sink_hook.go @@ -0,0 +1,220 @@ +package interceptor + +import ( + "github.com/redpanda-data/console/backend/pkg/connector/model" + "github.com/redpanda-data/console/backend/pkg/connector/patch" +) + +// KafkaConnectToConsoleIcebergSinkHook adds Iceberg sink specific config options +// missing in Validate Kafka Connect response +func KafkaConnectToConsoleIcebergSinkHook(response model.ValidationResponse, config map[string]any) model.ValidationResponse { + response.Configs = append(response.Configs, model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.type", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: false, + DisplayName: "Iceberg catalog type", + Documentation: "To set the catalog type. For other catalog types, you need to instead set 'iceberg.catalog.catalog-impl' to the name of the catalog class", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.type", + Value: "rest", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + Metadata: model.ConfigDefinitionMetadata{ + ComponentType: model.ComponentRadioGroup, + RecommendedValues: []model.RecommendedValueWithMetadata{ + {Value: "rest", DisplayName: "REST"}, + {Value: "hive", DisplayName: "HIVE"}, + {Value: "hadoop", DisplayName: "HADOOP"}, + }, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.uri", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: false, + DisplayName: "Iceberg catalog uri", + Documentation: "Iceberg REST catalog uri", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.uri", + Value: "", + RecommendedValues: []string{}, + Visible: isRestCatalogType(response.Configs), + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.s3.secret-access-key", + Type: "PASSWORD", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: false, + DisplayName: "Iceberg REST catalog S3 Secret Access Key", + Documentation: "Use for Iceberg REST catalog type. Use JSON configuration for other catalog types", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.s3.secret-access-key", + Value: "", + RecommendedValues: []string{}, + Visible: isRestCatalogType(response.Configs), + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.s3.access-key-id", + Type: "PASSWORD", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: false, + DisplayName: "Iceberg REST catalog S3 Access Key ID", + Documentation: "Use for Iceberg REST catalog type. Use JSON configuration for other catalog types", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.s3.access-key-id", + Value: "", + RecommendedValues: []string{}, + Visible: isRestCatalogType(response.Configs), + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.s3.endpoint", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceMedium, + Required: false, + DisplayName: "Iceberg REST catalog S3 endpoint", + Documentation: "Use for Iceberg REST catalog type. Use JSON configuration for other catalog types", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.s3.endpoint", + Value: "", + RecommendedValues: []string{}, + Visible: isRestCatalogType(response.Configs), + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.s3.path-style-access", + Type: "BOOLEAN", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: false, + DisplayName: "Iceberg REST catalog S3 path style access", + Documentation: "Use for Iceberg REST catalog type. Use JSON configuration for other catalog types", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.s3.path-style-access", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.client.region", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: false, + DisplayName: "Iceberg REST catalog client region", + Documentation: "Use for Iceberg REST catalog type. Use JSON configuration for other catalog types", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.client.region", + Value: "us-east-1", + RecommendedValues: patch.AwsRegions, + Visible: true, + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.credential", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: false, + DisplayName: "Iceberg catalog credential", + Documentation: "Iceberg catalog credential", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.credential", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.warehouse", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: false, + DisplayName: "Iceberg catalog warehouse", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.warehouse", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.uri", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: false, + DisplayName: "Iceberg REST catalog URI", + Documentation: "Use for Iceberg REST catalog type. Use JSON configuration for other catalog types", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.uri", + Value: "", + RecommendedValues: []string{}, + Visible: isRestCatalogType(response.Configs), + Errors: []string{}, + }, + }, + ) + + return KafkaConnectToConsoleJSONSchemaHook(response, config) +} + +func isRestCatalogType(configs []model.ConfigDefinition) bool { + for _, config := range configs { + if config.Value.Name == "iceberg.catalog.type" { + return config.Value.Value == "rest" + } + } + return true +} diff --git a/backend/pkg/connector/interceptor/interceptor.go b/backend/pkg/connector/interceptor/interceptor.go index 572af76b8..cfa463a1c 100644 --- a/backend/pkg/connector/interceptor/interceptor.go +++ b/backend/pkg/connector/interceptor/interceptor.go @@ -57,6 +57,7 @@ func CommunityPatches() []patch.ConfigPatch { patch.NewConfigPatchJdbcSink(), patch.NewConfigPatchJdbcSource(), patch.NewConfigPatchHTTPSource(), + patch.NewConfigPatchIcebergSink(), patch.NewConfigPatchMirrorSource(), patch.NewConfigPatchMirrorHeartbeat(), patch.NewConfigPatchMongoDB(), @@ -81,6 +82,7 @@ func CommunityGuides(opts ...guide.Option) []guide.Guide { ), guide.NewDebeziumMySQLGuide(guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleDebeziumMysqlSourceHook), guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleCloudEventsConverterHook)), + guide.NewIcebergSinkGuide(guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleIcebergSinkHook)), guide.NewSnowflakeSinkGuide(guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleSnowflakeHook)), guide.NewBigQuerySinkGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectBigQueryHook), guide.WithKafkaConnectValidateToConsoleHookFn(KafkaConnectToConsoleJSONSchemaHook)), diff --git a/backend/pkg/connector/patch/http_source.go b/backend/pkg/connector/patch/http_source.go index 129553556..5ebcff06c 100644 --- a/backend/pkg/connector/patch/http_source.go +++ b/backend/pkg/connector/patch/http_source.go @@ -18,15 +18,15 @@ import ( ) // ConfigPatchHTTPSource is a config patch that includes changes that shall be applied to the -// BigQuery source connector. +// HTTP source connector. type ConfigPatchHTTPSource struct { ConfigurationKeySelector IncludeExcludeSelector ConnectorClassSelector IncludeExcludeSelector } -var _ ConfigPatch = (*ConfigPatchBigQuery)(nil) +var _ ConfigPatch = (*ConfigPatchHTTPSource)(nil) -// NewConfigPatchHTTPSource returns a new Patch for the BigQuery source connector. +// NewConfigPatchHTTPSource returns a new Patch for the HTTP source connector. func NewConfigPatchHTTPSource() *ConfigPatchHTTPSource { return &ConfigPatchHTTPSource{ ConfigurationKeySelector: IncludeExcludeSelector{ diff --git a/backend/pkg/connector/patch/iceberg_sink.go b/backend/pkg/connector/patch/iceberg_sink.go new file mode 100644 index 000000000..a48755b4c --- /dev/null +++ b/backend/pkg/connector/patch/iceberg_sink.go @@ -0,0 +1,81 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package patch + +import ( + "regexp" + "strings" + + "github.com/redpanda-data/console/backend/pkg/connector/model" + "github.com/redpanda-data/console/backend/pkg/random" +) + +// ConfigPatchIcebergSink is a config patch that includes changes that shall be applied to the +// Iceberg sink connector. +type ConfigPatchIcebergSink struct { + ConfigurationKeySelector IncludeExcludeSelector + ConnectorClassSelector IncludeExcludeSelector +} + +var _ ConfigPatch = (*ConfigPatchIcebergSink)(nil) + +// NewConfigPatchIcebergSink returns a new Patch for the Iceberg sink connector. +func NewConfigPatchIcebergSink() *ConfigPatchIcebergSink { + return &ConfigPatchIcebergSink{ + ConfigurationKeySelector: IncludeExcludeSelector{ + Include: regexp.MustCompile(`.*`), + Exclude: nil, + }, + ConnectorClassSelector: IncludeExcludeSelector{ + Include: regexp.MustCompile(`io.tabular.iceberg.connect.IcebergSinkConnector`), + Exclude: nil, + }, + } +} + +// IsMatch implements the ConfigPatch.IsMatch interface. +func (c *ConfigPatchIcebergSink) IsMatch(configKey, connectorClass string) bool { + return c.ConfigurationKeySelector.IsMatch(configKey) && c.ConnectorClassSelector.IsMatch(connectorClass) +} + +// PatchDefinition implements the ConfigPatch.PatchDefinition interface. +func (*ConfigPatchIcebergSink) PatchDefinition(d model.ConfigDefinition, _ string) model.ConfigDefinition { + // Misc patches + switch d.Definition.Name { + case keyConverter: + d.SetImportance(model.ConfigDefinitionImportanceHigh). + ClearRecommendedValuesWithMetadata(). + AddRecommendedValueWithMetadata("io.confluent.connect.avro.AvroConverter", "AVRO"). + AddRecommendedValueWithMetadata("org.apache.kafka.connect.json.JsonConverter", "JSON"). + SetDefaultValue("org.apache.kafka.connect.json.JsonConverter") + case valueConverter: + d.ClearRecommendedValuesWithMetadata(). + AddRecommendedValueWithMetadata("io.confluent.connect.avro.AvroConverter", "AVRO"). + AddRecommendedValueWithMetadata("org.apache.kafka.connect.json.JsonConverter", "JSON"). + SetDefaultValue("org.apache.kafka.connect.json.JsonConverter") + case name: + d.SetDefaultValue("iceberg-sink-connector-" + strings.ToLower(random.String(4))) + case "iceberg.control.commitIntervalMs": + d.SetDisplayName("Iceberg commit interval ms") + case "iceberg.control.commitThreads": + d.SetDisplayName("Iceberg commit threads") + case "iceberg.control.commitTimeoutMs": + d.SetDisplayName("Iceberg commit timeout ms") + case "iceberg.tables.upsertModeEnabled": + d.SetDisplayName("Iceberg tables upsert mode enabled") + case "iceberg.control.topic": + d.SetImportance(model.ConfigDefinitionImportanceHigh). + SetDocumentation("Name of the control topic. Control topic has to exist before creating the connector. It has to be unique for each Iceberg connector working in the same cluster"). + SetDefaultValue("iceberg-connector-control-" + strings.ToLower(random.String(4))). + SetRequired(true) + } + + return d +} diff --git a/backend/pkg/connector/patch/jdbc_sink.go b/backend/pkg/connector/patch/jdbc_sink.go index e10e7f88f..a43631667 100644 --- a/backend/pkg/connector/patch/jdbc_sink.go +++ b/backend/pkg/connector/patch/jdbc_sink.go @@ -18,15 +18,15 @@ import ( ) // ConfigPatchJdbcSink is a config patch that includes changes that shall be applied to the -// BigQuery sink connector. +// JDBC sink connector. type ConfigPatchJdbcSink struct { ConfigurationKeySelector IncludeExcludeSelector ConnectorClassSelector IncludeExcludeSelector } -var _ ConfigPatch = (*ConfigPatchBigQuery)(nil) +var _ ConfigPatch = (*ConfigPatchJdbcSink)(nil) -// NewConfigPatchJdbcSink returns a new Patch for the BigQuery sink connector. +// NewConfigPatchJdbcSink returns a new Patch for the JDBC sink connector. func NewConfigPatchJdbcSink() *ConfigPatchJdbcSink { return &ConfigPatchJdbcSink{ ConfigurationKeySelector: IncludeExcludeSelector{ diff --git a/backend/pkg/connector/patch/jdbc_source.go b/backend/pkg/connector/patch/jdbc_source.go index 15e3666c2..4c54e75ff 100644 --- a/backend/pkg/connector/patch/jdbc_source.go +++ b/backend/pkg/connector/patch/jdbc_source.go @@ -18,15 +18,15 @@ import ( ) // ConfigPatchJdbcSource is a config patch that includes changes that shall be applied to the -// BigQuery source connector. +// JDBC source connector. type ConfigPatchJdbcSource struct { ConfigurationKeySelector IncludeExcludeSelector ConnectorClassSelector IncludeExcludeSelector } -var _ ConfigPatch = (*ConfigPatchBigQuery)(nil) +var _ ConfigPatch = (*ConfigPatchJdbcSource)(nil) -// NewConfigPatchJdbcSource returns a new Patch for the BigQuery source connector. +// NewConfigPatchJdbcSource returns a new Patch for the JDBC source connector. func NewConfigPatchJdbcSource() *ConfigPatchJdbcSource { return &ConfigPatchJdbcSource{ ConfigurationKeySelector: IncludeExcludeSelector{ diff --git a/backend/pkg/connector/patch/redpanda_s3_sink.go b/backend/pkg/connector/patch/redpanda_s3_sink.go index b7510c1f5..4e2515092 100644 --- a/backend/pkg/connector/patch/redpanda_s3_sink.go +++ b/backend/pkg/connector/patch/redpanda_s3_sink.go @@ -80,7 +80,7 @@ func (*ConfigPatchRedpandaS3) PatchDefinition(d model.ConfigDefinition, _ string case "aws.s3.bucket.name": d.SetDocumentation("") case "aws.s3.region": - d.SetRecommendedValues(awsRegions) + d.SetRecommendedValues(AwsRegions) d.SetDocumentation("") case name: d.SetDefaultValue("s3-connector-" + strings.ToLower(random.String(4))) diff --git a/backend/pkg/connector/patch/util.go b/backend/pkg/connector/patch/util.go index edcbe5532..eb9a8f325 100644 --- a/backend/pkg/connector/patch/util.go +++ b/backend/pkg/connector/patch/util.go @@ -14,7 +14,8 @@ import ( "strings" ) -var awsRegions = []string{ +// AwsRegions static list of AWS regions used in connectors configs UI dropdowns +var AwsRegions = []string{ "us-east-1", "us-east-2", "us-west-1", diff --git a/frontend/src/components/pages/connect/helper.tsx b/frontend/src/components/pages/connect/helper.tsx index 678fc29cd..462290bba 100644 --- a/frontend/src/components/pages/connect/helper.tsx +++ b/frontend/src/components/pages/connect/helper.tsx @@ -262,7 +262,7 @@ const connectorMetadata: ConnectorMetadata[] = [ author: 'Tabular', friendlyName: 'Iceberg', description: 'Exports messages to Iceberg tables', - learnMoreLink: 'https://docs.redpanda.com/docs/deploy/deployment-option/cloud/managed-connectors/' + learnMoreLink: 'https://docs.redpanda.com/docs/deploy/deployment-option/cloud/managed-connectors/create-iceberg-sink-connector/' } as const, // JMS Connectors