diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index dd1016731614..d7e10879216a 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -24,5 +24,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=3.1.8 +LABEL io.airbyte.version=3.1.9 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/metadata.yaml b/airbyte-integrations/connectors/source-postgres/metadata.yaml index b3360a9f0b54..ce6939c4bee7 100644 --- a/airbyte-integrations/connectors/source-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/source-postgres/metadata.yaml @@ -6,7 +6,7 @@ data: connectorSubtype: database connectorType: source definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 - dockerImageTag: 3.1.8 + dockerImageTag: 3.1.9 maxSecondsBetweenMessages: 7200 dockerRepository: airbyte/source-postgres githubIssueLabel: source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index e2bf92238a5d..862cad842e78 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -217,7 +217,13 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex); case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex); case ARRAY -> putArray(json, columnName, resultSet, colIndex); - default -> json.put(columnName, value); + default -> { + if (columnInfo.columnType.isArrayType()) { + putArray(json, columnName, resultSet, colIndex); + } else { + json.put(columnName, value); + } + } } } } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresType.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresType.java index 878f5497dbf1..6afda89271ae 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresType.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresType.java @@ -113,6 +113,15 @@ public Integer getVendorTypeNumber() { return type; } + /** + * Returns true if the PostgresType is an array type, false otherwise. + * + * @return true if the PostgresType is an array type, false otherwise. + */ + public boolean isArrayType() { + return type == Types.ARRAY; + } + /** * Returns the {@code JDBCType} that corresponds to the specified {@code Types} value * diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/InitialSyncCtidIterator.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/InitialSyncCtidIterator.java index ef490c34cfad..1f78ee0e9bca 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/InitialSyncCtidIterator.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/InitialSyncCtidIterator.java @@ -283,13 +283,13 @@ public PreparedStatement createCtidQueryStatement(final Connection connection, final PreparedStatement preparedStatement = connection.prepareStatement(sql); preparedStatement.setObject(1, lowerBound.toString()); preparedStatement.setObject(2, upperBound.toString()); - LOGGER.info("Executing query for table {}: {}", tableName, preparedStatement); + LOGGER.info("Executing query for table {}: {} with bindings {} and {}", tableName, sql, lowerBound, upperBound); return preparedStatement; } else { final String sql = "SELECT ctid::text, %s FROM %s WHERE ctid > ?::tid".formatted(wrappedColumnNames, fullTableName); final PreparedStatement preparedStatement = connection.prepareStatement(sql); preparedStatement.setObject(1, lowerBound.toString()); - LOGGER.info("Executing query for table {}: {}", tableName, preparedStatement); + LOGGER.info("Executing query for table {}: {} with binding {}", tableName, sql, lowerBound); return preparedStatement; } } catch (final SQLException e) { diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java index d7dbe4e02ad8..ec343a91d0ad 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java @@ -847,6 +847,17 @@ private void addArraysTestData() { .addExpectedValues("[131070.237689,231072.476596593]") .build()); + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("jsonb_array") + .fullSourceDataType("JSONB[]") + .airbyteType(JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY) + .withItems(JsonSchemaType.builder(JsonSchemaPrimitive.STRING).build()) + .build()) + .addInsertValues("ARRAY['{\"foo\":\"bar\"}'::JSONB, NULL]") + .addExpectedValues("[\"{\\\"foo\\\": \\\"bar\\\"}\",null]") + .build()); + addDataTypeTestData( TestDataHolder.builder() .sourceType("money_array") diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 282f3a06a07b..564d461619fd 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -291,6 +291,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.1.9 | 2023-09-25 | [30534](https://github.com/airbytehq/airbyte/pull/30534) | Fix JSONB[] column type handling bug. | | 3.1.8 | 2023-09-20 | [30125](https://github.com/airbytehq/airbyte/pull/30125) | Improve initial load performance for older versions of PostgreSQL. | | 3.1.7 | 2023-09-05 | [29672](https://github.com/airbytehq/airbyte/pull/29672) | Handle VACUUM happening during initial sync | | 3.1.6 | 2023-08-24 | [29821](https://github.com/airbytehq/airbyte/pull/29821) | Set replication_method display_type to radio, update titles and descriptions, and make CDC the default choice |