diff --git a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs index 2ae3e4cfe..bd4bdaa05 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs @@ -61,7 +61,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken) var cmd = new BasicPublish(exchange, routingKey, mandatory, default); using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners - ? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length) + ? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length, basicProperties) : default; ulong publishSequenceNumber = 0; @@ -116,7 +116,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken) var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners - ? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length) + ? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length, basicProperties) : default; ulong publishSequenceNumber = 0; diff --git a/projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs b/projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs index 764828ce3..faa813bdd 100644 --- a/projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs +++ b/projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs @@ -66,7 +66,7 @@ public static bool UseRoutingKeyAsOperationName new KeyValuePair(ProtocolVersion, "0.9.1") }; - internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize, + internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize, IReadOnlyBasicProperties basicProperties, ActivityContext linkedContext = default) { if (!s_publisherSource.HasListeners()) @@ -83,7 +83,7 @@ public static bool UseRoutingKeyAsOperationName ActivityKind.Producer, linkedContext); if (activity != null && activity.IsAllDataRequested) { - PopulateMessagingTags(MessagingOperationTypeSend, MessagingOperationNameBasicPublish, routingKey, exchange, 0, bodySize, activity); + PopulateMessagingTags(MessagingOperationTypeSend, MessagingOperationNameBasicPublish, routingKey, exchange, 0, basicProperties, bodySize, activity); } return activity; diff --git a/projects/Test/SequentialIntegration/TestActivitySource.cs b/projects/Test/SequentialIntegration/TestActivitySource.cs index 6591638ce..08ae4406c 100644 --- a/projects/Test/SequentialIntegration/TestActivitySource.cs +++ b/projects/Test/SequentialIntegration/TestActivitySource.cs @@ -75,120 +75,6 @@ void AssertIntTagGreaterThanZero(Activity activity, string name) Assert.True(activity.GetTagItem(name) is int result && result > 0); } - [Theory] - [InlineData(true, true)] - [InlineData(true, false)] - [InlineData(false, true)] - [InlineData(false, false)] - public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) - { - RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; - RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent; - var _activities = new List(); - using ActivityListener activityListener = StartActivityListener(_activities); - await Task.Delay(500); - string queueName = $"{Guid.NewGuid()}"; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); - byte[] sendBody = Encoding.UTF8.GetBytes("hi"); - byte[] consumeBody = null; - var consumer = new AsyncEventingBasicConsumer(_channel); - var consumerReceivedTcs = - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - consumerReceivedTcs.SetResult(true); - return Task.CompletedTask; - }; - - string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); - - await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(await consumerReceivedTcs.Task); - - await _channel.BasicCancelAsync(consumerTag); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queueName, _activities, true); - } - - [Theory] - [InlineData(true, true)] - [InlineData(true, false)] - [InlineData(false, true)] - [InlineData(false, false)] - public async Task TestPublisherWithCachedStringsAndConsumerActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) - { - RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; - RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent; - var _activities = new List(); - using ActivityListener activityListener = StartActivityListener(_activities); - await Task.Delay(500); - string queueName = $"{Guid.NewGuid()}"; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); - byte[] sendBody = Encoding.UTF8.GetBytes("hi"); - byte[] consumeBody = null; - var consumer = new AsyncEventingBasicConsumer(_channel); - var consumerReceivedTcs = - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - consumerReceivedTcs.SetResult(true); - return Task.CompletedTask; - }; - - string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - CachedString exchange = new CachedString(""); - CachedString routingKey = new CachedString(q.QueueName); - await _channel.BasicPublishAsync(exchange, routingKey, true, sendBody); - - await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(await consumerReceivedTcs.Task); - - await _channel.BasicCancelAsync(consumerTag); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queueName, _activities, true); - } - - [Theory] - [InlineData(true, true)] - [InlineData(true, false)] - [InlineData(false, true)] - [InlineData(false, false)] - public async Task TestPublisherWithPublicationAddressAndConsumerActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) - { - RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; - RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent; - var _activities = new List(); - using ActivityListener activityListener = StartActivityListener(_activities); - await Task.Delay(500); - string queueName = $"{Guid.NewGuid()}"; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); - byte[] sendBody = Encoding.UTF8.GetBytes("hi"); - byte[] consumeBody = null; - var consumer = new AsyncEventingBasicConsumer(_channel); - var consumerReceivedTcs = - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - consumerReceivedTcs.SetResult(true); - return Task.CompletedTask; - }; - - string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - PublicationAddress publicationAddress = new PublicationAddress(ExchangeType.Direct, "", q.QueueName); - await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody); - - await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(await consumerReceivedTcs.Task); - - await _channel.BasicCancelAsync(consumerTag); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queueName, _activities, true); - } - [Theory] [InlineData(true, true)] [InlineData(true, false)] @@ -307,11 +193,15 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn } [Theory] - [InlineData(true, true)] - [InlineData(true, false)] - [InlineData(false, true)] - [InlineData(false, false)] - public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) + [InlineData(true, true, true)] + [InlineData(true, true, false)] + [InlineData(true, false, true)] + [InlineData(true, false, false)] + [InlineData(false, true, true)] + [InlineData(false, true, false)] + [InlineData(false, false, true)] + [InlineData(false, false, false)] + public async Task TestPublisherAndBasicGetActivityTagsAsync(bool useRoutingKeyAsOperationName, bool usePublisherAsParent, bool useMessageId) { RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent; @@ -321,10 +211,12 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera string queue = $"queue-{Guid.NewGuid()}"; const string msg = "for basic.get"; + var basicProps = useMessageId ? new BasicProperties() { MessageId = Guid.NewGuid().ToString() } : new BasicProperties(); + try { await _channel.QueueDeclareAsync(queue, false, false, false, null); - await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg)); + await _channel.BasicPublishAsync("", queue, true, basicProps, Encoding.UTF8.GetBytes(msg)); QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); Assert.Equal(1u, ok.MessageCount); BasicGetResult res = await _channel.BasicGetAsync(queue, true); @@ -332,7 +224,7 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera ok = await _channel.QueueDeclarePassiveAsync(queue); Assert.Equal(0u, ok.MessageCount); await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queue, activities, false); + AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queue, activities, false, basicProps.MessageId); } finally { @@ -345,7 +237,7 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera [InlineData(true, false)] [InlineData(false, true)] [InlineData(false, false)] - public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) + public async Task TestPublisherWithCachedStringsAndBasicGetActivityTagsAsync(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) { RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent; @@ -381,7 +273,7 @@ public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool use [InlineData(true, false)] [InlineData(false, true)] [InlineData(false, false)] - public async Task TestPublisherWithPublicationAddressAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) + public async Task TestPublisherWithPublicationAddressAndBasicGetActivityTagsAsync(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) { RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent; @@ -427,7 +319,7 @@ private static ActivityListener StartActivityListener(List activities) } private void AssertActivityData(bool useRoutingKeyAsOperationName, bool usePublisherAsParent, string queueName, - List activityList, bool isDeliver = false) + List activityList, bool isDeliver = false, string messageId = null) { string childName = isDeliver ? "deliver" : "fetch"; Activity[] activities = activityList.ToArray(); @@ -480,6 +372,12 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, bool usePubli AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingEnvelopeSize); AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingBodySize); AssertIntTagGreaterThanZero(receiveActivity, RabbitMQActivitySource.MessagingBodySize); + + if (messageId is not null) + { + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessageId, messageId); + AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessageId, messageId); + } } } } diff --git a/projects/Test/SequentialIntegration/TestHeartbeats.cs b/projects/Test/SequentialIntegration/TestHeartbeats.cs index 976ccbb1d..520cdbfbc 100644 --- a/projects/Test/SequentialIntegration/TestHeartbeats.cs +++ b/projects/Test/SequentialIntegration/TestHeartbeats.cs @@ -59,7 +59,7 @@ public override Task InitializeAsync() [SkippableFact(Timeout = 35000)] [Trait("Category", "LongRunning")] - public async Task TestThatHeartbeatWriterUsesConfigurableInterval() + public async Task TestThatHeartbeatWriterUsesConfigurableIntervalAsync() { Skip.IfNot(LongRunningTestsEnabled(), "RABBITMQ_LONG_RUNNING_TESTS is not set, skipping test"); @@ -72,7 +72,7 @@ public async Task TestThatHeartbeatWriterUsesConfigurableInterval() [SkippableFact] [Trait("Category", "LongRunning")] - public async Task TestThatHeartbeatWriterWithTLSEnabled() + public async Task TestThatHeartbeatWriterWithTLSEnabledAsync() { Skip.IfNot(LongRunningTestsEnabled(), "RABBITMQ_LONG_RUNNING_TESTS is not set, skipping test"); @@ -94,7 +94,7 @@ public async Task TestThatHeartbeatWriterWithTLSEnabled() [SkippableFact(Timeout = 90000)] [Trait("Category", "LongRunning")] - public async Task TestHundredsOfConnectionsWithRandomHeartbeatInterval() + public async Task TestHundredsOfConnectionsWithRandomHeartbeatIntervalAsync() { Skip.IfNot(LongRunningTestsEnabled(), "RABBITMQ_LONG_RUNNING_TESTS is not set, skipping test"); diff --git a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs index 254c310b5..0523e9836 100644 --- a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs +++ b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs @@ -94,64 +94,6 @@ public void TestDefaultTracingOptions() Assert.True(RabbitMQActivitySource.TracingOptions.UsePublisherAsParent); } - [Theory] - [InlineData(true, true)] - [InlineData(true, false)] - [InlineData(false, true)] - [InlineData(false, false)] - public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) - { - var exportedItems = new List(); - using var tracer = Sdk.CreateTracerProviderBuilder() - .AddRabbitMQInstrumentation(options => - { - options.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; - options.UsePublisherAsParent = usePublisherAsParent; - }) - .AddInMemoryExporter(exportedItems) - .Build(); - string baggageGuid = Guid.NewGuid().ToString(); - Baggage.SetBaggage("TestItem", baggageGuid); - Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); - - await Task.Delay(500); - string queueName = $"{Guid.NewGuid()}"; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); - byte[] sendBody = Encoding.UTF8.GetBytes("hi"); - byte[] consumeBody = null; - var consumer = new AsyncEventingBasicConsumer(_channel); - var consumerReceivedTcs = - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - string baggageItem = Baggage.GetBaggage("TestItem"); - if (baggageItem == baggageGuid) - { - consumerReceivedTcs.SetResult(true); - } - else - { - consumerReceivedTcs.SetException( - EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0)); - } - - return Task.CompletedTask; - }; - - string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); - Baggage.ClearBaggage(); - Assert.Null(Baggage.GetBaggage("TestItem")); - - await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(await consumerReceivedTcs.Task); - - await _channel.BasicCancelAsync(consumerTag); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queueName, exportedItems, true); - } - [Theory] [InlineData(true, true)] [InlineData(true, false)] @@ -333,11 +275,15 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo } [Theory] - [InlineData(true, true)] - [InlineData(true, false)] - [InlineData(false, true)] - [InlineData(false, false)] - public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) + [InlineData(true, true, true)] + [InlineData(true, true, false)] + [InlineData(true, false, true)] + [InlineData(true, false, false)] + [InlineData(false, true, true)] + [InlineData(false, true, false)] + [InlineData(false, false, true)] + [InlineData(false, false, false)] + public async Task TestPublisherAndBasicGetActivityTagsAsync(bool useRoutingKeyAsOperationName, bool usePublisherAsParent, bool useMessageId) { var exportedItems = new List(); using var tracer = Sdk.CreateTracerProviderBuilder() @@ -355,10 +301,12 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera string queue = $"queue-{Guid.NewGuid()}"; const string msg = "for basic.get"; + var basicProps = useMessageId ? new BasicProperties() { MessageId = Guid.NewGuid().ToString() } : new BasicProperties(); + try { await _channel.QueueDeclareAsync(queue, false, false, false, null); - await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg)); + await _channel.BasicPublishAsync("", queue, true, basicProps, Encoding.UTF8.GetBytes(msg)); Baggage.ClearBaggage(); Assert.Null(Baggage.GetBaggage("TestItem")); QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); @@ -368,7 +316,7 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera ok = await _channel.QueueDeclarePassiveAsync(queue); Assert.Equal(0u, ok.MessageCount); await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queue, exportedItems, false); + AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queue, exportedItems, false, basicProps.MessageId); } finally { @@ -377,7 +325,7 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera } private void AssertActivityData(bool useRoutingKeyAsOperationName, bool usePublisherAsParent, string queueName, - List activityList, bool isDeliver = false, string baggageGuid = null) + List activityList, bool isDeliver = false, string messageId = null) { string childName = isDeliver ? "deliver" : "fetch"; string childType = isDeliver ? "process" : "receive"; @@ -432,6 +380,11 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, bool usePubli AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationType, "send"); AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationName, "publish"); + if (messageId is not null) + { + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessageId, messageId); + AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessageId, messageId); + } } } }