diff --git a/airbyte-integrations/connectors/destination-postgres/build.gradle b/airbyte-integrations/connectors/destination-postgres/build.gradle index 54398f71bd38f..a1f833df547f3 100644 --- a/airbyte-integrations/connectors/destination-postgres/build.gradle +++ b/airbyte-integrations/connectors/destination-postgres/build.gradle @@ -5,7 +5,7 @@ plugins { airbyteJavaConnector { cdkVersionRequired = '0.20.4' features = ['db-destinations', 'datastore-postgres', 'typing-deduping'] - useLocalCdk = false + useLocalCdk = true } application { diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java index 62e975dbee861..93c51df74259f 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java @@ -14,13 +14,16 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.cdk.db.factory.DataSourceFactory; import io.airbyte.cdk.db.factory.DatabaseDriver; +import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcUtils; import io.airbyte.cdk.integrations.base.Destination; import io.airbyte.cdk.integrations.base.IntegrationRunner; import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination; import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination; +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator; import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDestinationHandler; import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; @@ -127,6 +130,11 @@ protected JdbcSqlGenerator getSqlGenerator() { return new PostgresSqlGenerator(new PostgresSQLNameTransformer()); } + @Override + protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) { + return new PostgresDestinationHandler(databaseName, database); + } + @Override public boolean isV2Destination() { return true; diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresDestinationHandler.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresDestinationHandler.java new file mode 100644 index 0000000000000..1cb6bacef3484 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresDestinationHandler.java @@ -0,0 +1,46 @@ +package io.airbyte.integrations.destination.postgres.typing_deduping; + +import io.airbyte.cdk.db.jdbc.JdbcDatabase; +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; +import io.airbyte.integrations.base.destination.typing_deduping.Array; +import io.airbyte.integrations.base.destination.typing_deduping.Struct; +import io.airbyte.integrations.base.destination.typing_deduping.Union; +import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf; + +public class PostgresDestinationHandler extends JdbcDestinationHandler { + + public PostgresDestinationHandler(String databaseName, JdbcDatabase jdbcDatabase) { + super(databaseName, jdbcDatabase); + } + + @Override + protected String toJdbcTypeName(AirbyteType airbyteType) { + // This is mostly identical to the postgres implementation, but swaps jsonb to super + if (airbyteType instanceof final AirbyteProtocolType airbyteProtocolType) { + return toJdbcTypeName(airbyteProtocolType); + } + return switch (airbyteType.getTypeName()) { + case Struct.TYPE, UnsupportedOneOf.TYPE, Array.TYPE -> "jsonb"; + // No nested Unions supported so this will definitely not result in infinite recursion. + case Union.TYPE -> toJdbcTypeName(((Union) airbyteType).chooseType()); + default -> throw new IllegalArgumentException("Unsupported AirbyteType: " + airbyteType); + }; + } + + private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) { + return switch (airbyteProtocolType) { + case STRING -> "varchar"; + case NUMBER -> "numeric"; + case INTEGER -> "int8"; + case BOOLEAN -> "bool"; + case TIMESTAMP_WITH_TIMEZONE -> "timestamptz"; + case TIMESTAMP_WITHOUT_TIMEZONE -> "timestamp"; + case TIME_WITH_TIMEZONE -> "timetz"; + case TIME_WITHOUT_TIMEZONE -> "time"; + case DATE -> "date"; + case UNKNOWN -> "jsonb"; + }; + } +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java index 0918226b3227a..54ad001b0f491 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java @@ -54,13 +54,6 @@ public class PostgresSqlGenerator extends JdbcSqlGenerator { public static final DataType<?> JSONB_TYPE = new DefaultDataType<>(null, Object.class, "jsonb"); - private static final Map<String, String> POSTGRES_TYPE_NAME_TO_JDBC_TYPE = ImmutableMap.of( - "numeric", "decimal", - "int8", "bigint", - "bool", "boolean", - "timestamptz", "timestamp with time zone", - "timetz", "time with time zone"); - public PostgresSqlGenerator(final NamingConventionTransformer namingTransformer) { super(namingTransformer); } @@ -309,29 +302,6 @@ protected Field<Integer> getRowNumber(final List<ColumnId> primaryKeys, final Op .orderBy(orderedFields).as(ROW_NUMBER_COLUMN_NAME); } - @Override - public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final TableDefinition existingTable) { - // Check that the columns match, with special handling for the metadata columns. - // This is mostly identical to the redshift implementation, but swaps super to jsonb - final LinkedHashMap<String, String> intendedColumns = stream.columns().entrySet().stream() - .collect(LinkedHashMap::new, - (map, column) -> map.put(column.getKey().name(), toDialectType(column.getValue()).getTypeName()), - LinkedHashMap::putAll); - final LinkedHashMap<String, String> actualColumns = existingTable.columns().entrySet().stream() - .filter(column -> JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream() - .noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey()))) - .collect(LinkedHashMap::new, - (map, column) -> map.put(column.getKey(), jdbcTypeNameFromPostgresTypeName(column.getValue().type())), - LinkedHashMap::putAll); - - final boolean sameColumns = actualColumns.equals(intendedColumns) - && "varchar".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID).type()) - && "timestamptz".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT).type()) - && "jsonb".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_META).type()); - - return sameColumns; - } - /** * Extract a raw field, leaving it as jsonb */ @@ -343,8 +313,4 @@ private Field<String> jsonTypeof(final Field<?> field) { return function("JSONB_TYPEOF", SQLDataType.VARCHAR, field); } - private static String jdbcTypeNameFromPostgresTypeName(final String redshiftType) { - return POSTGRES_TYPE_NAME_TO_JDBC_TYPE.getOrDefault(redshiftType, redshiftType); - } - } diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.java index 3f744c846b083..dfafc51f6bea1 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.java @@ -7,6 +7,7 @@ import static io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator.JSONB_TYPE; import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; @@ -18,10 +19,12 @@ import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator; import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest; import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState; import io.airbyte.integrations.base.destination.typing_deduping.Sql; import io.airbyte.integrations.destination.postgres.PostgresDestination; import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer; import io.airbyte.integrations.destination.postgres.PostgresTestDatabase; +import java.util.List; import java.util.Optional; import javax.sql.DataSource; import org.jooq.DataType; @@ -76,8 +79,8 @@ protected JdbcSqlGenerator getSqlGenerator() { } @Override - protected DestinationHandler<TableDefinition> getDestinationHandler() { - return new JdbcDestinationHandler(databaseName, database); + protected DestinationHandler getDestinationHandler() { + return new PostgresDestinationHandler(databaseName, database); } @Override @@ -96,29 +99,11 @@ public void testCreateTableIncremental() throws Exception { final Sql sql = generator.createTable(incrementalDedupStream, "", false); destinationHandler.execute(sql); - final Optional<TableDefinition> existingTable = destinationHandler.findExistingTable(incrementalDedupStream.id()); - - assertTrue(existingTable.isPresent()); - assertAll( - () -> assertEquals("varchar", existingTable.get().columns().get("_airbyte_raw_id").type()), - () -> assertEquals("timestamptz", existingTable.get().columns().get("_airbyte_extracted_at").type()), - () -> assertEquals("jsonb", existingTable.get().columns().get("_airbyte_meta").type()), - () -> assertEquals("int8", existingTable.get().columns().get("id1").type()), - () -> assertEquals("int8", existingTable.get().columns().get("id2").type()), - () -> assertEquals("timestamptz", existingTable.get().columns().get("updated_at").type()), - () -> assertEquals("jsonb", existingTable.get().columns().get("struct").type()), - () -> assertEquals("jsonb", existingTable.get().columns().get("array").type()), - () -> assertEquals("varchar", existingTable.get().columns().get("string").type()), - () -> assertEquals("numeric", existingTable.get().columns().get("number").type()), - () -> assertEquals("int8", existingTable.get().columns().get("integer").type()), - () -> assertEquals("bool", existingTable.get().columns().get("boolean").type()), - () -> assertEquals("timestamptz", existingTable.get().columns().get("timestamp_with_timezone").type()), - () -> assertEquals("timestamp", existingTable.get().columns().get("timestamp_without_timezone").type()), - () -> assertEquals("timetz", existingTable.get().columns().get("time_with_timezone").type()), - () -> assertEquals("time", existingTable.get().columns().get("time_without_timezone").type()), - () -> assertEquals("date", existingTable.get().columns().get("date").type()), - () -> assertEquals("jsonb", existingTable.get().columns().get("unknown").type())); - // TODO assert on table indexing, etc. + List<DestinationInitialState> initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream)); + assertEquals(1, initialStates.size()); + final DestinationInitialState initialState = initialStates.getFirst(); + assertTrue(initialState.isFinalTablePresent()); + assertFalse(initialState.isSchemaMismatch()); } } diff --git a/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java index 50b1da44fa6cf..128d8d2de1cf0 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java @@ -44,7 +44,7 @@ private String generateBigString() { } @Override - protected SqlGenerator<?> getSqlGenerator() { + protected SqlGenerator getSqlGenerator() { return new PostgresSqlGenerator(new PostgresSQLNameTransformer()); }