Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into baz/source/recharge…
Browse files Browse the repository at this point in the history
…/migrate-to-low-code
  • Loading branch information
bazarnov committed Mar 20, 2024
2 parents 04fc570 + bcd32a0 commit 7aaf834
Show file tree
Hide file tree
Showing 16 changed files with 371 additions and 135 deletions.
1 change: 1 addition & 0 deletions airbyte-ci/connectors/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,7 @@ E.G.: running Poe tasks on the modified internal packages of the current branch:

| Version | PR | Description |
| ------- | ---------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------- |
| 4.6.1 | [#0](https://github.com/airbytehq/airbyte/pull/0) | Fix `ValueError` related to PR number in migrate-to-poetry |
| 4.6.0 | [#35583](https://github.com/airbytehq/airbyte/pull/35583) | Implement the `airbyte-ci connectors migrate-to-poetry` command. |
| 4.5.4 | [#36206](https://github.com/airbytehq/airbyte/pull/36206) | Revert poetry cache removal during nightly builds |
| 4.5.3 | [#34586](https://github.com/airbytehq/airbyte/pull/34586) | Extract connector changelog modification logic into its own class |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ async def run_connector_migration_to_poetry_pipeline(context: ConnectorContext,
await context.get_repo_dir(include=[str(context.connector.local_connector_documentation_directory)]),
new_version,
"Manage dependencies with Poetry.",
"TBD",
"0",
export_docs=True,
),
depends_on=[CONNECTOR_TEST_STEP_ID.REGRESSION_TEST],
Expand Down
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/pipelines/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "pipelines"
version = "4.6.0"
version = "4.6.1"
description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines"
authors = ["Airbyte <[email protected]>"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.2.16
dockerImageTag: 1.3.0
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@

import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Accumulators;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Sorts;
import com.mongodb.client.model.*;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency;
import io.airbyte.commons.exceptions.ConfigErrorException;
Expand All @@ -22,16 +18,10 @@
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonInt64;
import org.bson.BsonObjectId;
import org.bson.BsonString;
import org.bson.Document;
import org.bson.*;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.slf4j.Logger;
Expand All @@ -53,15 +43,11 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIterators(
final List<ConfiguredAirbyteStream> streams,
final MongoDbStateManager stateManager,
final MongoDatabase database,
final int checkpointInterval,
final boolean isEnforceSchema) {
final MongoDbSourceConfig config) {
final boolean isEnforceSchema = config.getEnforceSchema();
final var checkpointInterval = config.getCheckpointInterval();
return streams
.stream()
.peek(airbyteStream -> {
if (!airbyteStream.getSyncMode().equals(SyncMode.INCREMENTAL))
LOGGER.warn("Stream {} configured with unsupported sync mode: {}", airbyteStream.getStream().getName(), airbyteStream.getSyncMode());
})
.filter(airbyteStream -> airbyteStream.getSyncMode().equals(SyncMode.INCREMENTAL))
.map(airbyteStream -> {
final var collectionName = airbyteStream.getStream().getName();
final var collection = database.getCollection(collectionName);
Expand All @@ -88,6 +74,9 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIterators(
// "where _id > [last saved state] order by _id ASC".
// If no state exists, it will create a query akin to "where 1=1 order by _id ASC"
final Bson filter = existingState
// Full refresh streams that finished set their id to null
// This tells us to start over
.filter(state -> state.id() != null)
.map(state -> Filters.gt(MongoConstants.ID_FIELD,
switch (state.idType()) {
case STRING -> new BsonString(state.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class MongoCatalogHelper {
/**
* The list of supported sync modes for a given stream.
*/
public static final List<SyncMode> SUPPORTED_SYNC_MODES = List.of(SyncMode.INCREMENTAL);
public static final List<SyncMode> SUPPORTED_SYNC_MODES = List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL);

/**
* Name of the property in the JSON representation of an Airbyte stream that contains the discovered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.source.mongodb;

import static io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcInitialSnapshotUtils.validateStateSyncMode;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.mongodb.MongoCommandException;
Expand All @@ -19,9 +21,11 @@
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcConnectorMetadataInjector;
import io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcInitializer;
import io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcState;
import io.airbyte.integrations.source.mongodb.state.MongoDbStateManager;
import io.airbyte.protocol.models.v0.*;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -126,19 +130,31 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
final var stateManager = MongoDbStateManager.createStateManager(state, sourceConfig);

if (catalog != null) {
validateStateSyncMode(stateManager, catalog.getStreams());
MongoUtil.checkSchemaModeMismatch(sourceConfig.getEnforceSchema(),
stateManager.getCdcState() != null ? stateManager.getCdcState().schema_enforced() : sourceConfig.getEnforceSchema(), catalog);
}

try {
// WARNING: do not close the client here since it needs to be used by the iterator
final MongoClient mongoClient = createMongoClient(sourceConfig);

try {
final var iteratorList =
cdcInitializer.createCdcIterators(mongoClient, cdcMetadataInjector, catalog,
stateManager, emittedAt, sourceConfig);
return AutoCloseableIterators.concatWithEagerClose(iteratorList, AirbyteTraceMessageUtility::emitStreamStatusTrace);
final List<ConfiguredAirbyteStream> fullRefreshStreams =
catalog.getStreams().stream().filter(s -> s.getSyncMode() == SyncMode.FULL_REFRESH).toList();
final List<ConfiguredAirbyteStream> incrementalStreams = catalog.getStreams().stream().filter(s -> !fullRefreshStreams.contains(s)).toList();

List<AutoCloseableIterator<AirbyteMessage>> iterators = new ArrayList<>();
if (!fullRefreshStreams.isEmpty()) {
LOGGER.info("There are {} Full refresh streams", fullRefreshStreams.size());
iterators.addAll(createFullRefreshIterators(sourceConfig, mongoClient, fullRefreshStreams, stateManager, emittedAt));
}

if (!incrementalStreams.isEmpty()) {
LOGGER.info("There are {} Incremental streams", incrementalStreams.size());
iterators
.addAll(cdcInitializer.createCdcIterators(mongoClient, cdcMetadataInjector, incrementalStreams, stateManager, emittedAt, sourceConfig));
}
return AutoCloseableIterators.concatWithEagerClose(iterators, AirbyteTraceMessageUtility::emitStreamStatusTrace);
} catch (final Exception e) {
mongoClient.close();
throw e;
Expand All @@ -153,4 +169,22 @@ protected MongoClient createMongoClient(final MongoDbSourceConfig config) {
return MongoConnectionUtils.createMongoClient(config);
}

List<AutoCloseableIterator<AirbyteMessage>> createFullRefreshIterators(final MongoDbSourceConfig sourceConfig,
final MongoClient mongoClient,
final List<ConfiguredAirbyteStream> streams,
final MongoDbStateManager stateManager,
final Instant emmitedAt) {
final InitialSnapshotHandler initialSnapshotHandler = new InitialSnapshotHandler();
if (stateManager.getCdcState() == null) {
stateManager.updateCdcState(new MongoDbCdcState(null, sourceConfig.getEnforceSchema()));
}
final List<AutoCloseableIterator<AirbyteMessage>> fullRefreshIterators = initialSnapshotHandler.getIterators(
streams,
stateManager,
mongoClient.getDatabase(sourceConfig.getDatabaseName()),
sourceConfig);

return fullRefreshIterators;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import com.google.common.collect.Sets;
import com.mongodb.client.MongoClient;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.mongodb.MongoUtil;
import io.airbyte.integrations.source.mongodb.state.InitialSnapshotStatus;
import io.airbyte.integrations.source.mongodb.state.MongoDbStateManager;
import io.airbyte.integrations.source.mongodb.state.MongoDbStreamState;
import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
Expand All @@ -38,6 +40,9 @@ public class MongoDbCdcInitialSnapshotUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbCdcInitialSnapshotUtils.class);

private static final Predicate<ConfiguredAirbyteStream> SYNC_MODE_FILTER = c -> SyncMode.INCREMENTAL.equals(c.getSyncMode());
private static final Map<SyncMode, List<InitialSnapshotStatus>> syncModeToStatusValidationMap = Map.of(
SyncMode.INCREMENTAL, List.of(InitialSnapshotStatus.IN_PROGRESS, InitialSnapshotStatus.COMPLETE),
SyncMode.FULL_REFRESH, List.of(InitialSnapshotStatus.FULL_REFRESH));

/**
* Returns the list of configured Airbyte streams that need to perform the initial snapshot portion
Expand Down Expand Up @@ -130,4 +135,18 @@ private static void estimateInitialSnapshotSyncSize(final MongoClient mongoClien
});
}

private static boolean isValidInitialSnapshotStatus(final SyncMode syncMode, final MongoDbStreamState state) {
return syncModeToStatusValidationMap.get(syncMode).contains(state.status());
}

public static void validateStateSyncMode(final MongoDbStateManager stateManager, final List<ConfiguredAirbyteStream> streams) {
streams.forEach(stream -> {
final var existingState = stateManager.getStreamState(stream.getStream().getName(), stream.getStream().getNamespace());
if (existingState.isPresent() && !isValidInitialSnapshotStatus(stream.getSyncMode(), existingState.get())) {
throw new ConfigErrorException("Stream " + stream.getStream().getName() + " is " + stream.getSyncMode() + " but the saved status "
+ existingState.get().status() + " doesn't match. Please reset this stream");
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,30 +78,33 @@ public MongoDbCdcInitializer() {
public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
final MongoClient mongoClient,
final MongoDbCdcConnectorMetadataInjector cdcMetadataInjector,
final ConfiguredAirbyteCatalog catalog,
final List<ConfiguredAirbyteStream> streams,
final MongoDbStateManager stateManager,
final Instant emittedAt,
final MongoDbSourceConfig config) {

ConfiguredAirbyteCatalog incrementalOnlyStreamsCatalog = new ConfiguredAirbyteCatalog().withStreams(streams);
final Duration firstRecordWaitTime = Duration.ofSeconds(config.getInitialWaitingTimeSeconds());
// #35059: debezium heartbeats are not sent on the expected interval. this is
// a worksaround to allow making subsequent wait time configurable.
// a workaround to allow making subsequent wait time configurable.
final Duration subsequentRecordWaitTime = firstRecordWaitTime;
LOGGER.info("Subsequent cdc record wait time: {} seconds", subsequentRecordWaitTime);
final int queueSize = MongoUtil.getDebeziumEventQueueSize(config);
final String databaseName = config.getDatabaseName();
final boolean isEnforceSchema = config.getEnforceSchema();
final Properties defaultDebeziumProperties = MongoDbCdcProperties.getDebeziumProperties();
logOplogInfo(mongoClient);
final BsonDocument initialResumeToken = MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient, databaseName, catalog);
final BsonDocument initialResumeToken =
MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient, databaseName, incrementalOnlyStreamsCatalog);
final JsonNode initialDebeziumState =
mongoDbDebeziumStateUtil.constructInitialDebeziumState(initialResumeToken, mongoClient, databaseName);
final MongoDbCdcState cdcState = (stateManager.getCdcState() == null || stateManager.getCdcState().state() == null)
? new MongoDbCdcState(initialDebeziumState, isEnforceSchema)
: new MongoDbCdcState(Jsons.clone(stateManager.getCdcState().state()), stateManager.getCdcState().schema_enforced());
final MongoDbCdcState cdcState =
(stateManager.getCdcState() == null || stateManager.getCdcState().state() == null || stateManager.getCdcState().state().isNull())
? new MongoDbCdcState(initialDebeziumState, isEnforceSchema)
: new MongoDbCdcState(Jsons.clone(stateManager.getCdcState().state()), stateManager.getCdcState().schema_enforced());
final Optional<BsonDocument> optSavedOffset = mongoDbDebeziumStateUtil.savedOffset(
Jsons.clone(defaultDebeziumProperties),
catalog,
incrementalOnlyStreamsCatalog,
cdcState.state(),
config.getDatabaseConfig(),
mongoClient);
Expand Down Expand Up @@ -131,23 +134,26 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
}

final MongoDbCdcState stateToBeUsed =
(!savedOffsetIsValid || stateManager.getCdcState() == null || stateManager.getCdcState().state() == null)
? new MongoDbCdcState(initialDebeziumState, config.getEnforceSchema())
: stateManager.getCdcState();
(!savedOffsetIsValid || stateManager.getCdcState() == null || stateManager.getCdcState().state() == null
|| stateManager.getCdcState().state().isNull())
? new MongoDbCdcState(initialDebeziumState, config.getEnforceSchema())
: stateManager.getCdcState();

final List<ConfiguredAirbyteStream> initialSnapshotStreams =
MongoDbCdcInitialSnapshotUtils.getStreamsForInitialSnapshot(mongoClient, stateManager, catalog, savedOffsetIsValid);
MongoDbCdcInitialSnapshotUtils.getStreamsForInitialSnapshot(mongoClient, stateManager, incrementalOnlyStreamsCatalog, savedOffsetIsValid);
final InitialSnapshotHandler initialSnapshotHandler = new InitialSnapshotHandler();
final List<AutoCloseableIterator<AirbyteMessage>> initialSnapshotIterators =
initialSnapshotHandler.getIterators(initialSnapshotStreams, stateManager, mongoClient.getDatabase(databaseName),
config.getCheckpointInterval(), isEnforceSchema);
config);

final AirbyteDebeziumHandler<BsonTimestamp> handler = new AirbyteDebeziumHandler<>(config.getDatabaseConfig(),
new MongoDbCdcTargetPosition(initialResumeToken), false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize, false);
final MongoDbCdcStateHandler mongoDbCdcStateHandler = new MongoDbCdcStateHandler(stateManager);
final MongoDbCdcSavedInfoFetcher cdcSavedInfoFetcher = new MongoDbCdcSavedInfoFetcher(stateToBeUsed);
final var propertiesManager = new MongoDbDebeziumPropertiesManager(defaultDebeziumProperties, config.getDatabaseConfig(), catalog);
final var eventConverter = new MongoDbDebeziumEventConverter(cdcMetadataInjector, catalog, emittedAt, config.getDatabaseConfig());
final var propertiesManager =
new MongoDbDebeziumPropertiesManager(defaultDebeziumProperties, config.getDatabaseConfig(), incrementalOnlyStreamsCatalog);
final var eventConverter =
new MongoDbDebeziumEventConverter(cdcMetadataInjector, incrementalOnlyStreamsCatalog, emittedAt, config.getDatabaseConfig());

final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(
propertiesManager, eventConverter, cdcSavedInfoFetcher, mongoDbCdcStateHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,9 @@
public enum InitialSnapshotStatus {

IN_PROGRESS,
COMPLETE
COMPLETE,
// A Full Refresh stream state behaves like In Progress,
// but its value set to null when collection is fully read
// Rather than turning into Complete
FULL_REFRESH
}
Loading

0 comments on commit 7aaf834

Please sign in to comment.