diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index f7310f4c1b07..8fe186d4e518 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,6 +166,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.20.4 | 2024-02-12 | [\#35042](https://github.com/airbytehq/airbyte/pull/35042) | Use delegate's isDestinationV2 invocation in SshWrappedDestination. | | 0.20.3 | 2024-02-09 | [\#34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in mysql/mssql database name. | | 0.20.2 | 2024-02-12 | [\#35111](https://github.com/airbytehq/airbyte/pull/35144) | Make state emission from async framework synchronized. | | 0.20.1 | 2024-02-11 | [\#35111](https://github.com/airbytehq/airbyte/pull/35111) | Fix GlobalAsyncStateManager stats counting logic. | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/ssh/SshWrappedDestination.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/ssh/SshWrappedDestination.java index 08f4d794d92b..826195d3fbcc 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/ssh/SshWrappedDestination.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/ssh/SshWrappedDestination.java @@ -129,4 +129,9 @@ protected SshTunnel getTunnelInstance(final JsonNode config) throws Exception { : getInstance(config, hostKey, portKey); } + @Override + public boolean isV2Destination() { + return delegate.isV2Destination(); + } + } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 21b134115f08..5ca3b8c87d35 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.20.3 \ No newline at end of file +version=0.20.4 \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/build.gradle b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/build.gradle index 6d672fa3616e..87d5e2b99a77 100644 --- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/build.gradle +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/build.gradle @@ -1,26 +1,13 @@ plugins { - id 'application' id 'airbyte-java-connector' } airbyteJavaConnector { - cdkVersionRequired = '0.16.3' - features = [ - 'db-sources', // required for tests - 'db-destinations' - ] + cdkVersionRequired = '0.20.4' + features = ['db-destinations', 'typing-deduping', 'datastore-postgres'] useLocalCdk = false } -//remove once upgrading the CDK version to 0.4.x or later -java { - compileJava { - options.compilerArgs.remove("-Werror") - } -} - -airbyteJavaConnector.addCdkDependencies() - application { mainClass = 'io.airbyte.integrations.destination.postgres.PostgresDestinationStrictEncrypt' applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0'] @@ -28,21 +15,6 @@ application { dependencies { implementation project(':airbyte-integrations:connectors:destination-postgres') - // TODO: declare typing-deduping as a CDK feature instead of importing from source. - implementation project(':airbyte-cdk:java:airbyte-cdk:typing-deduping') - - testImplementation libs.testcontainers.jdbc - testImplementation libs.testcontainers.postgresql - - testFixturesImplementation libs.testcontainers.jdbc - testFixturesImplementation libs.testcontainers.postgresql integrationTestJavaImplementation testFixtures(project(':airbyte-integrations:connectors:destination-postgres')) - integrationTestJavaImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:typing-deduping')) -} - -configurations.all { - resolutionStrategy { - force libs.jooq - } } diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml index 117c6570d909..fb475fa2995e 100644 --- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml @@ -2,22 +2,27 @@ data: connectorSubtype: database connectorType: destination definitionId: 25c5221d-dce2-4163-ade9-739ef790f503 - dockerImageTag: 0.6.3 + dockerImageTag: 2.0.0 dockerRepository: airbyte/destination-postgres-strict-encrypt documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres githubIssueLabel: destination-postgres icon: postgresql.svg license: ELv2 name: Postgres - normalizationConfig: - normalizationIntegrationType: postgres - normalizationRepository: airbyte/normalization - normalizationTag: 0.4.1 registries: cloud: enabled: false oss: enabled: false + releases: + breakingChanges: + 2.0.0: + message: > + This version introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2), which provides better error handling, incremental delivery of data for large syncs, and improved final table structures. + To review the breaking changes, and how to upgrade, see [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading). + These changes will likely require updates to downstream dbt / SQL models, which we walk through [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations). + Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync. For more controlled upgrade [see instructions](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#upgrading-connections-one-by-one-with-dual-writing). + upgradeDeadline: "2024-05-31" releaseStage: alpha supportsDbt: true tags: diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncrypt.java b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncrypt.java index 28aa74c8861f..2ac912ae5962 100644 --- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncrypt.java +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncrypt.java @@ -60,6 +60,11 @@ public AirbyteConnectionStatus check(final JsonNode config) throws Exception { return super.check(config); } + @Override + public boolean isV2Destination() { + return true; + } + public static void main(final String[] args) throws Exception { final Destination destination = new PostgresDestinationStrictEncrypt(); LOGGER.info("starting destination: {}", PostgresDestinationStrictEncrypt.class); diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncryptAcceptanceTest.java b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncryptAcceptanceTest.java index 128f8903f0a7..7b01a95ab65b 100644 --- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncryptAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncryptAcceptanceTest.java @@ -16,6 +16,7 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +@Disabled("Disabled after DV2 migration. Re-enable with fixtures updated to DV2.") public class PostgresDestinationStrictEncryptAcceptanceTest extends AbstractPostgresDestinationAcceptanceTest { private PostgresTestDatabase testDb; diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresStrictEncryptTypingDedupingTest.java b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresStrictEncryptTypingDedupingTest.java new file mode 100644 index 000000000000..73d9866dde6a --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresStrictEncryptTypingDedupingTest.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.postgres.typing_deduping; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.integrations.destination.postgres.PostgresDestination; +import io.airbyte.integrations.destination.postgres.PostgresTestDatabase; +import io.airbyte.integrations.destination.postgres.PostgresTestDatabase.BaseImage; +import javax.sql.DataSource; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +// TODO: This test is added to ensure coverage missed by disabling DATs. Redundant when DATs +// enabled. +public class PostgresStrictEncryptTypingDedupingTest extends AbstractPostgresTypingDedupingTest { + + protected static PostgresTestDatabase testContainer; + + @BeforeAll + public static void setupPostgres() { + // Postgres-13 is alpine image and SSL conf is failing to load, intentionally using 12:bullseye + testContainer = PostgresTestDatabase.in(BaseImage.POSTGRES_12, PostgresTestDatabase.ContainerModifier.CERT); + } + + @AfterAll + public static void teardownPostgres() { + testContainer.close(); + } + + @Override + protected ObjectNode getBaseConfig() { + return (ObjectNode) testContainer.configBuilder() + .with("schema", "public") + .withDatabase() + .withResolvedHostAndPort() + .withCredentials() + .withSsl(ImmutableMap.builder() + .put("mode", "verify-ca") // verify-full will not work since the spawned container is only allowed for 127.0.0.1/32 CIDRs + .put("ca_certificate", testContainer.getCertificates().caCertificate()) + .build()) + .build(); + } + + @Override + protected DataSource getDataSource(final JsonNode config) { + // Intentionally ignore the config and rebuild it. + // The config param has the resolved (i.e. in-docker) host/port. + // We need the unresolved host/port since the test wrapper code is running from the docker host + // rather than in a container. + return new PostgresDestination().getDataSource(testContainer.configBuilder() + .with("schema", "public") + .withDatabase() + .withHostAndPort() + .withCredentials() + .withoutSsl() + .build()); + } + + @Override + protected String getImageName() { + return "airbyte/destination-postgres-strict-encrypt:dev"; + } + +} diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_final.jsonl b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_final.jsonl new file mode 100644 index 000000000000..9f11b2293a95 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_final.jsonl @@ -0,0 +1,3 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "old_cursor": 1, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie"} diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl new file mode 100644 index 000000000000..7f75f0f804e2 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl @@ -0,0 +1,4 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl new file mode 100644 index 000000000000..61024be7867d --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl @@ -0,0 +1,5 @@ +// Keep the Alice record with more recent updated_at +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final2.jsonl b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final2.jsonl new file mode 100644 index 000000000000..b2bf47df66c1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final2.jsonl @@ -0,0 +1 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2001-01-01T00:00:00.000000Z", "name": "Someone completely different"} diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl new file mode 100644 index 000000000000..f3a225756ced --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl @@ -0,0 +1,6 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00.000000Z", "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}} +// Invalid columns are nulled out (i.e. SQL null, not JSON null) +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl new file mode 100644 index 000000000000..4012c086a9e6 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl @@ -0,0 +1,6 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} +// Invalid data is still allowed in the raw table. +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}} diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_expectedrecords_raw2.jsonl b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_expectedrecords_raw2.jsonl new file mode 100644 index 000000000000..b489accda1bb --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync1_expectedrecords_raw2.jsonl @@ -0,0 +1 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2001-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Someone completely different"}} diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl new file mode 100644 index 000000000000..c26d4a49aacd --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl @@ -0,0 +1,3 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}} +// Charlie wasn't reemitted with updated_at, so it still has a null cursor +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "name": "Charlie"} diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl new file mode 100644 index 000000000000..03f28e155af5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl @@ -0,0 +1,7 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}} diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl new file mode 100644 index 000000000000..0989dfc17ed0 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl @@ -0,0 +1,9 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00.000000Z", "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} + +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Bob", "address": {"city": "New York", "state": "NY"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00.000000Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00.000000Z"} diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl new file mode 100644 index 000000000000..9d1f1499469f --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl @@ -0,0 +1,3 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Bob", "address": {"city": "New York", "state": "NY"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00.000000Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00.000000Z"} diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl new file mode 100644 index 000000000000..33bc3280be27 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl @@ -0,0 +1,3 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}} diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl new file mode 100644 index 000000000000..fd2a4b3adbf3 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl @@ -0,0 +1,4 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}} +// Delete Bob, keep Charlie +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final2.jsonl b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final2.jsonl new file mode 100644 index 000000000000..53c304c89d31 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final2.jsonl @@ -0,0 +1 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2001-01-02T00:00:00.000000Z", "name": "Someone completely different v2"} diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl new file mode 100644 index 000000000000..2f634c6ad4e9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl @@ -0,0 +1,10 @@ +// We keep the records from the first sync +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}} +// And append the records from the second sync +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}} diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_raw2.jsonl b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_raw2.jsonl new file mode 100644 index 000000000000..88b8ee7746c1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/resources/dat/sync2_expectedrecords_raw2.jsonl @@ -0,0 +1,2 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2001-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Someone completely different"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2001-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Someone completely different v2"}} diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test/resources/expected_spec.json b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test/resources/expected_spec.json index b291203568e8..c076616ec0b1 100644 --- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test/resources/expected_spec.json +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test/resources/expected_spec.json @@ -209,16 +209,17 @@ "type": "string", "order": 8 }, - "use_1s1t_format": { - "type": "boolean", - "description": "(Early Access) Use Destinations V2.", - "title": "Use Destinations V2 (Early Access)", - "order": 9 - }, "raw_data_schema": { "type": "string", - "description": "(Early Access) The schema to write raw tables into", - "title": "Destinations V2 Raw Table Schema (Early Access)", + "description": "The schema to write raw tables into", + "title": "Raw table schema (defaults to airbyte_internal)", + "order": 9 + }, + "disable_type_dedupe": { + "type": "boolean", + "default": false, + "description": "Disable Writing Final Tables. WARNING! The data format in _airbyte_data is likely stable but there are no guarantees that other metadata columns will remain the same in future versions", + "title": "Disable Final Tables. (WARNING! Unstable option; Columns in raw table schema might change between versions)", "order": 10 }, "tunnel_method": { diff --git a/airbyte-integrations/connectors/destination-postgres/build.gradle b/airbyte-integrations/connectors/destination-postgres/build.gradle index 6b49a55578df..54398f71bd38 100644 --- a/airbyte-integrations/connectors/destination-postgres/build.gradle +++ b/airbyte-integrations/connectors/destination-postgres/build.gradle @@ -1,50 +1,18 @@ plugins { - id 'application' id 'airbyte-java-connector' } airbyteJavaConnector { - cdkVersionRequired = '0.16.3' - features = [ - 'db-sources', // required for tests - 'db-destinations', - ] + cdkVersionRequired = '0.20.4' + features = ['db-destinations', 'datastore-postgres', 'typing-deduping'] useLocalCdk = false } -//remove once upgrading the CDK version to 0.4.x or later -java { - compileJava { - options.compilerArgs.remove("-Werror") - } -} - -airbyteJavaConnector.addCdkDependencies() - application { mainClass = 'io.airbyte.integrations.destination.postgres.PostgresDestination' applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0'] } dependencies { - implementation libs.postgresql - // TODO: declare typing-deduping as a CDK feature instead of importing from source. - implementation project(':airbyte-cdk:java:airbyte-cdk:typing-deduping') - - testImplementation libs.testcontainers.jdbc - testImplementation libs.testcontainers.postgresql - - testFixturesImplementation libs.testcontainers.jdbc - testFixturesImplementation libs.testcontainers.postgresql - - - integrationTestJavaImplementation libs.testcontainers.postgresql - - integrationTestJavaImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:typing-deduping')) -} - -configurations.all { - resolutionStrategy { - force libs.jooq - } + testFixturesApi 'org.testcontainers:postgresql:1.19.0' } diff --git a/airbyte-integrations/connectors/destination-postgres/metadata.yaml b/airbyte-integrations/connectors/destination-postgres/metadata.yaml index bcb00a6873d5..d380f44acd77 100644 --- a/airbyte-integrations/connectors/destination-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/destination-postgres/metadata.yaml @@ -5,23 +5,28 @@ data: connectorSubtype: database connectorType: destination definitionId: 25c5221d-dce2-4163-ade9-739ef790f503 - dockerImageTag: 0.6.3 + dockerImageTag: 2.0.0 dockerRepository: airbyte/destination-postgres documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres githubIssueLabel: destination-postgres icon: postgresql.svg license: ELv2 name: Postgres - normalizationConfig: - normalizationIntegrationType: postgres - normalizationRepository: airbyte/normalization - normalizationTag: 0.4.3 registries: cloud: dockerRepository: airbyte/destination-postgres-strict-encrypt enabled: true oss: enabled: true + releases: + breakingChanges: + 2.0.0: + message: > + This version introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2), which provides better error handling, incremental delivery of data for large syncs, and improved final table structures. + To review the breaking changes, and how to upgrade, see [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading). + These changes will likely require updates to downstream dbt / SQL models, which we walk through [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations). + Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync. For more controlled upgrade [see instructions](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#upgrading-connections-one-by-one-with-dual-writing). + upgradeDeadline: "2024-05-31" releaseStage: alpha supportLevel: community supportsDbt: true diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java index 3a2a36ce446b..62e975dbee86 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java @@ -24,6 +24,7 @@ import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -52,18 +53,21 @@ protected DataSourceFactory.DataSourceBuilder modifyDataSourceBuilder(final Data // This avoids issues with creating the same function concurrently (e.g. if multiple syncs run // at the same time). // Function definition copied from https://dba.stackexchange.com/a/203986 - return builder.withConnectionInitSql(""" - CREATE FUNCTION pg_temp.airbyte_safe_cast(_in text, INOUT _out ANYELEMENT) - LANGUAGE plpgsql AS - $func$ - BEGIN - EXECUTE format('SELECT %L::%s', $1, pg_typeof(_out)) - INTO _out; - EXCEPTION WHEN others THEN - -- do nothing: _out already carries default - END - $func$; - """); + + // Adding 60 seconds to connection timeout, for ssl connections, default 10 seconds is not enough + return builder.withConnectionTimeout(Duration.ofSeconds(60)) + .withConnectionInitSql(""" + CREATE FUNCTION pg_temp.airbyte_safe_cast(_in text, INOUT _out ANYELEMENT) + LANGUAGE plpgsql AS + $func$ + BEGIN + EXECUTE format('SELECT %L::%s', $1, pg_typeof(_out)) + INTO _out; + EXCEPTION WHEN others THEN + -- do nothing: _out already carries default + END + $func$; + """); } @Override @@ -123,6 +127,11 @@ protected JdbcSqlGenerator getSqlGenerator() { return new PostgresSqlGenerator(new PostgresSQLNameTransformer()); } + @Override + public boolean isV2Destination() { + return true; + } + public static void main(final String[] args) throws Exception { final Destination destination = PostgresDestination.sshWrappedDestination(); LOGGER.info("starting destination: {}", PostgresDestination.class); diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json index 86900109c06c..4c775be8b887 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json @@ -216,16 +216,17 @@ "type": "string", "order": 8 }, - "use_1s1t_format": { - "type": "boolean", - "description": "(Early Access) Use Destinations V2.", - "title": "Use Destinations V2 (Early Access)", - "order": 9 - }, "raw_data_schema": { "type": "string", - "description": "(Early Access) The schema to write raw tables into", - "title": "Destinations V2 Raw Table Schema (Early Access)", + "description": "The schema to write raw tables into", + "title": "Raw table schema (defaults to airbyte_internal)", + "order": 9 + }, + "disable_type_dedupe": { + "type": "boolean", + "default": false, + "description": "Disable Writing Final Tables. WARNING! The data format in _airbyte_data is likely stable but there are no guarantees that other metadata columns will remain the same in future versions", + "title": "Disable Final Tables. (WARNING! Unstable option; Columns in raw table schema might change between versions)", "order": 10 } } diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationAcceptanceTest.java index 369ecdd5caf2..1bec8925facb 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationAcceptanceTest.java @@ -7,7 +7,9 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.integrations.destination.postgres.PostgresTestDatabase.BaseImage; import java.util.HashSet; +import org.junit.jupiter.api.Disabled; +@Disabled("Disabled after DV2 migration. Re-enable with fixtures updated to DV2.") public class PostgresDestinationAcceptanceTest extends AbstractPostgresDestinationAcceptanceTest { private PostgresTestDatabase testDb; diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationSSLFullCertificateAcceptanceTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationSSLFullCertificateAcceptanceTest.java index 37265484202d..fa92f3a9f663 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationSSLFullCertificateAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationSSLFullCertificateAcceptanceTest.java @@ -10,6 +10,7 @@ import java.util.HashSet; import org.junit.jupiter.api.Disabled; +@Disabled("Disabled after DV2 migration. Re-enable with fixtures updated to DV2.") public class PostgresDestinationSSLFullCertificateAcceptanceTest extends AbstractPostgresDestinationAcceptanceTest { private PostgresTestDatabase testDb; diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshKeyPostgresDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshKeyPostgresDestinationAcceptanceTest.java index ce0e53ca1fdf..0bab6cb695e5 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshKeyPostgresDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshKeyPostgresDestinationAcceptanceTest.java @@ -5,7 +5,9 @@ package io.airbyte.integrations.destination.postgres; import io.airbyte.cdk.integrations.base.ssh.SshTunnel; +import org.junit.jupiter.api.Disabled; +@Disabled("Disabled after DV2 migration. Re-enable with fixtures updated to DV2.") public class SshKeyPostgresDestinationAcceptanceTest extends SshPostgresDestinationAcceptanceTest { @Override diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPasswordPostgresDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPasswordPostgresDestinationAcceptanceTest.java index 8404ee16f4fb..454058e4a99c 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPasswordPostgresDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPasswordPostgresDestinationAcceptanceTest.java @@ -8,6 +8,7 @@ import io.airbyte.cdk.integrations.standardtest.destination.argproviders.DataTypeTestArgumentProvider.TestCompatibility; import org.junit.jupiter.api.Disabled; +@Disabled("Disabled after DV2 migration. Re-enable with fixtures updated to DV2.") public class SshPasswordPostgresDestinationAcceptanceTest extends SshPostgresDestinationAcceptanceTest { @Override diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresRawOverrideDisableTypingDedupingTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresRawOverrideDisableTypingDedupingTest.java new file mode 100644 index 000000000000..a4841e7b44d1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresRawOverrideDisableTypingDedupingTest.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.postgres.typing_deduping; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +public class PostgresRawOverrideDisableTypingDedupingTest extends PostgresTypingDedupingTest { + + @Override + protected ObjectNode getBaseConfig() { + return super.getBaseConfig() + .put("raw_data_schema", "overridden_raw_dataset") + .put("disable_type_dedupe", true); + } + + @Override + protected String getRawSchema() { + return "overridden_raw_dataset"; + } + + @Override + protected boolean disableFinalTableComparison() { + return true; + } + + @Disabled + @Test + @Override + public void identicalNameSimultaneousSync() {} + + @Disabled + @Test + @Override + public void testVarcharLimitOver64K() {} + +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.java index ee80c3e12ab5..3f744c846b08 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.java @@ -10,23 +10,18 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcDatabase; -import io.airbyte.cdk.db.jdbc.JdbcSourceOperations; import io.airbyte.cdk.db.jdbc.JdbcUtils; import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition; import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator; import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest; -import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; import io.airbyte.integrations.base.destination.typing_deduping.Sql; import io.airbyte.integrations.destination.postgres.PostgresDestination; import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer; import io.airbyte.integrations.destination.postgres.PostgresTestDatabase; -import java.sql.ResultSet; -import java.sql.SQLException; import java.util.Optional; import javax.sql.DataSource; import org.jooq.DataType; @@ -43,30 +38,6 @@ public class PostgresSqlGeneratorIntegrationTest extends JdbcSqlGeneratorIntegra private static String databaseName; private static JdbcDatabase database; - /** - * See - * {@link io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGeneratorIntegrationTest.RedshiftSourceOperations}. - * Copied here to avoid weird dependencies. - */ - public static class PostgresSourceOperations extends JdbcSourceOperations { - - @Override - public void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { - final String columnName = resultSet.getMetaData().getColumnName(colIndex); - final String columnTypeName = resultSet.getMetaData().getColumnTypeName(colIndex).toLowerCase(); - - switch (columnTypeName) { - // JSONB has no equivalent in JDBCType - case "jsonb" -> json.set(columnName, Jsons.deserializeExact(resultSet.getString(colIndex))); - // For some reason, the driver maps these to their timezoneless equivalents (TIME and TIMESTAMP) - case "timetz" -> putTimeWithTimezone(json, columnName, resultSet, colIndex); - case "timestamptz" -> putTimestampWithTimezone(json, columnName, resultSet, colIndex); - default -> super.copyToJsonField(resultSet, colIndex, json); - } - } - - } - @BeforeAll public static void setupPostgres() { testContainer = PostgresTestDatabase.in(PostgresTestDatabase.BaseImage.POSTGRES_13); diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java index 22b525af74f7..4bf10317d207 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java @@ -4,53 +4,18 @@ package io.airbyte.integrations.destination.postgres.typing_deduping; -import static org.junit.jupiter.api.Assertions.assertEquals; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.ImmutableMap; -import io.airbyte.cdk.db.JdbcCompatibleSourceOperations; -import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcTypingDedupingTest; -import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; import io.airbyte.integrations.destination.postgres.PostgresDestination; -import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer; import io.airbyte.integrations.destination.postgres.PostgresTestDatabase; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteMessage.Type; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; -import io.airbyte.protocol.models.v0.AirbyteStream; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.v0.DestinationSyncMode; -import io.airbyte.protocol.models.v0.SyncMode; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Random; import javax.sql.DataSource; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -public class PostgresTypingDedupingTest extends JdbcTypingDedupingTest { +public class PostgresTypingDedupingTest extends AbstractPostgresTypingDedupingTest { protected static PostgresTestDatabase testContainer; - private static final int DEFAULT_VARCHAR_LIMIT_IN_JDBC_GEN = 65535; - - private static final Random RANDOM = new Random(); - - private String generateBigString() { - // Generate exactly 2 chars over the limit - final int length = DEFAULT_VARCHAR_LIMIT_IN_JDBC_GEN + 2; - return RANDOM - .ints('a', 'z' + 1) - .limit(length) - .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append) - .toString(); - } - @BeforeAll public static void setupPostgres() { testContainer = PostgresTestDatabase.in(PostgresTestDatabase.BaseImage.POSTGRES_13); @@ -63,14 +28,13 @@ public static void teardownPostgres() { @Override protected ObjectNode getBaseConfig() { - final ObjectNode config = (ObjectNode) testContainer.configBuilder() + return (ObjectNode) testContainer.configBuilder() .with("schema", "public") .withDatabase() .withResolvedHostAndPort() .withCredentials() .withoutSsl() .build(); - return config.put("use_1s1t_format", true); } @Override @@ -93,77 +57,4 @@ protected String getImageName() { return "airbyte/destination-postgres:dev"; } - @Override - protected SqlGenerator getSqlGenerator() { - return new PostgresSqlGenerator(new PostgresSQLNameTransformer()); - } - - @Override - protected JdbcCompatibleSourceOperations getSourceOperations() { - return new PostgresSqlGeneratorIntegrationTest.PostgresSourceOperations(); - } - - @Test - public void testMixedCasedSchema() throws Exception { - streamName = "MixedCaseSchema" + streamName; - final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( - new ConfiguredAirbyteStream() - .withSyncMode(SyncMode.FULL_REFRESH) - .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) - .withStream(new AirbyteStream() - .withNamespace(streamNamespace) - .withName(streamName) - .withJsonSchema(SCHEMA)))); - - // First sync - final List messages1 = readMessages("dat/sync1_messages.jsonl"); - - runSync(catalog, messages1); - - final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl"); - final List expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl"); - verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison()); - } - - @Override - protected List dumpRawTableRecords(String streamNamespace, String streamName) throws Exception { - return super.dumpRawTableRecords(streamNamespace, streamName.toLowerCase()); - } - - @Test - public void testVarcharLimitOver64K() throws Exception { - final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( - new ConfiguredAirbyteStream() - .withSyncMode(SyncMode.FULL_REFRESH) - .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) - .withStream(new AirbyteStream() - .withNamespace(streamNamespace) - .withName(streamName) - .withJsonSchema(SCHEMA)))); - - final AirbyteMessage message = new AirbyteMessage(); - final String largeString = generateBigString(); - final Map data = ImmutableMap.of( - "id1", 1, - "id2", 200, - "updated_at", "2021-01-01T00:00:00Z", - "name", largeString); - message.setType(Type.RECORD); - message.setRecord(new AirbyteRecordMessage() - .withNamespace(streamNamespace) - .withStream(streamName) - .withData(Jsons.jsonNode(data)) - .withEmittedAt(1000L)); - final List messages1 = new ArrayList<>(); - messages1.add(message); - runSync(catalog, messages1); - - // Only assert on the large varchar string landing in final table. - // Rest of the fields' correctness is tested by other means in other tests. - final List actualFinalRecords = dumpFinalTableRecords(streamNamespace, streamName); - assertEquals(1, actualFinalRecords.size()); - assertEquals(largeString, actualFinalRecords.get(0).get("name").asText()); - - } - } diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl b/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl index c805113dc6c2..61024be7867d 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl @@ -2,3 +2,4 @@ {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}} {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}} {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl b/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl index 8aa852183061..f3a225756ced 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl @@ -3,3 +3,4 @@ {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}} // Invalid columns are nulled out (i.e. SQL null, not JSON null) {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl b/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl index 80fac124d28d..4012c086a9e6 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl @@ -3,3 +3,4 @@ {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} // Invalid data is still allowed in the raw table. {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}} diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl b/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl index 6e9258bab255..0989dfc17ed0 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl @@ -2,6 +2,7 @@ {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}} {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}} {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} {"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}} {"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Bob", "address": {"city": "New York", "state": "NY"}} diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl b/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl index 13c59b2f9912..fd2a4b3adbf3 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl @@ -1,3 +1,4 @@ {"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}} // Delete Bob, keep Charlie {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl b/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl index 32a7e57b1c14..2f634c6ad4e9 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl @@ -3,6 +3,7 @@ {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}} // And append the records from the second sync {"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}} {"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}} diff --git a/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/PostgresContainerFactory.java b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/PostgresContainerFactory.java index 1a19f9eb50d5..60e588214f72 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/PostgresContainerFactory.java +++ b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/PostgresContainerFactory.java @@ -16,17 +16,11 @@ * TODO: This class is a copy from source-postgres:testFixtures. Eventually merge into a common * fixtures module. */ -public class PostgresContainerFactory implements ContainerFactory> { +public class PostgresContainerFactory extends ContainerFactory> { @Override - public PostgreSQLContainer createNewContainer(DockerImageName imageName) { + protected PostgreSQLContainer createNewContainer(DockerImageName imageName) { return new PostgreSQLContainer<>(imageName.asCompatibleSubstituteFor("postgres")); - - } - - @Override - public Class getContainerClass() { - return PostgreSQLContainer.class; } /** diff --git a/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java new file mode 100644 index 000000000000..50b1da44fa6c --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.postgres.typing_deduping; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.cdk.db.JdbcCompatibleSourceOperations; +import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcTypingDedupingTest; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; +import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteMessage.Type; +import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import io.airbyte.protocol.models.v0.AirbyteStream; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.v0.DestinationSyncMode; +import io.airbyte.protocol.models.v0.SyncMode; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import org.junit.jupiter.api.Test; + +public abstract class AbstractPostgresTypingDedupingTest extends JdbcTypingDedupingTest { + + private static final int DEFAULT_VARCHAR_LIMIT_IN_JDBC_GEN = 65535; + + private static final Random RANDOM = new Random(); + + private String generateBigString() { + // Generate exactly 2 chars over the limit + final int length = DEFAULT_VARCHAR_LIMIT_IN_JDBC_GEN + 2; + return RANDOM + .ints('a', 'z' + 1) + .limit(length) + .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append) + .toString(); + } + + @Override + protected SqlGenerator getSqlGenerator() { + return new PostgresSqlGenerator(new PostgresSQLNameTransformer()); + } + + @Override + protected JdbcCompatibleSourceOperations getSourceOperations() { + return new PostgresSourceOperations(); + } + + @Test + public void testMixedCasedSchema() throws Exception { + streamName = "MixedCaseSchema" + streamName; + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.FULL_REFRESH) + .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) + .withStream(new AirbyteStream() + .withNamespace(streamNamespace) + .withName(streamName) + .withJsonSchema(SCHEMA)))); + + // First sync + final List messages1 = readMessages("dat/sync1_messages.jsonl"); + + runSync(catalog, messages1); + + final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl"); + verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison()); + } + + @Override + protected List dumpRawTableRecords(String streamNamespace, String streamName) throws Exception { + return super.dumpRawTableRecords(streamNamespace, streamName.toLowerCase()); + } + + @Test + public void testVarcharLimitOver64K() throws Exception { + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.FULL_REFRESH) + .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) + .withStream(new AirbyteStream() + .withNamespace(streamNamespace) + .withName(streamName) + .withJsonSchema(SCHEMA)))); + + final AirbyteMessage message = new AirbyteMessage(); + final String largeString = generateBigString(); + final Map data = ImmutableMap.of( + "id1", 1, + "id2", 200, + "updated_at", "2021-01-01T00:00:00Z", + "name", largeString); + message.setType(Type.RECORD); + message.setRecord(new AirbyteRecordMessage() + .withNamespace(streamNamespace) + .withStream(streamName) + .withData(Jsons.jsonNode(data)) + .withEmittedAt(1000L)); + final List messages1 = new ArrayList<>(); + messages1.add(message); + runSync(catalog, messages1); + + // Only assert on the large varchar string landing in final table. + // Rest of the fields' correctness is tested by other means in other tests. + final List actualFinalRecords = dumpFinalTableRecords(streamNamespace, streamName); + assertEquals(1, actualFinalRecords.size()); + assertEquals(largeString, actualFinalRecords.get(0).get("name").asText()); + + } + +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSourceOperations.java b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSourceOperations.java new file mode 100644 index 000000000000..997aa66bf2a2 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSourceOperations.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.postgres.typing_deduping; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.cdk.db.jdbc.JdbcSourceOperations; +import io.airbyte.commons.json.Jsons; +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * See + * {@link io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGeneratorIntegrationTest.RedshiftSourceOperations}. + * Copied here to avoid weird dependencies. + */ +public class PostgresSourceOperations extends JdbcSourceOperations { + + @Override + public void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { + final String columnName = resultSet.getMetaData().getColumnName(colIndex); + final String columnTypeName = resultSet.getMetaData().getColumnTypeName(colIndex).toLowerCase(); + + switch (columnTypeName) { + // JSONB has no equivalent in JDBCType + case "jsonb" -> json.set(columnName, Jsons.deserializeExact(resultSet.getString(colIndex))); + // For some reason, the driver maps these to their timezoneless equivalents (TIME and TIMESTAMP) + case "timetz" -> putTimeWithTimezone(json, columnName, resultSet, colIndex); + case "timestamptz" -> putTimestampWithTimezone(json, columnName, resultSet, colIndex); + default -> super.copyToJsonField(resultSet, colIndex, json); + } + } + +} diff --git a/docs/integrations/destinations/postgres-migrations.md b/docs/integrations/destinations/postgres-migrations.md new file mode 100644 index 000000000000..7e9d1a5ba3dd --- /dev/null +++ b/docs/integrations/destinations/postgres-migrations.md @@ -0,0 +1,14 @@ +# Postgres Migration Guide + +## Upgrading to 2.0.0 + +This version introduces [Destinations V2](/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2), which provides better error handling, incremental delivery of data for large syncs, and improved final table structures. To review the breaking changes, and how to upgrade, see [here](/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading). These changes will likely require updates to downstream dbt / SQL models, which we walk through [here](/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations). Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync. You can manually sync existing connections prior to the next scheduled sync to start the upgrade early. + +Worthy of specific mention, this version includes: + +- Per-record error handling +- Clearer table structure +- Removal of sub-tables for nested properties +- Removal of SCD tables + +Learn more about what's new in Destinations V2 [here](/using-airbyte/core-concepts/typing-deduping). diff --git a/docs/integrations/destinations/postgres.md b/docs/integrations/destinations/postgres.md index 3fa8fd2cb5f7..eb07756183d4 100644 --- a/docs/integrations/destinations/postgres.md +++ b/docs/integrations/destinations/postgres.md @@ -82,9 +82,13 @@ From [Postgres SQL Identifiers syntax](https://www.postgresql.org/docs/9.0/sql-s lower case. - In order to make your applications portable and less error-prone, use consistent quoting with each name (either always quote it or never quote it). -Note, that Airbyte Postgres destination will create tables and schemas using the Unquoted -identifiers when possible or fallback to Quoted Identifiers if the names are containing special -characters. +:::info + +Airbyte Postgres destination will create raw tables and schemas using the Unquoted +identifiers by replacing any special characters with an underscore. All final tables and their corresponding +columns are created using Quoted identifiers preserving the case sensitivity. + +::: **For Airbyte Cloud:** @@ -148,17 +152,36 @@ following[ sync modes](https://docs.airbyte.com/cloud/core-concepts#connection-s ## Schema map -#### Output Schema +### Output Schema (Raw Tables) -Each stream will be mapped to a separate table in Postgres. Each table will contain 3 columns: +Each stream will be mapped to a separate raw table in Postgres. The default schema in which the raw tables are +created is `airbyte_internal`. This can be overridden in the configuration. +Each table will contain 3 columns: -- `_airbyte_ab_id`: a uuid assigned by Airbyte to each event that is processed. The column type in +- `_airbyte_raw_id`: a uuid assigned by Airbyte to each event that is processed. The column type in Postgres is `VARCHAR`. -- `_airbyte_emitted_at`: a timestamp representing when the event was pulled from the data source. +- `_airbyte_extracted_at`: a timestamp representing when the event was pulled from the data source. The column type in Postgres is `TIMESTAMP WITH TIME ZONE`. +- `_airbyte_loaded_at`: a timestamp representing when the row was processed into final table. + The column type in Postgres is `TIMESTAMP WITH TIME ZONE`. - `_airbyte_data`: a json blob representing with the event data. The column type in Postgres is `JSONB`. +### Final Tables Data type mapping +| Airbyte Type | Postgres Type | +|:---------------------------|:-------------------------| +| string | VARCHAR | +| number | DECIMAL | +| integer | BIGINT | +| boolean | BOOLEAN | +| object | JSONB | +| array | JSONB | +| timestamp_with_timezone | TIMESTAMP WITH TIME ZONE | +| timestamp_without_timezone | TIMESTAMP | +| time_with_timezone | TIME WITH TIME ZONE | +| time_without_timezone | TIME | +| date | DATE | + ## Tutorials Now that you have set up the Postgres destination connector, check out the following tutorials: @@ -170,6 +193,7 @@ Now that you have set up the Postgres destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------| +| 2.0.0 | 2024-02-09 | [35042](https://github.com/airbytehq/airbyte/pull/35042) | GA release V2 destinations format. | | 0.6.3 | 2024-02-06 | [34891](https://github.com/airbytehq/airbyte/pull/34891) | Remove varchar limit, use system defaults | | 0.6.2 | 2024-01-30 | [34683](https://github.com/airbytehq/airbyte/pull/34683) | CDK Upgrade 0.16.3; Fix dependency mismatches in slf4j lib | | 0.6.1 | 2024-01-29 | [34630](https://github.com/airbytehq/airbyte/pull/34630) | CDK Upgrade; Use lowercase raw table in T+D queries. | diff --git a/docs/release_notes/destinations_v2.js b/docs/release_notes/destinations_v2.js index 9a7cf6a0b0eb..d880e3b5fdf2 100644 --- a/docs/release_notes/destinations_v2.js +++ b/docs/release_notes/destinations_v2.js @@ -148,6 +148,43 @@ INSERT INTO "${raw_schema}".${v2RawTableName} ( ); }; +export const PostgresMigrationGenerator = () => { + // StandardNameTransformer + identifier should start with a letter or an underscore + function postgresConvertStreamName(str) { + str = convertStreamName(str); + if (str.charAt(0).match(/[A-Za-z_]/)) { + return str; + } else { + return "_" + str; + } + } + function generateSql(og_namespace, new_namespace, name, raw_schema) { + let v2RawTableName = + concatenateRawTableName(new_namespace, name).toLowerCase(); + let v1namespace = postgresConvertStreamName(og_namespace); + let v1name = postgresConvertStreamName("_airbyte_raw_" + name).toLowerCase(); + return `CREATE SCHEMA IF NOT EXISTS "${raw_schema}"; +DROP TABLE IF EXISTS "${raw_schema}".${v2RawTableName}; +CREATE TABLE "${raw_schema}".${v2RawTableName} ( + "_airbyte_raw_id" VARCHAR(36) NOT NULL PRIMARY KEY + , "_airbyte_extracted_at" TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP + , "_airbyte_loaded_at" TIMESTAMP WITH TIME ZONE DEFAULT NULL + , "_airbyte_data" JSONB +); +INSERT INTO "${raw_schema}".${v2RawTableName} ( + SELECT + _airbyte_ab_id AS "_airbyte_raw_id", + _airbyte_emitted_at AS "_airbyte_extracted_at", + CAST(NULL AS TIMESTAMP WITH TIME ZONE) AS "_airbyte_loaded_at", + _airbyte_data AS "_airbyte_data" + FROM ${v1namespace}.${v1name} +);`; + } + return ( + + ); +}; + export const MigrationGenerator = ({ destination, generateSql }) => { const defaultMessage = `Enter your stream's name and namespace to see the SQL output. If your stream has no namespace, take the default value from the destination connector's settings.`; diff --git a/docs/release_notes/upgrading_to_destinations_v2.md b/docs/release_notes/upgrading_to_destinations_v2.md index 61f93c060df4..e0f27a394e64 100644 --- a/docs/release_notes/upgrading_to_destinations_v2.md +++ b/docs/release_notes/upgrading_to_destinations_v2.md @@ -1,6 +1,6 @@ import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; -import {SnowflakeMigrationGenerator, BigQueryMigrationGenerator, RedshiftMigrationGenerator} from './destinations_v2.js' +import {SnowflakeMigrationGenerator, BigQueryMigrationGenerator, RedshiftMigrationGenerator, PostgresMigrationGenerator} from './destinations_v2.js' # Upgrading to Destinations V2 @@ -53,6 +53,17 @@ Whenever possible, we've taken this opportunity to use the best data type for st ![Upgrade Path](./assets/airbyte_destinations_v2_upgrade_prompt.png) +:::caution Upgrade Warning + +* The upgrading process entails hydrating the v2 format raw table by querying the v1 raw table through a standard query, such as "INSERT INTO v2_raw_table SELECT * FROM v1_raw_table." +The duration of this process can vary significantly based on the data size and may encounter failures contingent on the Destination's capacity to execute the query. +In some cases, creating a new Airbyte connection, rather than migrating your existing connection, may be faster. Note that in these cases, all data will be re-imported. +* Following the successful migration of v1 raw tables to v2, the v1 raw tables will be dropped. However, it is essential to note that if there are any derived objects (materialized views) or referential +constraints (foreign keys) linked to the old raw table, this operation may encounter failure, resulting in an unsuccessful upgrade or broken derived objects (like materialized views etc). + +If any of the above concerns are applicable to your existing setup, we recommend [Upgrading Connections One by One with Dual-Writing](#upgrading-connections-one-by-one-with-dual-writing) for a more controlled upgrade process +::: + After upgrading the out-of-date destination to a [Destinations V2 compatible version](#destinations-v2-effective-versions), the following will occur at the next sync **for each connection** sending data to the updated destination: 1. Existing raw tables replicated to this destination will be copied to a new `airbyte_internal` schema. @@ -72,7 +83,7 @@ Versions are tied to the destination. When you update the destination, **all con - [Testing Destinations V2 on a Single Connection](#testing-destinations-v2-for-a-single-connection) - [Upgrading Connections One by One Using CDC](#upgrade-paths-for-connections-using-cdc) - [Upgrading as a User of Raw Tables](#upgrading-as-a-user-of-raw-tables) -- [Rolling back to Legacy Normalization](#oss-only-rolling-back-to-legacy-normalization) +- [Rolling back to Legacy Normalization](#open-source-only-rolling-back-to-legacy-normalization) ## Advanced Upgrade Paths @@ -109,6 +120,9 @@ These steps allow you to dual-write for connections incrementally syncing data w + + + 2. Navigate to the existing connection you are duplicating, and navigate to the `Settings` tab. Open the `Advanced` settings to see the connection state (which manages incremental syncs). Copy the state to your clipboard. @@ -146,7 +160,7 @@ As a user previously not running Normalization, Upgrading to Destinations V2 wil For each [CDC-supported](https://docs.airbyte.com/understanding-airbyte/cdc) source connector, we recommend the following: | CDC Source | Recommendation | Notes | -| ---------- | ------------------------------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +|------------|--------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | Postgres | [Upgrade connection in place](#quick-start-to-upgrading) | You can optionally dual write, but this requires resyncing historical data from the source. You must create a new Postgres source with a different replication slot than your existing source to preserve the integrity of your existing connection. | | MySQL | [All above upgrade paths supported](#advanced-upgrade-paths) | You can upgrade the connection in place, or dual write. When dual writing, Airbyte can leverage the state of an existing, active connection to ensure historical data is not re-replicated from MySQL. | @@ -155,11 +169,11 @@ For each [CDC-supported](https://docs.airbyte.com/understanding-airbyte/cdc) sou For each destination connector, Destinations V2 is effective as of the following versions: | Destination Connector | Safe Rollback Version | Destinations V2 Compatible | Upgrade Deadline | -| --------------------- | --------------------- | -------------------------- | ------------------------ | +|-----------------------|-----------------------|----------------------------|--------------------------| | BigQuery | 1.10.2 | 2.0.6+ | November 7, 2023 | | Snowflake | 2.1.7 | 3.1.0+ | November 7, 2023 | | Redshift | 0.8.0 | 2.0.0+ | March 15, 2024 | -| Postgres | 0.4.0 | [coming soon] 2.0.0+ | [coming soon] early 2024 | +| Postgres | 0.6.3 | 2.0.0+ | May 31, 2024 | | MySQL | 0.2.0 | [coming soon] 2.0.0+ | [coming soon] early 2024 | Note that legacy normalization will be deprecated for ClickHouse, DuckDB, MSSQL, TiDB, and Oracle DB in early 2024. If you wish to add Destinations V2 capability to these destinations, please reference our implementation guide (coming soon).