Skip to content

Commit

Permalink
✨ Source MongoDB V2: Fix nested object filtering (#31034)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev authored Oct 3, 2023
1 parent 4c251c1 commit 5d6597e
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private static ObjectNode readDocument(final BsonReader reader,
} else if (ARRAY.equals(fieldType)) {
jsonNodes.set(fieldName, readArray(reader, includedFields, fieldName));
} else {
readField(reader, jsonNodes, includedFields, fieldName, fieldType, false);
readField(reader, jsonNodes, fieldName, fieldType);
}
transformToStringIfMarked(jsonNodes, includedFields, fieldName);
} else {
Expand All @@ -176,7 +176,7 @@ private static JsonNode readArray(final BsonReader reader, final Set<String> col
// recursion is used to read inner array
elements.add(readArray(reader, columnNames, fieldName));
} else {
final var element = readField(reader, (ObjectNode) Jsons.jsonNode(Collections.emptyMap()), columnNames, fieldName, currentBsonType, true);
final var element = readField(reader, (ObjectNode) Jsons.jsonNode(Collections.emptyMap()), fieldName, currentBsonType);
elements.add(element.get(fieldName));
}
}
Expand All @@ -186,30 +186,24 @@ private static JsonNode readArray(final BsonReader reader, final Set<String> col

private static ObjectNode readField(final BsonReader reader,
final ObjectNode o,
final Set<String> includedFields,
final String fieldName,
final BsonType fieldType,
final boolean allowAllFields) {
if (shouldIncludeField(fieldName, includedFields, allowAllFields)) {
switch (fieldType) {
case BOOLEAN -> o.put(fieldName, reader.readBoolean());
case INT32 -> o.put(fieldName, reader.readInt32());
case INT64 -> o.put(fieldName, reader.readInt64());
case DOUBLE -> o.put(fieldName, reader.readDouble());
case DECIMAL128 -> o.put(fieldName, toDouble(reader.readDecimal128()));
case TIMESTAMP -> o.put(fieldName, DataTypeUtils.toISO8601StringWithMilliseconds(reader.readTimestamp().getValue()));
case DATE_TIME -> o.put(fieldName, DataTypeUtils.toISO8601StringWithMilliseconds(reader.readDateTime()));
case BINARY -> o.put(fieldName, toByteArray(reader.readBinaryData()));
case SYMBOL -> o.put(fieldName, reader.readSymbol());
case STRING -> o.put(fieldName, reader.readString());
case OBJECT_ID -> o.put(fieldName, toString(reader.readObjectId()));
case JAVASCRIPT -> o.put(fieldName, reader.readJavaScript());
case JAVASCRIPT_WITH_SCOPE -> readJavaScriptWithScope(o, reader, fieldName);
case REGULAR_EXPRESSION -> o.put(fieldName, readRegularExpression(reader.readRegularExpression()));
default -> reader.skipValue();
}
} else {
reader.skipValue();
final BsonType fieldType) {
switch (fieldType) {
case BOOLEAN -> o.put(fieldName, reader.readBoolean());
case INT32 -> o.put(fieldName, reader.readInt32());
case INT64 -> o.put(fieldName, reader.readInt64());
case DOUBLE -> o.put(fieldName, reader.readDouble());
case DECIMAL128 -> o.put(fieldName, toDouble(reader.readDecimal128()));
case TIMESTAMP -> o.put(fieldName, DataTypeUtils.toISO8601StringWithMilliseconds(reader.readTimestamp().getValue()));
case DATE_TIME -> o.put(fieldName, DataTypeUtils.toISO8601StringWithMilliseconds(reader.readDateTime()));
case BINARY -> o.put(fieldName, toByteArray(reader.readBinaryData()));
case SYMBOL -> o.put(fieldName, reader.readSymbol());
case STRING -> o.put(fieldName, reader.readString());
case OBJECT_ID -> o.put(fieldName, toString(reader.readObjectId()));
case JAVASCRIPT -> o.put(fieldName, reader.readJavaScript());
case JAVASCRIPT_WITH_SCOPE -> readJavaScriptWithScope(o, reader, fieldName);
case REGULAR_EXPRESSION -> o.put(fieldName, readRegularExpression(reader.readRegularExpression()));
default -> reader.skipValue();
}

return o;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ void testTransformDataTypes() {
.append("field12", new BsonJavaScript("code"))
.append("field13", new BsonJavaScriptWithScope("code2", new BsonDocument("scope", new BsonString("scope"))))
.append("field14", new BsonRegularExpression("pattern"))
.append("field15", new BsonNull());
.append("field15", new BsonNull())
.append("field16", new Document("key", "value"));

final String documentAsJson = document.toJson();
final ObjectNode transformed = MongoDbCdcEventUtils.transformDataTypes(documentAsJson, document.keySet());
Expand All @@ -116,6 +117,7 @@ void testTransformDataTypes() {
assertEquals("scope", transformed.get("field13").get("scope").get("scope").asText());
assertEquals("pattern", transformed.get("field14").asText());
assertFalse(transformed.has("field15"));
assertEquals("value", transformed.get("field16").get("key").asText());
}

@Test
Expand All @@ -137,7 +139,8 @@ void testTransformDataTypesWithFilteredFields() {
.append("field12", new BsonJavaScript("code"))
.append("field13", new BsonJavaScriptWithScope("code2", new BsonDocument("scope", new BsonString("scope"))))
.append("field14", new BsonRegularExpression("pattern"))
.append("field15", new BsonNull());
.append("field15", new BsonNull())
.append("field16", new Document("key", "value"));

final String documentAsJson = document.toJson();
final ObjectNode transformed = MongoDbCdcEventUtils.transformDataTypes(documentAsJson, Set.of("field1", "field2", "field3"));
Expand All @@ -159,6 +162,7 @@ void testTransformDataTypesWithFilteredFields() {
assertFalse(transformed.has("field13"));
assertFalse(transformed.has("field14"));
assertFalse(transformed.has("field15"));
assertFalse(transformed.has("field16"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ ENV APPLICATION source-mongodb-v2

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.version=1.0.1
LABEL io.airbyte.name=airbyte/source-mongodb-v2
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.0.0
dockerImageTag: 1.0.1
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------| :-------------------------------------------------------------------------------------------------------- |
| 1.0.1 | 2023-10-03 | [31034](https://github.com/airbytehq/airbyte/pull/31034) | Fix field filtering logic related to nested documents
| 1.0.0 | 2023-10-03 | [29969](https://github.com/airbytehq/airbyte/pull/29969) | General availability release using Change Data Capture (CDC) |
| 0.2.5 | 2023-07-27 | [28815](https://github.com/airbytehq/airbyte/pull/28815) | Revert back to version 0.2.0 |
| 0.2.4 | 2023-07-26 | [28760](https://github.com/airbytehq/airbyte/pull/28760) | Fix bug preventing some syncs from succeeding when collecting stats |
Expand Down

0 comments on commit 5d6597e

Please sign in to comment.