Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce OpenTelemetry #2358

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions src/Marten.OpenTelemetry/DiagnosticSourceSubscriber.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
using System.Diagnostics;

namespace Marten.OpenTelemetry;
internal class DiagnosticSourceSubscriber: IDisposable, IObserver<DiagnosticListener>
{
private readonly Func<string, IObserver<KeyValuePair<string, object?>>> listnerFactory;
private readonly Func<DiagnosticListener, bool> diagnosticSourceFilter;
private readonly Func<string, object?, object?, bool>? isEnabledFilter;
private long disposed;
private IDisposable? allSourcesSubscription;
private readonly List<IDisposable> listenerSubscriptions;
public DiagnosticSourceSubscriber(
Func<string, IObserver<KeyValuePair<string, object?>>> listnerFactory,
Func<DiagnosticListener, bool> diagnosticSourceFilter,
Func<string, object?, object?, bool>? isEnabledFilter)
{
listenerSubscriptions = new List<IDisposable>();
this.listnerFactory = listnerFactory;
this.diagnosticSourceFilter = diagnosticSourceFilter;
this.isEnabledFilter = isEnabledFilter;
}

public void Subscribe()
{
allSourcesSubscription ??= DiagnosticListener.AllListeners.Subscribe(this);
}

public void OnNext(DiagnosticListener value)
{
if ((Interlocked.Read(ref disposed) == 0) &&
diagnosticSourceFilter(value))
{
var listener = listnerFactory(value.Name);
var subscription = isEnabledFilter == null ?
value.Subscribe(listener) :
value.Subscribe(listener, isEnabledFilter);

lock (listenerSubscriptions)
{
listenerSubscriptions.Add(subscription);
}
}
}

public void OnCompleted()
{
}

public void OnError(Exception error)
{
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
if (Interlocked.CompareExchange(ref this.disposed, 1, 0) == 1)
{
return;
}

lock (listenerSubscriptions)
{
foreach (var listenerSubscription in listenerSubscriptions)
{
listenerSubscription?.Dispose();
}

listenerSubscriptions.Clear();
}

allSourcesSubscription?.Dispose();
allSourcesSubscription = null;
}
}

5 changes: 5 additions & 0 deletions src/Marten.OpenTelemetry/InstrumentationOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

namespace Marten.OpenTelemetry;
public class MartenInstrumentationOptions
{
}
88 changes: 88 additions & 0 deletions src/Marten.OpenTelemetry/Internal/MartenListener.Stream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using Marten.Diagnostics.Events;
using System.Diagnostics;

namespace Marten.OpenTelemetry.Internal;

internal partial class MartenListener
{
private void StreamCreated(object? payload)
{
StreamBaseDiagnosticEvent? @event = payload switch
{
StreamCreatedDiagnosticEvent => payload as StreamCreatedDiagnosticEvent,
StreamAppendedDiagnosticEvent => payload as StreamAppendedDiagnosticEvent,
_ => null
};

if (@event == null)
return;

var parent = Activity.Current!.GetCustomProperty("parent") as Activity
?? null;

if (parent == null && @event.CorrelationId != null)
{
var corr = ActivitySource.StartActivity("Correlation", ActivityKind.Internal);
if (corr == null)
return;

corr!.SetCustomProperty("parent", corr);
parent = corr;
}

if (parent != null)
{
Activity.Current = parent;
parent.SetCustomProperty("stop_parent", true);
}
else
{
parent = Activity.Current;
parent.SetCustomProperty("parent", parent);
}

var activity = ActivitySource.StartActivity(@event.EventId.Name!, ActivityKind.Internal);
if (activity != null)
{
activity.DisplayName = @event.DisplayName!;
activity.AddStreamTags(@event);
activity.SetCustomProperty("parent", parent);

var childs = parent.GetCustomProperty("childs_count") as int? ?? 0;
parent.SetCustomProperty(@event.StreamAction.Id.ToString(), activity);
parent.SetCustomProperty("childs_count", ++childs);
}
}
private void StreamChangesFinished(object? payload)
{
StreamBaseDiagnosticEvent? @event = payload switch
{
StreamChangesCompletedDiagnosticEvent => payload as StreamChangesCompletedDiagnosticEvent,
StreamChangesFailedDiagnosticEvent => payload as StreamChangesFailedDiagnosticEvent,
_ => null
};

if (@event == null)
return;

if (Activity.Current?.GetCustomProperty("parent") is Activity parent)
{
var activity = parent.GetCustomProperty(@event.StreamAction.Id.ToString()) as Activity;
activity?.SetEndTime(DateTime.UtcNow);
activity?.SetTag("aggregate_type", @event.StreamAction.AggregateTypeName);
activity?.SetTag("expected_version_onserver", @event.StreamAction.ExpectedVersionOnServer?.ToString() ?? string.Empty);
activity?.AddStreamEvents(@event);
activity?.Stop();

var stopParent = parent.GetCustomProperty("stop_parent") as bool?;
if (stopParent != true)
return;

var done = parent.GetCustomProperty("childs_done_count") as int? ?? 0;
var childs = parent.GetCustomProperty("childs_count") as int? ?? 0;
parent.SetCustomProperty("childs_done_count", ++done);
if (done == childs)
parent.Stop();
}
}
}
40 changes: 40 additions & 0 deletions src/Marten.OpenTelemetry/Internal/MartenListener.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using Marten.Diagnostics;
using System.Diagnostics;

namespace Marten.OpenTelemetry.Internal;

internal partial class MartenListener: IObserver<KeyValuePair<string, object?>>
{
private static readonly string version = typeof(MartenListener).Assembly.GetName().Version?.ToString() ?? "1.0.0";
public static string SourceName => "Marten.OpenTelemetry";
private ActivitySource ActivitySource { get; } = new(SourceName, version);
private Dictionary<string[], Action<object?>> handlers { get; } = new();

public MartenListener()
{
handlers.Add(new[]
{
DiagnosticEventId.StreamCreated.Name!,
DiagnosticEventId.StreamAppended.Name!
}, StreamCreated);

handlers.Add(new[]
{
DiagnosticEventId.StreamChangesCompleted.Name!,
DiagnosticEventId.StreamChangesFailed.Name!
}, StreamChangesFinished);
}

public virtual void OnNext(KeyValuePair<string, object?> evt)
{
var key = handlers.Keys.FirstOrDefault(key => key.Contains(evt.Key));
if (key == null)
return;

handlers.TryGetValue(key, out var handler);
handler?.Invoke(evt.Value);
}

public virtual void OnCompleted() { }
public virtual void OnError(Exception error) { }
}
32 changes: 32 additions & 0 deletions src/Marten.OpenTelemetry/Internal/StreamExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using Marten.Diagnostics.Events;
using System.Diagnostics;

namespace Marten.OpenTelemetry.Internal;

internal static class StreamExtensions
{
public static void AddStreamEvents(this Activity activity, StreamBaseDiagnosticEvent @event)
{
foreach (var evt in @event.StreamAction.Events)
{
activity.AddEvent(new(
evt.EventTypeName,
evt.Timestamp,
new ActivityTagsCollection(new Dictionary<string, object?>()
{
{"event_id", evt.Id},
{"causation_id", evt.CausationId??string.Empty},
{ "dotnet_type_name",evt.DotNetTypeName},
{ "version",evt.Version},
{ "sequence",evt.Sequence},
})));
}
}
public static void AddStreamTags(this Activity activity, StreamBaseDiagnosticEvent @event)
{
activity.SetTag("correlation_id", @event.CorrelationId ?? string.Empty);
activity.SetTag("stream_id", @event.StreamAction.Id);
activity.SetTag("action_type", @event.StreamAction.ActionType);
activity.SetTag("tenant_id", @event.StreamAction.TenantId?.ToString() ?? string.Empty);
}
}
27 changes: 27 additions & 0 deletions src/Marten.OpenTelemetry/Marten.OpenTelemetry.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net6.0</TargetFrameworks>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<LangVersion>10.0</LangVersion>
<Description>Marten OpenTelemetry Instrumentation</Description>
<GenerateAssemblyTitleAttribute>true</GenerateAssemblyTitleAttribute>
<GenerateAssemblyDescriptionAttribute>true</GenerateAssemblyDescriptionAttribute>
<GenerateAssemblyProductAttribute>true</GenerateAssemblyProductAttribute>
<GenerateAssemblyCopyrightAttribute>false</GenerateAssemblyCopyrightAttribute>
<GenerateAssemblyVersionAttribute>true</GenerateAssemblyVersionAttribute>
<GenerateAssemblyFileVersionAttribute>true</GenerateAssemblyFileVersionAttribute>
<GenerateAssemblyInformationalVersionAttribute>true</GenerateAssemblyInformationalVersionAttribute>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="OpenTelemetry.Api" Version="1.4.0-alpha.2" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Marten\Marten.csproj" />
</ItemGroup>

</Project>
23 changes: 23 additions & 0 deletions src/Marten.OpenTelemetry/OpentelemetryInstrumentation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Marten.Diagnostics;
using Marten.OpenTelemetry.Internal;

namespace Marten.OpenTelemetry;
internal class OpenTelemetryInstrumentation: IDisposable
{
private readonly DiagnosticSourceSubscriber subscriber;
public OpenTelemetryInstrumentation(MartenInstrumentationOptions options)
{
options ??= new();
MartenListener listener = new();
subscriber = new DiagnosticSourceSubscriber(
name => listener,
listener => listener.Name == DiagnosticCategory.Name,
null);

subscriber.Subscribe();
}
public void Dispose()
{
subscriber.Dispose();
}
}
16 changes: 16 additions & 0 deletions src/Marten.OpenTelemetry/TraceProviderBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

using Marten.OpenTelemetry;
using Marten.OpenTelemetry.Internal;

namespace OpenTelemetry.Trace;
public static class TraceProviderBuilderExtensions
{
public static TracerProviderBuilder AddMartenInstrumentation(this TracerProviderBuilder builder, Action<MartenInstrumentationOptions>? configure = null)
{
MartenInstrumentationOptions options = new();
configure?.Invoke(options);
builder.AddSource(MartenListener.SourceName);
builder.AddInstrumentation(() => new OpenTelemetryInstrumentation(options));
return builder;
}
}
Loading