Skip to content

Commit

Permalink
[Instrumentation.AWS]: always add context propagation data to requests (
Browse files Browse the repository at this point in the history
  • Loading branch information
cfbao authored Jan 8, 2025
1 parent fbba3f8 commit 5512176
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 17 deletions.
5 changes: 5 additions & 0 deletions src/OpenTelemetry.Instrumentation.AWS/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## Unreleased

* Context propagation data is always added to SQS and SNS requests regardless of
sampling decision. This enables downstream services to make consistent sampling
decisions and prevents incomplete traces.
([#2447](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/2447))

## 1.10.0-rc.1

Released 2025-Jan-06
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ public override async Task<T> InvokeAsync<T>(IExecutionContext executionContext)
return ret;
}

private static void AddPropagationDataToRequest(Activity activity, IRequestContext requestContext)
{
var service = requestContext.ServiceMetaData.ServiceId;

if (AWSServiceType.IsSqsService(service))
{
SqsRequestContextHelper.AddAttributes(
requestContext, AWSMessagingUtils.InjectIntoDictionary(new PropagationContext(activity.Context, Baggage.Current)));
}
else if (AWSServiceType.IsSnsService(service))
{
SnsRequestContextHelper.AddAttributes(
requestContext, AWSMessagingUtils.InjectIntoDictionary(new PropagationContext(activity.Context, Baggage.Current)));
}
}

#if NET
[System.Diagnostics.CodeAnalysis.UnconditionalSuppressMessage(
"Trimming",
Expand Down Expand Up @@ -167,16 +183,6 @@ private void AddRequestSpecificInformation(Activity activity, IRequestContext re
{
this.awsSemanticConventions.TagBuilder.SetTagAttributeDbSystemToDynamoDb(activity);
}
else if (AWSServiceType.IsSqsService(service))
{
SqsRequestContextHelper.AddAttributes(
requestContext, AWSMessagingUtils.InjectIntoDictionary(new PropagationContext(activity.Context, Baggage.Current)));
}
else if (AWSServiceType.IsSnsService(service))
{
SnsRequestContextHelper.AddAttributes(
requestContext, AWSMessagingUtils.InjectIntoDictionary(new PropagationContext(activity.Context, Baggage.Current)));
}
else if (AWSServiceType.IsBedrockRuntimeService(service))
{
this.awsSemanticConventions.TagBuilder.SetTagAttributeGenAiSystemToBedrock(activity);
Expand All @@ -202,14 +208,21 @@ private void ProcessEndRequest(Activity? activity, IExecutionContext executionCo

var currentActivity = Activity.Current;

if (currentActivity == null
|| !currentActivity.Source.Name.StartsWith(TelemetryConstants.TelemetryScopePrefix, StringComparison.Ordinal)
|| !currentActivity.IsAllDataRequested)
if (currentActivity == null)
{
return null;
}

this.AddRequestSpecificInformation(currentActivity, executionContext.RequestContext);
if (currentActivity.IsAllDataRequested
&& currentActivity.Source.Name.StartsWith(TelemetryConstants.TelemetryScopePrefix, StringComparison.Ordinal))
{
this.AddRequestSpecificInformation(currentActivity, executionContext.RequestContext);
}

// Context propagation should always happen regardless of sampling decision (which affects Activity.IsAllDataRequested and Activity.Source).
// Otherwise, downstream services can make inconsistent sampling decisions and create incomplete traces.
AddPropagationDataToRequest(currentActivity, executionContext.RequestContext);

return currentActivity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,18 @@ public async Task TestDDBScanUnsuccessful()

[Fact]
#if NETFRAMEWORK
public void TestSQSSendMessageSuccessful()
public void TestSQSSendMessageSuccessfulSampled()
#else
public async Task TestSQSSendMessageSuccessful()
public async Task TestSQSSendMessageSuccessfulSampled()
#endif
{
var exportedItems = new List<Activity>();

var parent = new Activity("parent").Start();
var requestId = @"fakerequ-esti-dfak-ereq-uestidfakere";

SendMessageRequest send_msg_req;

using (Sdk.CreateTracerProviderBuilder()
.AddXRayTraceId()
.SetSampler(new AlwaysOnSampler())
Expand All @@ -208,7 +210,7 @@ public async Task TestSQSSendMessageSuccessful()
var sqs = new AmazonSQSClient(new AnonymousAWSCredentials(), RegionEndpoint.USEast1);
var dummyResponse = "{}";
CustomResponses.SetResponse(sqs, dummyResponse, requestId, true);
var send_msg_req = new SendMessageRequest
send_msg_req = new SendMessageRequest
{
QueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789/MyTestQueue",
MessageBody = "Hello from OT",
Expand All @@ -230,6 +232,65 @@ public async Task TestSQSSendMessageSuccessful()

Assert.Equal(ActivityStatusCode.Unset, awssdk_activity.Status);
Assert.Equal(requestId, Utils.GetTagValue(awssdk_activity, "aws.request_id"));

Assert.Equal(2, send_msg_req.MessageAttributes.Count);
Assert.Contains(
send_msg_req.MessageAttributes,
kv => kv.Key == "traceparent" && kv.Value.StringValue == $"00-{awssdk_activity.TraceId}-{awssdk_activity.SpanId}-01");
Assert.Contains(
send_msg_req.MessageAttributes,
kv => kv.Key == "Custom" && kv.Value.StringValue == "Value");
}

[Fact]
#if NETFRAMEWORK
public void TestSQSSendMessageSuccessfulNotSampled()
#else
public async Task TestSQSSendMessageSuccessfulNotSampled()
#endif
{
var exportedItems = new List<Activity>();

var parent = new Activity("parent").Start();
var requestId = @"fakerequ-esti-dfak-ereq-uestidfakere";

SendMessageRequest send_msg_req;

using (Sdk.CreateTracerProviderBuilder()
.AddXRayTraceId()
.SetSampler(new AlwaysOffSampler())
.AddAWSInstrumentation(o =>
{
o.SemanticConventionVersion = SemanticConventionVersion.Latest;
})
.AddInMemoryExporter(exportedItems)
.Build())
{
var sqs = new AmazonSQSClient(new AnonymousAWSCredentials(), RegionEndpoint.USEast1);
var dummyResponse = "{}";
CustomResponses.SetResponse(sqs, dummyResponse, requestId, true);
send_msg_req = new SendMessageRequest
{
QueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789/MyTestQueue",
MessageBody = "Hello from OT",
};
send_msg_req.MessageAttributes.Add("Custom", new MessageAttributeValue { StringValue = "Value", DataType = "String" });
#if NETFRAMEWORK
sqs.SendMessage(send_msg_req);
#else
await sqs.SendMessageAsync(send_msg_req);
#endif
}

Assert.Empty(exportedItems);

Assert.Equal(2, send_msg_req.MessageAttributes.Count);
Assert.Contains(
send_msg_req.MessageAttributes,
kv => kv.Key == "traceparent" && kv.Value.StringValue == $"00-{parent.TraceId}-{parent.SpanId}-00");
Assert.Contains(
send_msg_req.MessageAttributes,
kv => kv.Key == "Custom" && kv.Value.StringValue == "Value");
}

[Fact]
Expand Down

0 comments on commit 5512176

Please sign in to comment.