From bddf0d3762f55d959ae76f3dea06259e7e795ebe Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Tue, 3 Oct 2023 16:07:34 -0400 Subject: [PATCH] Fix the HttpChannel trace termination in case of exceptions (#10325) Signed-off-by: Andriy Redko --- .../org/opensearch/telemetry/tracing/DefaultTracer.java | 5 ++++- .../org/opensearch/http/AbstractHttpServerTransport.java | 1 + server/src/main/java/org/opensearch/http/HttpChannel.java | 5 +++++ .../telemetry/tracing/channels/TraceableHttpChannel.java | 7 +++++++ .../tracing/handler/TraceableTransportResponseHandler.java | 1 + .../org/opensearch/transport/TransportResponseHandler.java | 2 +- 6 files changed, 19 insertions(+), 2 deletions(-) diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java index d3c28b3a9cb5e..79b7e4aca6c2f 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java @@ -25,7 +25,10 @@ */ @InternalApi class DefaultTracer implements Tracer { - static final String THREAD_NAME = "th_name"; + /** + * Current thread name. + */ + static final String THREAD_NAME = "thread.name"; private final TracingTelemetry tracingTelemetry; private final TracerContextStorage tracerContextStorage; diff --git a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java index ed44102d0abe4..b8f8abb6c2c23 100644 --- a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java @@ -298,6 +298,7 @@ static int resolvePublishPort(Settings settings, List boundAdd } public void onException(HttpChannel channel, Exception e) { + channel.handleException(e); if (lifecycle.started() == false) { // just close and ignore - we are already stopped and just need to make sure we release all resources CloseableChannel.closeChannel(channel); diff --git a/server/src/main/java/org/opensearch/http/HttpChannel.java b/server/src/main/java/org/opensearch/http/HttpChannel.java index 99aaed23c69b8..6dcdaf9034413 100644 --- a/server/src/main/java/org/opensearch/http/HttpChannel.java +++ b/server/src/main/java/org/opensearch/http/HttpChannel.java @@ -43,6 +43,11 @@ * @opensearch.internal */ public interface HttpChannel extends CloseableChannel { + /** + * Notify HTTP channel that exception happens and the response may not be sent (for example, timeout) + * @param ex the exception being raised + */ + default void handleException(Exception ex) {} /** * Sends an http response to the channel. The listener will be executed once the send process has been diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java index 9229d334dea01..03848e8e58207 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java @@ -56,6 +56,13 @@ public static HttpChannel create(HttpChannel delegate, Span span, Tracer tracer) } } + @Override + public void handleException(Exception ex) { + span.addEvent("The HttpChannel was closed without sending the response"); + span.setError(ex); + span.endSpan(); + } + @Override public void close() { delegate.close(); diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java b/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java index abddfcc6cebc1..538bf82a1dbec 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java @@ -101,6 +101,7 @@ public void handleRejection(Exception exp) { try (SpanScope scope = tracer.withSpanInScope(span)) { delegate.handleRejection(exp); } finally { + span.setError(exp); span.endSpan(); } } diff --git a/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java b/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java index 90e94e52515ce..8992af18edb48 100644 --- a/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java +++ b/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java @@ -57,7 +57,7 @@ public interface TransportResponseHandler extends W * It should be used to clear up the resources held by the {@link TransportResponseHandler}. * @param exp exception */ - default void handleRejection(Exception exp) {}; + default void handleRejection(Exception exp) {} default TransportResponseHandler wrap(Function converter, Writeable.Reader reader) { final TransportResponseHandler self = this;