Skip to content

Commit

Permalink
[otlp] Replace the current trace implementation with the new one (#6003)
Browse files Browse the repository at this point in the history
  • Loading branch information
rajkumar-rangaraj authored Nov 27, 2024
1 parent 7eeddf5 commit 84e6afb
Show file tree
Hide file tree
Showing 15 changed files with 183 additions and 899 deletions.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClie
/// <summary>Class for sending OTLP trace export request over HTTP.</summary>
internal sealed class ProtobufOtlpHttpExportClient : ProtobufOtlpExportClient
{
private static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/x-protobuf");
internal static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/x-protobuf");
private static readonly ExportClientHttpResponse SuccessExportResponse = new(success: true, deadlineUtc: default, response: null, exception: null);

internal ProtobufOtlpHttpExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using LogOtlpCollector = OpenTelemetry.Proto.Collector.Logs.V1;
using MetricsOtlpCollector = OpenTelemetry.Proto.Collector.Metrics.V1;
using TraceOtlpCollector = OpenTelemetry.Proto.Collector.Trace.V1;

namespace OpenTelemetry.Exporter;

Expand Down Expand Up @@ -99,42 +98,6 @@ public static THeaders GetHeaders<THeaders>(this OtlpExporterOptions options, Ac
return headers;
}

public static OtlpExporterTransmissionHandler<TraceOtlpCollector.ExportTraceServiceRequest> GetTraceExportTransmissionHandler(this OtlpExporterOptions options, ExperimentalOptions experimentalOptions)
{
var exportClient = GetTraceExportClient(options);

// `HttpClient.Timeout.TotalMilliseconds` would be populated with the correct timeout value for both the exporter configuration cases:
// 1. User provides their own HttpClient. This case is straightforward as the user wants to use their `HttpClient` and thereby the same client's timeout value.
// 2. If the user configures timeout via the exporter options, then the timeout set for the `HttpClient` initialized by the exporter will be set to user provided value.
double timeoutMilliseconds = exportClient is OtlpHttpTraceExportClient httpTraceExportClient
? httpTraceExportClient.HttpClient.Timeout.TotalMilliseconds
: options.TimeoutMilliseconds;

if (experimentalOptions.EnableInMemoryRetry)
{
return new OtlpExporterRetryTransmissionHandler<TraceOtlpCollector.ExportTraceServiceRequest>(exportClient, timeoutMilliseconds);
}
else if (experimentalOptions.EnableDiskRetry)
{
Debug.Assert(!string.IsNullOrEmpty(experimentalOptions.DiskRetryDirectoryPath), $"{nameof(experimentalOptions.DiskRetryDirectoryPath)} is null or empty");

return new OtlpExporterPersistentStorageTransmissionHandler<TraceOtlpCollector.ExportTraceServiceRequest>(
exportClient,
timeoutMilliseconds,
(byte[] data) =>
{
var request = new TraceOtlpCollector.ExportTraceServiceRequest();
request.MergeFrom(data);
return request;
},
Path.Combine(experimentalOptions.DiskRetryDirectoryPath, "traces"));
}
else
{
return new OtlpExporterTransmissionHandler<TraceOtlpCollector.ExportTraceServiceRequest>(exportClient, timeoutMilliseconds);
}
}

public static ProtobufOtlpExporterTransmissionHandler GetProtobufExportTransmissionHandler(this OtlpExporterOptions options, ExperimentalOptions experimentalOptions, OtlpSignalType otlpSignalType)
{
var exportClient = GetProtobufExportClient(options, otlpSignalType);
Expand Down Expand Up @@ -169,6 +132,11 @@ public static IProtobufExportClient GetProtobufExportClient(this OtlpExporterOpt
{
var httpClient = options.HttpClientFactory?.Invoke() ?? throw new InvalidOperationException("OtlpExporterOptions was missing HttpClientFactory or it returned null.");

if (options.Protocol != OtlpExportProtocol.Grpc && options.Protocol != OtlpExportProtocol.HttpProtobuf)
{
throw new NotSupportedException($"Protocol {options.Protocol} is not supported.");
}

return otlpSignalType switch
{
OtlpSignalType.Traces => options.Protocol == OtlpExportProtocol.Grpc
Expand Down Expand Up @@ -255,16 +223,6 @@ public static IProtobufExportClient GetProtobufExportClient(this OtlpExporterOpt
}
}

public static IExportClient<TraceOtlpCollector.ExportTraceServiceRequest> GetTraceExportClient(this OtlpExporterOptions options) =>
options.Protocol switch
{
OtlpExportProtocol.Grpc => new OtlpGrpcTraceExportClient(options),
OtlpExportProtocol.HttpProtobuf => new OtlpHttpTraceExportClient(
options,
options.HttpClientFactory?.Invoke() ?? throw new InvalidOperationException("OtlpExporterOptions was missing HttpClientFactory or it returned null.")),
_ => throw new NotSupportedException($"Protocol {options.Protocol} is not supported."),
};

public static IExportClient<MetricsOtlpCollector.ExportMetricsServiceRequest> GetMetricsExportClient(this OtlpExporterOptions options) =>
options.Protocol switch
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Buffers.Binary;
using System.Diagnostics;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using OtlpCollector = OpenTelemetry.Proto.Collector.Trace.V1;
using OtlpResource = OpenTelemetry.Proto.Resource.V1;
using OpenTelemetry.Resources;

namespace OpenTelemetry.Exporter;

Expand All @@ -16,9 +17,15 @@ namespace OpenTelemetry.Exporter;
public class OtlpTraceExporter : BaseExporter<Activity>
{
private readonly SdkLimitOptions sdkLimitOptions;
private readonly OtlpExporterTransmissionHandler<OtlpCollector.ExportTraceServiceRequest> transmissionHandler;
private readonly ProtobufOtlpExporterTransmissionHandler transmissionHandler;
private readonly int startWritePosition;

private OtlpResource.Resource? processResource;
private Resource? resource;

// Initial buffer size set to ~732KB.
// This choice allows us to gradually grow the buffer while targeting a final capacity of around 100 MB,
// by the 7th doubling to maintain efficient allocation without frequent resizing.
private byte[] buffer = new byte[750000];

/// <summary>
/// Initializes a new instance of the <see cref="OtlpTraceExporter"/> class.
Expand All @@ -40,31 +47,40 @@ internal OtlpTraceExporter(
OtlpExporterOptions exporterOptions,
SdkLimitOptions sdkLimitOptions,
ExperimentalOptions experimentalOptions,
OtlpExporterTransmissionHandler<OtlpCollector.ExportTraceServiceRequest>? transmissionHandler = null)
ProtobufOtlpExporterTransmissionHandler? transmissionHandler = null)
{
Debug.Assert(exporterOptions != null, "exporterOptions was null");
Debug.Assert(sdkLimitOptions != null, "sdkLimitOptions was null");

this.sdkLimitOptions = sdkLimitOptions!;

this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetTraceExportTransmissionHandler(experimentalOptions);
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions, OtlpSignalType.Traces);
}

internal OtlpResource.Resource ProcessResource => this.processResource ??= this.ParentProvider.GetResource().ToOtlpResource();
internal Resource Resource => this.resource ??= this.ParentProvider.GetResource();

/// <inheritdoc/>
public override ExportResult Export(in Batch<Activity> activityBatch)
{
// Prevents the exporter's gRPC and HTTP operations from being instrumented.
using var scope = SuppressInstrumentationScope.Begin();

var request = new OtlpCollector.ExportTraceServiceRequest();

try
{
request.AddBatch(this.sdkLimitOptions, this.ProcessResource, activityBatch);
int writePosition = ProtobufOtlpTraceSerializer.WriteTraceData(this.buffer, this.startWritePosition, this.sdkLimitOptions, this.Resource, activityBatch);

if (this.startWritePosition == 5)
{
// Grpc payload consists of 3 parts
// byte 0 - Specifying if the payload is compressed.
// 1-4 byte - Specifies the length of payload in big endian format.
// 5 and above - Protobuf serialized data.
Span<byte> data = new Span<byte>(this.buffer, 1, 4);
var dataLength = writePosition - 5;
BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength);
}

if (!this.transmissionHandler.TrySubmitRequest(request))
if (!this.transmissionHandler.TrySubmitRequest(this.buffer, writePosition))
{
return ExportResult.Failure;
}
Expand All @@ -74,17 +90,10 @@ public override ExportResult Export(in Batch<Activity> activityBatch)
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex);
return ExportResult.Failure;
}
finally
{
request.Return();
}

return ExportResult.Success;
}

/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds)
{
return this.transmissionHandler.Shutdown(timeoutMilliseconds);
}
protected override bool OnShutdown(int timeoutMilliseconds) => this.transmissionHandler.Shutdown(timeoutMilliseconds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,7 @@ internal static BaseProcessor<Activity> BuildOtlpExporterProcessor(

exporterOptions!.TryEnableIHttpClientFactoryIntegration(serviceProvider!, "OtlpTraceExporter");

BaseExporter<Activity> otlpExporter;

if (experimentalOptions != null && experimentalOptions.UseCustomProtobufSerializer)
{
otlpExporter = new ProtobufOtlpTraceExporter(exporterOptions!, sdkLimitOptions!, experimentalOptions!);
}
else
{
otlpExporter = new OtlpTraceExporter(exporterOptions!, sdkLimitOptions!, experimentalOptions!);
}
BaseExporter<Activity> otlpExporter = new OtlpTraceExporter(exporterOptions!, sdkLimitOptions!, experimentalOptions!);

if (configureExporterInstance != null)
{
Expand Down
Loading

0 comments on commit 84e6afb

Please sign in to comment.