From 55121760115c94e11571057f4263c43f4d71ad21 Mon Sep 17 00:00:00 2001 From: Chenfeng Bao Date: Tue, 7 Jan 2025 23:31:41 -0800 Subject: [PATCH] [Instrumentation.AWS]: always add context propagation data to requests (#2447) --- .../CHANGELOG.md | 5 ++ .../AWSTracingPipelineHandler.cs | 41 ++++++++---- .../TestAWSClientInstrumentation.cs | 67 ++++++++++++++++++- 3 files changed, 96 insertions(+), 17 deletions(-) diff --git a/src/OpenTelemetry.Instrumentation.AWS/CHANGELOG.md b/src/OpenTelemetry.Instrumentation.AWS/CHANGELOG.md index e1894dcb5a..86a45429b7 100644 --- a/src/OpenTelemetry.Instrumentation.AWS/CHANGELOG.md +++ b/src/OpenTelemetry.Instrumentation.AWS/CHANGELOG.md @@ -2,6 +2,11 @@ ## Unreleased +* Context propagation data is always added to SQS and SNS requests regardless of + sampling decision. This enables downstream services to make consistent sampling + decisions and prevents incomplete traces. + ([#2447](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/2447)) + ## 1.10.0-rc.1 Released 2025-Jan-06 diff --git a/src/OpenTelemetry.Instrumentation.AWS/Implementation/AWSTracingPipelineHandler.cs b/src/OpenTelemetry.Instrumentation.AWS/Implementation/AWSTracingPipelineHandler.cs index ef28d3cc5e..c2a9ee300e 100644 --- a/src/OpenTelemetry.Instrumentation.AWS/Implementation/AWSTracingPipelineHandler.cs +++ b/src/OpenTelemetry.Instrumentation.AWS/Implementation/AWSTracingPipelineHandler.cs @@ -46,6 +46,22 @@ public override async Task InvokeAsync(IExecutionContext executionContext) return ret; } + private static void AddPropagationDataToRequest(Activity activity, IRequestContext requestContext) + { + var service = requestContext.ServiceMetaData.ServiceId; + + if (AWSServiceType.IsSqsService(service)) + { + SqsRequestContextHelper.AddAttributes( + requestContext, AWSMessagingUtils.InjectIntoDictionary(new PropagationContext(activity.Context, Baggage.Current))); + } + else if (AWSServiceType.IsSnsService(service)) + { + SnsRequestContextHelper.AddAttributes( + requestContext, AWSMessagingUtils.InjectIntoDictionary(new PropagationContext(activity.Context, Baggage.Current))); + } + } + #if NET [System.Diagnostics.CodeAnalysis.UnconditionalSuppressMessage( "Trimming", @@ -167,16 +183,6 @@ private void AddRequestSpecificInformation(Activity activity, IRequestContext re { this.awsSemanticConventions.TagBuilder.SetTagAttributeDbSystemToDynamoDb(activity); } - else if (AWSServiceType.IsSqsService(service)) - { - SqsRequestContextHelper.AddAttributes( - requestContext, AWSMessagingUtils.InjectIntoDictionary(new PropagationContext(activity.Context, Baggage.Current))); - } - else if (AWSServiceType.IsSnsService(service)) - { - SnsRequestContextHelper.AddAttributes( - requestContext, AWSMessagingUtils.InjectIntoDictionary(new PropagationContext(activity.Context, Baggage.Current))); - } else if (AWSServiceType.IsBedrockRuntimeService(service)) { this.awsSemanticConventions.TagBuilder.SetTagAttributeGenAiSystemToBedrock(activity); @@ -202,14 +208,21 @@ private void ProcessEndRequest(Activity? activity, IExecutionContext executionCo var currentActivity = Activity.Current; - if (currentActivity == null - || !currentActivity.Source.Name.StartsWith(TelemetryConstants.TelemetryScopePrefix, StringComparison.Ordinal) - || !currentActivity.IsAllDataRequested) + if (currentActivity == null) { return null; } - this.AddRequestSpecificInformation(currentActivity, executionContext.RequestContext); + if (currentActivity.IsAllDataRequested + && currentActivity.Source.Name.StartsWith(TelemetryConstants.TelemetryScopePrefix, StringComparison.Ordinal)) + { + this.AddRequestSpecificInformation(currentActivity, executionContext.RequestContext); + } + + // Context propagation should always happen regardless of sampling decision (which affects Activity.IsAllDataRequested and Activity.Source). + // Otherwise, downstream services can make inconsistent sampling decisions and create incomplete traces. + AddPropagationDataToRequest(currentActivity, executionContext.RequestContext); + return currentActivity; } } diff --git a/test/OpenTelemetry.Instrumentation.AWS.Tests/TestAWSClientInstrumentation.cs b/test/OpenTelemetry.Instrumentation.AWS.Tests/TestAWSClientInstrumentation.cs index 487ededdb6..1e975c5568 100644 --- a/test/OpenTelemetry.Instrumentation.AWS.Tests/TestAWSClientInstrumentation.cs +++ b/test/OpenTelemetry.Instrumentation.AWS.Tests/TestAWSClientInstrumentation.cs @@ -185,9 +185,9 @@ public async Task TestDDBScanUnsuccessful() [Fact] #if NETFRAMEWORK - public void TestSQSSendMessageSuccessful() + public void TestSQSSendMessageSuccessfulSampled() #else - public async Task TestSQSSendMessageSuccessful() + public async Task TestSQSSendMessageSuccessfulSampled() #endif { var exportedItems = new List(); @@ -195,6 +195,8 @@ public async Task TestSQSSendMessageSuccessful() var parent = new Activity("parent").Start(); var requestId = @"fakerequ-esti-dfak-ereq-uestidfakere"; + SendMessageRequest send_msg_req; + using (Sdk.CreateTracerProviderBuilder() .AddXRayTraceId() .SetSampler(new AlwaysOnSampler()) @@ -208,7 +210,7 @@ public async Task TestSQSSendMessageSuccessful() var sqs = new AmazonSQSClient(new AnonymousAWSCredentials(), RegionEndpoint.USEast1); var dummyResponse = "{}"; CustomResponses.SetResponse(sqs, dummyResponse, requestId, true); - var send_msg_req = new SendMessageRequest + send_msg_req = new SendMessageRequest { QueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789/MyTestQueue", MessageBody = "Hello from OT", @@ -230,6 +232,65 @@ public async Task TestSQSSendMessageSuccessful() Assert.Equal(ActivityStatusCode.Unset, awssdk_activity.Status); Assert.Equal(requestId, Utils.GetTagValue(awssdk_activity, "aws.request_id")); + + Assert.Equal(2, send_msg_req.MessageAttributes.Count); + Assert.Contains( + send_msg_req.MessageAttributes, + kv => kv.Key == "traceparent" && kv.Value.StringValue == $"00-{awssdk_activity.TraceId}-{awssdk_activity.SpanId}-01"); + Assert.Contains( + send_msg_req.MessageAttributes, + kv => kv.Key == "Custom" && kv.Value.StringValue == "Value"); + } + + [Fact] +#if NETFRAMEWORK + public void TestSQSSendMessageSuccessfulNotSampled() +#else + public async Task TestSQSSendMessageSuccessfulNotSampled() +#endif + { + var exportedItems = new List(); + + var parent = new Activity("parent").Start(); + var requestId = @"fakerequ-esti-dfak-ereq-uestidfakere"; + + SendMessageRequest send_msg_req; + + using (Sdk.CreateTracerProviderBuilder() + .AddXRayTraceId() + .SetSampler(new AlwaysOffSampler()) + .AddAWSInstrumentation(o => + { + o.SemanticConventionVersion = SemanticConventionVersion.Latest; + }) + .AddInMemoryExporter(exportedItems) + .Build()) + { + var sqs = new AmazonSQSClient(new AnonymousAWSCredentials(), RegionEndpoint.USEast1); + var dummyResponse = "{}"; + CustomResponses.SetResponse(sqs, dummyResponse, requestId, true); + send_msg_req = new SendMessageRequest + { + QueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789/MyTestQueue", + MessageBody = "Hello from OT", + }; + send_msg_req.MessageAttributes.Add("Custom", new MessageAttributeValue { StringValue = "Value", DataType = "String" }); +#if NETFRAMEWORK + sqs.SendMessage(send_msg_req); +#else + await sqs.SendMessageAsync(send_msg_req); +#endif + } + + Assert.Empty(exportedItems); + + Assert.Equal(2, send_msg_req.MessageAttributes.Count); + Assert.Contains( + send_msg_req.MessageAttributes, + kv => kv.Key == "traceparent" && kv.Value.StringValue == $"00-{parent.TraceId}-{parent.SpanId}-00"); + Assert.Contains( + send_msg_req.MessageAttributes, + kv => kv.Key == "Custom" && kv.Value.StringValue == "Value"); } [Fact]