Skip to content

Commit

Permalink
[source-mongodb] record count in state & initial iterator refactor (#…
Browse files Browse the repository at this point in the history
…35669)

Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Gireesh Sreepathi <[email protected]>
Co-authored-by: Augustin <[email protected]>
Co-authored-by: Subodh Kant Chaturvedi <[email protected]>
Co-authored-by: Anatolii Yatsuk <[email protected]>
Co-authored-by: Baz <[email protected]>
Co-authored-by: Artem Inzhyyants <[email protected]>
Co-authored-by: Aaron ("AJ") Steers <[email protected]>
Co-authored-by: Tim Roes <[email protected]>
Co-authored-by: benmoriceau <[email protected]>
Co-authored-by: Gireesh Sreepathi <[email protected]>
Co-authored-by: Marius Posta <[email protected]>
Co-authored-by: Evan Tahler <[email protected]>
Co-authored-by: Edward Gao <[email protected]>
Co-authored-by: Anton Karpets <[email protected]>
Co-authored-by: Patrick Nilan <[email protected]>
Co-authored-by: Akash Kulkarni <[email protected]>
Co-authored-by: Tyler B <[email protected]>
Co-authored-by: bgroff <[email protected]>
Co-authored-by: mjgatz <[email protected]>
Co-authored-by: mgreene <[email protected]>
Co-authored-by: Serhii Lazebnyi <[email protected]>
Co-authored-by: Serhii Lazebnyi <[email protected]>
Co-authored-by: Rodi Reich Zilberman <[email protected]>
Co-authored-by: Daryna Ishchenko <[email protected]>
Co-authored-by: Stephane Geneix <[email protected]>
Co-authored-by: Joe Reuter <[email protected]>
Co-authored-by: Marcos Marx <[email protected]>
Co-authored-by: Maxime Carbonneau-Leclerc <[email protected]>
Co-authored-by: Akash Kulkarni <[email protected]>
Co-authored-by: Roman Yermilov [GL] <[email protected]>
Co-authored-by: Alexandre Girard <[email protected]>
Co-authored-by: girarda <[email protected]>
Co-authored-by: Brian Lai <[email protected]>
Co-authored-by: brianjlai <[email protected]>
Co-authored-by: Catherine Noll <[email protected]>
Co-authored-by: midavadim <[email protected]>
Co-authored-by: Julien COUTAND <[email protected]>
Co-authored-by: Christo Grabowski <[email protected]>
Co-authored-by: maxi297 <[email protected]>
Co-authored-by: Bindi Pankhudi <[email protected]>
Co-authored-by: Bindi Pankhudi <[email protected]>
Co-authored-by: Ben Drucker <[email protected]>
Co-authored-by: TornadoContre <[email protected]>
Co-authored-by: Natik Gadzhi <[email protected]>
Co-authored-by: Thomas Dippel <[email protected]>
Co-authored-by: marcosmarxm <[email protected]>
Co-authored-by: Alex Birdsall <[email protected]>
Co-authored-by: ambirdsall <[email protected]>
Co-authored-by: Jose Gerardo Pineda <[email protected]>
Co-authored-by: alafanechere <[email protected]>
Co-authored-by: Pedro S. Lopez <[email protected]>
Co-authored-by: Ella Rohm-Ensing <[email protected]>
Co-authored-by: Siarhei Ivanou <[email protected]>
Co-authored-by: Anatolii Yatsuk <[email protected]>
Co-authored-by: Ryan Waskewich <[email protected]>
Co-authored-by: Sajarin <[email protected]>
Co-authored-by: artem1205 <[email protected]>
Co-authored-by: perangel <[email protected]>
Co-authored-by: Joe Bell <[email protected]>
Co-authored-by: Obioma Anomnachi <[email protected]>
Co-authored-by: maxi297 <[email protected]>
Co-authored-by: SatishChGit <[email protected]>
Co-authored-by: Brian Leonard <[email protected]>
Co-authored-by: David Wallace <[email protected]>
Co-authored-by: pmossman <[email protected]>
Co-authored-by: Stephane Geneix <[email protected]>
Co-authored-by: Alexandre Cuoci <[email protected]>
Co-authored-by: Danny Tiesling <[email protected]>
  • Loading branch information
Show file tree
Hide file tree
Showing 14 changed files with 259 additions and 317 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.20.6'
cdkVersionRequired = '0.23.8'
features = ['db-sources', 'datastore-mongo']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.2.15
dockerImageTag: 1.2.16
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Sorts;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcConnectorMetadataInjector;
import io.airbyte.integrations.source.mongodb.state.IdType;
import io.airbyte.integrations.source.mongodb.state.MongoDbStateManager;
import io.airbyte.integrations.source.mongodb.state.MongoDbStreamState;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -53,8 +53,6 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIterators(
final List<ConfiguredAirbyteStream> streams,
final MongoDbStateManager stateManager,
final MongoDatabase database,
final MongoDbCdcConnectorMetadataInjector cdcConnectorMetadataInjector,
final Instant emittedAt,
final int checkpointInterval,
final boolean isEnforceSchema) {
return streams
Expand Down Expand Up @@ -113,10 +111,9 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIterators(
.sort(Sorts.ascending(MongoConstants.ID_FIELD))
.allowDiskUse(true)
.cursor();

final var stateIterator =
new MongoDbStateIterator(cursor, stateManager, Optional.ofNullable(cdcConnectorMetadataInjector),
airbyteStream, emittedAt, checkpointInterval, MongoConstants.CHECKPOINT_DURATION, isEnforceSchema);
new SourceStateIterator<>(cursor, airbyteStream, stateManager, new StateEmitFrequency(checkpointInterval,
MongoConstants.CHECKPOINT_DURATION));
return AutoCloseableIterators.fromIterator(stateIterator, cursor::close, null);
})
.toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
final JsonNode state) {
final var emittedAt = Instant.now();
final var cdcMetadataInjector = MongoDbCdcConnectorMetadataInjector.getInstance(emittedAt);
final var stateManager = MongoDbStateManager.createStateManager(state);
final MongoDbSourceConfig sourceConfig = new MongoDbSourceConfig(config);
final var stateManager = MongoDbStateManager.createStateManager(state, sourceConfig);

if (catalog != null) {
MongoUtil.checkSchemaModeMismatch(sourceConfig.getEnforceSchema(),
stateManager.getCdcState() != null ? stateManager.getCdcState().schema_enforced() : sourceConfig.getEnforceSchema(), catalog);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
MongoDbCdcInitialSnapshotUtils.getStreamsForInitialSnapshot(mongoClient, stateManager, catalog, savedOffsetIsValid);
final InitialSnapshotHandler initialSnapshotHandler = new InitialSnapshotHandler();
final List<AutoCloseableIterator<AirbyteMessage>> initialSnapshotIterators =
initialSnapshotHandler.getIterators(initialSnapshotStreams, stateManager, mongoClient.getDatabase(databaseName), cdcMetadataInjector,
emittedAt, config.getCheckpointInterval(), isEnforceSchema);
initialSnapshotHandler.getIterators(initialSnapshotStreams, stateManager, mongoClient.getDatabase(databaseName),
config.getCheckpointInterval(), isEnforceSchema);

final AirbyteDebeziumHandler<BsonTimestamp> handler = new AirbyteDebeziumHandler<>(config.getDatabaseConfig(),
new MongoDbCdcTargetPosition(initialResumeToken), false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize, false);
Expand Down
Loading

0 comments on commit d5e91ae

Please sign in to comment.