Skip to content

Commit

Permalink
[Source-Mongodb] : Add config to throw an error on invalid CDC positi…
Browse files Browse the repository at this point in the history
…on (#35375)
  • Loading branch information
akashkulk authored and xiaohansong committed Feb 27, 2024
1 parent f0e7447 commit bcdd2d8
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@
"minimum": 10,
"maximum": 100000,
"group": "advanced"
},
"invalid_cdc_cursor_position_behavior": {
"type": "string",
"title": "Invalid CDC position behavior (Advanced)",
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
"enum": ["Fail sync", "Re-sync data"],
"default": "Fail sync",
"order": 11,
"group": "advanced"
}
},
"groups": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.2.10
dockerImageTag: 1.2.11
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public class MongoConstants {
public static final String INITIAL_RECORD_WAITING_TIME_SEC = "initial_waiting_seconds";
public static final Integer DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC = 300;

public static final String INVALID_CDC_CURSOR_POSITION_PROPERTY = "invalid_cdc_cursor_position_behavior";
public static final String FAIL_SYNC_OPTION = "Fail sync";
public static final String RESYNC_DATA_OPTION = "Re-sync data";

private MongoConstants() {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC;
import static io.airbyte.integrations.source.mongodb.MongoConstants.DISCOVER_SAMPLE_SIZE_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.INITIAL_RECORD_WAITING_TIME_SEC;
import static io.airbyte.integrations.source.mongodb.MongoConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.PASSWORD_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.RESYNC_DATA_OPTION;
import static io.airbyte.integrations.source.mongodb.MongoConstants.SCHEMA_ENFORCED_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.USERNAME_CONFIGURATION_KEY;

Expand Down Expand Up @@ -96,4 +98,13 @@ public Integer getInitialWaitingTimeSeconds() {
}
}

public boolean shouldFailSyncOnInvalidCursor() {
if (rawConfig.has(INVALID_CDC_CURSOR_POSITION_PROPERTY)
&& rawConfig.get(INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(RESYNC_DATA_OPTION)) {
return false;
} else {
return true;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.mongodb.client.MongoDatabase;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
Expand Down Expand Up @@ -116,6 +117,10 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(

if (!savedOffsetIsValid) {
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage());
if (config.shouldFailSyncOnInvalidCursor()) {
throw new ConfigErrorException(
"Saved offset is not valid. Please reset the connection, and then increase oplog retention or reduce sync frequency to prevent his from happening in the future. See https://docs.airbyte.com/integrations/sources/mongodb-v2#mongodb-oplog-and-change-streams for more details");
}
LOGGER.info("Saved offset is not valid. Airbyte will trigger a full refresh.");
// If the offset in the state is invalid, reset the state to the initial STATE
stateManager.resetState(new MongoDbCdcState(initialDebeziumState, config.getEnforceSchema()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@
"minimum": 10,
"maximum": 100000,
"group": "advanced"
},
"invalid_cdc_cursor_position_behavior": {
"type": "string",
"title": "Invalid CDC position behavior (Advanced)",
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
"enum": ["Fail sync", "Re-sync data"],
"default": "Fail sync",
"order": 11,
"group": "advanced"
}
},
"groups": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -508,14 +509,8 @@ void testSyncShouldHandlePurgedLogsGracefully() throws Exception {
stateMessage.getGlobal().setSharedState(Jsons.jsonNode(cdcState));
final JsonNode state = Jsons.jsonNode(List.of(stateMessage));

// Re-run the sync to prove that an initial snapshot is initiated due to invalid resume token
final List<AirbyteMessage> messages2 = runRead(configuredCatalog, state);

final List<AirbyteRecordMessage> recordMessages2 = filterRecords(messages2);
final List<AirbyteStateMessage> stateMessages2 = filterStateMessages(messages2);

assertEquals(recordCount, recordMessages2.size());
assertEquals(recordCount + 1, stateMessages2.size());
// Re-run the sync to prove that a config error is thrown due to invalid resume token
assertThrows(Exception.class, () -> runRead(configuredCatalog, state));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package io.airbyte.integrations.source.mongodb.cdc;

import static io.airbyte.integrations.source.mongodb.MongoConstants.DATABASE_CONFIG_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.RESYNC_DATA_OPTION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -20,6 +22,7 @@
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.mongodb.MongoCommandException;
import com.mongodb.ServerAddress;
import com.mongodb.client.AggregateIterable;
Expand Down Expand Up @@ -205,20 +208,53 @@ void testCreateCdcIteratorsFromInitialStateWithCompletedInitialSnapshot() {
}

@Test
void testCreateCdcIteratorsWithCompletedInitialSnapshotSavedOffsetInvalid() {
void testCreateCdcIteratorsWithCompletedInitialSnapshotSavedOffsetInvalidDefaultBehavior() {
when(changeStreamIterable.cursor())
.thenReturn(mongoChangeStreamCursor)
.thenThrow(new MongoCommandException(new BsonDocument(), new ServerAddress()))
.thenReturn(mongoChangeStreamCursor);
final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(createInitialDebeziumState(InitialSnapshotStatus.COMPLETE));
assertThrows(ConfigErrorException.class, () -> cdcInitializer.createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG,
stateManager, EMITTED_AT, CONFIG));
}

@Test
void testCreateCdcIteratorsWithCompletedInitialSnapshotSavedOffsetFailOption() {
when(changeStreamIterable.cursor())
.thenReturn(mongoChangeStreamCursor)
.thenThrow(new MongoCommandException(new BsonDocument(), new ServerAddress()))
.thenReturn(mongoChangeStreamCursor);
final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(createInitialDebeziumState(InitialSnapshotStatus.COMPLETE));
assertThrows(ConfigErrorException.class, () -> cdcInitializer.createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG,
stateManager, EMITTED_AT, CONFIG));
}

@Test
void testCreateCdcIteratorsWithCompletedInitialSnapshotSavedOffsetInvalidResyncOption() {
MongoDbSourceConfig resyncConfig = new MongoDbSourceConfig(createConfig(RESYNC_DATA_OPTION));
when(changeStreamIterable.cursor())
.thenReturn(mongoChangeStreamCursor)
.thenThrow(new MongoCommandException(new BsonDocument(), new ServerAddress()))
.thenReturn(mongoChangeStreamCursor);
final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(createInitialDebeziumState(InitialSnapshotStatus.COMPLETE));
final List<AutoCloseableIterator<AirbyteMessage>> iterators = cdcInitializer
.createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG, stateManager, EMITTED_AT, CONFIG);
.createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG, stateManager, EMITTED_AT, resyncConfig);
assertNotNull(iterators);
assertEquals(2, iterators.size(), "Should always have 2 iterators: 1 for the initial snapshot and 1 for the cdc stream");
assertTrue(iterators.get(0).hasNext(),
"Initial snapshot iterator should at least have one message if its snapshot state is set as complete but needs to start over due to invalid saved offset");
}

JsonNode createConfig(String cdcCursorFailBehaviour) {
return Jsons.jsonNode(ImmutableMap.builder()
.put(DATABASE_CONFIG_CONFIGURATION_KEY,
Map.of(
MongoDbDebeziumConstants.Configuration.CONNECTION_STRING_CONFIGURATION_KEY, "mongodb://host:12345/",
MongoDbDebeziumConstants.Configuration.DATABASE_CONFIGURATION_KEY, DATABASE))
.put(INVALID_CDC_CURSOR_POSITION_PROPERTY, cdcCursorFailBehaviour)
.build());
}

@Test
void testUnableToExtractOffsetFromStateException() {
final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(createInitialDebeziumState(InitialSnapshotStatus.COMPLETE));
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ For more information regarding configuration parameters, please see [MongoDb Doc

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------|
| 1.2.10 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. |
| 1.2.11 | 2024-02-20 | [35375](https://github.com/airbytehq/airbyte/pull/35375) | Add config to throw an error on invalid CDC position and enable it by default. |
| 1.2.10 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. |
| 1.2.9 | 2024-02-13 | [35114](https://github.com/airbytehq/airbyte/pull/35114) | Extend subsequent cdc record wait time to the duration of initial. Bug Fixes |
| 1.2.8 | 2024-02-08 | [34748](https://github.com/airbytehq/airbyte/pull/34748) | Adopt CDK 0.19.0 |
| 1.2.7 | 2024-02-01 | [34759](https://github.com/airbytehq/airbyte/pull/34759) | Fail sync if initial snapshot for any stream fails. |
Expand Down

0 comments on commit bcdd2d8

Please sign in to comment.