Skip to content

Commit

Permalink
binary id
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich committed May 9, 2024
1 parent 82e28bf commit 3e8edea
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.source.mongodb;

import static io.airbyte.integrations.source.mongodb.state.InitialSnapshotStatus.IN_PROGRESS;
import static java.util.Base64.getEncoder;

import com.google.common.collect.AbstractIterator;
import com.mongodb.client.MongoCollection;
Expand All @@ -15,14 +16,15 @@
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.source.mongodb.state.IdType;
import io.airbyte.integrations.source.mongodb.state.MongoDbStreamState;

import java.util.Base64;
import java.util.Optional;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonInt64;
import org.bson.BsonObjectId;
import org.bson.BsonString;
import org.bson.Document;
import java.util.UUID;

import org.bson.*;
import org.bson.conversions.Bson;
import org.bson.internal.UuidHelper;
import org.bson.types.Binary;
import org.bson.types.ObjectId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -83,9 +85,24 @@ protected Document computeNext() {
}

private Optional<MongoDbStreamState> getCurrentState(Object currentId) {
LOGGER.info("*** Current ID : {} {}", currentId, currentId.getClass().getSimpleName());
final var idType = IdType.findByJavaType(currentId.getClass().getSimpleName())
.orElseThrow(() -> new ConfigErrorException("Unsupported _id type " + currentId.getClass().getSimpleName()));
final var state = new MongoDbStreamState(currentId.toString(),
// final var id = (idType == IdType.BINARY) ? java.util.Base64.getEncoder().encodeToString(((Binary)currentId).getData()) : currentId.toString();
final String id;
if (idType == IdType.BINARY) {
final var binLastId = (Binary) currentId;
if (binLastId.getType() == 4) {
id = UuidHelper.decodeBinaryToUuid(binLastId.getData(), binLastId.getType(), UuidRepresentation.STANDARD).toString();
} else {
id = getEncoder().encodeToString(binLastId.getData());
}
} else {
id = currentId.toString();
}

LOGGER.info("*** id {}", id);
final var state = new MongoDbStreamState(id,
IN_PROGRESS,
idType);
return Optional.of(state);
Expand All @@ -100,6 +117,7 @@ public void close() throws Exception {

private MongoCursor<Document> buildNewQueryIterator() {
Bson filter = buildFilter();
LOGGER.info("*** filter {}", filter);
return isEnforceSchema ? collection.find()
.filter(filter)
.projection(fields)
Expand Down Expand Up @@ -130,6 +148,17 @@ private Bson buildFilter() {
case OBJECT_ID -> new BsonObjectId(new ObjectId(state.id()));
case INT -> new BsonInt32(Integer.parseInt(state.id()));
case LONG -> new BsonInt64(Long.parseLong(state.id()));
case BINARY -> {
try {
final var uuid = UUID.fromString(state.id());
LOGGER.info("*** UUID {}", uuid);
// yield new BsonBinary(BsonBinarySubType.UUID_STANDARD, UuidHelper.encodeUuidToBinary(uuid, UuidRepresentation.STANDARD));
yield new BsonBinary(uuid);
} catch (final Exception ex) {
LOGGER.info("*** Base64 {}", Base64.getDecoder().decode(state.id()));
yield new BsonBinary(Base64.getDecoder().decode(state.id()));
}
}
}))
// if nothing was found, return a new BsonDocument
.orElseGet(BsonDocument::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@

package io.airbyte.integrations.source.mongodb.state;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.bson.types.ObjectId;
Expand All @@ -21,7 +18,9 @@ public enum IdType {
OBJECT_ID("objectId", "ObjectId", ObjectId::new),
STRING("string", "String", s -> s),
INT("int", "Integer", Integer::valueOf),
LONG("long", "Long", Long::valueOf);
LONG("long", "Long", Long::valueOf),
// BINARY("binData", "Binary",UUID::fromString);
BINARY("binData", "Binary", s -> s);

private static final Map<String, IdType> byBsonType = new HashMap<>();
static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static io.airbyte.integrations.source.mongodb.state.InitialSnapshotStatus.FULL_REFRESH;
import static io.airbyte.integrations.source.mongodb.state.InitialSnapshotStatus.IN_PROGRESS;
import static io.airbyte.protocol.models.v0.SyncMode.INCREMENTAL;
import static java.util.Base64.getEncoder;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -23,13 +24,12 @@
import io.airbyte.protocol.models.v0.*;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;
import org.bson.Document;
import org.bson.UuidRepresentation;
import org.bson.internal.UuidHelper;
import org.bson.types.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -206,6 +206,7 @@ private List<AirbyteStreamState> generateStreamStateList(final Map<AirbyteStream

private AirbyteStreamState generateStreamState(final AirbyteStreamNameNamespacePair airbyteStreamNameNamespacePair,
final MongoDbStreamState streamState) {
LOGGER.info("*** {} to {}", streamState, Jsons.jsonNode(streamState));
return new AirbyteStreamState()
.withStreamDescriptor(
new StreamDescriptor()
Expand Down Expand Up @@ -302,7 +303,19 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream
final var finalStateStatus = InitialSnapshotStatus.COMPLETE;
final var idType = IdType.findByJavaType(lastId.getClass().getSimpleName())
.orElseThrow(() -> new ConfigErrorException("Unsupported _id type " + lastId.getClass().getSimpleName()));
final var state = new MongoDbStreamState(lastId.toString(), finalStateStatus, idType);
// final var id = idType == IdType.BINARY ? java.util.Base64.getEncoder().encodeToString(((org.bson.types.Binary) lastId).getData()) : lastId.toString();
String id;
if (idType == IdType.BINARY) {
final var binLastId = (Binary) lastId;
if (binLastId.getType() == 4) {
id = UuidHelper.decodeBinaryToUuid(binLastId.getData(), binLastId.getType(), UuidRepresentation.STANDARD).toString();
} else {
id = getEncoder().encodeToString(binLastId.getData());
}
} else {
id = lastId.toString();
}
final var state = new MongoDbStreamState(id, finalStateStatus, idType);

updateStreamState(stream.getStream().getName(), stream.getStream().getNamespace(), state);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@

package io.airbyte.integrations.source.mongodb.state;

public record MongoDbStreamState(String id, InitialSnapshotStatus status, IdType idType) {
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Takes a value converting it to the appropriate MongoDb type based on the IdType of this record.
*
* @param value the value to convert
* @return a converted value.
*/
public Object idTypeAsMongoDbType(final String value) {
return idType.convert(value);
}
public record MongoDbStreamState(String id, InitialSnapshotStatus status, IdType idType) {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbStreamState.class);
// /**
// * Takes a value converting it to the appropriate MongoDb type based on the IdType of this record.
// *
// * @param value the value to convert
// * @return a converted value.
// */
// public Object idTypeAsMongoDbType(final String value) {
// LOGGER.info("***{} converted to {}", value , idType.convert(value));
// return idType.convert(value);
// }

}

0 comments on commit 3e8edea

Please sign in to comment.