Skip to content

Add tracing support using Micrometer #1695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions driver-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dependencies {

optionalImplementation(libs.snappy.java)
optionalImplementation(libs.zstd.jni)
optionalImplementation(libs.micrometer)

testImplementation(project(path = ":bson", configuration = "testArtifacts"))
testImplementation(libs.reflections)
Expand Down
29 changes: 29 additions & 0 deletions driver-core/src/main/com/mongodb/MongoClientSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.mongodb.connection.SslSettings;
import com.mongodb.connection.TransportSettings;
import com.mongodb.event.CommandListener;
import com.mongodb.internal.tracing.Tracer;
import com.mongodb.lang.Nullable;
import com.mongodb.spi.dns.DnsClient;
import com.mongodb.spi.dns.InetAddressResolver;
Expand Down Expand Up @@ -118,6 +119,7 @@ public final class MongoClientSettings {
private final InetAddressResolver inetAddressResolver;
@Nullable
private final Long timeoutMS;
private final Tracer tracer;

/**
* Gets the default codec registry. It includes the following providers:
Expand Down Expand Up @@ -238,6 +240,7 @@ public static final class Builder {
private ContextProvider contextProvider;
private DnsClient dnsClient;
private InetAddressResolver inetAddressResolver;
private Tracer tracer;

private Builder() {
}
Expand Down Expand Up @@ -275,6 +278,7 @@ private Builder(final MongoClientSettings settings) {
if (settings.heartbeatSocketTimeoutSetExplicitly) {
heartbeatSocketTimeoutMS = settings.heartbeatSocketSettings.getReadTimeout(MILLISECONDS);
}
tracer = settings.tracer;
}

/**
Expand Down Expand Up @@ -723,6 +727,20 @@ Builder heartbeatSocketTimeoutMS(final int heartbeatSocketTimeoutMS) {
return this;
}

/**
* Sets the tracer to use for creating Spans for operations, commands and transactions.
*
* @param tracer the tracer
* @see com.mongodb.tracing.MicrometerTracer
* @return this
* @since 5.6
*/
@Alpha(Reason.CLIENT)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Is this going to be released as alpha?

public Builder tracer(final Tracer tracer) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that Tracer and related types are part of the API, they should not be in com.mongodb.internal

this.tracer = tracer;
return this;
}

/**
* Build an instance of {@code MongoClientSettings}.
*
Expand Down Expand Up @@ -1040,6 +1058,16 @@ public ContextProvider getContextProvider() {
return contextProvider;
}

/**
* Get the tracer to create Spans for operations, commands and transactions.
*
* @return the configured Tracer
* @since 5.6
*/
public Tracer getTracer() {
return tracer;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down Expand Up @@ -1156,5 +1184,6 @@ private MongoClientSettings(final Builder builder) {
heartbeatConnectTimeoutSetExplicitly = builder.heartbeatConnectTimeoutMS != 0;
contextProvider = builder.contextProvider;
timeoutMS = builder.timeoutMS;
tracer = (builder.tracer == null) ? Tracer.NO_OP : builder.tracer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import com.mongodb.internal.logging.StructuredLogger;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.internal.time.Timeout;
import com.mongodb.internal.tracing.Span;
import com.mongodb.internal.tracing.TracingManager;
import com.mongodb.lang.Nullable;
import org.bson.BsonBinaryReader;
import org.bson.BsonDocument;
Expand Down Expand Up @@ -94,6 +96,18 @@
import static com.mongodb.internal.connection.ProtocolHelper.isCommandOk;
import static com.mongodb.internal.logging.LogMessage.Level.DEBUG;
import static com.mongodb.internal.thread.InterruptionUtil.translateInterruptedException;
import static com.mongodb.internal.tracing.Tags.CLIENT_CONNECTION_ID;
import static com.mongodb.internal.tracing.Tags.CURSOR_ID;
import static com.mongodb.internal.tracing.Tags.NAMESPACE;
import static com.mongodb.internal.tracing.Tags.QUERY_SUMMARY;
import static com.mongodb.internal.tracing.Tags.QUERY_TEXT;
import static com.mongodb.internal.tracing.Tags.SERVER_ADDRESS;
import static com.mongodb.internal.tracing.Tags.SERVER_CONNECTION_ID;
import static com.mongodb.internal.tracing.Tags.SERVER_PORT;
import static com.mongodb.internal.tracing.Tags.SERVER_TYPE;
import static com.mongodb.internal.tracing.Tags.SESSION_ID;
import static com.mongodb.internal.tracing.Tags.SYSTEM;
import static com.mongodb.internal.tracing.Tags.TRANSACTION_NUMBER;
import static java.util.Arrays.asList;

/**
Expand Down Expand Up @@ -432,15 +446,44 @@ public boolean reauthenticationIsTriggered(@Nullable final Throwable t) {
private <T> T sendAndReceiveInternal(final CommandMessage message, final Decoder<T> decoder,
final OperationContext operationContext) {
CommandEventSender commandEventSender;

try (ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(this)) {
message.encode(bsonOutput, operationContext);
commandEventSender = createCommandEventSender(message, bsonOutput, operationContext);
commandEventSender.sendStartedEvent();
Span tracingSpan = createTracingSpan(message, operationContext, bsonOutput);

boolean isLoggingCommandNeeded = isLoggingCommandNeeded();
boolean isTracingCommandPayloadNeeded = tracingSpan != null && operationContext.getTracingManager().isCommandPayloadEnabled();

// Only hydrate the command document if necessary
BsonDocument commandDocument = null;
if (isLoggingCommandNeeded || isTracingCommandPayloadNeeded) {
commandDocument = message.getCommandDocument(bsonOutput);
}
if (isLoggingCommandNeeded) {
commandEventSender = new LoggingCommandEventSender(
SECURITY_SENSITIVE_COMMANDS, SECURITY_SENSITIVE_HELLO_COMMANDS, description, commandListener,
operationContext, message, commandDocument,
COMMAND_PROTOCOL_LOGGER, loggerSettings);
commandEventSender.sendStartedEvent();
} else {
commandEventSender = new NoOpCommandEventSender();
}
if (isTracingCommandPayloadNeeded) {
tracingSpan.tag(QUERY_TEXT, commandDocument.toJson());
}

try {
sendCommandMessage(message, bsonOutput, operationContext);
} catch (Exception e) {
if (tracingSpan != null) {
tracingSpan.error(e);
}
commandEventSender.sendFailedEvent(e);
throw e;
} finally {
if (tracingSpan != null) {
tracingSpan.end();
}
}
}

Expand Down Expand Up @@ -553,7 +596,18 @@ private <T> void sendAndReceiveAsyncInternal(final CommandMessage message, final

try {
message.encode(bsonOutput, operationContext);
CommandEventSender commandEventSender = createCommandEventSender(message, bsonOutput, operationContext);

CommandEventSender commandEventSender;
if (isLoggingCommandNeeded()) {
BsonDocument commandDocument = message.getCommandDocument(bsonOutput);
commandEventSender = new LoggingCommandEventSender(
SECURITY_SENSITIVE_COMMANDS, SECURITY_SENSITIVE_HELLO_COMMANDS, description, commandListener,
operationContext, message, commandDocument,
COMMAND_PROTOCOL_LOGGER, loggerSettings);
} else {
commandEventSender = new NoOpCommandEventSender();
}

commandEventSender.sendStartedEvent();
Compressor localSendCompressor = sendCompressor;
if (localSendCompressor == null || SECURITY_SENSITIVE_COMMANDS.contains(message.getCommandDocument(bsonOutput).getFirstKey())) {
Expand Down Expand Up @@ -952,19 +1006,74 @@ public void onResult(@Nullable final ByteBuf result, @Nullable final Throwable t

private static final StructuredLogger COMMAND_PROTOCOL_LOGGER = new StructuredLogger("protocol.command");

private CommandEventSender createCommandEventSender(final CommandMessage message, final ByteBufferBsonOutput bsonOutput,
final OperationContext operationContext) {
private boolean isLoggingCommandNeeded() {
boolean listensOrLogs = commandListener != null || COMMAND_PROTOCOL_LOGGER.isRequired(DEBUG, getClusterId());
if (!recordEverything && (isMonitoringConnection || !opened() || !authenticated.get() || !listensOrLogs)) {
return new NoOpCommandEventSender();
}
return new LoggingCommandEventSender(
SECURITY_SENSITIVE_COMMANDS, SECURITY_SENSITIVE_HELLO_COMMANDS, description, commandListener,
operationContext, message, bsonOutput,
COMMAND_PROTOCOL_LOGGER, loggerSettings);
return recordEverything || (!isMonitoringConnection && opened() && authenticated.get() && listensOrLogs);
}

private ClusterId getClusterId() {
return description.getConnectionId().getServerId().getClusterId();
}

/**
* Creates a tracing span for the given command message.
* <p>
* The span is only created if tracing is enabled and the command is not security-sensitive.
* It attaches various tags to the span, such as database system, namespace, query summary, opcode,
* server address, port, server type, client and server connection IDs, and, if applicable,
* transaction number and session ID. For cursor fetching commands, the parent context is retrieved using the cursor ID.
* If command payload tracing is enabled, the command document is also attached as a tag.
*
* @param message the command message to trace
* @param operationContext the operation context containing tracing and session information
* @param bsonOutput the BSON output used to serialize the command
* @return the created {@link Span}, or {@code null} if tracing is not enabled or the command is security-sensitive
*/
@Nullable
private Span createTracingSpan(final CommandMessage message, final OperationContext operationContext, final ByteBufferBsonOutput bsonOutput) {

TracingManager tracingManager = operationContext.getTracingManager();
BsonDocument command = message.getCommandDocument(bsonOutput);

String commandName = command.getFirstKey();

if (!tracingManager.isEnabled()
|| SECURITY_SENSITIVE_COMMANDS.contains(commandName)
|| SECURITY_SENSITIVE_HELLO_COMMANDS.contains(commandName)) {
return null;
}

Span span = tracingManager
.addSpan("Command " + commandName, operationContext.getTracingSpanContext())
.tag(SYSTEM, "mongodb")
.tag(NAMESPACE, message.getNamespace().getDatabaseName())
.tag(QUERY_SUMMARY, commandName);

if (command.containsKey("getMore")) {
span.tag(CURSOR_ID, command.getInt64("getMore").longValue());
}

tagServerAndConnectionInfo(span, message);
tagSessionAndTransactionInfo(span, operationContext);

return span;
}

private void tagServerAndConnectionInfo(final Span span, final CommandMessage message) {
span.tag(SERVER_ADDRESS, serverId.getAddress().getHost())
.tag(SERVER_PORT, String.valueOf(serverId.getAddress().getPort()))
.tag(SERVER_TYPE, message.getSettings().getServerType().name())
.tag(CLIENT_CONNECTION_ID, this.description.getConnectionId().toString())
.tag(SERVER_CONNECTION_ID, String.valueOf(this.description.getConnectionId().getServerValue()));
}

private void tagSessionAndTransactionInfo(final Span span, final OperationContext operationContext) {
SessionContext sessionContext = operationContext.getSessionContext();
if (sessionContext.hasSession() && !sessionContext.isImplicitSession()) {
span.tag(TRANSACTION_NUMBER, String.valueOf(sessionContext.getTransactionNumber()))
.tag(SESSION_ID, String.valueOf(sessionContext.getSessionId()
.get(sessionContext.getSessionId().getFirstKey())
.asBinary().asUuid()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class LoggingCommandEventSender implements CommandEventSender {
@Nullable final CommandListener commandListener,
final OperationContext operationContext,
final CommandMessage message,
final ByteBufferBsonOutput bsonOutput,
final BsonDocument commandDocument,
final StructuredLogger logger,
final LoggerSettings loggerSettings) {
this.description = description;
Expand All @@ -88,7 +88,7 @@ class LoggingCommandEventSender implements CommandEventSender {
this.loggerSettings = loggerSettings;
this.startTimeNanos = System.nanoTime();
this.message = message;
this.commandDocument = message.getCommandDocument(bsonOutput);
this.commandDocument = commandDocument;
this.commandName = commandDocument.getFirstKey();
this.redactionRequired = securitySensitiveCommands.contains(commandName)
|| (securitySensitiveHelloCommands.contains(commandName) && commandDocument.containsKey("speculativeAuthenticate"));
Expand Down
Loading