-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
**DRAFT** feat: Azure Service Bus Instrumentation #2880
Changes from all commits
ec5de82
aacdccd
9393eda
49a7159
be27089
2880aae
5499146
a0919a5
f843d47
2d8a4bb
f6a6a9e
81f48ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -423,6 +423,7 @@ jobs: | |
matrix: | ||
namespace: | ||
[ | ||
AzureServiceBus, | ||
CosmosDB, | ||
Couchbase, | ||
Elasticsearch, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
<PropertyGroup> | ||
<TargetFrameworks>net462;netstandard2.0</TargetFrameworks> | ||
<AssemblyName>NewRelic.Providers.Wrapper.AzureServiceBus</AssemblyName> | ||
<RootNamespace>NewRelic.Providers.Wrapper.AzureServiceBus</RootNamespace> | ||
<Description>Azure Service Bus Wrapper Provider for New Relic .NET Agent</Description> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<Content Include="Instrumentation.xml"> | ||
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> | ||
</Content> | ||
</ItemGroup> | ||
|
||
<ItemGroup> | ||
<PackageReference Include="Microsoft.CSharp" Version="4.7.0" /> | ||
</ItemGroup> | ||
|
||
<ItemGroup> | ||
<ProjectReference Include="..\..\..\NewRelic.Agent.Extensions\NewRelic.Agent.Extensions.csproj" /> | ||
</ItemGroup> | ||
</Project> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
// Copyright 2020 New Relic, Inc. All rights reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
using System; | ||
using System.Collections.Concurrent; | ||
using System.Collections.Generic; | ||
using System.Collections.ObjectModel; | ||
using System.Threading.Tasks; | ||
using NewRelic.Agent.Api; | ||
using NewRelic.Agent.Extensions.Providers.Wrapper; | ||
using NewRelic.Reflection; | ||
|
||
namespace NewRelic.Providers.Wrapper.AzureServiceBus; | ||
|
||
public class AzureServiceBusReceiveWrapper : AzureServiceBusWrapperBase | ||
{ | ||
private static readonly ConcurrentDictionary<Type, Func<object, object>> _getResultFromGenericTask = new(); | ||
|
||
public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo) | ||
{ | ||
var canWrap = instrumentedMethodInfo.RequestedWrapperName.Equals(nameof(AzureServiceBusReceiveWrapper)); | ||
return new CanWrapResponse(canWrap); | ||
} | ||
|
||
public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction) | ||
{ | ||
dynamic serviceBusReceiver = instrumentedMethodCall.MethodCall.InvocationTarget; | ||
string queueName = serviceBusReceiver.EntityPath; // some-queue-name | ||
string fqns = serviceBusReceiver.FullyQualifiedNamespace; // some-service-bus-entity.servicebus.windows.net | ||
|
||
MessageBrokerAction action = | ||
instrumentedMethodCall.MethodCall.Method.MethodName switch | ||
{ | ||
"ReceiveMessagesAsync" => MessageBrokerAction.Consume, | ||
"ReceiveDeferredMessagesAsync" => MessageBrokerAction.Consume, | ||
"PeekMessagesInternalAsync" => MessageBrokerAction.Peek, | ||
"AbandonMessageAsync" => MessageBrokerAction.Purge, // TODO is this correct ??? Abandon sends the message back to the queue for re-delivery | ||
"CompleteMessageAsync" => MessageBrokerAction.Consume, | ||
"DeadLetterInternalAsync" => MessageBrokerAction.Purge, // TODO is this correct ??? | ||
"DeferMessageAsync" => MessageBrokerAction.Consume, // TODO is this correct or should we extend MessageBrokerAction with more values??? | ||
"RenewMessageLockAsync" => MessageBrokerAction.Consume, // TODO is this correct or should we extend MessageBrokerAction with more values??? | ||
_ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodCall.MethodCall.Method.MethodName}") | ||
}; | ||
|
||
// start a message broker segment | ||
var segment = transaction.StartMessageBrokerSegment( | ||
instrumentedMethodCall.MethodCall, | ||
MessageBrokerDestinationType.Queue, | ||
action, | ||
BrokerVendorName, | ||
queueName, | ||
serverAddress: fqns ); | ||
|
||
return instrumentedMethodCall.IsAsync | ||
? | ||
// return an async delegate | ||
Delegates.GetAsyncDelegateFor<Task>( | ||
agent, | ||
segment, | ||
false, | ||
HandleResponse, | ||
TaskContinuationOptions.ExecuteSynchronously) | ||
: Delegates.GetDelegateFor<object>( | ||
onFailure: transaction.NoticeError, | ||
onComplete: segment.End, | ||
onSuccess: ExtractDTHeadersIfAvailable); | ||
|
||
void HandleResponse(Task responseTask) | ||
{ | ||
try | ||
{ | ||
if (responseTask.IsFaulted) | ||
{ | ||
transaction.NoticeError(responseTask.Exception); | ||
return; | ||
} | ||
|
||
var resultObj = GetTaskResultFromObject(responseTask); | ||
ExtractDTHeadersIfAvailable(resultObj); | ||
} | ||
finally | ||
{ | ||
segment.End(); | ||
} | ||
} | ||
|
||
|
||
|
||
void ExtractDTHeadersIfAvailable(object resultObj) | ||
{ | ||
if (resultObj != null) | ||
{ | ||
switch (instrumentedMethodCall.MethodCall.Method.MethodName) | ||
{ | ||
case "ReceiveMessagesAsync": | ||
case "ReceiveDeferredMessagesAsync": | ||
case "PeekMessagesInternalAsync": | ||
// the response contains a list of messages. | ||
// get the first message from the response and extract DT headers | ||
dynamic messages = resultObj; | ||
if (messages.Count > 0) | ||
{ | ||
var msg = messages[0]; | ||
if (msg.ApplicationProperties is ReadOnlyDictionary<string, object> applicationProperties) | ||
{ | ||
transaction.AcceptDistributedTraceHeaders(applicationProperties, ProcessHeaders, TransportType.Queue); | ||
} | ||
} | ||
break; | ||
} | ||
} | ||
IEnumerable<string> ProcessHeaders(ReadOnlyDictionary<string, object> applicationProperties, string key) | ||
{ | ||
var headerValues = new List<string>(); | ||
foreach (var item in applicationProperties) | ||
{ | ||
if (item.Key.Equals(key, StringComparison.OrdinalIgnoreCase)) | ||
{ | ||
headerValues.Add(item.Value as string); | ||
} | ||
} | ||
|
||
return headerValues; | ||
} | ||
} | ||
} | ||
|
||
private static object GetTaskResultFromObject(object taskObj) | ||
{ | ||
var task = taskObj as Task; | ||
if (task == null) | ||
{ | ||
return null; | ||
} | ||
if (task.IsFaulted) | ||
{ | ||
return null; | ||
} | ||
|
||
var getResponse = _getResultFromGenericTask.GetOrAdd(task.GetType(), t => VisibilityBypasser.Instance.GeneratePropertyAccessor<object>(t, "Result")); | ||
return getResponse(task); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
// Copyright 2020 New Relic, Inc. All rights reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.Threading.Tasks; | ||
using NewRelic.Agent.Api; | ||
using NewRelic.Agent.Extensions.Providers.Wrapper; | ||
|
||
namespace NewRelic.Providers.Wrapper.AzureServiceBus; | ||
|
||
public class AzureServiceBusSendWrapper : AzureServiceBusWrapperBase | ||
{ | ||
public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo) | ||
{ | ||
var canWrap = instrumentedMethodInfo.RequestedWrapperName.Equals(nameof(AzureServiceBusSendWrapper)); | ||
return new CanWrapResponse(canWrap); | ||
} | ||
|
||
public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction) | ||
{ | ||
dynamic serviceBusReceiver = instrumentedMethodCall.MethodCall.InvocationTarget; | ||
string queueName = serviceBusReceiver.EntityPath; // some-queue-name | ||
string fqns = serviceBusReceiver.FullyQualifiedNamespace; // some-service-bus-entity.servicebus.windows.net | ||
|
||
// determine message broker action based on method name | ||
MessageBrokerAction action = | ||
instrumentedMethodCall.MethodCall.Method.MethodName switch | ||
{ | ||
"SendMessagesAsync" => MessageBrokerAction.Produce, | ||
"ScheduleMessagesAsync" => MessageBrokerAction.Produce, | ||
"CancelScheduledMessagesAsync" => MessageBrokerAction.Purge, // TODO is this correct ??? | ||
_ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodCall.MethodCall.Method.MethodName}") | ||
}; | ||
|
||
// start a message broker segment | ||
var segment = transaction.StartMessageBrokerSegment( | ||
instrumentedMethodCall.MethodCall, | ||
MessageBrokerDestinationType.Queue, | ||
action, | ||
BrokerVendorName, | ||
queueName, | ||
serverAddress: fqns); | ||
|
||
if (action == MessageBrokerAction.Produce) | ||
{ | ||
dynamic messages = instrumentedMethodCall.MethodCall.MethodArguments[0]; | ||
|
||
// iterate all messages that are being sent, | ||
// insert DT headers into each message | ||
foreach (var message in messages) | ||
{ | ||
if (message.ApplicationProperties is IDictionary<string, object> applicationProperties) | ||
transaction.InsertDistributedTraceHeaders(applicationProperties, ProcessHeaders); | ||
} | ||
|
||
void ProcessHeaders(IDictionary<string, object> applicationProperties, string key, string value) | ||
{ | ||
applicationProperties.Add(key, value); | ||
} | ||
} | ||
|
||
return instrumentedMethodCall.IsAsync ? Delegates.GetAsyncDelegateFor<Task>(agent, segment) : Delegates.GetDelegateFor(segment); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
// Copyright 2020 New Relic, Inc. All rights reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
using NewRelic.Agent.Api; | ||
using NewRelic.Agent.Extensions.Providers.Wrapper; | ||
|
||
namespace NewRelic.Providers.Wrapper.AzureServiceBus | ||
{ | ||
public abstract class AzureServiceBusWrapperBase : IWrapper | ||
{ | ||
protected const string BrokerVendorName = "AzureServiceBus"; | ||
|
||
public bool IsTransactionRequired => true; // only instrument service bus methods if we're already in a transaction | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems different compared with other MQ that we instrument. Shouldn't the receive generate a transaction? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably a topic for pairing on Tuesday. But this implementation is consistent with our MSMQ instrumentation. Unlike RabbitMQ, there's not an "eventing" layer that we could instrument that would wrap processing time on the client application. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like we could potentially instrument |
||
|
||
public abstract CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo); | ||
|
||
public abstract AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent,ITransaction transaction); | ||
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems correct based on the description of the method.