Skip to content

Commit

Permalink
Fix OTel GRPC trace exporter dropping spans (#37692)
Browse files Browse the repository at this point in the history
Fixes otel exporter dropping spans in some cases, for example:
- If, while waiting for a response to an export request, the span buffer
is flushed again.
- If the span buffer is flushed after an upstream span ends, and there
exists a corresponding downstream span, and the buffer is flushed again
when the downstream span ends. This can be triggered by setting the
`tracing.opentelemetry.min_flush_spans` runtime variable to `1`, or if
the flush timer expires just after the upstream span ends. This is
similar to the first case, but the specific order of http callbacks
prevented the exporter client from ever being reset before the
downstream request completed, if it was used to export spans after the
upstream request completed.

Risk Level: 
Testing: Yes; added an integration test that I think will cover this.
Updated mock usage in other otel tests to account for refactors to the
exporter.

---------

Signed-off-by: Joe Kralicky <[email protected]>
  • Loading branch information
kralicky authored Dec 23, 2024
1 parent fb52980 commit e22980c
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 139 deletions.
35 changes: 32 additions & 3 deletions source/extensions/tracers/opentelemetry/grpc_trace_exporter.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "source/extensions/tracers/opentelemetry/grpc_trace_exporter.h"

#include "source/common/common/logger.h"
#include "source/common/grpc/status.h"
#include "source/extensions/tracers/opentelemetry/otlp_utils.h"

namespace Envoy {
namespace Extensions {
Expand All @@ -9,11 +11,38 @@ namespace OpenTelemetry {

OpenTelemetryGrpcTraceExporter::OpenTelemetryGrpcTraceExporter(
const Grpc::RawAsyncClientSharedPtr& client)
: client_(client, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"opentelemetry.proto.collector.trace.v1.TraceService.Export")) {}
: client_(client),
service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"opentelemetry.proto.collector.trace.v1.TraceService.Export")) {}

void OpenTelemetryGrpcTraceExporter::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
metadata.setReferenceUserAgent(OtlpUtils::getOtlpUserAgentHeader());
}

void OpenTelemetryGrpcTraceExporter::onSuccess(
Grpc::ResponsePtr<ExportTraceServiceResponse>&& response, Tracing::Span&) {
if (response->has_partial_success()) {
auto msg = response->partial_success().error_message();
auto rejected_spans = response->partial_success().rejected_spans();
if (rejected_spans > 0 || !msg.empty()) {
if (msg.empty()) {
msg = "empty message";
}
ENVOY_LOG(debug, "OTLP partial success: {} ({} spans rejected)", msg, rejected_spans);
}
}
}

void OpenTelemetryGrpcTraceExporter::onFailure(Grpc::Status::GrpcStatus status,
const std::string& message, Tracing::Span&) {
ENVOY_LOG(debug, "OTLP trace export failed with status: {}, message: {}",
Grpc::Utility::grpcStatusToString(status), message);
}

bool OpenTelemetryGrpcTraceExporter::log(const ExportTraceServiceRequest& request) {
return client_.log(request);
client_->send(service_method_, request, *this, Tracing::NullSpan::instance(),
Http::AsyncClient::RequestOptions());
return true;
}

} // namespace OpenTelemetry
Expand Down
83 changes: 10 additions & 73 deletions source/extensions/tracers/opentelemetry/grpc_trace_exporter.h
Original file line number Diff line number Diff line change
@@ -1,99 +1,36 @@
#pragma once

#include "envoy/grpc/async_client_manager.h"

#include "source/common/common/logger.h"
#include "source/common/grpc/typed_async_client.h"
#include "source/extensions/tracers/opentelemetry/otlp_utils.h"
#include "source/extensions/tracers/opentelemetry/trace_exporter.h"

#include "opentelemetry/proto/collector/trace/v1/trace_service.pb.h"

namespace Envoy {
namespace Extensions {
namespace Tracers {
namespace OpenTelemetry {

using opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
using opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse;

/**
* Exporter client for OTLP Traces. Provides abstraction on top of gRPC stream.
*/
class OpenTelemetryGrpcTraceExporterClient : Logger::Loggable<Logger::Id::tracing> {
class OpenTelemetryGrpcTraceExporter
: public OpenTelemetryTraceExporter,
public Grpc::AsyncRequestCallbacks<ExportTraceServiceResponse> {
public:
OpenTelemetryGrpcTraceExporterClient(const Grpc::RawAsyncClientSharedPtr& client,
const Protobuf::MethodDescriptor& service_method)
: client_(client), service_method_(service_method) {}

struct LocalStream : public Grpc::AsyncStreamCallbacks<
opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse> {
LocalStream(OpenTelemetryGrpcTraceExporterClient& parent) : parent_(parent) {}

// Grpc::AsyncStreamCallbacks
void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override {
metadata.setReferenceUserAgent(OtlpUtils::getOtlpUserAgentHeader());
}
void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {}
void onReceiveMessage(
std::unique_ptr<opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse>&&)
override {}
void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {}
void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override {
ASSERT(parent_.stream_ != nullptr);
if (parent_.stream_->stream_ != nullptr) {
// Only reset if we have a stream. Otherwise we had an inline failure and we will clear the
// stream data in send().
parent_.stream_.reset();
}
}
OpenTelemetryGrpcTraceExporter(const Grpc::RawAsyncClientSharedPtr& client);
~OpenTelemetryGrpcTraceExporter() override = default;

OpenTelemetryGrpcTraceExporterClient& parent_;
Grpc::AsyncStream<opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest>
stream_{};
};
void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override;

bool log(const ExportTraceServiceRequest& request) {
// If we don't have a stream already, we need to initialize it.
if (!stream_) {
stream_ = std::make_unique<LocalStream>(*this);
}
void onSuccess(Grpc::ResponsePtr<ExportTraceServiceResponse>&& response, Tracing::Span&) override;

// If we don't have a Grpc AsyncStream, we need to initialize it.
if (stream_->stream_ == nullptr) {
stream_->stream_ =
client_->start(service_method_, *stream_, Http::AsyncClient::StreamOptions());
}
void onFailure(Grpc::Status::GrpcStatus status, const std::string& message,
Tracing::Span&) override;

// If we do have a Grpc AsyncStream, we can first check if we are above the write buffer, and
// send message if it's ok; if we don't have a stream, we need to clear out the stream data
// after stream creation failed.
if (stream_->stream_ != nullptr) {
if (stream_->stream_->isAboveWriteBufferHighWatermark()) {
return false;
}
stream_->stream_->sendMessage(request, true);
} else {
stream_.reset();
}
return true;
}
bool log(const ExportTraceServiceRequest& request) override;

Grpc::AsyncClient<ExportTraceServiceRequest, ExportTraceServiceResponse> client_;
std::unique_ptr<LocalStream> stream_;
const Protobuf::MethodDescriptor& service_method_;
};

class OpenTelemetryGrpcTraceExporter : public OpenTelemetryTraceExporter {
public:
OpenTelemetryGrpcTraceExporter(const Grpc::RawAsyncClientSharedPtr& client);

bool log(const ExportTraceServiceRequest& request) override;

private:
OpenTelemetryGrpcTraceExporterClient client_;
};

} // namespace OpenTelemetry
} // namespace Tracers
} // namespace Extensions
Expand Down
1 change: 1 addition & 0 deletions source/extensions/tracers/opentelemetry/trace_exporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "opentelemetry/proto/collector/trace/v1/trace_service.pb.h"

using opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
using opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse;

namespace Envoy {
namespace Extensions {
Expand Down
18 changes: 18 additions & 0 deletions test/extensions/tracers/opentelemetry/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,21 @@ envoy_extension_cc_test(
"//test/test_common:utility_lib",
],
)

envoy_extension_cc_test(
name = "grpc_trace_exporter_integration_test",
srcs = ["grpc_trace_exporter_integration_test.cc"],
extension_names = ["envoy.tracers.opentelemetry"],
rbe_pool = "6gig",
deps = [
"//source/extensions/tracers/opentelemetry:config",
"//source/extensions/tracers/opentelemetry:opentelemetry_tracer_lib",
"//source/extensions/tracers/opentelemetry:trace_exporter",
"//test/common/config:dummy_config_proto_cc_proto",
"//test/integration:http_integration_lib",
"//test/test_common:test_runtime_lib",
"//test/test_common:utility_lib",
"@envoy_api//envoy/config/trace/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/network/http_connection_manager/v3:pkg_cc_proto",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
#include <cstddef>

#include "envoy/config/trace/v3/opentelemetry.pb.h"
#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h"

#include "test/integration/http_integration.h"

#include "gtest/gtest.h"
#include "opentelemetry/proto/collector/trace/v1/trace_service.pb.h"

namespace Envoy {

using envoy::config::trace::v3::OpenTelemetryConfig;
using envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager;
using opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
using opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse;

constexpr auto timeout = std::chrono::milliseconds(500);

class OpenTelemetryTraceExporterIntegrationTest
: public testing::TestWithParam<std::tuple<int, int>>,
public HttpIntegrationTest {
public:
OpenTelemetryTraceExporterIntegrationTest();

~OpenTelemetryTraceExporterIntegrationTest() override {
if (connection_) {
AssertionResult result = connection_->close();
RELEASE_ASSERT(result, result.message());
result = connection_->waitForDisconnect();
RELEASE_ASSERT(result, result.message());
connection_.reset();
}
}

void createUpstreams() override {
HttpIntegrationTest::createUpstreams();
addFakeUpstream(Http::CodecType::HTTP2);
grpc_receiver_upstream_ = fake_upstreams_.back().get();
}

void setFlushIntervalMs(int64_t ms) {
(*otel_runtime_config_.mutable_fields())["tracing.opentelemetry.flush_interval_ms"]
.set_number_value(ms);
}

void setMinFlushSpans(int64_t ms) {
(*otel_runtime_config_.mutable_fields())["tracing.opentelemetry.min_flush_spans"]
.set_number_value(ms);
}

void initialize() override {
setFlushIntervalMs(99999'000); // disable flush interval
setUpstreamCount(1);
config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
auto* grpc_receiver_cluster = bootstrap.mutable_static_resources()->add_clusters();
grpc_receiver_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]);
grpc_receiver_cluster->set_name("grpc-receiver");

auto* layer = bootstrap.mutable_layered_runtime()->add_layers();
layer->set_name("test_otel_layer");
auto* static_layer = layer->mutable_static_layer();
layer->set_name("test_otel_static_layer");
*static_layer = otel_runtime_config_;
ConfigHelper::setHttp2(*grpc_receiver_cluster);
});

config_helper_.addConfigModifier([&](HttpConnectionManager& hcm) -> void {
HttpConnectionManager::Tracing tracing;
tracing.mutable_random_sampling()->set_value(100);
tracing.mutable_spawn_upstream_span()->set_value(true);

OpenTelemetryConfig otel_config;
otel_config.set_service_name("my-service");
otel_config.mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("grpc-receiver");
*otel_config.mutable_grpc_service()->mutable_timeout() =
Protobuf::util::TimeUtil::MillisecondsToDuration(250);

tracing.mutable_provider()->set_name("envoy.tracers.opentelemetry");
tracing.mutable_provider()->mutable_typed_config()->PackFrom(otel_config);

*hcm.mutable_tracing() = tracing;
});
HttpIntegrationTest::initialize();
}

void cleanup() { cleanupUpstreamAndDownstream(); }

void doHttpRequest() {
codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));

auto response = sendRequestAndWaitForResponse(default_request_headers_, 0,
default_response_headers_, 0, 0, timeout);

codec_client_->close();
auto _ = codec_client_->waitForDisconnect(timeout);
}

FakeUpstream* grpc_receiver_upstream_{};
ProtobufWkt::Struct otel_runtime_config_;

FakeHttpConnectionPtr connection_;
std::vector<FakeStreamPtr> streams_;
};

struct TestCase {};

OpenTelemetryTraceExporterIntegrationTest::OpenTelemetryTraceExporterIntegrationTest()
: HttpIntegrationTest(Http::CodecType::HTTP1, Network::Address::IpVersion::v4){};

INSTANTIATE_TEST_SUITE_P(All, OpenTelemetryTraceExporterIntegrationTest,
// values are (min_flush_spans, num_requests)
testing::Values(std::make_tuple(1, 1), std::make_tuple(1, 2),
std::make_tuple(2, 1), std::make_tuple(2, 2),
std::make_tuple(5, 5), std::make_tuple(6, 3)));

TEST_P(OpenTelemetryTraceExporterIntegrationTest, GrpcExporter) {
auto [min_flush_spans, num_requests] = GetParam();
setMinFlushSpans(min_flush_spans);

initialize();

dispatcher_->post([this, num_requests = num_requests]() {
// each request will create two spans, one upstream and one downstream
for (auto i = 0; i < num_requests; i++) {
doHttpRequest();
}
});

// verify that we receive the correct number of export requests, each with the correct number
// of spans (there should be no unexported spans remaining)
auto num_expected_exports = (num_requests * 2) / min_flush_spans;
ASSERT_TRUE(grpc_receiver_upstream_->waitForHttpConnection(*dispatcher_, connection_));

std::map<std::string, int> name_counts;
for (auto i = 0; i < num_expected_exports; i++) {
FakeStreamPtr stream;
ASSERT_TRUE(connection_->waitForNewStream(*dispatcher_, stream, timeout))
<< "Expected to receive " << num_expected_exports << " export requests, but got " << i;
ExportTraceServiceRequest req;
ASSERT_TRUE(stream->waitForGrpcMessage(*dispatcher_, req, timeout));
stream->startGrpcStream();
ExportTraceServiceResponse resp;
stream->sendGrpcMessage(resp);
stream->finishGrpcStream(Grpc::Status::WellKnownGrpcStatus::Ok);

ASSERT_EQ(1, req.resource_spans().size());
ASSERT_EQ(1, req.resource_spans(0).scope_spans().size());
ASSERT_EQ(min_flush_spans, req.resource_spans(0).scope_spans(0).spans().size());
for (auto j = 0; j < min_flush_spans; j++) {
++name_counts[req.resource_spans(0).scope_spans(0).spans().at(j).name()];
}
ASSERT_TRUE(stream->waitForEndStream(*dispatcher_, timeout));
streams_.push_back(std::move(stream));
}

// the number of upstream and downstream spans received should be equal
ASSERT_EQ(2, name_counts.size());
ASSERT_THAT(name_counts,
testing::AllOf(testing::Contains(testing::Pair("ingress", testing::Eq(num_requests))),
testing::Contains(testing::Pair("router cluster_0 egress",
testing::Eq(num_requests)))));

cleanup();
}

} // namespace Envoy
Loading

0 comments on commit e22980c

Please sign in to comment.