Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination redshift: Upgrade cdk #35316

Merged
merged 8 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
plugins {
id 'application'
id 'airbyte-java-connector'
id 'org.jetbrains.kotlin.jvm' version '1.9.22'
}

airbyteJavaConnector {
cdkVersionRequired = '0.23.2'
cdkVersionRequired = '0.23.11'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 2.1.8
dockerImageTag: 2.1.9
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftState;
import io.airbyte.integrations.destination.redshift.util.RedshiftUtil;
import java.time.Duration;
import java.util.HashMap;
Expand Down Expand Up @@ -115,8 +116,10 @@ protected JdbcSqlGenerator getSqlGenerator() {
}

@Override
protected JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database) {
return new RedshiftDestinationHandler(databaseName, database);
protected JdbcDestinationHandler<RedshiftState> getDestinationHandler(final String databaseName,
final JdbcDatabase database,
String rawTableSchema) {
return new RedshiftDestinationHandler(databaseName, database, rawTableSchema);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination;
Expand Down Expand Up @@ -50,6 +51,7 @@
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftState;
import io.airbyte.integrations.destination.redshift.util.RedshiftUtil;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
Expand All @@ -58,6 +60,7 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import javax.sql.DataSource;
Expand Down Expand Up @@ -176,8 +179,10 @@ protected JdbcSqlGenerator getSqlGenerator() {
}

@Override
protected JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database) {
return new RedshiftDestinationHandler(databaseName, database);
protected JdbcDestinationHandler<RedshiftState> getDestinationHandler(final String databaseName,
final JdbcDatabase database,
String rawTableSchema) {
return new RedshiftDestinationHandler(databaseName, database, rawTableSchema);
}

@Override
Expand Down Expand Up @@ -217,22 +222,26 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
final TyperDeduper typerDeduper;
final JdbcDatabase database = getDatabase(getDataSource(config));
final String databaseName = config.get(JdbcUtils.DATABASE_KEY).asText();
final RedshiftDestinationHandler redshiftDestinationHandler = new RedshiftDestinationHandler(databaseName, database);
final CatalogParser catalogParser;
final String rawNamespace;
if (TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).isPresent()) {
catalogParser = new CatalogParser(sqlGenerator, TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get());
rawNamespace = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get();
catalogParser = new CatalogParser(sqlGenerator, rawNamespace);
} else {
rawNamespace = JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
catalogParser = new CatalogParser(sqlGenerator);
}
final RedshiftDestinationHandler redshiftDestinationHandler = new RedshiftDestinationHandler(databaseName, database, rawNamespace);
parsedCatalog = catalogParser.parseCatalog(catalog);
final JdbcV1V2Migrator migrator = new JdbcV1V2Migrator(getNamingResolver(), database, databaseName);
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator);
typerDeduper =
new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
} else {
typerDeduper =
new DefaultTyperDeduper(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator);
new DefaultTyperDeduper<>(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
}
return StagingConsumerFactory.builder(
outputRecordCollector,
Expand All @@ -252,7 +261,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
/**
* Retrieves user configured file buffer amount so as long it doesn't exceed the maximum number of
* file buffers and sets the minimum number to the default
*
* <p>
* NOTE: If Out Of Memory Exceptions (OOME) occur, this can be a likely cause as this hard limit has
* not been thoroughly load tested across all instance sizes
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

package io.airbyte.integrations.destination.redshift.typing_deduping;

import static io.airbyte.cdk.integrations.base.JavaBaseConstants.*;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
Expand All @@ -20,12 +19,14 @@
import java.util.List;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.jooq.SQLDialect;

@Slf4j
public class RedshiftDestinationHandler extends JdbcDestinationHandler {
public class RedshiftDestinationHandler extends JdbcDestinationHandler<RedshiftState> {

public RedshiftDestinationHandler(final String databaseName, final JdbcDatabase jdbcDatabase) {
super(databaseName, jdbcDatabase);
public RedshiftDestinationHandler(final String databaseName, final JdbcDatabase jdbcDatabase, String rawNamespace) {
// :shrug: apparently this works better than using POSTGRES
super(databaseName, jdbcDatabase, rawNamespace, SQLDialect.DEFAULT);
}

@Override
Expand Down Expand Up @@ -69,6 +70,12 @@ protected String toJdbcTypeName(AirbyteType airbyteType) {
};
}

@Override
protected RedshiftState toDestinationState(JsonNode json) {
return new RedshiftState(
json.hasNonNull("needsSoftReset") && json.get("needsSoftReset").asBoolean());
}

private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) {
return switch (airbyteProtocolType) {
case STRING -> "varchar";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.redshift.typing_deduping

import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState

data class RedshiftState(val needsSoftReset: Boolean) : MinimumDestinationState {
override fun needsSoftReset(): Boolean {
return needsSoftReset
}

override fun <T : MinimumDestinationState> withSoftReset(needsSoftReset: Boolean): T {
return copy(needsSoftReset = needsSoftReset) as T
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.destination.redshift.RedshiftInsertDestination;
import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer;
Expand All @@ -46,7 +46,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class RedshiftSqlGeneratorIntegrationTest extends JdbcSqlGeneratorIntegrationTest {
public class RedshiftSqlGeneratorIntegrationTest extends JdbcSqlGeneratorIntegrationTest<RedshiftState> {

/**
* Redshift's JDBC driver doesn't map certain data types onto {@link java.sql.JDBCType} usefully.
Expand Down Expand Up @@ -151,8 +151,8 @@ protected DSLContext getDslContext() {
}

@Override
protected DestinationHandler getDestinationHandler() {
return new RedshiftDestinationHandler(databaseName, database);
protected DestinationHandler<RedshiftState> getDestinationHandler() {
return new RedshiftDestinationHandler(databaseName, database, namespace);
}

@Override
Expand Down Expand Up @@ -180,11 +180,11 @@ protected Field<?> toJsonValue(final String valueAsString) {
public void testCreateTableIncremental() throws Exception {
final Sql sql = generator.createTable(incrementalDedupStream, "", false);
destinationHandler.execute(sql);
List<DestinationInitialState> initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
assertEquals(1, initialStates.size());
final DestinationInitialState initialState = initialStates.getFirst();
assertTrue(initialState.isFinalTablePresent());
assertFalse(initialState.isSchemaMismatch());
List<DestinationInitialStatus<RedshiftState>> initialStatuses = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
assertEquals(1, initialStatuses.size());
final DestinationInitialStatus<RedshiftState> initialStatus = initialStatuses.getFirst();
assertTrue(initialStatus.isFinalTablePresent());
assertFalse(initialStatus.isSchemaMismatch());
// TODO assert on table clustering, etc.
}

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.1.9 | 2024-03-04 | [\#35316](https://github.com/airbytehq/airbyte/pull/35316) | Update to CDK 0.23.11; Adopt migration framework |
| 2.1.8 | 2024-02-09 | [\#35354](https://github.com/airbytehq/airbyte/pull/35354) | Update to CDK 0.23.0; Gather required initial state upfront, remove dependency on svv_table_info for table empty check |
| 2.1.7 | 2024-02-09 | [\#34562](https://github.com/airbytehq/airbyte/pull/34562) | Switch back to jooq-based sql execution for standard insert |
| 2.1.6 | 2024-02-08 | [\#34502](https://github.com/airbytehq/airbyte/pull/34502) | Update to CDK version 0.17.0 |
Expand Down
Loading