Skip to content

Commit

Permalink
Resumable full refresh source-mongodb (#35845)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich authored Mar 20, 2024
1 parent 23b3d4e commit bcd32a0
Show file tree
Hide file tree
Showing 13 changed files with 368 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.2.16
dockerImageTag: 1.3.0
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@

import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Accumulators;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Sorts;
import com.mongodb.client.model.*;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency;
import io.airbyte.commons.exceptions.ConfigErrorException;
Expand All @@ -22,16 +18,10 @@
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonInt64;
import org.bson.BsonObjectId;
import org.bson.BsonString;
import org.bson.Document;
import org.bson.*;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.slf4j.Logger;
Expand All @@ -53,15 +43,11 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIterators(
final List<ConfiguredAirbyteStream> streams,
final MongoDbStateManager stateManager,
final MongoDatabase database,
final int checkpointInterval,
final boolean isEnforceSchema) {
final MongoDbSourceConfig config) {
final boolean isEnforceSchema = config.getEnforceSchema();
final var checkpointInterval = config.getCheckpointInterval();
return streams
.stream()
.peek(airbyteStream -> {
if (!airbyteStream.getSyncMode().equals(SyncMode.INCREMENTAL))
LOGGER.warn("Stream {} configured with unsupported sync mode: {}", airbyteStream.getStream().getName(), airbyteStream.getSyncMode());
})
.filter(airbyteStream -> airbyteStream.getSyncMode().equals(SyncMode.INCREMENTAL))
.map(airbyteStream -> {
final var collectionName = airbyteStream.getStream().getName();
final var collection = database.getCollection(collectionName);
Expand All @@ -88,6 +74,9 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIterators(
// "where _id > [last saved state] order by _id ASC".
// If no state exists, it will create a query akin to "where 1=1 order by _id ASC"
final Bson filter = existingState
// Full refresh streams that finished set their id to null
// This tells us to start over
.filter(state -> state.id() != null)
.map(state -> Filters.gt(MongoConstants.ID_FIELD,
switch (state.idType()) {
case STRING -> new BsonString(state.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class MongoCatalogHelper {
/**
* The list of supported sync modes for a given stream.
*/
public static final List<SyncMode> SUPPORTED_SYNC_MODES = List.of(SyncMode.INCREMENTAL);
public static final List<SyncMode> SUPPORTED_SYNC_MODES = List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL);

/**
* Name of the property in the JSON representation of an Airbyte stream that contains the discovered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.source.mongodb;

import static io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcInitialSnapshotUtils.validateStateSyncMode;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.mongodb.MongoCommandException;
Expand All @@ -19,9 +21,11 @@
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcConnectorMetadataInjector;
import io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcInitializer;
import io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcState;
import io.airbyte.integrations.source.mongodb.state.MongoDbStateManager;
import io.airbyte.protocol.models.v0.*;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -126,19 +130,31 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
final var stateManager = MongoDbStateManager.createStateManager(state, sourceConfig);

if (catalog != null) {
validateStateSyncMode(stateManager, catalog.getStreams());
MongoUtil.checkSchemaModeMismatch(sourceConfig.getEnforceSchema(),
stateManager.getCdcState() != null ? stateManager.getCdcState().schema_enforced() : sourceConfig.getEnforceSchema(), catalog);
}

try {
// WARNING: do not close the client here since it needs to be used by the iterator
final MongoClient mongoClient = createMongoClient(sourceConfig);

try {
final var iteratorList =
cdcInitializer.createCdcIterators(mongoClient, cdcMetadataInjector, catalog,
stateManager, emittedAt, sourceConfig);
return AutoCloseableIterators.concatWithEagerClose(iteratorList, AirbyteTraceMessageUtility::emitStreamStatusTrace);
final List<ConfiguredAirbyteStream> fullRefreshStreams =
catalog.getStreams().stream().filter(s -> s.getSyncMode() == SyncMode.FULL_REFRESH).toList();
final List<ConfiguredAirbyteStream> incrementalStreams = catalog.getStreams().stream().filter(s -> !fullRefreshStreams.contains(s)).toList();

List<AutoCloseableIterator<AirbyteMessage>> iterators = new ArrayList<>();
if (!fullRefreshStreams.isEmpty()) {
LOGGER.info("There are {} Full refresh streams", fullRefreshStreams.size());
iterators.addAll(createFullRefreshIterators(sourceConfig, mongoClient, fullRefreshStreams, stateManager, emittedAt));
}

if (!incrementalStreams.isEmpty()) {
LOGGER.info("There are {} Incremental streams", incrementalStreams.size());
iterators
.addAll(cdcInitializer.createCdcIterators(mongoClient, cdcMetadataInjector, incrementalStreams, stateManager, emittedAt, sourceConfig));
}
return AutoCloseableIterators.concatWithEagerClose(iterators, AirbyteTraceMessageUtility::emitStreamStatusTrace);
} catch (final Exception e) {
mongoClient.close();
throw e;
Expand All @@ -153,4 +169,22 @@ protected MongoClient createMongoClient(final MongoDbSourceConfig config) {
return MongoConnectionUtils.createMongoClient(config);
}

List<AutoCloseableIterator<AirbyteMessage>> createFullRefreshIterators(final MongoDbSourceConfig sourceConfig,
final MongoClient mongoClient,
final List<ConfiguredAirbyteStream> streams,
final MongoDbStateManager stateManager,
final Instant emmitedAt) {
final InitialSnapshotHandler initialSnapshotHandler = new InitialSnapshotHandler();
if (stateManager.getCdcState() == null) {
stateManager.updateCdcState(new MongoDbCdcState(null, sourceConfig.getEnforceSchema()));
}
final List<AutoCloseableIterator<AirbyteMessage>> fullRefreshIterators = initialSnapshotHandler.getIterators(
streams,
stateManager,
mongoClient.getDatabase(sourceConfig.getDatabaseName()),
sourceConfig);

return fullRefreshIterators;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import com.google.common.collect.Sets;
import com.mongodb.client.MongoClient;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.mongodb.MongoUtil;
import io.airbyte.integrations.source.mongodb.state.InitialSnapshotStatus;
import io.airbyte.integrations.source.mongodb.state.MongoDbStateManager;
import io.airbyte.integrations.source.mongodb.state.MongoDbStreamState;
import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
Expand All @@ -38,6 +40,9 @@ public class MongoDbCdcInitialSnapshotUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbCdcInitialSnapshotUtils.class);

private static final Predicate<ConfiguredAirbyteStream> SYNC_MODE_FILTER = c -> SyncMode.INCREMENTAL.equals(c.getSyncMode());
private static final Map<SyncMode, List<InitialSnapshotStatus>> syncModeToStatusValidationMap = Map.of(
SyncMode.INCREMENTAL, List.of(InitialSnapshotStatus.IN_PROGRESS, InitialSnapshotStatus.COMPLETE),
SyncMode.FULL_REFRESH, List.of(InitialSnapshotStatus.FULL_REFRESH));

/**
* Returns the list of configured Airbyte streams that need to perform the initial snapshot portion
Expand Down Expand Up @@ -130,4 +135,18 @@ private static void estimateInitialSnapshotSyncSize(final MongoClient mongoClien
});
}

private static boolean isValidInitialSnapshotStatus(final SyncMode syncMode, final MongoDbStreamState state) {
return syncModeToStatusValidationMap.get(syncMode).contains(state.status());
}

public static void validateStateSyncMode(final MongoDbStateManager stateManager, final List<ConfiguredAirbyteStream> streams) {
streams.forEach(stream -> {
final var existingState = stateManager.getStreamState(stream.getStream().getName(), stream.getStream().getNamespace());
if (existingState.isPresent() && !isValidInitialSnapshotStatus(stream.getSyncMode(), existingState.get())) {
throw new ConfigErrorException("Stream " + stream.getStream().getName() + " is " + stream.getSyncMode() + " but the saved status "
+ existingState.get().status() + " doesn't match. Please reset this stream");
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,30 +78,33 @@ public MongoDbCdcInitializer() {
public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
final MongoClient mongoClient,
final MongoDbCdcConnectorMetadataInjector cdcMetadataInjector,
final ConfiguredAirbyteCatalog catalog,
final List<ConfiguredAirbyteStream> streams,
final MongoDbStateManager stateManager,
final Instant emittedAt,
final MongoDbSourceConfig config) {

ConfiguredAirbyteCatalog incrementalOnlyStreamsCatalog = new ConfiguredAirbyteCatalog().withStreams(streams);
final Duration firstRecordWaitTime = Duration.ofSeconds(config.getInitialWaitingTimeSeconds());
// #35059: debezium heartbeats are not sent on the expected interval. this is
// a worksaround to allow making subsequent wait time configurable.
// a workaround to allow making subsequent wait time configurable.
final Duration subsequentRecordWaitTime = firstRecordWaitTime;
LOGGER.info("Subsequent cdc record wait time: {} seconds", subsequentRecordWaitTime);
final int queueSize = MongoUtil.getDebeziumEventQueueSize(config);
final String databaseName = config.getDatabaseName();
final boolean isEnforceSchema = config.getEnforceSchema();
final Properties defaultDebeziumProperties = MongoDbCdcProperties.getDebeziumProperties();
logOplogInfo(mongoClient);
final BsonDocument initialResumeToken = MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient, databaseName, catalog);
final BsonDocument initialResumeToken =
MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient, databaseName, incrementalOnlyStreamsCatalog);
final JsonNode initialDebeziumState =
mongoDbDebeziumStateUtil.constructInitialDebeziumState(initialResumeToken, mongoClient, databaseName);
final MongoDbCdcState cdcState = (stateManager.getCdcState() == null || stateManager.getCdcState().state() == null)
? new MongoDbCdcState(initialDebeziumState, isEnforceSchema)
: new MongoDbCdcState(Jsons.clone(stateManager.getCdcState().state()), stateManager.getCdcState().schema_enforced());
final MongoDbCdcState cdcState =
(stateManager.getCdcState() == null || stateManager.getCdcState().state() == null || stateManager.getCdcState().state().isNull())
? new MongoDbCdcState(initialDebeziumState, isEnforceSchema)
: new MongoDbCdcState(Jsons.clone(stateManager.getCdcState().state()), stateManager.getCdcState().schema_enforced());
final Optional<BsonDocument> optSavedOffset = mongoDbDebeziumStateUtil.savedOffset(
Jsons.clone(defaultDebeziumProperties),
catalog,
incrementalOnlyStreamsCatalog,
cdcState.state(),
config.getDatabaseConfig(),
mongoClient);
Expand Down Expand Up @@ -131,23 +134,26 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
}

final MongoDbCdcState stateToBeUsed =
(!savedOffsetIsValid || stateManager.getCdcState() == null || stateManager.getCdcState().state() == null)
? new MongoDbCdcState(initialDebeziumState, config.getEnforceSchema())
: stateManager.getCdcState();
(!savedOffsetIsValid || stateManager.getCdcState() == null || stateManager.getCdcState().state() == null
|| stateManager.getCdcState().state().isNull())
? new MongoDbCdcState(initialDebeziumState, config.getEnforceSchema())
: stateManager.getCdcState();

final List<ConfiguredAirbyteStream> initialSnapshotStreams =
MongoDbCdcInitialSnapshotUtils.getStreamsForInitialSnapshot(mongoClient, stateManager, catalog, savedOffsetIsValid);
MongoDbCdcInitialSnapshotUtils.getStreamsForInitialSnapshot(mongoClient, stateManager, incrementalOnlyStreamsCatalog, savedOffsetIsValid);
final InitialSnapshotHandler initialSnapshotHandler = new InitialSnapshotHandler();
final List<AutoCloseableIterator<AirbyteMessage>> initialSnapshotIterators =
initialSnapshotHandler.getIterators(initialSnapshotStreams, stateManager, mongoClient.getDatabase(databaseName),
config.getCheckpointInterval(), isEnforceSchema);
config);

final AirbyteDebeziumHandler<BsonTimestamp> handler = new AirbyteDebeziumHandler<>(config.getDatabaseConfig(),
new MongoDbCdcTargetPosition(initialResumeToken), false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize, false);
final MongoDbCdcStateHandler mongoDbCdcStateHandler = new MongoDbCdcStateHandler(stateManager);
final MongoDbCdcSavedInfoFetcher cdcSavedInfoFetcher = new MongoDbCdcSavedInfoFetcher(stateToBeUsed);
final var propertiesManager = new MongoDbDebeziumPropertiesManager(defaultDebeziumProperties, config.getDatabaseConfig(), catalog);
final var eventConverter = new MongoDbDebeziumEventConverter(cdcMetadataInjector, catalog, emittedAt, config.getDatabaseConfig());
final var propertiesManager =
new MongoDbDebeziumPropertiesManager(defaultDebeziumProperties, config.getDatabaseConfig(), incrementalOnlyStreamsCatalog);
final var eventConverter =
new MongoDbDebeziumEventConverter(cdcMetadataInjector, incrementalOnlyStreamsCatalog, emittedAt, config.getDatabaseConfig());

final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(
propertiesManager, eventConverter, cdcSavedInfoFetcher, mongoDbCdcStateHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,9 @@
public enum InitialSnapshotStatus {

IN_PROGRESS,
COMPLETE
COMPLETE,
// A Full Refresh stream state behaves like In Progress,
// but its value set to null when collection is fully read
// Rather than turning into Complete
FULL_REFRESH
}
Loading

0 comments on commit bcd32a0

Please sign in to comment.