Skip to content

Commit

Permalink
Add tracing instrumentation in transport service (opensearch-project#…
Browse files Browse the repository at this point in the history
…10042)

* Add instrumentation to transport service

Signed-off-by: Gagan Juneja <[email protected]>

* Add instrumentation to transport service

Signed-off-by: Gagan Juneja <[email protected]>

* Add javadoc

Signed-off-by: Gagan Juneja <[email protected]>

---------

Signed-off-by: Gagan Juneja <[email protected]>
Co-authored-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gaganjuneja and Gagan Juneja authored Sep 15, 2023
1 parent 74fcaab commit 483c2e3
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add instrumentation in rest and network layer. ([#9415](https://github.com/opensearch-project/OpenSearch/pull/9415))
- Allow parameterization of tests with OpenSearchIntegTestCase.SuiteScopeTestCase annotation ([#9916](https://github.com/opensearch-project/OpenSearch/pull/9916))
- Mute the query profile IT with concurrent execution ([#9840](https://github.com/opensearch-project/OpenSearch/pull/9840))
- Add instrumentation in transport service. ([#10042](https://github.com/opensearch-project/OpenSearch/pull/10042))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testSanityChecksWhenTracingEnabled() throws Exception {

InMemorySingletonSpanExporter exporter = InMemorySingletonSpanExporter.INSTANCE;
if (!exporter.getFinishedSpanItems().isEmpty()) {
validators.validate(exporter.getFinishedSpanItems(), 2);
validators.validate(exporter.getFinishedSpanItems(), 6);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public void testSingleNodesDoNotDiscoverEachOther() throws IOException, Interrup
@Override
public Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(featureFlagSettings())
.put("discovery.type", "single-node")
.put("transport.type", getTestTransportType())
/*
Expand Down Expand Up @@ -142,6 +143,7 @@ public boolean innerMatch(final LogEvent event) {
@Override
public Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(featureFlagSettings())
.put("discovery.type", "zen")
.put("transport.type", getTestTransportType())
.put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.handler;

import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;

import java.io.IOException;
import java.util.Objects;

/**
* Tracer wrapped {@link TransportResponseHandler}
* @param <T> TransportResponse
*/
public class TraceableTransportResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> {

private final Span span;
private final TransportResponseHandler<T> delegate;
private final Tracer tracer;

/**
* Constructor.
*
* @param delegate delegate
* @param span span
* @param tracer tracer
*/
private TraceableTransportResponseHandler(TransportResponseHandler<T> delegate, Span span, Tracer tracer) {
this.delegate = Objects.requireNonNull(delegate);
this.span = Objects.requireNonNull(span);
this.tracer = Objects.requireNonNull(tracer);
}

/**
* Factory method.
* @param delegate delegate
* @param span span
* @param tracer tracer
* @return transportResponseHandler
*/
public static <S extends TransportResponse> TransportResponseHandler<S> create(
TransportResponseHandler<S> delegate,
Span span,
Tracer tracer
) {
if (FeatureFlags.isEnabled(FeatureFlags.TELEMETRY) == true) {
return new TraceableTransportResponseHandler<S>(delegate, span, tracer);
} else {
return delegate;
}
}

@Override
public T read(StreamInput in) throws IOException {
return delegate.read(in);
}

@Override
public void handleResponse(T response) {
try (SpanScope scope = tracer.withSpanInScope(span)) {
delegate.handleResponse(response);
} finally {
span.endSpan();
}
}

@Override
public void handleException(TransportException exp) {
try (SpanScope scope = tracer.withSpanInScope(span)) {
delegate.handleException(exp);
} finally {
span.setError(exp);
span.endSpan();
}
}

@Override
public String executor() {
return delegate.executor();
}

@Override
public String toString() {
return delegate.toString();
}

@Override
public void handleRejection(Exception exp) {
try (SpanScope scope = tracer.withSpanInScope(span)) {
delegate.handleRejection(exp);
} finally {
span.endSpan();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* This package contains classes needed for tracing requests.
*/
package org.opensearch.telemetry.tracing.handler;
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ public interface TransportResponseHandler<T extends TransportResponse> extends W

String executor();

/**
* This method should be handling the rejection/failure scenarios where connection to the node is rejected or failed.
* It should be used to clear up the resources held by the {@link TransportResponseHandler}.
* @param exp exception
*/
default void handleRejection(Exception exp) {};

default <Q extends TransportResponse> TransportResponseHandler<Q> wrap(Function<Q, T> converter, Writeable.Reader<Q> reader) {
final TransportResponseHandler<T> self = this;
return new TransportResponseHandler<Q>() {
Expand Down
99 changes: 57 additions & 42 deletions server/src/main/java/org/opensearch/transport/TransportService.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskManager;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.handler.TraceableTransportResponseHandler;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -333,6 +337,7 @@ protected void doStop() {
getExecutorService().execute(new AbstractRunnable() {
@Override
public void onRejection(Exception e) {
holderToNotify.handler().handleRejection(e);
// if we get rejected during node shutdown we don't wanna bubble it up
logger.debug(
() -> new ParameterizedMessage(
Expand All @@ -345,6 +350,7 @@ public void onRejection(Exception e) {

@Override
public void onFailure(Exception e) {
holderToNotify.handler().handleRejection(e);
logger.warn(
() -> new ParameterizedMessage(
"failed to notify response handler on exception, action: {}",
Expand Down Expand Up @@ -861,53 +867,60 @@ public final <T extends TransportResponse> void sendRequest(
final TransportRequestOptions options,
final TransportResponseHandler<T> handler
) {
try {
logger.debug("Action: " + action);
final TransportResponseHandler<T> delegate;
if (request.getParentTask().isSet()) {
// TODO: capture the connection instead so that we can cancel child tasks on the remote connections.
final Releasable unregisterChildNode = taskManager.registerChildNode(request.getParentTask().getId(), connection.getNode());
delegate = new TransportResponseHandler<T>() {
@Override
public void handleResponse(T response) {
unregisterChildNode.close();
handler.handleResponse(response);
}
final Span span = tracer.startSpan(SpanBuilder.from(action, connection));
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
TransportResponseHandler<T> traceableTransportResponseHandler = TraceableTransportResponseHandler.create(handler, span, tracer);
try {
logger.debug("Action: " + action);
final TransportResponseHandler<T> delegate;
if (request.getParentTask().isSet()) {
// TODO: capture the connection instead so that we can cancel child tasks on the remote connections.
final Releasable unregisterChildNode = taskManager.registerChildNode(
request.getParentTask().getId(),
connection.getNode()
);
delegate = new TransportResponseHandler<T>() {
@Override
public void handleResponse(T response) {
unregisterChildNode.close();
traceableTransportResponseHandler.handleResponse(response);
}

@Override
public void handleException(TransportException exp) {
unregisterChildNode.close();
handler.handleException(exp);
}
@Override
public void handleException(TransportException exp) {
unregisterChildNode.close();
traceableTransportResponseHandler.handleException(exp);
}

@Override
public String executor() {
return handler.executor();
}
@Override
public String executor() {
return traceableTransportResponseHandler.executor();
}

@Override
public T read(StreamInput in) throws IOException {
return handler.read(in);
}
@Override
public T read(StreamInput in) throws IOException {
return traceableTransportResponseHandler.read(in);
}

@Override
public String toString() {
return getClass().getName() + "/[" + action + "]:" + handler.toString();
}
};
} else {
delegate = handler;
}
asyncSender.sendRequest(connection, action, request, options, delegate);
} catch (final Exception ex) {
// the caller might not handle this so we invoke the handler
final TransportException te;
if (ex instanceof TransportException) {
te = (TransportException) ex;
} else {
te = new TransportException("failure to send", ex);
@Override
public String toString() {
return getClass().getName() + "/[" + action + "]:" + handler.toString();
}
};
} else {
delegate = traceableTransportResponseHandler;
}
asyncSender.sendRequest(connection, action, request, options, delegate);
} catch (final Exception ex) {
// the caller might not handle this so we invoke the handler
final TransportException te;
if (ex instanceof TransportException) {
te = (TransportException) ex;
} else {
te = new TransportException("failure to send", ex);
}
traceableTransportResponseHandler.handleException(te);
}
handler.handleException(te);
}
}

Expand Down Expand Up @@ -1017,6 +1030,7 @@ private <T extends TransportResponse> void sendRequestInternal(
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
public void onRejection(Exception e) {
contextToNotify.handler().handleRejection(e);
// if we get rejected during node shutdown we don't wanna bubble it up
logger.debug(
() -> new ParameterizedMessage(
Expand All @@ -1029,6 +1043,7 @@ public void onRejection(Exception e) {

@Override
public void onFailure(Exception e) {
contextToNotify.handler().handleRejection(e);
logger.warn(
() -> new ParameterizedMessage(
"failed to notify response handler on exception, action: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2300,9 +2300,8 @@ public static void afterClass() throws Exception {
INSTANCE.printTestMessage("cleaning up after");
INSTANCE.afterInternal(true);
checkStaticState(true);
StrictCheckSpanProcessor.validateTracingStateOnShutdown();
}

StrictCheckSpanProcessor.validateTracingStateOnShutdown();
} finally {
SUITE_SEED = null;
currentCluster = null;
Expand Down

0 comments on commit 483c2e3

Please sign in to comment.