Skip to content

Commit

Permalink
[Source-mongo] : Op log logging (#33549)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored Dec 18, 2023
1 parent 63e96fb commit 279dd98
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 4 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ MavenLocal debugging steps:
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.8.0 | 2023-12-18 | [\#33506](https://github.com/airbytehq/airbyte/pull/33506) | Improve async destination shutdown logic; more JDBC async migration work; improve DAT test schema handling |
| 0.7.9 | 2023-12-18 | [\#33549](https://github.com/airbytehq/airbyte/pull/33549) | Improve MongoDB logging. |
| 0.7.8 | 2023-12-18 | [\#33365](https://github.com/airbytehq/airbyte/pull/33365) | Emit stream statuses more consistently |
| 0.7.7 | 2023-12-18 | [\#33434](https://github.com/airbytehq/airbyte/pull/33307) | Remove LEGACY state |
| 0.7.6 | 2023-12-14 | [\#32328](https://github.com/airbytehq/airbyte/pull/33307) | Add schema less mode for mongodb CDC. Fixes for non standard mongodb id type. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class MongoDbDebeziumStateUtil implements DebeziumStateUtil {
*/
public JsonNode constructInitialDebeziumState(final BsonDocument resumeToken, final MongoClient mongoClient, final String serverId) {
final String replicaSet = getReplicaSetName(mongoClient);
LOGGER.info("Initial resume token '{}' constructed", ResumeTokens.getData(resumeToken).asString().getValue());
final JsonNode state = formatState(serverId, replicaSet, ((BsonString) ResumeTokens.getData(resumeToken)).getValue());
LOGGER.info("Initial Debezium state constructed: {}", state);
return state;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ public class DebugUtil {
public static void debug(final Source debugSource) throws Exception {
final JsonNode debugConfig = DebugUtil.getConfig();
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = DebugUtil.getCatalog();
final JsonNode state = DebugUtil.getState();
JsonNode state;
try {
state = DebugUtil.getState();
} catch (final Exception e) {
state = null;
}

debugSource.check(debugConfig);
debugSource.discover(debugConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.7.7'
cdkVersionRequired = '0.7.9'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.2.0
dockerImageTag: 1.2.1
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoDatabase;
import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumPropertiesManager;
import io.airbyte.cdk.integrations.debezium.internals.RecordWaitTimeUtil;
Expand All @@ -32,6 +33,7 @@
import java.util.function.Supplier;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -89,6 +91,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
final String databaseName = config.getDatabaseName();
final boolean isEnforceSchema = config.getEnforceSchema();
final Properties defaultDebeziumProperties = MongoDbCdcProperties.getDebeziumProperties();
logOplogInfo(mongoClient);
final BsonDocument resumeToken = MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient);
final JsonNode initialDebeziumState =
mongoDbDebeziumStateUtil.constructInitialDebeziumState(resumeToken, mongoClient, databaseName);
Expand Down Expand Up @@ -155,4 +158,18 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
return List.of(initialSnapshotIterator, AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null));
}

private void logOplogInfo(final MongoClient mongoClient) {
try {
final MongoDatabase localDatabase = mongoClient.getDatabase("local");
final Document command = new Document("collStats", "oplog.rs");
final Document result = localDatabase.runCommand(command);
if (result != null) {
LOGGER.info("Max oplog size is {} bytes", result.getInteger("maxSize"));
LOGGER.info("Free space in oplog is {} bytes", result.getInteger("freeStorageSize"));
}
} catch (final Exception e) {
LOGGER.warn("Unable to query for op log stats, exception: {}" + e.getMessage());
}
}

}
3 changes: 2 additions & 1 deletion docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,10 @@ For more information regarding configuration parameters, please see [MongoDb Doc

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------|
| 1.2.1 | 2023-12-18 | [33549](https://github.com/airbytehq/airbyte/pull/33549) | Add logging to understand op log size. |
| 1.2.0 | 2023-12-18 | [33438](https://github.com/airbytehq/airbyte/pull/33438) | Remove LEGACY state flag |
| 1.1.0 | 2023-12-14 | [32328](https://github.com/airbytehq/airbyte/pull/32328) | Schema less mode in mongodb. |
| 1.0.12 | 2023-12-13 | [33430](https://github.com/airbytehq/airbyte/pull/33430) | Support for better debugging tools. |
| 1.0.12 | 2023-12-13 | [33430](https://github.com/airbytehq/airbyte/pull/33430) | Add more verbose logging. |
| 1.0.11 | 2023-11-28 | [33356](https://github.com/airbytehq/airbyte/pull/33356) | Support for better debugging tools. |
| 1.0.10 | 2023-11-28 | [32886](https://github.com/airbytehq/airbyte/pull/32886) | Handle discover phase OOMs |
| 1.0.9 | 2023-11-08 | [32285](https://github.com/airbytehq/airbyte/pull/32285) | Additional support to read UUIDs |
Expand Down

0 comments on commit 279dd98

Please sign in to comment.