From bcd32a0a8b6736879b4d9a288e2890b7e6e9d05c Mon Sep 17 00:00:00 2001 From: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com> Date: Wed, 20 Mar 2024 13:01:53 -0700 Subject: [PATCH] Resumable full refresh source-mongodb (#35845) --- .../source-mongodb-v2/metadata.yaml | 2 +- .../mongodb/InitialSnapshotHandler.java | 27 ++-- .../source/mongodb/MongoCatalogHelper.java | 2 +- .../source/mongodb/MongoDbSource.java | 44 +++++- .../cdc/MongoDbCdcInitialSnapshotUtils.java | 19 +++ .../mongodb/cdc/MongoDbCdcInitializer.java | 34 +++-- .../mongodb/state/InitialSnapshotStatus.java | 6 +- .../mongodb/state/MongoDbStateManager.java | 70 ++++++---- .../mongodb/InitialSnapshotHandlerTest.java | 95 ++++++++++--- .../source/mongodb/MongoDbSourceTest.java | 38 ++---- .../mongodb/MongoDbStateManagerTest.java | 127 +++++++++++++++++- .../cdc/MongoDbCdcInitializerTest.java | 29 ++-- docs/integrations/sources/mongodb-v2.md | 8 ++ 13 files changed, 368 insertions(+), 133 deletions(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml index 18a4f31c2152..48656fbbc361 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: source definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e - dockerImageTag: 1.2.16 + dockerImageTag: 1.3.0 dockerRepository: airbyte/source-mongodb-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2 githubIssueLabel: source-mongodb-v2 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/InitialSnapshotHandler.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/InitialSnapshotHandler.java index 08321ded6853..4930fb43b352 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/InitialSnapshotHandler.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/InitialSnapshotHandler.java @@ -6,11 +6,7 @@ import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; -import com.mongodb.client.model.Accumulators; -import com.mongodb.client.model.Aggregates; -import com.mongodb.client.model.Filters; -import com.mongodb.client.model.Projections; -import com.mongodb.client.model.Sorts; +import com.mongodb.client.model.*; import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator; import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency; import io.airbyte.commons.exceptions.ConfigErrorException; @@ -22,16 +18,10 @@ import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.CatalogHelpers; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.v0.SyncMode; import java.util.ArrayList; import java.util.List; import java.util.Optional; -import org.bson.BsonDocument; -import org.bson.BsonInt32; -import org.bson.BsonInt64; -import org.bson.BsonObjectId; -import org.bson.BsonString; -import org.bson.Document; +import org.bson.*; import org.bson.conversions.Bson; import org.bson.types.ObjectId; import org.slf4j.Logger; @@ -53,15 +43,11 @@ public List> getIterators( final List streams, final MongoDbStateManager stateManager, final MongoDatabase database, - final int checkpointInterval, - final boolean isEnforceSchema) { + final MongoDbSourceConfig config) { + final boolean isEnforceSchema = config.getEnforceSchema(); + final var checkpointInterval = config.getCheckpointInterval(); return streams .stream() - .peek(airbyteStream -> { - if (!airbyteStream.getSyncMode().equals(SyncMode.INCREMENTAL)) - LOGGER.warn("Stream {} configured with unsupported sync mode: {}", airbyteStream.getStream().getName(), airbyteStream.getSyncMode()); - }) - .filter(airbyteStream -> airbyteStream.getSyncMode().equals(SyncMode.INCREMENTAL)) .map(airbyteStream -> { final var collectionName = airbyteStream.getStream().getName(); final var collection = database.getCollection(collectionName); @@ -88,6 +74,9 @@ public List> getIterators( // "where _id > [last saved state] order by _id ASC". // If no state exists, it will create a query akin to "where 1=1 order by _id ASC" final Bson filter = existingState + // Full refresh streams that finished set their id to null + // This tells us to start over + .filter(state -> state.id() != null) .map(state -> Filters.gt(MongoConstants.ID_FIELD, switch (state.idType()) { case STRING -> new BsonString(state.id()); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoCatalogHelper.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoCatalogHelper.java index 37f0c51dd1ba..bb395870efcd 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoCatalogHelper.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoCatalogHelper.java @@ -35,7 +35,7 @@ public class MongoCatalogHelper { /** * The list of supported sync modes for a given stream. */ - public static final List SUPPORTED_SYNC_MODES = List.of(SyncMode.INCREMENTAL); + public static final List SUPPORTED_SYNC_MODES = List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL); /** * Name of the property in the JSON representation of an Airbyte stream that contains the discovered diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSource.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSource.java index 39974e73cec7..487252e7e3e1 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSource.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSource.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.source.mongodb; +import static io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcInitialSnapshotUtils.validateStateSyncMode; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.mongodb.MongoCommandException; @@ -19,9 +21,11 @@ import io.airbyte.commons.util.AutoCloseableIterators; import io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcConnectorMetadataInjector; import io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcInitializer; +import io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcState; import io.airbyte.integrations.source.mongodb.state.MongoDbStateManager; import io.airbyte.protocol.models.v0.*; import java.time.Instant; +import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,6 +130,7 @@ public AutoCloseableIterator read(final JsonNode config, final var stateManager = MongoDbStateManager.createStateManager(state, sourceConfig); if (catalog != null) { + validateStateSyncMode(stateManager, catalog.getStreams()); MongoUtil.checkSchemaModeMismatch(sourceConfig.getEnforceSchema(), stateManager.getCdcState() != null ? stateManager.getCdcState().schema_enforced() : sourceConfig.getEnforceSchema(), catalog); } @@ -133,12 +138,23 @@ public AutoCloseableIterator read(final JsonNode config, try { // WARNING: do not close the client here since it needs to be used by the iterator final MongoClient mongoClient = createMongoClient(sourceConfig); - try { - final var iteratorList = - cdcInitializer.createCdcIterators(mongoClient, cdcMetadataInjector, catalog, - stateManager, emittedAt, sourceConfig); - return AutoCloseableIterators.concatWithEagerClose(iteratorList, AirbyteTraceMessageUtility::emitStreamStatusTrace); + final List fullRefreshStreams = + catalog.getStreams().stream().filter(s -> s.getSyncMode() == SyncMode.FULL_REFRESH).toList(); + final List incrementalStreams = catalog.getStreams().stream().filter(s -> !fullRefreshStreams.contains(s)).toList(); + + List> iterators = new ArrayList<>(); + if (!fullRefreshStreams.isEmpty()) { + LOGGER.info("There are {} Full refresh streams", fullRefreshStreams.size()); + iterators.addAll(createFullRefreshIterators(sourceConfig, mongoClient, fullRefreshStreams, stateManager, emittedAt)); + } + + if (!incrementalStreams.isEmpty()) { + LOGGER.info("There are {} Incremental streams", incrementalStreams.size()); + iterators + .addAll(cdcInitializer.createCdcIterators(mongoClient, cdcMetadataInjector, incrementalStreams, stateManager, emittedAt, sourceConfig)); + } + return AutoCloseableIterators.concatWithEagerClose(iterators, AirbyteTraceMessageUtility::emitStreamStatusTrace); } catch (final Exception e) { mongoClient.close(); throw e; @@ -153,4 +169,22 @@ protected MongoClient createMongoClient(final MongoDbSourceConfig config) { return MongoConnectionUtils.createMongoClient(config); } + List> createFullRefreshIterators(final MongoDbSourceConfig sourceConfig, + final MongoClient mongoClient, + final List streams, + final MongoDbStateManager stateManager, + final Instant emmitedAt) { + final InitialSnapshotHandler initialSnapshotHandler = new InitialSnapshotHandler(); + if (stateManager.getCdcState() == null) { + stateManager.updateCdcState(new MongoDbCdcState(null, sourceConfig.getEnforceSchema())); + } + final List> fullRefreshIterators = initialSnapshotHandler.getIterators( + streams, + stateManager, + mongoClient.getDatabase(sourceConfig.getDatabaseName()), + sourceConfig); + + return fullRefreshIterators; + } + } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitialSnapshotUtils.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitialSnapshotUtils.java index 1e844f4949ca..4387326396ce 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitialSnapshotUtils.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitialSnapshotUtils.java @@ -9,10 +9,12 @@ import com.google.common.collect.Sets; import com.mongodb.client.MongoClient; import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.source.mongodb.MongoUtil; import io.airbyte.integrations.source.mongodb.state.InitialSnapshotStatus; import io.airbyte.integrations.source.mongodb.state.MongoDbStateManager; +import io.airbyte.integrations.source.mongodb.state.MongoDbStreamState; import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage; import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; @@ -38,6 +40,9 @@ public class MongoDbCdcInitialSnapshotUtils { private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbCdcInitialSnapshotUtils.class); private static final Predicate SYNC_MODE_FILTER = c -> SyncMode.INCREMENTAL.equals(c.getSyncMode()); + private static final Map> syncModeToStatusValidationMap = Map.of( + SyncMode.INCREMENTAL, List.of(InitialSnapshotStatus.IN_PROGRESS, InitialSnapshotStatus.COMPLETE), + SyncMode.FULL_REFRESH, List.of(InitialSnapshotStatus.FULL_REFRESH)); /** * Returns the list of configured Airbyte streams that need to perform the initial snapshot portion @@ -130,4 +135,18 @@ private static void estimateInitialSnapshotSyncSize(final MongoClient mongoClien }); } + private static boolean isValidInitialSnapshotStatus(final SyncMode syncMode, final MongoDbStreamState state) { + return syncModeToStatusValidationMap.get(syncMode).contains(state.status()); + } + + public static void validateStateSyncMode(final MongoDbStateManager stateManager, final List streams) { + streams.forEach(stream -> { + final var existingState = stateManager.getStreamState(stream.getStream().getName(), stream.getStream().getNamespace()); + if (existingState.isPresent() && !isValidInitialSnapshotStatus(stream.getSyncMode(), existingState.get())) { + throw new ConfigErrorException("Stream " + stream.getStream().getName() + " is " + stream.getSyncMode() + " but the saved status " + + existingState.get().status() + " doesn't match. Please reset this stream"); + } + }); + } + } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java index 9760b0d4cacc..0c7661bc4b94 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java @@ -78,14 +78,15 @@ public MongoDbCdcInitializer() { public List> createCdcIterators( final MongoClient mongoClient, final MongoDbCdcConnectorMetadataInjector cdcMetadataInjector, - final ConfiguredAirbyteCatalog catalog, + final List streams, final MongoDbStateManager stateManager, final Instant emittedAt, final MongoDbSourceConfig config) { + ConfiguredAirbyteCatalog incrementalOnlyStreamsCatalog = new ConfiguredAirbyteCatalog().withStreams(streams); final Duration firstRecordWaitTime = Duration.ofSeconds(config.getInitialWaitingTimeSeconds()); // #35059: debezium heartbeats are not sent on the expected interval. this is - // a worksaround to allow making subsequent wait time configurable. + // a workaround to allow making subsequent wait time configurable. final Duration subsequentRecordWaitTime = firstRecordWaitTime; LOGGER.info("Subsequent cdc record wait time: {} seconds", subsequentRecordWaitTime); final int queueSize = MongoUtil.getDebeziumEventQueueSize(config); @@ -93,15 +94,17 @@ public List> createCdcIterators( final boolean isEnforceSchema = config.getEnforceSchema(); final Properties defaultDebeziumProperties = MongoDbCdcProperties.getDebeziumProperties(); logOplogInfo(mongoClient); - final BsonDocument initialResumeToken = MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient, databaseName, catalog); + final BsonDocument initialResumeToken = + MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient, databaseName, incrementalOnlyStreamsCatalog); final JsonNode initialDebeziumState = mongoDbDebeziumStateUtil.constructInitialDebeziumState(initialResumeToken, mongoClient, databaseName); - final MongoDbCdcState cdcState = (stateManager.getCdcState() == null || stateManager.getCdcState().state() == null) - ? new MongoDbCdcState(initialDebeziumState, isEnforceSchema) - : new MongoDbCdcState(Jsons.clone(stateManager.getCdcState().state()), stateManager.getCdcState().schema_enforced()); + final MongoDbCdcState cdcState = + (stateManager.getCdcState() == null || stateManager.getCdcState().state() == null || stateManager.getCdcState().state().isNull()) + ? new MongoDbCdcState(initialDebeziumState, isEnforceSchema) + : new MongoDbCdcState(Jsons.clone(stateManager.getCdcState().state()), stateManager.getCdcState().schema_enforced()); final Optional optSavedOffset = mongoDbDebeziumStateUtil.savedOffset( Jsons.clone(defaultDebeziumProperties), - catalog, + incrementalOnlyStreamsCatalog, cdcState.state(), config.getDatabaseConfig(), mongoClient); @@ -131,23 +134,26 @@ public List> createCdcIterators( } final MongoDbCdcState stateToBeUsed = - (!savedOffsetIsValid || stateManager.getCdcState() == null || stateManager.getCdcState().state() == null) - ? new MongoDbCdcState(initialDebeziumState, config.getEnforceSchema()) - : stateManager.getCdcState(); + (!savedOffsetIsValid || stateManager.getCdcState() == null || stateManager.getCdcState().state() == null + || stateManager.getCdcState().state().isNull()) + ? new MongoDbCdcState(initialDebeziumState, config.getEnforceSchema()) + : stateManager.getCdcState(); final List initialSnapshotStreams = - MongoDbCdcInitialSnapshotUtils.getStreamsForInitialSnapshot(mongoClient, stateManager, catalog, savedOffsetIsValid); + MongoDbCdcInitialSnapshotUtils.getStreamsForInitialSnapshot(mongoClient, stateManager, incrementalOnlyStreamsCatalog, savedOffsetIsValid); final InitialSnapshotHandler initialSnapshotHandler = new InitialSnapshotHandler(); final List> initialSnapshotIterators = initialSnapshotHandler.getIterators(initialSnapshotStreams, stateManager, mongoClient.getDatabase(databaseName), - config.getCheckpointInterval(), isEnforceSchema); + config); final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler<>(config.getDatabaseConfig(), new MongoDbCdcTargetPosition(initialResumeToken), false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize, false); final MongoDbCdcStateHandler mongoDbCdcStateHandler = new MongoDbCdcStateHandler(stateManager); final MongoDbCdcSavedInfoFetcher cdcSavedInfoFetcher = new MongoDbCdcSavedInfoFetcher(stateToBeUsed); - final var propertiesManager = new MongoDbDebeziumPropertiesManager(defaultDebeziumProperties, config.getDatabaseConfig(), catalog); - final var eventConverter = new MongoDbDebeziumEventConverter(cdcMetadataInjector, catalog, emittedAt, config.getDatabaseConfig()); + final var propertiesManager = + new MongoDbDebeziumPropertiesManager(defaultDebeziumProperties, config.getDatabaseConfig(), incrementalOnlyStreamsCatalog); + final var eventConverter = + new MongoDbDebeziumEventConverter(cdcMetadataInjector, incrementalOnlyStreamsCatalog, emittedAt, config.getDatabaseConfig()); final Supplier> incrementalIteratorSupplier = () -> handler.getIncrementalIterators( propertiesManager, eventConverter, cdcSavedInfoFetcher, mongoDbCdcStateHandler); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/state/InitialSnapshotStatus.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/state/InitialSnapshotStatus.java index 192ef6607e81..44c4b8bc7adf 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/state/InitialSnapshotStatus.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/state/InitialSnapshotStatus.java @@ -12,5 +12,9 @@ public enum InitialSnapshotStatus { IN_PROGRESS, - COMPLETE + COMPLETE, + // A Full Refresh stream state behaves like In Progress, + // but its value set to null when collection is fully read + // Rather than turning into Complete + FULL_REFRESH } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/state/MongoDbStateManager.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/state/MongoDbStateManager.java index 47f7cd87b657..2a23fc2e9710 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/state/MongoDbStateManager.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/state/MongoDbStateManager.java @@ -4,6 +4,10 @@ package io.airbyte.integrations.source.mongodb.state; +import static io.airbyte.integrations.source.mongodb.state.InitialSnapshotStatus.FULL_REFRESH; +import static io.airbyte.integrations.source.mongodb.state.InitialSnapshotStatus.IN_PROGRESS; +import static io.airbyte.protocol.models.v0.SyncMode.INCREMENTAL; + import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -16,16 +20,8 @@ import io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcConnectorMetadataInjector; import io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcEventUtils; import io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcState; -import io.airbyte.protocol.models.v0.AirbyteGlobalState; -import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.*; import io.airbyte.protocol.models.v0.AirbyteMessage.Type; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; -import io.airbyte.protocol.models.v0.AirbyteStateMessage; -import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; -import io.airbyte.protocol.models.v0.AirbyteStreamState; -import io.airbyte.protocol.models.v0.CatalogHelpers; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.v0.StreamDescriptor; import java.time.Instant; import java.util.HashMap; import java.util.List; @@ -160,6 +156,12 @@ public void updateStreamState(final String streamName, final String streamNamesp pairToStreamState.put(airbyteStreamNameNamespacePair, streamState); } + public void deleteStreamState(final String streamName, final String streamNamespace) { + final AirbyteStreamNameNamespacePair airbyteStreamNameNamespacePair = new AirbyteStreamNameNamespacePair(streamName, streamNamespace); + LOGGER.debug("Deleting stream state for stream {}:{} ...", streamNamespace, streamName); + pairToStreamState.remove(airbyteStreamNameNamespacePair); + } + /** * Resets the state stored in this manager by overwriting the CDC state and clearing the stream * state. @@ -238,14 +240,16 @@ private boolean isValidStreamDescriptor(final StreamDescriptor streamDescriptor) @Override public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirbyteStream stream) { final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()); - + final var syncMode = stream.getSyncMode(); // Assuming we will always process at least 1 record message before sending out the state message. // shouldEmitStateMessage should guard this. var lastId = streamPairToLastIdMap.get(pair); if (lastId != null) { final var idType = IdType.findByJavaType(lastId.getClass().getSimpleName()) .orElseThrow(() -> new ConfigErrorException("Unsupported _id type " + lastId.getClass().getSimpleName())); - final var state = new MongoDbStreamState(lastId.toString(), InitialSnapshotStatus.IN_PROGRESS, idType); + final var state = new MongoDbStreamState(lastId.toString(), + syncMode == INCREMENTAL ? IN_PROGRESS : FULL_REFRESH, + idType); updateStreamState(stream.getStream().getName(), stream.getStream().getNamespace(), state); } return toState(); @@ -260,7 +264,7 @@ public AirbyteMessage processRecordMessage(final ConfiguredAirbyteStream stream, final var jsonNode = isEnforceSchema ? MongoDbCdcEventUtils.toJsonNode(document, fields) : MongoDbCdcEventUtils.toJsonNodeNoSchema(document); - var lastId = document.get(MongoConstants.ID_FIELD); + final var lastId = document.get(MongoConstants.ID_FIELD); final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()); streamPairToLastIdMap.put(pair, lastId); @@ -270,7 +274,7 @@ public AirbyteMessage processRecordMessage(final ConfiguredAirbyteStream stream, .withStream(stream.getStream().getName()) .withNamespace(stream.getStream().getNamespace()) .withEmittedAt(emittedAt.toEpochMilli()) - .withData(injectMetadata(jsonNode))); + .withData((stream.getSyncMode() == INCREMENTAL) ? injectMetadata(jsonNode) : jsonNode)); } private JsonNode injectMetadata(final JsonNode jsonNode) { @@ -286,20 +290,32 @@ private JsonNode injectMetadata(final JsonNode jsonNode) { */ @Override public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream stream) { - final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()); - if (!streamPairToLastIdMap.containsKey(pair)) { - var initialLastId = getStreamState(stream.getStream().getName(), stream.getStream().getNamespace()).map(MongoDbStreamState::id).orElse(null); - streamPairToLastIdMap.put(pair, initialLastId); - } - var lastId = streamPairToLastIdMap.get(pair); - if (lastId != null) { - LOGGER.debug("Emitting final state status for stream {}:{}...", stream.getStream().getNamespace(), stream.getStream().getName()); - final var finalStateStatus = InitialSnapshotStatus.COMPLETE; - final var idType = IdType.findByJavaType(lastId.getClass().getSimpleName()) - .orElseThrow(() -> new ConfigErrorException("Unsupported _id type " + lastId.getClass().getSimpleName())); - final var state = new MongoDbStreamState(lastId.toString(), finalStateStatus, idType); - - updateStreamState(stream.getStream().getName(), stream.getStream().getNamespace(), state); + if (stream.getSyncMode() == INCREMENTAL) { + final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()); + if (!streamPairToLastIdMap.containsKey(pair)) { + var initialLastId = getStreamState(stream.getStream().getName(), stream.getStream().getNamespace()).map(MongoDbStreamState::id).orElse(null); + streamPairToLastIdMap.put(pair, initialLastId); + } + var lastId = streamPairToLastIdMap.get(pair); + if (lastId != null) { + LOGGER.debug("Emitting final state status for stream {}:{}...", stream.getStream().getNamespace(), stream.getStream().getName()); + final var finalStateStatus = InitialSnapshotStatus.COMPLETE; + final var idType = IdType.findByJavaType(lastId.getClass().getSimpleName()) + .orElseThrow(() -> new ConfigErrorException("Unsupported _id type " + lastId.getClass().getSimpleName())); + final var state = new MongoDbStreamState(lastId.toString(), finalStateStatus, idType); + + updateStreamState(stream.getStream().getName(), stream.getStream().getNamespace(), state); + } + } else { + // deleteStreamState(stream.getStream().getName(), stream.getStream().getNamespace()); + final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()); + var lastId = streamPairToLastIdMap.get(pair); + if (lastId != null) { + final var idType = IdType.findByJavaType(lastId.getClass().getSimpleName()) + .orElseThrow(() -> new ConfigErrorException("Unsupported _id type " + lastId.getClass().getSimpleName())); + updateStreamState(stream.getStream().getName(), stream.getStream().getNamespace(), + new MongoDbStreamState(null, FULL_REFRESH, idType)); + } } return toState(); } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/InitialSnapshotHandlerTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/InitialSnapshotHandlerTest.java index 3acc708372a0..d750f188c74a 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/InitialSnapshotHandlerTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/InitialSnapshotHandlerTest.java @@ -26,6 +26,7 @@ import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumConstants; import io.airbyte.integrations.source.mongodb.state.IdType; +import io.airbyte.integrations.source.mongodb.state.InitialSnapshotStatus; import io.airbyte.integrations.source.mongodb.state.MongoDbStateManager; import io.airbyte.integrations.source.mongodb.state.MongoDbStreamState; import io.airbyte.protocol.models.Field; @@ -64,12 +65,15 @@ class InitialSnapshotHandlerTest { private static final String OBJECT_ID1_STRING = "64c0029d95ad260d69ef28a1"; private static final String OBJECT_ID2_STRING = "64c0029d95ad260d69ef28a2"; private static final String OBJECT_ID3_STRING = "64c0029d95ad260d69ef28a3"; + private static final String OBJECT_ID4_STRING = "64c0029d95ad260d69ef28a4"; + private static final String OBJECT_ID5_STRING = "64c0029d95ad260d69ef28a5"; + private static final String OBJECT_ID6_STRING = "64c0029d95ad260d69ef28a6"; private static final ObjectId OBJECT_ID1 = new ObjectId(OBJECT_ID1_STRING); private static final ObjectId OBJECT_ID2 = new ObjectId(OBJECT_ID2_STRING); private static final ObjectId OBJECT_ID3 = new ObjectId(OBJECT_ID3_STRING); - private static final ObjectId OBJECT_ID4 = new ObjectId("64c0029d95ad260d69ef28a4"); - private static final ObjectId OBJECT_ID5 = new ObjectId("64c0029d95ad260d69ef28a5"); - private static final ObjectId OBJECT_ID6 = new ObjectId("64c0029d95ad260d69ef28a6"); + private static final ObjectId OBJECT_ID4 = new ObjectId(OBJECT_ID4_STRING); + private static final ObjectId OBJECT_ID5 = new ObjectId(OBJECT_ID5_STRING); + private static final ObjectId OBJECT_ID6 = new ObjectId(OBJECT_ID6_STRING); private static final String NAME1 = "name1"; private static final String NAME2 = "name2"; @@ -160,13 +164,13 @@ void testGetIteratorsEmptyInitialState() { final MongoDbStateManager ogStateManager = MongoDbStateManager.createStateManager(null, CONFIG); final MongoDbStateManager stateManager = spy(ogStateManager); final List> iterators = - initialSnapshotHandler.getIterators(STREAMS, stateManager, mongoClient.getDatabase(DB_NAME), - MongoConstants.CHECKPOINT_INTERVAL, true); + initialSnapshotHandler.getIterators(STREAMS, stateManager, mongoClient.getDatabase(DB_NAME), CONFIG); - assertEquals(iterators.size(), 2, "Only two streams are configured as incremental, full refresh streams should be ignored"); + assertEquals(iterators.size(), 3); final AutoCloseableIterator collection1 = iterators.get(0); final AutoCloseableIterator collection2 = iterators.get(1); + final AutoCloseableIterator collection3 = iterators.get(2); // collection1 final AirbyteMessage collection1StreamMessage1 = collection1.next(); @@ -217,6 +221,20 @@ void testGetIteratorsEmptyInitialState() { assertEquals(Type.STATE, collection2SateMessage.getType(), "State message is expected after all records in a stream are emitted"); assertFalse(collection2.hasNext()); + + final AirbyteMessage collection3StreamMessage1 = collection3.next(); + assertEquals(Type.RECORD, collection3StreamMessage1.getType()); + assertEquals(COLLECTION3, collection3StreamMessage1.getRecord().getStream()); + assertEquals(OBJECT_ID6.toString(), collection3StreamMessage1.getRecord().getData().get(CURSOR_FIELD).asText()); + // Full refresh record have no cdc fields + assertTrue(collection3StreamMessage1.getRecord().getData().has(CURSOR_FIELD)); + assertFalse(collection3StreamMessage1.getRecord().getData().has(CDC_UPDATED_AT)); + assertFalse(collection3StreamMessage1.getRecord().getData().has(CDC_DELETED_AT)); + assertFalse(collection3StreamMessage1.getRecord().getData().has(CDC_DEFAULT_CURSOR)); + + final AirbyteMessage collection3SateMessage = collection3.next(); + assertEquals(Type.STATE, collection3SateMessage.getType(), "State message is expected after all records in a stream are emitted"); + } @Test @@ -234,19 +252,29 @@ void testGetIteratorsNonEmptyInitialState() { CURSOR_FIELD, OBJECT_ID3, NAME_FIELD, NAME3)))); + insertDocuments(COLLECTION3, List.of( + new Document(Map.of( + CURSOR_FIELD, OBJECT_ID4, + NAME_FIELD, NAME4)), + new Document(Map.of( + CURSOR_FIELD, OBJECT_ID5, + NAME_FIELD, NAME5)))); + final InitialSnapshotHandler initialSnapshotHandler = new InitialSnapshotHandler(); final MongoDbStateManager ogStateManager = MongoDbStateManager.createStateManager(null, CONFIG); final MongoDbStateManager stateManager = spy(ogStateManager); when(stateManager.getStreamState(COLLECTION1, NAMESPACE)) .thenReturn(Optional.of(new MongoDbStreamState(OBJECT_ID1_STRING, null, IdType.OBJECT_ID))); + when(stateManager.getStreamState(COLLECTION3, NAMESPACE)) + .thenReturn(Optional.of(new MongoDbStreamState(OBJECT_ID4_STRING, InitialSnapshotStatus.FULL_REFRESH, IdType.OBJECT_ID))); final List> iterators = - initialSnapshotHandler.getIterators(STREAMS, stateManager, mongoClient.getDatabase(DB_NAME), - MongoConstants.CHECKPOINT_INTERVAL, true); + initialSnapshotHandler.getIterators(STREAMS, stateManager, mongoClient.getDatabase(DB_NAME), CONFIG); - assertEquals(iterators.size(), 2, "Only two streams are configured as incremental, full refresh streams should be ignored"); + assertEquals(iterators.size(), 3); final AutoCloseableIterator collection1 = iterators.get(0); final AutoCloseableIterator collection2 = iterators.get(1); + final AutoCloseableIterator collection3 = iterators.get(2); // collection1, first document should be skipped final AirbyteMessage collection1StreamMessage1 = collection1.next(); @@ -274,6 +302,17 @@ void testGetIteratorsNonEmptyInitialState() { assertEquals(Type.STATE, collection2SateMessage.getType(), "State message is expected after all records in a stream are emitted"); assertFalse(collection2.hasNext()); + + // collection3 will skip the first document + final AirbyteMessage collection3StreamMessage1 = collection3.next(); + assertEquals(Type.RECORD, collection3StreamMessage1.getType()); + assertEquals(COLLECTION3, collection3StreamMessage1.getRecord().getStream()); + assertEquals(OBJECT_ID5.toString(), collection3StreamMessage1.getRecord().getData().get(CURSOR_FIELD).asText()); + assertEquals(NAME5, collection3StreamMessage1.getRecord().getData().get(NAME_FIELD).asText()); + + final AirbyteMessage collection3StateMessage = collection3.next(); + assertEquals(Type.STATE, collection3StateMessage.getType(), "State message is expected after all records in a stream are emitted"); + assertFalse(collection3.hasNext()); } @Test @@ -291,7 +330,7 @@ void testGetIteratorsThrowsExceptionWhenThereAreDifferentIdTypes() { final var thrown = assertThrows(ConfigErrorException.class, () -> initialSnapshotHandler.getIterators(STREAMS, stateManager, mongoClient.getDatabase(DB_NAME), - MongoConstants.CHECKPOINT_INTERVAL, true)); + /* MongoConstants.CHECKPOINT_INTERVAL, true */ CONFIG)); assertTrue(thrown.getMessage().contains("must be consistently typed")); } @@ -307,7 +346,7 @@ void testGetIteratorsThrowsExceptionWhenThereAreUnsupportedIdTypes() { final var thrown = assertThrows(ConfigErrorException.class, () -> initialSnapshotHandler.getIterators(STREAMS, stateManager, mongoClient.getDatabase(DB_NAME), - MongoConstants.CHECKPOINT_INTERVAL, true)); + /* MongoConstants.CHECKPOINT_INTERVAL, true */ CONFIG)); assertTrue(thrown.getMessage().contains("_id fields with the following types are currently supported")); } @@ -333,13 +372,13 @@ void testGetIteratorsWithOneEmptyCollection() { final MongoDbStateManager ogStateManager = MongoDbStateManager.createStateManager(null, CONFIG); final MongoDbStateManager stateManager = spy(ogStateManager); final List> iterators = - initialSnapshotHandler.getIterators(STREAMS, stateManager, mongoClient.getDatabase(DB_NAME), - MongoConstants.CHECKPOINT_INTERVAL, true); + initialSnapshotHandler.getIterators(STREAMS, stateManager, mongoClient.getDatabase(DB_NAME), CONFIG); - assertEquals(iterators.size(), 2, "Only two streams are configured as incremental, full refresh streams should be ignored"); + assertEquals(iterators.size(), 3); final AutoCloseableIterator collection1 = iterators.get(0); final AutoCloseableIterator collection2 = iterators.get(1); + final AutoCloseableIterator collection3 = iterators.get(2); // collection1 final AirbyteMessage collection1StreamMessage1 = collection1.next(); @@ -360,6 +399,12 @@ void testGetIteratorsWithOneEmptyCollection() { final AirbyteMessage collection2StateMessage = collection2.next(); assertEquals(Type.STATE, collection2StateMessage.getType(), "State message is expected after all records in a stream are emitted"); assertFalse(collection2.hasNext()); + + // collection3 will generate a final state. + + final AirbyteMessage collection3StateMessage = collection3.next(); + assertEquals(Type.STATE, collection3StateMessage.getType(), "State message is expected after all records in a stream are emitted"); + assertFalse(collection3.hasNext()); } @Test @@ -377,25 +422,31 @@ void testGetIteratorsWithInitialStateNonDefaultIdType() { CURSOR_FIELD, OBJECT_ID3_STRING, NAME_FIELD, NAME3)))); + insertDocuments(COLLECTION3, List.of( + new Document(Map.of( + CURSOR_FIELD, OBJECT_ID4_STRING, + NAME_FIELD, NAME4)))); + final InitialSnapshotHandler initialSnapshotHandler = new InitialSnapshotHandler(); final MongoDbStateManager ogStateManager = MongoDbStateManager.createStateManager(null, CONFIG); final MongoDbStateManager stateManager = spy(ogStateManager); when(stateManager.getStreamState(COLLECTION1, NAMESPACE)) .thenReturn(Optional.of(new MongoDbStreamState(OBJECT_ID1_STRING, null, IdType.STRING))); final List> iterators = - initialSnapshotHandler.getIterators(STREAMS, stateManager, mongoClient.getDatabase(DB_NAME), - MongoConstants.CHECKPOINT_INTERVAL, true); + initialSnapshotHandler.getIterators(STREAMS, stateManager, mongoClient.getDatabase(DB_NAME), CONFIG); - assertEquals(iterators.size(), 2, "Only two streams are configured as incremental, full refresh streams should be ignored"); + assertEquals(iterators.size(), 3); final AutoCloseableIterator collection1 = iterators.get(0); final AutoCloseableIterator collection2 = iterators.get(1); + final AutoCloseableIterator collection3 = iterators.get(2); // collection1, first document should be skipped final AirbyteMessage collection1StreamMessage1 = collection1.next(); System.out.println("message 1: " + collection1StreamMessage1); final AirbyteMessage collection2StreamMessage1 = collection2.next(); System.out.println("message 2: " + collection2StreamMessage1); + final AirbyteMessage collection3StreamMessage1 = collection3.next(); assertEquals(Type.RECORD, collection1StreamMessage1.getType()); assertEquals(COLLECTION1, collection1StreamMessage1.getRecord().getStream()); @@ -420,6 +471,16 @@ void testGetIteratorsWithInitialStateNonDefaultIdType() { assertEquals(Type.STATE, collection2SateMessage.getType(), "State message is expected after all records in a stream are emitted"); assertFalse(collection2.hasNext()); + + // collection3, no documents should be skipped + assertEquals(Type.RECORD, collection3StreamMessage1.getType()); + assertEquals(COLLECTION3, collection3StreamMessage1.getRecord().getStream()); + assertEquals(OBJECT_ID4.toString(), collection3StreamMessage1.getRecord().getData().get(CURSOR_FIELD).asText()); + + final AirbyteMessage collection3SateMessage = collection3.next(); + assertEquals(Type.STATE, collection3SateMessage.getType(), "State message is expected after all records in a stream are emitted"); + + assertFalse(collection3.hasNext()); } } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceTest.java index 6b6f661ebb78..76cf614d04d9 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceTest.java @@ -5,34 +5,17 @@ package io.airbyte.integrations.source.mongodb; import static io.airbyte.integrations.source.mongodb.MongoCatalogHelper.DEFAULT_CURSOR_FIELD; -import static io.airbyte.integrations.source.mongodb.MongoConstants.DATABASE_CONFIG_CONFIGURATION_KEY; -import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_DISCOVER_SAMPLE_SIZE; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static io.airbyte.integrations.source.mongodb.MongoConstants.*; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.mongodb.MongoCredential; import com.mongodb.MongoSecurityException; -import com.mongodb.client.AggregateIterable; -import com.mongodb.client.ChangeStreamIterable; -import com.mongodb.client.MongoChangeStreamCursor; -import com.mongodb.client.MongoClient; -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoCursor; -import com.mongodb.client.MongoDatabase; -import com.mongodb.client.MongoIterable; +import com.mongodb.client.*; import com.mongodb.connection.ClusterDescription; import com.mongodb.connection.ClusterType; import io.airbyte.cdk.integrations.debezium.internals.DebeziumEventConverter; @@ -45,11 +28,7 @@ import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import org.bson.BsonDocument; import org.bson.Document; import org.junit.jupiter.api.BeforeEach; @@ -60,6 +39,7 @@ class MongoDbSourceTest { private static final String DB_NAME = "airbyte_test"; private JsonNode airbyteSourceConfig; + private JsonNode airbyteSourceConfigWithoutSchema; private MongoDbSourceConfig sourceConfig; private MongoClient mongoClient; private MongoDbCdcInitializer cdcInitializer; @@ -68,6 +48,8 @@ class MongoDbSourceTest { @BeforeEach void setup() { airbyteSourceConfig = createConfiguration(Optional.empty(), Optional.empty(), true); + airbyteSourceConfigWithoutSchema = createConfiguration(Optional.empty(), Optional.empty(), false); + sourceConfig = new MongoDbSourceConfig(airbyteSourceConfig); mongoClient = mock(MongoClient.class); cdcInitializer = mock(MongoDbCdcInitializer.class); @@ -302,7 +284,7 @@ void testReadKeepsMongoClientOpen() { when(changeStreamIterable.cursor()).thenReturn(mongoChangeStreamCursor); when(mongoClient.watch(BsonDocument.class)).thenReturn(changeStreamIterable); when(cdcInitializer.createCdcIterators(any(), any(), any(), any(), any(), any())).thenReturn(Collections.emptyList()); - source.read(airbyteSourceConfig, null, null); + source.read(airbyteSourceConfigWithoutSchema, new ConfiguredAirbyteCatalog(), null); verify(mongoClient, never()).close(); } @@ -312,7 +294,7 @@ private static JsonNode createConfiguration(final Optional username, fin MongoConstants.CONNECTION_STRING_CONFIGURATION_KEY, "mongodb://localhost:27017/", MongoConstants.AUTH_SOURCE_CONFIGURATION_KEY, "admin", MongoConstants.DISCOVER_SAMPLE_SIZE_CONFIGURATION_KEY, DEFAULT_DISCOVER_SAMPLE_SIZE, - MongoConstants.SCHEMA_ENFORCED_CONFIGURATION_KEY, isSchemaEnforced); + SCHEMA_ENFORCED_CONFIGURATION_KEY, isSchemaEnforced); final Map config = new HashMap<>(baseConfig); username.ifPresent(u -> config.put(MongoConstants.USERNAME_CONFIGURATION_KEY, u)); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbStateManagerTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbStateManagerTest.java index d830fd53acab..bf9919a3d4c5 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbStateManagerTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbStateManagerTest.java @@ -32,12 +32,15 @@ import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.stream.Stream; import org.bson.Document; import org.bson.types.ObjectId; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; @@ -155,8 +158,9 @@ public Document answer(final InvocationOnMock invocation) { assertFalse(iter.hasNext(), "should have no more records"); } - @Test - void treatHasNextExceptionAsFalse() { + @ParameterizedTest + @MethodSource("provideCatalogArguments") + void treatHasNextExceptionAsFalse(final ConfiguredAirbyteCatalog catalog) { final var docs = docs(); // on the second hasNext call, throw an exception @@ -166,7 +170,7 @@ void treatHasNextExceptionAsFalse() { when(mongoCursor.next()).thenReturn(docs.get(0)); - final var stream = catalog().getStreams().stream().findFirst().orElseThrow(); + final var stream = catalog.getStreams().stream().findFirst().orElseThrow(); final var iter = new SourceStateIterator(mongoCursor, stream, stateManager, new StateEmitFrequency(CHECKPOINT_INTERVAL, MongoConstants.CHECKPOINT_DURATION)); @@ -207,7 +211,8 @@ void anInvalidIdFieldThrowsAnException() { assertThrows(ConfigErrorException.class, iter::hasNext); } - @Test + @ParameterizedTest + @MethodSource("provideCatalogArguments") void initialStateIsReturnedIfUnderlyingIteratorIsEmpty() { // underlying cursor is empty. when(mongoCursor.hasNext()).thenReturn(false); @@ -241,7 +246,8 @@ void initialStateIsReturnedIfUnderlyingIteratorIsEmpty() { assertFalse(iter.hasNext(), "should have no more records"); } - @Test + @ParameterizedTest + @MethodSource("provideCatalogArguments") void stateEmittedAfterDuration() throws InterruptedException { // force a 1.5s wait between messages when(mongoCursor.hasNext()) @@ -322,7 +328,8 @@ void stateEmittedAfterDuration() throws InterruptedException { assertFalse(iter.hasNext(), "should have no more records"); } - @Test + @ParameterizedTest + @MethodSource("provideCatalogArguments") void hasNextNoInitialStateAndNoMoreRecordsInCursor() { when(mongoCursor.hasNext()).thenReturn(false); final var stream = catalog().getStreams().stream().findFirst().orElseThrow(); @@ -335,7 +342,7 @@ void hasNextNoInitialStateAndNoMoreRecordsInCursor() { assertFalse(iter.hasNext()); } - private ConfiguredAirbyteCatalog catalog() { + private static ConfiguredAirbyteCatalog catalog() { return new ConfiguredAirbyteCatalog().withStreams(List.of( new ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) @@ -351,6 +358,92 @@ private ConfiguredAirbyteCatalog catalog() { .withDefaultCursorField(List.of("_id"))))); } + @Test + void happyPathFullRefresh() { + final var docs = docs(); + + when(mongoCursor.hasNext()).thenAnswer(new Answer() { + + private int count = 0; + + @Override + public Boolean answer(final InvocationOnMock invocation) { + count++; + // hasNext will be called for each doc plus for each state message + return count <= (docs.size() + (docs.size() % CHECKPOINT_INTERVAL)); + } + + }); + + when(mongoCursor.next()).thenAnswer(new Answer() { + + private int offset = 0; + + @Override + public Document answer(final InvocationOnMock invocation) { + final var doc = docs.get(offset); + offset++; + return doc; + } + + }); + + final var stream = catalogFullRefresh().getStreams().stream().findFirst().orElseThrow(); + + final var iter = new SourceStateIterator(mongoCursor, stream, stateManager, new StateEmitFrequency(CHECKPOINT_INTERVAL, + MongoConstants.CHECKPOINT_DURATION)); + + // with a batch size of 2, the MongoDbStateIterator should return the following after each + // `hasNext`/`next` call: + // true, record Air Force Blue + // true, record Alice Blue + // true, state (with Alice Blue as the state) + // true, record Alizarin Crimson + // true, state (with Alizarin Crimson) + // false + AirbyteMessage message; + assertTrue(iter.hasNext(), "air force blue should be next"); + message = iter.next(); + assertEquals(Type.RECORD, message.getType()); + assertEquals(docs.get(0).get("_id").toString(), message.getRecord().getData().get("_id").asText()); + + assertTrue(iter.hasNext(), "alice blue should be next"); + message = iter.next(); + assertEquals(Type.RECORD, message.getType()); + assertEquals(docs.get(1).get("_id").toString(), message.getRecord().getData().get("_id").asText()); + + assertTrue(iter.hasNext(), "state should be next"); + message = iter.next(); + assertEquals(Type.STATE, message.getType()); + assertEquals( + docs.get(1).get("_id").toString(), + message.getState().getGlobal().getStreamStates().get(0).getStreamState().get("id").asText(), + "state id should match last record id"); + Assertions.assertEquals( + InitialSnapshotStatus.FULL_REFRESH.toString(), + message.getState().getGlobal().getStreamStates().get(0).getStreamState().get("status").asText(), + "state status should remain full_refresh"); + + assertTrue(iter.hasNext(), "alizarin crimson should be next"); + message = iter.next(); + assertEquals(Type.RECORD, message.getType()); + assertEquals(docs.get(2).get("_id").toString(), message.getRecord().getData().get("_id").asText()); + + assertTrue(iter.hasNext(), "state should be next"); + message = iter.next(); + assertEquals(Type.STATE, message.getType()); + assertEquals( + "null", + message.getState().getGlobal().getStreamStates().get(0).getStreamState().get("id").asText(), + "state id should be null upon completion"); + assertEquals( + InitialSnapshotStatus.FULL_REFRESH.toString(), + message.getState().getGlobal().getStreamStates().get(0).getStreamState().get("status").asText(), + "state status should remain full_refresh upon completion"); + + assertFalse(iter.hasNext(), "should have no more records"); + } + private List docs() { return List.of( new Document("_id", new ObjectId("64c0029d95ad260d69ef28a0")) @@ -361,4 +454,24 @@ private List docs() { .append("name", "Alizarin Crimson").append("hex", "#e32636")); } + private static ConfiguredAirbyteCatalog catalogFullRefresh() { + return new ConfiguredAirbyteCatalog().withStreams(List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.FULL_REFRESH) + .withCursorField(List.of("_id")) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withCursorField(List.of("_id")) + .withStream(CatalogHelpers.createAirbyteStream( + "test.unit", + Field.of("_id", JsonSchemaType.STRING), + Field.of("name", JsonSchemaType.STRING), + Field.of("hex", JsonSchemaType.STRING)) + .withSupportedSyncModes(List.of(SyncMode.INCREMENTAL)) + .withDefaultCursorField(List.of("_id"))))); + } + + private static Stream provideCatalogArguments() { + return Stream.of(catalog(), catalogFullRefresh()); + } + } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializerTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializerTest.java index f7b48c8bcb95..0417f119fe92 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializerTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializerTest.java @@ -95,7 +95,7 @@ class MongoDbCdcInitializerTest { .withSupportedSyncModes(List.of(SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(List.of(List.of("_id"))))); protected static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = toConfiguredCatalog(CATALOG); - + protected static final List CONFIGURED_CATALOG_STREAMS = CONFIGURED_CATALOG.getStreams(); final MongoDbSourceConfig CONFIG = new MongoDbSourceConfig(Jsons.jsonNode( Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of( @@ -167,7 +167,7 @@ void setUp() { void testCreateCdcIteratorsEmptyInitialState() { final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(null, CONFIG); final List> iterators = cdcInitializer - .createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG, stateManager, EMITTED_AT, CONFIG); + .createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG_STREAMS, stateManager, EMITTED_AT, CONFIG); assertNotNull(iterators); assertEquals(2, iterators.size(), "Should always have 2 iterators: 1 for the initial snapshot and 1 for the cdc stream"); assertTrue(iterators.get(0).hasNext(), @@ -179,7 +179,7 @@ void testCreateCdcIteratorsEmptyInitialStateEmptyCollections() { when(findCursor.hasNext()).thenReturn(false); final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(null, CONFIG); final List> iterators = cdcInitializer - .createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG, stateManager, EMITTED_AT, CONFIG); + .createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG_STREAMS, stateManager, EMITTED_AT, CONFIG); assertNotNull(iterators); assertEquals(2, iterators.size(), "Should always have 2 iterators: 1 for the initial snapshot and 1 for the cdc stream"); } @@ -189,7 +189,7 @@ void testCreateCdcIteratorsFromInitialStateWithInProgressInitialSnapshot() { final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(createInitialDebeziumState(InitialSnapshotStatus.IN_PROGRESS), CONFIG); final List> iterators = cdcInitializer - .createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG, stateManager, EMITTED_AT, CONFIG); + .createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG_STREAMS, stateManager, EMITTED_AT, CONFIG); assertNotNull(iterators); assertEquals(2, iterators.size(), "Should always have 2 iterators: 1 for the initial snapshot and 1 for the cdc stream"); assertTrue(iterators.get(0).hasNext(), @@ -201,7 +201,7 @@ void testCreateCdcIteratorsFromInitialStateWithCompletedInitialSnapshot() { final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(createInitialDebeziumState(InitialSnapshotStatus.COMPLETE), CONFIG); final List> iterators = cdcInitializer - .createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG, stateManager, EMITTED_AT, CONFIG); + .createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG_STREAMS, stateManager, EMITTED_AT, CONFIG); assertNotNull(iterators); assertEquals(2, iterators.size(), "Should always have 2 iterators: 1 for the initial snapshot and 1 for the cdc stream"); assertFalse(iterators.get(0).hasNext(), "Initial snapshot iterator should have no messages if its snapshot state is set as complete"); @@ -215,8 +215,9 @@ void testCreateCdcIteratorsWithCompletedInitialSnapshotSavedOffsetInvalidDefault .thenReturn(mongoChangeStreamCursor); final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(createInitialDebeziumState(InitialSnapshotStatus.COMPLETE), CONFIG); - assertThrows(ConfigErrorException.class, () -> cdcInitializer.createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG, - stateManager, EMITTED_AT, CONFIG)); + assertThrows(ConfigErrorException.class, + () -> cdcInitializer.createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG_STREAMS, + stateManager, EMITTED_AT, CONFIG)); } @Test @@ -227,8 +228,9 @@ void testCreateCdcIteratorsWithCompletedInitialSnapshotSavedOffsetFailOption() { .thenReturn(mongoChangeStreamCursor); final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(createInitialDebeziumState(InitialSnapshotStatus.COMPLETE), CONFIG); - assertThrows(ConfigErrorException.class, () -> cdcInitializer.createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG, - stateManager, EMITTED_AT, CONFIG)); + assertThrows(ConfigErrorException.class, + () -> cdcInitializer.createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG_STREAMS, + stateManager, EMITTED_AT, CONFIG)); } @Test @@ -241,7 +243,7 @@ void testCreateCdcIteratorsWithCompletedInitialSnapshotSavedOffsetInvalidResyncO final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(createInitialDebeziumState(InitialSnapshotStatus.COMPLETE), CONFIG); final List> iterators = cdcInitializer - .createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG, stateManager, EMITTED_AT, resyncConfig); + .createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG_STREAMS, stateManager, EMITTED_AT, resyncConfig); assertNotNull(iterators); assertEquals(2, iterators.size(), "Should always have 2 iterators: 1 for the initial snapshot and 1 for the cdc stream"); assertTrue(iterators.get(0).hasNext(), @@ -264,7 +266,8 @@ void testUnableToExtractOffsetFromStateException() { MongoDbStateManager.createStateManager(createInitialDebeziumState(InitialSnapshotStatus.COMPLETE), CONFIG); doReturn(Optional.empty()).when(mongoDbDebeziumStateUtil).savedOffset(any(), any(), any(), any(), any()); assertThrows(RuntimeException.class, - () -> cdcInitializer.createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG, stateManager, EMITTED_AT, CONFIG)); + () -> cdcInitializer.createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG_STREAMS, stateManager, EMITTED_AT, + CONFIG)); } @Test @@ -280,7 +283,7 @@ void testMultipleIdTypesThrowsException() { MongoDbStateManager.createStateManager(createInitialDebeziumState(InitialSnapshotStatus.IN_PROGRESS), CONFIG); final var thrown = assertThrows(ConfigErrorException.class, () -> cdcInitializer - .createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG, stateManager, EMITTED_AT, CONFIG)); + .createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG_STREAMS, stateManager, EMITTED_AT, CONFIG)); assertTrue(thrown.getMessage().contains("must be consistently typed")); } @@ -295,7 +298,7 @@ void testUnsupportedIdTypeThrowsException() { final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(null, CONFIG); final var thrown = assertThrows(ConfigErrorException.class, () -> cdcInitializer - .createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG, stateManager, EMITTED_AT, CONFIG)); + .createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG_STREAMS, stateManager, EMITTED_AT, CONFIG)); assertTrue(thrown.getMessage().contains("_id fields with the following types are currently supported")); } diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index 7bde62a2ff67..f93c096b241b 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -4,6 +4,7 @@ Airbyte's certified MongoDB connector offers the following features: * [Change Data Capture (CDC)](https://docs.airbyte.com/understanding-airbyte/cdc) via [MongoDB's change streams](https://www.mongodb.com/docs/manual/changeStreams/)/[Replica Set Oplog](https://www.mongodb.com/docs/manual/core/replica-set-oplog/). * Reliable replication of any collection size with [checkpointing](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#state--checkpointing) and chunking of data reads. +* ***NEW*** Full refresh syncing of collections. ## Quick Start @@ -131,11 +132,17 @@ The source will test the connection to the MongoDB instance upon creation. ## Replication Methods The MongoDB source utilizes change data capture (CDC) as a reliable way to keep your data up to date. +In addtion MongoDB source now allows for syncing in a full refresh mode. ### CDC Airbyte utilizes [the change streams feature](https://www.mongodb.com/docs/manual/changeStreams/) of a [MongoDB replica set](https://www.mongodb.com/docs/manual/replication/) to incrementally capture inserts, updates and deletes using a replication plugin. To learn more how Airbyte implements CDC, refer to [Change Data Capture (CDC)](https://docs.airbyte.com/understanding-airbyte/cdc/). +### Full Refresh +The Full refresh sync mode added in v4.0.0 allows for reading a the entire contents of a collection, repeatedly. +The MongoDB source connector is using checkpointing in Full Refresh read so a sync job that failed for netwrok error for example, +Rather than starting over it will continue its full refresh read from a last known point. + ### Schema Enforcement By default the MongoDB V2 source connector enforces a schema. This means that while setting up a connector it will sample a configureable number of docuemnts and will create a set of fields to sync. From that set of fields, an admin can then deselect specific fields from the Replication screen to filter them out from the sync. @@ -214,6 +221,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------| +| 1.3.0 | 2024-03-15 | [35669](https://github.com/airbytehq/airbyte/pull/35669) | Full refresh read of collections. | | 1.2.16 | 2024-03-06 | [35669](https://github.com/airbytehq/airbyte/pull/35669) | State message will now include record count. | | 1.2.15 | 2024-02-27 | [35673](https://github.com/airbytehq/airbyte/pull/35673) | Consume user provided connection string. | | 1.2.14 | 2024-02-27 | [35675](https://github.com/airbytehq/airbyte/pull/35675) | Fix invalid cdc error message. |