From 6b4f215c6d80ec970fdde9f304d4a28b3e920c98 Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Mon, 22 Jan 2024 09:07:13 -0800 Subject: [PATCH] Destination Postgres: DV2 Beta (#34372) ## What * Adding postgres DV2 beta launch option --- .../metadata.yaml | 4 +--- .../src/test/resources/expected_spec.json | 12 ++++++++++ .../destination-postgres/metadata.yaml | 2 +- .../postgres/PostgresSqlOperations.java | 24 +++++++++++++++---- .../src/main/resources/spec.json | 12 ++++++++++ docs/integrations/destinations/postgres.md | 1 + 6 files changed, 47 insertions(+), 8 deletions(-) diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml index 573dbcf99fd9..f605bec22b5d 100644 --- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 25c5221d-dce2-4163-ade9-739ef790f503 - dockerImageTag: 0.5.5 + dockerImageTag: 0.6.0 dockerRepository: airbyte/destination-postgres-strict-encrypt documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres githubIssueLabel: destination-postgres @@ -15,10 +15,8 @@ data: normalizationTag: 0.4.1 registries: cloud: - dockerImageTag: 0.4.0 enabled: false oss: - dockerImageTag: 0.4.0 enabled: false releaseStage: alpha supportsDbt: true diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test/resources/expected_spec.json b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test/resources/expected_spec.json index 5410a917e982..0b366183eb52 100644 --- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test/resources/expected_spec.json +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test/resources/expected_spec.json @@ -209,6 +209,18 @@ "type": "string", "order": 8 }, + "use_1s1t_format": { + "type": "boolean", + "description": "(Early Access) Use Destinations V2.", + "title": "Use Destinations V2 (Early Access)", + "order": 9 + }, + "raw_data_schema": { + "type": "string", + "description": "(Early Access) The schema to write raw tables into", + "title": "Destinations V2 Raw Table Schema (Early Access)", + "order": 10 + }, "tunnel_method": { "type": "object", "title": "SSH Tunnel Method", diff --git a/airbyte-integrations/connectors/destination-postgres/metadata.yaml b/airbyte-integrations/connectors/destination-postgres/metadata.yaml index b0983ff2e291..71253170afd0 100644 --- a/airbyte-integrations/connectors/destination-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/destination-postgres/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 25c5221d-dce2-4163-ade9-739ef790f503 - dockerImageTag: 0.5.5 + dockerImageTag: 0.6.0 dockerRepository: airbyte/destination-postgres documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres githubIssueLabel: destination-postgres diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java index 01e4904b5684..210cc6d9bc4d 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.destination.postgres; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.*; + import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag; import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations; @@ -17,6 +19,7 @@ import java.sql.SQLException; import java.util.Collections; import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.postgresql.copy.CopyManager; import org.postgresql.core.BaseConnection; @@ -49,8 +52,11 @@ protected void insertRecordsInternalV2(final JdbcDatabase database, final String schemaName, final String tableName) throws Exception { - // idk apparently this just works - insertRecordsInternal(database, records, schemaName, tableName); + insertRecordsInternal(database, records, schemaName, tableName, + COLUMN_NAME_AB_RAW_ID, + COLUMN_NAME_DATA, + COLUMN_NAME_AB_EXTRACTED_AT, + COLUMN_NAME_AB_LOADED_AT); } @Override @@ -59,10 +65,20 @@ public void insertRecordsInternal(final JdbcDatabase database, final String schemaName, final String tmpTableName) throws SQLException { + insertRecordsInternal(database, records, schemaName, tmpTableName, COLUMN_NAME_AB_ID, COLUMN_NAME_DATA, COLUMN_NAME_EMITTED_AT); + } + + private void insertRecordsInternal(final JdbcDatabase database, + final List records, + final String schemaName, + final String tmpTableName, + final String... columnNames) + throws SQLException { if (records.isEmpty()) { return; } - + // Explicitly passing column order to avoid order mismatches between CREATE TABLE and COPY statement + final String orderedColumnNames = StringUtils.join(columnNames, ", "); database.execute(connection -> { File tmpFile = null; try { @@ -70,7 +86,7 @@ public void insertRecordsInternal(final JdbcDatabase database, writeBatchToFile(tmpFile, records); final var copyManager = new CopyManager(connection.unwrap(BaseConnection.class)); - final var sql = String.format("COPY %s.%s FROM stdin DELIMITER ',' CSV", schemaName, tmpTableName); + final var sql = String.format("COPY %s.%s (%s) FROM stdin DELIMITER ',' CSV", schemaName, tmpTableName, orderedColumnNames); final var bufferedReader = new BufferedReader(new FileReader(tmpFile, StandardCharsets.UTF_8)); copyManager.copyIn(sql, bufferedReader); } catch (final Exception e) { diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json index e310cb5a10f0..cf067a7338d9 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json @@ -215,6 +215,18 @@ "title": "JDBC URL Params", "type": "string", "order": 8 + }, + "use_1s1t_format": { + "type": "boolean", + "description": "(Early Access) Use Destinations V2.", + "title": "Use Destinations V2 (Early Access)", + "order": 9 + }, + "raw_data_schema": { + "type": "string", + "description": "(Early Access) The schema to write raw tables into", + "title": "Destinations V2 Raw Table Schema (Early Access)", + "order": 10 } } } diff --git a/docs/integrations/destinations/postgres.md b/docs/integrations/destinations/postgres.md index b8fffc91ab93..05b845a14bc0 100644 --- a/docs/integrations/destinations/postgres.md +++ b/docs/integrations/destinations/postgres.md @@ -170,6 +170,7 @@ Now that you have set up the Postgres destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------| +| 0.6.0 | 2024-01-19 | [34372](https://github.com/airbytehq/airbyte/pull/34372) | Add dv2 flag in spec | | 0.5.5 | 2024-01-18 | [34236](https://github.com/airbytehq/airbyte/pull/34236) | Upgrade CDK to 0.13.1; Add indexes in raw table for query optimization | | 0.5.4 | 2024-01-11 | [34177](https://github.com/airbytehq/airbyte/pull/34177) | Add code for DV2 beta (no user-visible changes) | | 0.5.3 | 2024-01-10 | [34135](https://github.com/airbytehq/airbyte/pull/34135) | Use published CDK missed in previous release |