diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbCdcEventUtils.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbCdcEventUtils.java index ea3b4985d11c..69996fc903f3 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbCdcEventUtils.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbCdcEventUtils.java @@ -151,7 +151,7 @@ private static ObjectNode readDocument(final BsonReader reader, } else if (ARRAY.equals(fieldType)) { jsonNodes.set(fieldName, readArray(reader, includedFields, fieldName)); } else { - readField(reader, jsonNodes, includedFields, fieldName, fieldType, false); + readField(reader, jsonNodes, fieldName, fieldType); } transformToStringIfMarked(jsonNodes, includedFields, fieldName); } else { @@ -176,7 +176,7 @@ private static JsonNode readArray(final BsonReader reader, final Set col // recursion is used to read inner array elements.add(readArray(reader, columnNames, fieldName)); } else { - final var element = readField(reader, (ObjectNode) Jsons.jsonNode(Collections.emptyMap()), columnNames, fieldName, currentBsonType, true); + final var element = readField(reader, (ObjectNode) Jsons.jsonNode(Collections.emptyMap()), fieldName, currentBsonType); elements.add(element.get(fieldName)); } } @@ -186,30 +186,24 @@ private static JsonNode readArray(final BsonReader reader, final Set col private static ObjectNode readField(final BsonReader reader, final ObjectNode o, - final Set includedFields, final String fieldName, - final BsonType fieldType, - final boolean allowAllFields) { - if (shouldIncludeField(fieldName, includedFields, allowAllFields)) { - switch (fieldType) { - case BOOLEAN -> o.put(fieldName, reader.readBoolean()); - case INT32 -> o.put(fieldName, reader.readInt32()); - case INT64 -> o.put(fieldName, reader.readInt64()); - case DOUBLE -> o.put(fieldName, reader.readDouble()); - case DECIMAL128 -> o.put(fieldName, toDouble(reader.readDecimal128())); - case TIMESTAMP -> o.put(fieldName, DataTypeUtils.toISO8601StringWithMilliseconds(reader.readTimestamp().getValue())); - case DATE_TIME -> o.put(fieldName, DataTypeUtils.toISO8601StringWithMilliseconds(reader.readDateTime())); - case BINARY -> o.put(fieldName, toByteArray(reader.readBinaryData())); - case SYMBOL -> o.put(fieldName, reader.readSymbol()); - case STRING -> o.put(fieldName, reader.readString()); - case OBJECT_ID -> o.put(fieldName, toString(reader.readObjectId())); - case JAVASCRIPT -> o.put(fieldName, reader.readJavaScript()); - case JAVASCRIPT_WITH_SCOPE -> readJavaScriptWithScope(o, reader, fieldName); - case REGULAR_EXPRESSION -> o.put(fieldName, readRegularExpression(reader.readRegularExpression())); - default -> reader.skipValue(); - } - } else { - reader.skipValue(); + final BsonType fieldType) { + switch (fieldType) { + case BOOLEAN -> o.put(fieldName, reader.readBoolean()); + case INT32 -> o.put(fieldName, reader.readInt32()); + case INT64 -> o.put(fieldName, reader.readInt64()); + case DOUBLE -> o.put(fieldName, reader.readDouble()); + case DECIMAL128 -> o.put(fieldName, toDouble(reader.readDecimal128())); + case TIMESTAMP -> o.put(fieldName, DataTypeUtils.toISO8601StringWithMilliseconds(reader.readTimestamp().getValue())); + case DATE_TIME -> o.put(fieldName, DataTypeUtils.toISO8601StringWithMilliseconds(reader.readDateTime())); + case BINARY -> o.put(fieldName, toByteArray(reader.readBinaryData())); + case SYMBOL -> o.put(fieldName, reader.readSymbol()); + case STRING -> o.put(fieldName, reader.readString()); + case OBJECT_ID -> o.put(fieldName, toString(reader.readObjectId())); + case JAVASCRIPT -> o.put(fieldName, reader.readJavaScript()); + case JAVASCRIPT_WITH_SCOPE -> readJavaScriptWithScope(o, reader, fieldName); + case REGULAR_EXPRESSION -> o.put(fieldName, readRegularExpression(reader.readRegularExpression())); + default -> reader.skipValue(); } return o; diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbCdcEventUtilsTest.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbCdcEventUtilsTest.java index e97cb22ce874..4e83d2f023d0 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbCdcEventUtilsTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbCdcEventUtilsTest.java @@ -93,7 +93,8 @@ void testTransformDataTypes() { .append("field12", new BsonJavaScript("code")) .append("field13", new BsonJavaScriptWithScope("code2", new BsonDocument("scope", new BsonString("scope")))) .append("field14", new BsonRegularExpression("pattern")) - .append("field15", new BsonNull()); + .append("field15", new BsonNull()) + .append("field16", new Document("key", "value")); final String documentAsJson = document.toJson(); final ObjectNode transformed = MongoDbCdcEventUtils.transformDataTypes(documentAsJson, document.keySet()); @@ -116,6 +117,7 @@ void testTransformDataTypes() { assertEquals("scope", transformed.get("field13").get("scope").get("scope").asText()); assertEquals("pattern", transformed.get("field14").asText()); assertFalse(transformed.has("field15")); + assertEquals("value", transformed.get("field16").get("key").asText()); } @Test @@ -137,7 +139,8 @@ void testTransformDataTypesWithFilteredFields() { .append("field12", new BsonJavaScript("code")) .append("field13", new BsonJavaScriptWithScope("code2", new BsonDocument("scope", new BsonString("scope")))) .append("field14", new BsonRegularExpression("pattern")) - .append("field15", new BsonNull()); + .append("field15", new BsonNull()) + .append("field16", new Document("key", "value")); final String documentAsJson = document.toJson(); final ObjectNode transformed = MongoDbCdcEventUtils.transformDataTypes(documentAsJson, Set.of("field1", "field2", "field3")); @@ -159,6 +162,7 @@ void testTransformDataTypesWithFilteredFields() { assertFalse(transformed.has("field13")); assertFalse(transformed.has("field14")); assertFalse(transformed.has("field15")); + assertFalse(transformed.has("field16")); } } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile b/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile index 82ed0b2b386e..6d8d9fbe1df7 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile +++ b/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile @@ -24,5 +24,5 @@ ENV APPLICATION source-mongodb-v2 COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.0 +LABEL io.airbyte.version=1.0.1 LABEL io.airbyte.name=airbyte/source-mongodb-v2 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml index 756d8aa48b77..ce4ed529f235 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: source definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e - dockerImageTag: 1.0.0 + dockerImageTag: 1.0.1 dockerRepository: airbyte/source-mongodb-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2 githubIssueLabel: source-mongodb-v2 diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index 7a06de7f1081..0f0f3b40a2b3 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -190,6 +190,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------| :-------------------------------------------------------------------------------------------------------- | +| 1.0.1 | 2023-10-03 | [31034](https://github.com/airbytehq/airbyte/pull/31034) | Fix field filtering logic related to nested documents | 1.0.0 | 2023-10-03 | [29969](https://github.com/airbytehq/airbyte/pull/29969) | General availability release using Change Data Capture (CDC) | | 0.2.5 | 2023-07-27 | [28815](https://github.com/airbytehq/airbyte/pull/28815) | Revert back to version 0.2.0 | | 0.2.4 | 2023-07-26 | [28760](https://github.com/airbytehq/airbyte/pull/28760) | Fix bug preventing some syncs from succeeding when collecting stats |