Skip to content

Commit

Permalink
postgres/cdk-td-changes
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa committed Feb 20, 2024
1 parent eca8629 commit ee06cd5
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.20.4'
features = ['db-destinations', 'datastore-postgres', 'typing-deduping']
useLocalCdk = false
useLocalCdk = true
}

application {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.db.factory.DataSourceFactory;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDestinationHandler;
import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
Expand Down Expand Up @@ -127,6 +130,11 @@ protected JdbcSqlGenerator getSqlGenerator() {
return new PostgresSqlGenerator(new PostgresSQLNameTransformer());
}

@Override
protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) {
return new PostgresDestinationHandler(databaseName, database);
}

@Override
public boolean isV2Destination() {
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.airbyte.integrations.destination.postgres.typing_deduping;

import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
import io.airbyte.integrations.base.destination.typing_deduping.Array;
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
import io.airbyte.integrations.base.destination.typing_deduping.Union;
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf;

public class PostgresDestinationHandler extends JdbcDestinationHandler {

public PostgresDestinationHandler(String databaseName, JdbcDatabase jdbcDatabase) {
super(databaseName, jdbcDatabase);
}

@Override
protected String toJdbcTypeName(AirbyteType airbyteType) {
// This is mostly identical to the postgres implementation, but swaps jsonb to super
if (airbyteType instanceof final AirbyteProtocolType airbyteProtocolType) {
return toJdbcTypeName(airbyteProtocolType);
}
return switch (airbyteType.getTypeName()) {
case Struct.TYPE, UnsupportedOneOf.TYPE, Array.TYPE -> "jsonb";
// No nested Unions supported so this will definitely not result in infinite recursion.
case Union.TYPE -> toJdbcTypeName(((Union) airbyteType).chooseType());
default -> throw new IllegalArgumentException("Unsupported AirbyteType: " + airbyteType);
};
}

private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) {
return switch (airbyteProtocolType) {
case STRING -> "varchar";
case NUMBER -> "numeric";
case INTEGER -> "int8";
case BOOLEAN -> "bool";
case TIMESTAMP_WITH_TIMEZONE -> "timestamptz";
case TIMESTAMP_WITHOUT_TIMEZONE -> "timestamp";
case TIME_WITH_TIMEZONE -> "timetz";
case TIME_WITHOUT_TIMEZONE -> "time";
case DATE -> "date";
case UNKNOWN -> "jsonb";
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,6 @@ public class PostgresSqlGenerator extends JdbcSqlGenerator {

public static final DataType<?> JSONB_TYPE = new DefaultDataType<>(null, Object.class, "jsonb");

private static final Map<String, String> POSTGRES_TYPE_NAME_TO_JDBC_TYPE = ImmutableMap.of(
"numeric", "decimal",
"int8", "bigint",
"bool", "boolean",
"timestamptz", "timestamp with time zone",
"timetz", "time with time zone");

public PostgresSqlGenerator(final NamingConventionTransformer namingTransformer) {
super(namingTransformer);
}
Expand Down Expand Up @@ -309,29 +302,6 @@ protected Field<Integer> getRowNumber(final List<ColumnId> primaryKeys, final Op
.orderBy(orderedFields).as(ROW_NUMBER_COLUMN_NAME);
}

@Override
public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final TableDefinition existingTable) {
// Check that the columns match, with special handling for the metadata columns.
// This is mostly identical to the redshift implementation, but swaps super to jsonb
final LinkedHashMap<String, String> intendedColumns = stream.columns().entrySet().stream()
.collect(LinkedHashMap::new,
(map, column) -> map.put(column.getKey().name(), toDialectType(column.getValue()).getTypeName()),
LinkedHashMap::putAll);
final LinkedHashMap<String, String> actualColumns = existingTable.columns().entrySet().stream()
.filter(column -> JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream()
.noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey())))
.collect(LinkedHashMap::new,
(map, column) -> map.put(column.getKey(), jdbcTypeNameFromPostgresTypeName(column.getValue().type())),
LinkedHashMap::putAll);

final boolean sameColumns = actualColumns.equals(intendedColumns)
&& "varchar".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID).type())
&& "timestamptz".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT).type())
&& "jsonb".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_META).type());

return sameColumns;
}

/**
* Extract a raw field, leaving it as jsonb
*/
Expand All @@ -343,8 +313,4 @@ private Field<String> jsonTypeof(final Field<?> field) {
return function("JSONB_TYPEOF", SQLDataType.VARCHAR, field);
}

private static String jdbcTypeNameFromPostgresTypeName(final String redshiftType) {
return POSTGRES_TYPE_NAME_TO_JDBC_TYPE.getOrDefault(redshiftType, redshiftType);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator.JSONB_TYPE;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -18,10 +19,12 @@
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.destination.postgres.PostgresDestination;
import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer;
import io.airbyte.integrations.destination.postgres.PostgresTestDatabase;
import java.util.List;
import java.util.Optional;
import javax.sql.DataSource;
import org.jooq.DataType;
Expand Down Expand Up @@ -76,8 +79,8 @@ protected JdbcSqlGenerator getSqlGenerator() {
}

@Override
protected DestinationHandler<TableDefinition> getDestinationHandler() {
return new JdbcDestinationHandler(databaseName, database);
protected DestinationHandler getDestinationHandler() {
return new PostgresDestinationHandler(databaseName, database);
}

@Override
Expand All @@ -96,29 +99,11 @@ public void testCreateTableIncremental() throws Exception {
final Sql sql = generator.createTable(incrementalDedupStream, "", false);
destinationHandler.execute(sql);

final Optional<TableDefinition> existingTable = destinationHandler.findExistingTable(incrementalDedupStream.id());

assertTrue(existingTable.isPresent());
assertAll(
() -> assertEquals("varchar", existingTable.get().columns().get("_airbyte_raw_id").type()),
() -> assertEquals("timestamptz", existingTable.get().columns().get("_airbyte_extracted_at").type()),
() -> assertEquals("jsonb", existingTable.get().columns().get("_airbyte_meta").type()),
() -> assertEquals("int8", existingTable.get().columns().get("id1").type()),
() -> assertEquals("int8", existingTable.get().columns().get("id2").type()),
() -> assertEquals("timestamptz", existingTable.get().columns().get("updated_at").type()),
() -> assertEquals("jsonb", existingTable.get().columns().get("struct").type()),
() -> assertEquals("jsonb", existingTable.get().columns().get("array").type()),
() -> assertEquals("varchar", existingTable.get().columns().get("string").type()),
() -> assertEquals("numeric", existingTable.get().columns().get("number").type()),
() -> assertEquals("int8", existingTable.get().columns().get("integer").type()),
() -> assertEquals("bool", existingTable.get().columns().get("boolean").type()),
() -> assertEquals("timestamptz", existingTable.get().columns().get("timestamp_with_timezone").type()),
() -> assertEquals("timestamp", existingTable.get().columns().get("timestamp_without_timezone").type()),
() -> assertEquals("timetz", existingTable.get().columns().get("time_with_timezone").type()),
() -> assertEquals("time", existingTable.get().columns().get("time_without_timezone").type()),
() -> assertEquals("date", existingTable.get().columns().get("date").type()),
() -> assertEquals("jsonb", existingTable.get().columns().get("unknown").type()));
// TODO assert on table indexing, etc.
List<DestinationInitialState> initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
assertEquals(1, initialStates.size());
final DestinationInitialState initialState = initialStates.getFirst();
assertTrue(initialState.isFinalTablePresent());
assertFalse(initialState.isSchemaMismatch());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private String generateBigString() {
}

@Override
protected SqlGenerator<?> getSqlGenerator() {
protected SqlGenerator getSqlGenerator() {
return new PostgresSqlGenerator(new PostgresSQLNameTransformer());
}

Expand Down

0 comments on commit ee06cd5

Please sign in to comment.