Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open telemetry reporter - Use an AddOtlpExporter #393

Merged
merged 7 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
jobs:
build:
docker:
- image: mcr.microsoft.com/dotnet/sdk:6.0
- image: mcr.microsoft.com/dotnet/sdk:8.0
steps:
- checkout
- run: dotnet tool install --global dotnet-sonarscanner --version 5.11.0
- run: dotnet tool install --global dotnet-sonarscanner --version 9.0.0
- run: echo 'export PATH="$PATH:/root/.dotnet/tools"' >> $BASH_ENV
#- run: echo "deb http://ftp.us.debian.org/debian stretch main contrib non-free" >> /etc/apt/sources.list
- run: echo "deb http://ftp.debian.org/debian stable main contrib non-free" >> /etc/apt/sources.list
Expand All @@ -26,7 +26,7 @@
- run: export JAVA_HOME
- run: dotnet sonarscanner begin /k:LGouellec_kafka-streams-dotnet /o:kafka-streams-dotnet /d:sonar.login=${SONAR_TOKEN} /d:sonar.host.url=https://sonarcloud.io /d:sonar.cs.opencover.reportsPaths="**\coverage*.opencover.xml" /d:sonar.coverage.exclusions="**sample*/*.cs,**test*/*.cs,**Tests*.cs,**Mock*.cs,**State/Cache/Internal/*.cs"
- run: dotnet build
- run: dotnet test --no-restore --no-build --verbosity normal -f net6.0 --collect:"XPlat Code Coverage" /p:CollectCoverage=true /p:CoverletOutputFormat=opencover test/Streamiz.Kafka.Net.Tests/Streamiz.Kafka.Net.Tests.csproj
- run: dotnet test --no-restore --no-build --verbosity normal -f net8.0 --collect:"XPlat Code Coverage" /p:CollectCoverage=true /p:CoverletOutputFormat=opencover test/Streamiz.Kafka.Net.Tests/Streamiz.Kafka.Net.Tests.csproj
- run: dotnet sonarscanner end /d:sonar.login=${SONAR_TOKEN}

workflows:
Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/build-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ jobs:
uses: actions/setup-dotnet@v1
with:
dotnet-version: 6.0.202
- name: Setup .NET 7.0
uses: actions/setup-dotnet@v1
with:
dotnet-version: 7.0.410
- name: Setup .NET 8.0
uses: actions/setup-dotnet@v1
with:
dotnet-version: 8.0.404
# BEGIN Dependencies for RocksDB
- run: sudo apt install -y libc6-dev libgflags-dev libsnappy-dev zlib1g-dev libbz2-dev liblz4-dev libzstd-dev
- run: sudo apt install -y bzip2 lz4 librocksdb-dev
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: integration

on:
push:
branches: [ integration ]
branches: [ master ]

jobs:
build:
Expand All @@ -11,10 +11,10 @@ jobs:

steps:
- uses: actions/checkout@v2
- name: Setup .NET 6.0
- name: Setup .NET 8.0
uses: actions/setup-dotnet@v1
with:
dotnet-version: 6.0.202
dotnet-version: 8.0.10
# BEGIN Dependencies for RocksDB
- run: sudo apt install -y libc6-dev libgflags-dev libsnappy-dev zlib1g-dev libbz2-dev liblz4-dev libzstd-dev
- run: sudo apt install -y bzip2 lz4 librocksdb-dev
Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ jobs:
uses: actions/setup-dotnet@v1
with:
dotnet-version: 6.0.202
- name: Setup .NET 7.0
uses: actions/setup-dotnet@v1
with:
dotnet-version: 7.0.20
- name: Setup .NET 8.0
uses: actions/setup-dotnet@v1
with:
dotnet-version: 8.0.10
- name: Install dependencies
run: dotnet restore
- name: Build
Expand Down
4 changes: 2 additions & 2 deletions core/Crosscutting/DictionaryExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using Confluent.Kafka;

namespace Streamiz.Kafka.Net.Crosscutting
{
Expand Down Expand Up @@ -51,6 +49,7 @@ public static bool AddIfNotExist<K, V>(this IDictionary<K, V> map, K key, V valu
return false;
}

#if NETSTANDARD2_0 || NET5_0 || NET6_0 || NET7_0
/// <summary>
/// Convert enumerable of <see cref="KeyValuePair{K, V}"/> to <see cref="IDictionary{K, V}"/>
/// </summary>
Expand All @@ -65,6 +64,7 @@ public static IDictionary<K, V> ToDictionary<K, V>(this IEnumerable<KeyValuePair
r.Add(s.Key, s.Value);
return r;
}
#endif


/// <summary>
Expand Down
1 change: 1 addition & 0 deletions core/Kafka/Internal/RecordCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
Expand Down
34 changes: 33 additions & 1 deletion core/KafkaStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Streamiz.Kafka.Net.Stream.Internal;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
Expand Down Expand Up @@ -253,7 +254,7 @@ public override string ToString()
private readonly string clientId;
private readonly ILogger logger;
private readonly string logPrefix;
private readonly object stateLock = new object();
private readonly object stateLock = new();
private readonly QueryableStoreProvider queryableStoreProvider;
private readonly GlobalStreamThread globalStreamThread;
private readonly StreamStateManager manager;
Expand Down Expand Up @@ -330,6 +331,37 @@ string Protect(string str)
() => threads.Count(t => t.State != ThreadState.DEAD && t.State != ThreadState.PENDING_SHUTDOWN),
metricsRegistry);

#region Temporary

long GetMemoryUse()
{
using var process = Process.GetCurrentProcess();
#if NETSTANDARD2_0
return process.PrivateMemorySize64;
#else
return process.WorkingSet64 + process.PagedSystemMemorySize64 -
(GC.GetGCMemoryInfo().TotalCommittedBytes - GC.GetTotalMemory(false));
#endif
}

string TOTAL_MEMORY = "total-memory";
string TOTAL_MEMORY_DESCRIPTION = "The application information metrics";
var sensor = metricsRegistry.ClientLevelSensor(
TOTAL_MEMORY,
TOTAL_MEMORY_DESCRIPTION,
MetricsRecordingLevel.INFO);
var tags = metricsRegistry.ClientTags();
tags.Add(GeneralClientMetrics.APPLICATION_ID, configuration.ApplicationId);

SensorHelper.AddMutableValueMetricToSensor(
sensor,
StreamMetricsRegistry.CLIENT_LEVEL_GROUP,
tags,
TOTAL_MEMORY,
TOTAL_MEMORY_DESCRIPTION,
GetMemoryUse);
#endregion

threads = new IThread[numStreamThreads];
var threadState = new Dictionary<long, Processors.ThreadState>();

Expand Down
2 changes: 1 addition & 1 deletion core/Metrics/Sensor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{
private readonly Dictionary<MetricName, StreamMetric> metrics;
private readonly IList<IMeasurableStat> stats;
private MetricConfig config = new MetricConfig();
private MetricConfig config = new();

/// <summary>
/// Lock object to synchronize recording
Expand Down Expand Up @@ -70,7 +70,7 @@

#region Add

internal bool AddStatMetric(MetricName keyMetricName, MetricName metricName, IMeasurableStat stat, MetricConfig config = null)

Check warning on line 73 in core/Metrics/Sensor.cs

View workflow job for this annotation

GitHub Actions / build

Cannot convert null literal to non-nullable reference type.
{
if (!NoRunnable)
{
Expand All @@ -88,7 +88,7 @@
return false;
}

internal virtual bool AddStatMetric(MetricName name, IMeasurableStat stat, MetricConfig config = null)

Check warning on line 91 in core/Metrics/Sensor.cs

View workflow job for this annotation

GitHub Actions / build

Cannot convert null literal to non-nullable reference type.
{
if (!NoRunnable)
{
Expand All @@ -106,7 +106,7 @@
return false;
}

internal virtual bool AddImmutableMetric<T>(MetricName name, T value, MetricConfig config = null)

Check warning on line 109 in core/Metrics/Sensor.cs

View workflow job for this annotation

GitHub Actions / build

Cannot convert null literal to non-nullable reference type.
{
if (!NoRunnable)
{
Expand All @@ -126,7 +126,7 @@
return false;
}

internal virtual bool AddProviderMetric<T>(MetricName name, Func<T> provider, MetricConfig config = null)

Check warning on line 129 in core/Metrics/Sensor.cs

View workflow job for this annotation

GitHub Actions / build

Cannot convert null literal to non-nullable reference type.
{
if (!NoRunnable)
{
Expand Down
10 changes: 4 additions & 6 deletions core/Processors/Internal/TaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ internal static StreamTask CurrentTask
return UnassignedStreamTask.Create();
return _currentTask;
}
set
{
_currentTask = value;
}
set => _currentTask = value;
}

private readonly ILogger log = Logger.GetLogger(typeof(TaskManager));
Expand Down Expand Up @@ -143,6 +140,8 @@ public void RevokeTasks(ICollection<TopicPartition> revokeAssignment)
foreach(var acT in commitNeededActiveTask)
acT.PostCommit(false);

revokedTask.Clear();
commitNeededActiveTask.Clear();
}

public StreamTask ActiveTaskFor(TopicPartition partition)
Expand All @@ -160,13 +159,11 @@ public StreamTask ActiveTaskFor(TopicPartition partition)

public void Close()
{
List<StreamTask> tasksToCommit = new List<StreamTask>();
List<TopicPartitionOffset> consumedOffsets = new List<TopicPartitionOffset>();
CurrentTask = null;
foreach (var t in activeTasks)
{
CurrentTask = t.Value;
tasksToCommit.Add(t.Value);
consumedOffsets.AddRange(t.Value.PrepareCommit());
}

Expand Down Expand Up @@ -237,6 +234,7 @@ internal int CommitAll()
if (committed > 0) // try to purge the committed records for repartition topics if possible
PurgeCommittedRecords(purgeOffsets);

tasksToCommit.Clear();
return committed;
}

Expand Down
12 changes: 6 additions & 6 deletions core/State/Cache/Internal/MemoryCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ public MemoryCacheStatistics GetCurrentStatistics()
{
if (wr.TryGetTarget(out Stats? stats))
{
hits += Interlocked.Read(ref stats.Hits);
misses += Interlocked.Read(ref stats.Misses);
hits += stats.Hits;
misses += stats.Misses;
}
}

Expand Down Expand Up @@ -351,8 +351,8 @@ private void RemoveFromStats(Stats current)
}
}

_accumulatedStats!.Hits += Interlocked.Read(ref current.Hits);
_accumulatedStats.Misses += Interlocked.Read(ref current.Misses);
_accumulatedStats!.Hits += current.Hits;
_accumulatedStats.Misses += current.Misses;
_allStats.TrimExcess();
}
}
Expand Down Expand Up @@ -550,13 +550,13 @@ private sealed class CoherentState<K, V>

internal long Count => Entries.Count;

internal long Size => Volatile.Read(ref CacheSize);
internal long Size => CacheSize;

internal void RemoveEntry(CacheEntry<K, V> entry, MemoryCacheOptions options)
{
if (Entries.TryRemove(entry.Key))
{
Interlocked.Add(ref CacheSize, -entry.Size);
CacheSize -= entry.Size;
entry.InvokeEvictionCallbacks();
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/Streamiz.Kafka.Net.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0;net5.0;net6.0</TargetFrameworks>
<TargetFrameworks>net5.0;net6.0;netstandard2.0;net7.0;net8.0</TargetFrameworks>
<RootNamespace>Streamiz.Kafka.Net</RootNamespace>
<AssemblyName>Streamiz.Kafka.Net</AssemblyName>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
Expand Down
30 changes: 30 additions & 0 deletions environment/confs/otel-collector-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317

exporters:
debug:
prometheus:
endpoint: "0.0.0.0:8889"
const_labels:
label1: value1

processors:
batch:

extensions:
health_check:
pprof:
endpoint: :1888
zpages:
endpoint: :55679

service:
extensions: [pprof, zpages, health_check]
pipelines:
metrics:
receivers: [otlp]
processors: [batch]
exporters: [debug, prometheus]
6 changes: 6 additions & 0 deletions environment/confs/prometheus.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
scrape_configs:
- job_name: 'otel-collector'
scrape_interval: 10s
static_configs:
- targets: ['otel-collector:8889']
- targets: ['otel-collector:8888']
16 changes: 0 additions & 16 deletions environment/datagen_connector.json

This file was deleted.

Loading
Loading