Skip to content

Commit

Permalink
Destination Postgres: DV2 Beta (#34372)
Browse files Browse the repository at this point in the history
## What
* Adding postgres DV2 beta launch option
  • Loading branch information
gisripa authored Jan 22, 2024
1 parent 2570ec9 commit 6b4f215
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 0.5.5
dockerImageTag: 0.6.0
dockerRepository: airbyte/destination-postgres-strict-encrypt
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand All @@ -15,10 +15,8 @@ data:
normalizationTag: 0.4.1
registries:
cloud:
dockerImageTag: 0.4.0
enabled: false
oss:
dockerImageTag: 0.4.0
enabled: false
releaseStage: alpha
supportsDbt: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,18 @@
"type": "string",
"order": 8
},
"use_1s1t_format": {
"type": "boolean",
"description": "(Early Access) Use <a href=\"https://docs.airbyte.com/understanding-airbyte/typing-deduping\" target=\"_blank\">Destinations V2</a>.",
"title": "Use Destinations V2 (Early Access)",
"order": 9
},
"raw_data_schema": {
"type": "string",
"description": "(Early Access) The schema to write raw tables into",
"title": "Destinations V2 Raw Table Schema (Early Access)",
"order": 10
},
"tunnel_method": {
"type": "object",
"title": "SSH Tunnel Method",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 0.5.5
dockerImageTag: 0.6.0
dockerRepository: airbyte/destination-postgres
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.destination.postgres;

import static io.airbyte.cdk.integrations.base.JavaBaseConstants.*;

import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations;
Expand All @@ -17,6 +19,7 @@
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;

Expand Down Expand Up @@ -49,8 +52,11 @@ protected void insertRecordsInternalV2(final JdbcDatabase database,
final String schemaName,
final String tableName)
throws Exception {
// idk apparently this just works
insertRecordsInternal(database, records, schemaName, tableName);
insertRecordsInternal(database, records, schemaName, tableName,
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_DATA,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_LOADED_AT);
}

@Override
Expand All @@ -59,18 +65,28 @@ public void insertRecordsInternal(final JdbcDatabase database,
final String schemaName,
final String tmpTableName)
throws SQLException {
insertRecordsInternal(database, records, schemaName, tmpTableName, COLUMN_NAME_AB_ID, COLUMN_NAME_DATA, COLUMN_NAME_EMITTED_AT);
}

private void insertRecordsInternal(final JdbcDatabase database,
final List<PartialAirbyteMessage> records,
final String schemaName,
final String tmpTableName,
final String... columnNames)
throws SQLException {
if (records.isEmpty()) {
return;
}

// Explicitly passing column order to avoid order mismatches between CREATE TABLE and COPY statement
final String orderedColumnNames = StringUtils.join(columnNames, ", ");
database.execute(connection -> {
File tmpFile = null;
try {
tmpFile = Files.createTempFile(tmpTableName + "-", ".tmp").toFile();
writeBatchToFile(tmpFile, records);

final var copyManager = new CopyManager(connection.unwrap(BaseConnection.class));
final var sql = String.format("COPY %s.%s FROM stdin DELIMITER ',' CSV", schemaName, tmpTableName);
final var sql = String.format("COPY %s.%s (%s) FROM stdin DELIMITER ',' CSV", schemaName, tmpTableName, orderedColumnNames);
final var bufferedReader = new BufferedReader(new FileReader(tmpFile, StandardCharsets.UTF_8));
copyManager.copyIn(sql, bufferedReader);
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,18 @@
"title": "JDBC URL Params",
"type": "string",
"order": 8
},
"use_1s1t_format": {
"type": "boolean",
"description": "(Early Access) Use <a href=\"https://docs.airbyte.com/understanding-airbyte/typing-deduping\" target=\"_blank\">Destinations V2</a>.",
"title": "Use Destinations V2 (Early Access)",
"order": 9
},
"raw_data_schema": {
"type": "string",
"description": "(Early Access) The schema to write raw tables into",
"title": "Destinations V2 Raw Table Schema (Early Access)",
"order": 10
}
}
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ Now that you have set up the Postgres destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------|
| 0.6.0 | 2024-01-19 | [34372](https://github.com/airbytehq/airbyte/pull/34372) | Add dv2 flag in spec |
| 0.5.5 | 2024-01-18 | [34236](https://github.com/airbytehq/airbyte/pull/34236) | Upgrade CDK to 0.13.1; Add indexes in raw table for query optimization |
| 0.5.4 | 2024-01-11 | [34177](https://github.com/airbytehq/airbyte/pull/34177) | Add code for DV2 beta (no user-visible changes) |
| 0.5.3 | 2024-01-10 | [34135](https://github.com/airbytehq/airbyte/pull/34135) | Use published CDK missed in previous release |
Expand Down

0 comments on commit 6b4f215

Please sign in to comment.