Skip to content

Commit

Permalink
connectors: Iceberg sink connector guide, bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
bochenekmartin committed Aug 15, 2023
1 parent 9bb0eca commit 13dcf84
Show file tree
Hide file tree
Showing 7 changed files with 387 additions and 9 deletions.
85 changes: 85 additions & 0 deletions backend/pkg/connector/guide/iceberg_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package guide

import "github.com/redpanda-data/console/backend/pkg/connector/model"

// NewIcebergSinkGuide returns a new guide for Iceberg sink connector.
func NewIcebergSinkGuide(opts ...Option) Guide {
var o Options
for _, opt := range opts {
opt(&o)
}

return &WizardGuide{
className: "io.tabular.iceberg.connect.IcebergSinkConnector",
options: o,
wizardSteps: []model.ValidationResponseStep{
{
Name: "Topics to export",
Groups: []model.ValidationResponseStepGroup{
{
// No Group name and description here
ConfigKeys: []string{"topics", "topics.regex"},
},
},
},

{
Name: "Connection",
Groups: []model.ValidationResponseStepGroup{
{
ConfigKeys: []string{
"iceberg.catalog.type",
"iceberg.catalog.uri",
"iceberg.catalog.s3.access-key-id",
"iceberg.catalog.s3.secret-access-key",
"iceberg.catalog.client.region",
"iceberg.catalog.s3.path-style-access",
"iceberg.catalog.s3.endpoint",
},
},
},
},

{
Name: "Connector configuration",
Groups: []model.ValidationResponseStepGroup{
{
// No Group name and description here
ConfigKeys: append([]string{
"key.converter",
"key.converter.schemas.enable",
"value.converter",
"value.converter.schemas.enable",
"header.converter",
"iceberg.control.topic",
"iceberg.control.group.id",
"iceberg.catalog",
"iceberg.control.commitIntervalMs",
"iceberg.control.commitThreads",
"iceberg.control.commitTimeoutMs",
"iceberg.tables",
"iceberg.tables.cdcField",
"iceberg.tables.dynamic.enabled",
"iceberg.tables.routeField",
"iceberg.tables.upsertModeEnabled",
"consumer.override.auto.offset.reset",
}, dlq()...),
},
},
},

sizing(),

reviewAndLaunch(),
},
}
}
209 changes: 209 additions & 0 deletions backend/pkg/connector/interceptor/iceberg_sink_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package interceptor

import (
"github.com/redpanda-data/console/backend/pkg/connector/model"
)

// KafkaConnectToConsoleIcebergSinkHook adds Iceberg sink specific config options
// missing in Validate Kafka Connect response
func KafkaConnectToConsoleIcebergSinkHook(response model.ValidationResponse, config map[string]any) model.ValidationResponse {
response.Configs = append(response.Configs, model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Name: "iceberg.catalog.type",
Type: "STRING",
DefaultValue: "rest",
Importance: model.ConfigDefinitionImportanceMedium,
Required: false,
DisplayName: "Iceberg catalog type",
Documentation: "The Iceberg catalog type",
Dependents: []string{},
},
Value: model.ConfigDefinitionValue{
Name: "iceberg.catalog.type",
Value: "rest",
RecommendedValues: []string{},
Visible: true,
Errors: []string{},
},
Metadata: model.ConfigDefinitionMetadata{
ComponentType: model.ComponentRadioGroup,
RecommendedValues: []model.RecommendedValueWithMetadata{
{Value: "rest", DisplayName: "REST"},
{Value: "hive", DisplayName: "HIVE"},
{Value: "hadoop", DisplayName: "HADOOP"},
},
},
},
model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Name: "iceberg.catalog.uri",
Type: "STRING",
DefaultValue: "",
Importance: model.ConfigDefinitionImportanceHigh,
Required: false,
DisplayName: "Iceberg catalog uri",
Documentation: "Iceberg catalog uri",
Dependents: []string{},
},
Value: model.ConfigDefinitionValue{
Name: "iceberg.catalog.uri",
Value: "",
RecommendedValues: []string{},
Visible: true,
Errors: []string{},
},
},
model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Name: "iceberg.catalog.s3.secret-access-key",
Type: "PASSWORD",
DefaultValue: "",
Importance: model.ConfigDefinitionImportanceHigh,
Required: false,
DisplayName: "Iceberg catalog S3 Secret Access Key",
Documentation: "",
Dependents: []string{},
},
Value: model.ConfigDefinitionValue{
Name: "iceberg.catalog.s3.secret-access-key",
Value: "",
RecommendedValues: []string{},
Visible: true,
Errors: []string{},
},
},
model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Name: "iceberg.catalog.s3.access-key-id",
Type: "PASSWORD",
DefaultValue: "",
Importance: model.ConfigDefinitionImportanceHigh,
Required: false,
DisplayName: "Iceberg catalog S3 Access Key ID",
Documentation: "",
Dependents: []string{},
},
Value: model.ConfigDefinitionValue{
Name: "iceberg.catalog.s3.access-key-id",
Value: "",
RecommendedValues: []string{},
Visible: true,
Errors: []string{},
},
},
model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Name: "iceberg.catalog.s3.endpoint",
Type: "STRING",
DefaultValue: "",
Importance: model.ConfigDefinitionImportanceMedium,
Required: false,
DisplayName: "Iceberg catalog S3 endpoint",
Documentation: "",
Dependents: []string{},
},
Value: model.ConfigDefinitionValue{
Name: "iceberg.catalog.s3.endpoint",
Value: "",
RecommendedValues: []string{},
Visible: true,
Errors: []string{},
},
},
model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Name: "iceberg.catalog.s3.path-style-access",
Type: "BOOLEAN",
DefaultValue: "",
Importance: model.ConfigDefinitionImportanceHigh,
Required: false,
DisplayName: "Iceberg catalog S3 path style access",
Documentation: "",
Dependents: []string{},
},
Value: model.ConfigDefinitionValue{
Name: "iceberg.catalog.s3.path-style-access",
Value: "",
RecommendedValues: []string{},
Visible: true,
Errors: []string{},
},
},
model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Name: "iceberg.catalog.client.region",
Type: "STRING",
DefaultValue: "",
Importance: model.ConfigDefinitionImportanceHigh,
Required: false,
DisplayName: "Iceberg catalog client region",
Documentation: "",
Dependents: []string{},
},
Value: model.ConfigDefinitionValue{
Name: "iceberg.catalog.client.region",
Value: "",
RecommendedValues: []string{},
Visible: true,
Errors: []string{},
},
},
model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Name: "iceberg.catalog.credential",
Type: "STRING",
DefaultValue: "",
Importance: model.ConfigDefinitionImportanceHigh,
Required: false,
DisplayName: "Iceberg catalog credential",
Documentation: "",
Dependents: []string{},
},
Value: model.ConfigDefinitionValue{
Name: "iceberg.catalog.credential",
Value: "",
RecommendedValues: []string{},
Visible: true,
Errors: []string{},
},
},
model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Name: "iceberg.catalog.warehouse",
Type: "STRING",
DefaultValue: "",
Importance: model.ConfigDefinitionImportanceHigh,
Required: false,
DisplayName: "Iceberg catalog warehouse",
Dependents: []string{},
},
Value: model.ConfigDefinitionValue{
Name: "iceberg.catalog.warehouse",
Value: "",
RecommendedValues: []string{},
Visible: true,
Errors: []string{},
},
},
model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Name: "iceberg.catalog.uri",
Type: "STRING",
DefaultValue: "",
Importance: model.ConfigDefinitionImportanceHigh,
Required: false,
DisplayName: "Iceberg catalog URI",
Dependents: []string{},
},
Value: model.ConfigDefinitionValue{
Name: "iceberg.catalog.uri",
Value: "",
RecommendedValues: []string{},
Visible: true,
Errors: []string{},
},
},
)

return KafkaConnectToConsoleJSONSchemaHook(response, config)
}
2 changes: 2 additions & 0 deletions backend/pkg/connector/interceptor/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func CommunityPatches() []patch.ConfigPatch {
patch.NewConfigPatchJdbcSink(),
patch.NewConfigPatchJdbcSource(),
patch.NewConfigPatchHTTPSource(),
patch.NewConfigPatchIcebergSink(),
patch.NewConfigPatchMirrorSource(),
patch.NewConfigPatchMirrorHeartbeat(),
patch.NewConfigPatchMongoDB(),
Expand Down Expand Up @@ -88,6 +89,7 @@ func CommunityGuides(opts ...guide.Option) []guide.Guide {
guide.NewJdbcSourceGuide(guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleTopicCreationHook)),
guide.NewHTTPSourceGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectHTTPSourceHook),
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleHTTPSourceHook)),
guide.NewIcebergSinkGuide(guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleIcebergSinkHook)),
guide.NewMirrorSourceGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectMirrorSourceHook),
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleMirrorSourceHook)),
guide.NewMirrorCheckpointGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectMirrorSourceHook),
Expand Down
6 changes: 3 additions & 3 deletions backend/pkg/connector/patch/http_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ import (
)

// ConfigPatchHTTPSource is a config patch that includes changes that shall be applied to the
// BigQuery source connector.
// HTTP source connector.
type ConfigPatchHTTPSource struct {
ConfigurationKeySelector IncludeExcludeSelector
ConnectorClassSelector IncludeExcludeSelector
}

var _ ConfigPatch = (*ConfigPatchBigQuery)(nil)
var _ ConfigPatch = (*ConfigPatchHTTPSource)(nil)

// NewConfigPatchHTTPSource returns a new Patch for the BigQuery source connector.
// NewConfigPatchHTTPSource returns a new Patch for the HTTP source connector.
func NewConfigPatchHTTPSource() *ConfigPatchHTTPSource {
return &ConfigPatchHTTPSource{
ConfigurationKeySelector: IncludeExcludeSelector{
Expand Down
Loading

0 comments on commit 13dcf84

Please sign in to comment.