diff --git a/airbyte-integrations/connectors/destination-redshift/build.gradle b/airbyte-integrations/connectors/destination-redshift/build.gradle index 298b24ec4012..c55b76b11eb9 100644 --- a/airbyte-integrations/connectors/destination-redshift/build.gradle +++ b/airbyte-integrations/connectors/destination-redshift/build.gradle @@ -1,10 +1,11 @@ plugins { id 'application' id 'airbyte-java-connector' + id 'org.jetbrains.kotlin.jvm' version '1.9.22' } airbyteJavaConnector { - cdkVersionRequired = '0.23.2' + cdkVersionRequired = '0.23.11' features = ['db-destinations', 's3-destinations', 'typing-deduping'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-redshift/metadata.yaml b/airbyte-integrations/connectors/destination-redshift/metadata.yaml index 368369cfe5d2..909685693541 100644 --- a/airbyte-integrations/connectors/destination-redshift/metadata.yaml +++ b/airbyte-integrations/connectors/destination-redshift/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc - dockerImageTag: 2.1.8 + dockerImageTag: 2.1.9 dockerRepository: airbyte/destination-redshift documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift githubIssueLabel: destination-redshift diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java index a4ba7a669557..81521b03b9fa 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java @@ -22,6 +22,7 @@ import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler; import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator; +import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftState; import io.airbyte.integrations.destination.redshift.util.RedshiftUtil; import java.time.Duration; import java.util.HashMap; @@ -115,8 +116,10 @@ protected JdbcSqlGenerator getSqlGenerator() { } @Override - protected JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database) { - return new RedshiftDestinationHandler(databaseName, database); + protected JdbcDestinationHandler getDestinationHandler(final String databaseName, + final JdbcDatabase database, + String rawTableSchema) { + return new RedshiftDestinationHandler(databaseName, database, rawTableSchema); } } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java index 16189ce2004b..97e8b4393890 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java @@ -20,6 +20,7 @@ import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer; import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.cdk.integrations.base.Destination; +import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer; import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag; import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination; @@ -50,6 +51,7 @@ import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler; import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator; +import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftState; import io.airbyte.integrations.destination.redshift.util.RedshiftUtil; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; @@ -58,6 +60,7 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import java.time.Duration; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.Consumer; import javax.sql.DataSource; @@ -176,8 +179,10 @@ protected JdbcSqlGenerator getSqlGenerator() { } @Override - protected JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database) { - return new RedshiftDestinationHandler(databaseName, database); + protected JdbcDestinationHandler getDestinationHandler(final String databaseName, + final JdbcDatabase database, + String rawTableSchema) { + return new RedshiftDestinationHandler(databaseName, database, rawTableSchema); } @Override @@ -217,22 +222,26 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN final TyperDeduper typerDeduper; final JdbcDatabase database = getDatabase(getDataSource(config)); final String databaseName = config.get(JdbcUtils.DATABASE_KEY).asText(); - final RedshiftDestinationHandler redshiftDestinationHandler = new RedshiftDestinationHandler(databaseName, database); final CatalogParser catalogParser; + final String rawNamespace; if (TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).isPresent()) { - catalogParser = new CatalogParser(sqlGenerator, TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get()); + rawNamespace = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get(); + catalogParser = new CatalogParser(sqlGenerator, rawNamespace); } else { + rawNamespace = JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE; catalogParser = new CatalogParser(sqlGenerator); } + final RedshiftDestinationHandler redshiftDestinationHandler = new RedshiftDestinationHandler(databaseName, database, rawNamespace); parsedCatalog = catalogParser.parseCatalog(catalog); final JdbcV1V2Migrator migrator = new JdbcV1V2Migrator(getNamingResolver(), database, databaseName); final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator(); final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false); if (disableTypeDedupe) { - typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator); + typerDeduper = + new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of()); } else { typerDeduper = - new DefaultTyperDeduper(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator); + new DefaultTyperDeduper<>(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of()); } return StagingConsumerFactory.builder( outputRecordCollector, @@ -252,7 +261,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN /** * Retrieves user configured file buffer amount so as long it doesn't exceed the maximum number of * file buffers and sets the minimum number to the default - * + *

* NOTE: If Out Of Memory Exceptions (OOME) occur, this can be a likely cause as this hard limit has * not been thoroughly load tested across all instance sizes * diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDestinationHandler.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDestinationHandler.java index 5a47c2436d00..497d6469cd05 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDestinationHandler.java @@ -4,8 +4,7 @@ package io.airbyte.integrations.destination.redshift.typing_deduping; -import static io.airbyte.cdk.integrations.base.JavaBaseConstants.*; - +import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; @@ -20,12 +19,14 @@ import java.util.List; import java.util.UUID; import lombok.extern.slf4j.Slf4j; +import org.jooq.SQLDialect; @Slf4j -public class RedshiftDestinationHandler extends JdbcDestinationHandler { +public class RedshiftDestinationHandler extends JdbcDestinationHandler { - public RedshiftDestinationHandler(final String databaseName, final JdbcDatabase jdbcDatabase) { - super(databaseName, jdbcDatabase); + public RedshiftDestinationHandler(final String databaseName, final JdbcDatabase jdbcDatabase, String rawNamespace) { + // :shrug: apparently this works better than using POSTGRES + super(databaseName, jdbcDatabase, rawNamespace, SQLDialect.DEFAULT); } @Override @@ -69,6 +70,12 @@ protected String toJdbcTypeName(AirbyteType airbyteType) { }; } + @Override + protected RedshiftState toDestinationState(JsonNode json) { + return new RedshiftState( + json.hasNonNull("needsSoftReset") && json.get("needsSoftReset").asBoolean()); + } + private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) { return switch (airbyteProtocolType) { case STRING -> "varchar"; diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftState.kt b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftState.kt new file mode 100644 index 000000000000..d2200ea9a60c --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftState.kt @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.redshift.typing_deduping + +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState + +data class RedshiftState(val needsSoftReset: Boolean) : MinimumDestinationState { + override fun needsSoftReset(): Boolean { + return needsSoftReset + } + + override fun withSoftReset(needsSoftReset: Boolean): T { + return copy(needsSoftReset = needsSoftReset) as T + } +} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java index 854fe35cfff6..75515b5130b7 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java @@ -21,7 +21,7 @@ import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; -import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus; import io.airbyte.integrations.base.destination.typing_deduping.Sql; import io.airbyte.integrations.destination.redshift.RedshiftInsertDestination; import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer; @@ -46,7 +46,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -public class RedshiftSqlGeneratorIntegrationTest extends JdbcSqlGeneratorIntegrationTest { +public class RedshiftSqlGeneratorIntegrationTest extends JdbcSqlGeneratorIntegrationTest { /** * Redshift's JDBC driver doesn't map certain data types onto {@link java.sql.JDBCType} usefully. @@ -151,8 +151,8 @@ protected DSLContext getDslContext() { } @Override - protected DestinationHandler getDestinationHandler() { - return new RedshiftDestinationHandler(databaseName, database); + protected DestinationHandler getDestinationHandler() { + return new RedshiftDestinationHandler(databaseName, database, namespace); } @Override @@ -180,11 +180,11 @@ protected Field toJsonValue(final String valueAsString) { public void testCreateTableIncremental() throws Exception { final Sql sql = generator.createTable(incrementalDedupStream, "", false); destinationHandler.execute(sql); - List initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream)); - assertEquals(1, initialStates.size()); - final DestinationInitialState initialState = initialStates.getFirst(); - assertTrue(initialState.isFinalTablePresent()); - assertFalse(initialState.isSchemaMismatch()); + List> initialStatuses = destinationHandler.gatherInitialState(List.of(incrementalDedupStream)); + assertEquals(1, initialStatuses.size()); + final DestinationInitialStatus initialStatus = initialStatuses.getFirst(); + assertTrue(initialStatus.isFinalTablePresent()); + assertFalse(initialStatus.isSchemaMismatch()); // TODO assert on table clustering, etc. } diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index c37e01dcbb67..0df49130d9bf 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -237,6 +237,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.1.9 | 2024-03-04 | [\#35316](https://github.com/airbytehq/airbyte/pull/35316) | Update to CDK 0.23.11; Adopt migration framework | | 2.1.8 | 2024-02-09 | [\#35354](https://github.com/airbytehq/airbyte/pull/35354) | Update to CDK 0.23.0; Gather required initial state upfront, remove dependency on svv_table_info for table empty check | | 2.1.7 | 2024-02-09 | [\#34562](https://github.com/airbytehq/airbyte/pull/34562) | Switch back to jooq-based sql execution for standard insert | | 2.1.6 | 2024-02-08 | [\#34502](https://github.com/airbytehq/airbyte/pull/34502) | Update to CDK version 0.17.0 |