From 4dc400afda23fbd1103d3c82b6df90f792c6573e Mon Sep 17 00:00:00 2001 From: LGouellec Date: Wed, 23 Oct 2024 12:27:08 -0700 Subject: [PATCH 1/6] Support .Net8 --- .circleci/config.yml | 6 ++-- .github/workflows/build-ci.yml | 8 +++++ .github/workflows/integration.yml | 4 +-- .github/workflows/release.yml | 8 +++++ core/Crosscutting/DictionaryExtensions.cs | 4 +-- core/Kafka/Internal/RecordCollector.cs | 1 + core/KafkaStream.cs | 34 ++++++++++++++++++- core/Metrics/Sensor.cs | 2 +- core/Processors/Internal/TaskManager.cs | 10 +++--- core/State/Cache/Internal/MemoryCache.cs | 12 +++---- core/Streamiz.Kafka.Net.csproj | 2 +- environment/datagen_connector.json | 5 ++- environment/start.sh | 13 ++++--- .../sample-stream-demo.csproj | 2 +- launcher/sample-stream/Program.cs | 25 ++++++++++---- launcher/sample-stream/sample-stream.csproj | 3 +- .../OpenTelemetryConfigExtension.cs | 23 ++++--------- .../OpenTelemetryMetricsExporter.cs | 4 +-- .../OpenTelemetryRunner.cs | 2 ++ ...miz.Kafka.Net.Metrics.OpenTelemetry.csproj | 9 ++--- .../PrometheusMetricServer.cs | 15 +++++--- ...reamiz.Kafka.Net.Metrics.Prometheus.csproj | 2 +- ...eamiz.Kafka.Net.Azure.RemoteStorage.csproj | 2 +- ...afka.Net.SchemaRegistry.SerDes.Avro.csproj | 2 +- ...afka.Net.SchemaRegistry.SerDes.Json.csproj | 2 +- ....Net.SchemaRegistry.SerDes.Protobuf.csproj | 2 +- ...miz.Kafka.Net.SchemaRegistry.SerDes.csproj | 2 +- ...reamiz.Kafka.Net.SerDes.CloudEvents.csproj | 2 +- ...Streamiz.Kafka.Net.IntegrationTests.csproj | 2 +- .../Streamiz.Kafka.Net.Tests.csproj | 2 +- 30 files changed, 133 insertions(+), 77 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 65c38f79..bccac2ce 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -6,10 +6,10 @@ jobs: build: docker: - - image: mcr.microsoft.com/dotnet/sdk:6.0 + - image: mcr.microsoft.com/dotnet/sdk:8.0 steps: - checkout - - run: dotnet tool install --global dotnet-sonarscanner --version 5.11.0 + - run: dotnet tool install --global dotnet-sonarscanner --version 9.0.0 - run: echo 'export PATH="$PATH:/root/.dotnet/tools"' >> $BASH_ENV #- run: echo "deb http://ftp.us.debian.org/debian stretch main contrib non-free" >> /etc/apt/sources.list - run: echo "deb http://ftp.debian.org/debian stable main contrib non-free" >> /etc/apt/sources.list @@ -26,7 +26,7 @@ - run: export JAVA_HOME - run: dotnet sonarscanner begin /k:LGouellec_kafka-streams-dotnet /o:kafka-streams-dotnet /d:sonar.login=${SONAR_TOKEN} /d:sonar.host.url=https://sonarcloud.io /d:sonar.cs.opencover.reportsPaths="**\coverage*.opencover.xml" /d:sonar.coverage.exclusions="**sample*/*.cs,**test*/*.cs,**Tests*.cs,**Mock*.cs,**State/Cache/Internal/*.cs" - run: dotnet build - - run: dotnet test --no-restore --no-build --verbosity normal -f net6.0 --collect:"XPlat Code Coverage" /p:CollectCoverage=true /p:CoverletOutputFormat=opencover test/Streamiz.Kafka.Net.Tests/Streamiz.Kafka.Net.Tests.csproj + - run: dotnet test --no-restore --no-build --verbosity normal -f net8.0 --collect:"XPlat Code Coverage" /p:CollectCoverage=true /p:CoverletOutputFormat=opencover test/Streamiz.Kafka.Net.Tests/Streamiz.Kafka.Net.Tests.csproj - run: dotnet sonarscanner end /d:sonar.login=${SONAR_TOKEN} workflows: diff --git a/.github/workflows/build-ci.yml b/.github/workflows/build-ci.yml index 6b3e96ed..83ea3890 100644 --- a/.github/workflows/build-ci.yml +++ b/.github/workflows/build-ci.yml @@ -25,6 +25,14 @@ jobs: uses: actions/setup-dotnet@v1 with: dotnet-version: 6.0.202 + - name: Setup .NET 7.0 + uses: actions/setup-dotnet@v1 + with: + dotnet-version: 7.0.20 + - name: Setup .NET 8.0 + uses: actions/setup-dotnet@v1 + with: + dotnet-version: 8.0.10 # BEGIN Dependencies for RocksDB - run: sudo apt install -y libc6-dev libgflags-dev libsnappy-dev zlib1g-dev libbz2-dev liblz4-dev libzstd-dev - run: sudo apt install -y bzip2 lz4 librocksdb-dev diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 61ed3334..c87815d9 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -11,10 +11,10 @@ jobs: steps: - uses: actions/checkout@v2 - - name: Setup .NET 6.0 + - name: Setup .NET 8.0 uses: actions/setup-dotnet@v1 with: - dotnet-version: 6.0.202 + dotnet-version: 8.0.10 # BEGIN Dependencies for RocksDB - run: sudo apt install -y libc6-dev libgflags-dev libsnappy-dev zlib1g-dev libbz2-dev liblz4-dev libzstd-dev - run: sudo apt install -y bzip2 lz4 librocksdb-dev diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 45bb5873..025e978a 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -22,6 +22,14 @@ jobs: uses: actions/setup-dotnet@v1 with: dotnet-version: 6.0.202 + - name: Setup .NET 7.0 + uses: actions/setup-dotnet@v1 + with: + dotnet-version: 7.0.20 + - name: Setup .NET 8.0 + uses: actions/setup-dotnet@v1 + with: + dotnet-version: 8.0.10 - name: Install dependencies run: dotnet restore - name: Build diff --git a/core/Crosscutting/DictionaryExtensions.cs b/core/Crosscutting/DictionaryExtensions.cs index 10bb6913..421e3fca 100644 --- a/core/Crosscutting/DictionaryExtensions.cs +++ b/core/Crosscutting/DictionaryExtensions.cs @@ -1,8 +1,6 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; -using Confluent.Kafka; namespace Streamiz.Kafka.Net.Crosscutting { @@ -51,6 +49,7 @@ public static bool AddIfNotExist(this IDictionary map, K key, V valu return false; } + #if NETSTANDARD2_0 || NET5_0 || NET6_0 || NET7_0 /// /// Convert enumerable of to /// @@ -65,6 +64,7 @@ public static IDictionary ToDictionary(this IEnumerable diff --git a/core/Kafka/Internal/RecordCollector.cs b/core/Kafka/Internal/RecordCollector.cs index 226b11d4..8cfea23a 100644 --- a/core/Kafka/Internal/RecordCollector.cs +++ b/core/Kafka/Internal/RecordCollector.cs @@ -2,6 +2,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using System.Text; using Confluent.Kafka; using Microsoft.Extensions.Logging; diff --git a/core/KafkaStream.cs b/core/KafkaStream.cs index 5965e2b8..3dcd5dd7 100644 --- a/core/KafkaStream.cs +++ b/core/KafkaStream.cs @@ -9,6 +9,7 @@ using Streamiz.Kafka.Net.Stream.Internal; using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Runtime.CompilerServices; using System.Threading; @@ -253,7 +254,7 @@ public override string ToString() private readonly string clientId; private readonly ILogger logger; private readonly string logPrefix; - private readonly object stateLock = new object(); + private readonly object stateLock = new(); private readonly QueryableStoreProvider queryableStoreProvider; private readonly GlobalStreamThread globalStreamThread; private readonly StreamStateManager manager; @@ -330,6 +331,37 @@ string Protect(string str) () => threads.Count(t => t.State != ThreadState.DEAD && t.State != ThreadState.PENDING_SHUTDOWN), metricsRegistry); + #region Temporary + + long GetMemoryUse() + { + using var process = Process.GetCurrentProcess(); + #if NETSTANDARD2_0 + return process.PrivateMemorySize64; + #else + return process.WorkingSet64 + process.PagedSystemMemorySize64 - + (GC.GetGCMemoryInfo().TotalCommittedBytes - GC.GetTotalMemory(false)); + #endif + } + + string TOTAL_MEMORY = "total-memory"; + string TOTAL_MEMORY_DESCRIPTION = "The application information metrics"; + var sensor = metricsRegistry.ClientLevelSensor( + TOTAL_MEMORY, + TOTAL_MEMORY_DESCRIPTION, + MetricsRecordingLevel.INFO); + var tags = metricsRegistry.ClientTags(); + tags.Add(GeneralClientMetrics.APPLICATION_ID, configuration.ApplicationId); + + SensorHelper.AddMutableValueMetricToSensor( + sensor, + StreamMetricsRegistry.CLIENT_LEVEL_GROUP, + tags, + TOTAL_MEMORY, + TOTAL_MEMORY_DESCRIPTION, + GetMemoryUse); + #endregion + threads = new IThread[numStreamThreads]; var threadState = new Dictionary(); diff --git a/core/Metrics/Sensor.cs b/core/Metrics/Sensor.cs index 5b1a4d0b..47ab913e 100644 --- a/core/Metrics/Sensor.cs +++ b/core/Metrics/Sensor.cs @@ -16,7 +16,7 @@ public class Sensor : IEquatable, IComparable { private readonly Dictionary metrics; private readonly IList stats; - private MetricConfig config = new MetricConfig(); + private MetricConfig config = new(); /// /// Lock object to synchronize recording diff --git a/core/Processors/Internal/TaskManager.cs b/core/Processors/Internal/TaskManager.cs index 99be0aaf..0fbe9826 100644 --- a/core/Processors/Internal/TaskManager.cs +++ b/core/Processors/Internal/TaskManager.cs @@ -25,10 +25,7 @@ internal static StreamTask CurrentTask return UnassignedStreamTask.Create(); return _currentTask; } - set - { - _currentTask = value; - } + set => _currentTask = value; } private readonly ILogger log = Logger.GetLogger(typeof(TaskManager)); @@ -143,6 +140,8 @@ public void RevokeTasks(ICollection revokeAssignment) foreach(var acT in commitNeededActiveTask) acT.PostCommit(false); + revokedTask.Clear(); + commitNeededActiveTask.Clear(); } public StreamTask ActiveTaskFor(TopicPartition partition) @@ -160,13 +159,11 @@ public StreamTask ActiveTaskFor(TopicPartition partition) public void Close() { - List tasksToCommit = new List(); List consumedOffsets = new List(); CurrentTask = null; foreach (var t in activeTasks) { CurrentTask = t.Value; - tasksToCommit.Add(t.Value); consumedOffsets.AddRange(t.Value.PrepareCommit()); } @@ -235,6 +232,7 @@ internal int CommitAll() if (committed > 0) // try to purge the committed records for repartition topics if possible PurgeCommittedRecords(purgeOffsets); + tasksToCommit.Clear(); return committed; } diff --git a/core/State/Cache/Internal/MemoryCache.cs b/core/State/Cache/Internal/MemoryCache.cs index 094c2be2..91b33862 100644 --- a/core/State/Cache/Internal/MemoryCache.cs +++ b/core/State/Cache/Internal/MemoryCache.cs @@ -310,8 +310,8 @@ public MemoryCacheStatistics GetCurrentStatistics() { if (wr.TryGetTarget(out Stats? stats)) { - hits += Interlocked.Read(ref stats.Hits); - misses += Interlocked.Read(ref stats.Misses); + hits += stats.Hits; + misses += stats.Misses; } } @@ -351,8 +351,8 @@ private void RemoveFromStats(Stats current) } } - _accumulatedStats!.Hits += Interlocked.Read(ref current.Hits); - _accumulatedStats.Misses += Interlocked.Read(ref current.Misses); + _accumulatedStats!.Hits += current.Hits; + _accumulatedStats.Misses += current.Misses; _allStats.TrimExcess(); } } @@ -550,13 +550,13 @@ private sealed class CoherentState internal long Count => Entries.Count; - internal long Size => Volatile.Read(ref CacheSize); + internal long Size => CacheSize; internal void RemoveEntry(CacheEntry entry, MemoryCacheOptions options) { if (Entries.TryRemove(entry.Key)) { - Interlocked.Add(ref CacheSize, -entry.Size); + CacheSize -= entry.Size; entry.InvokeEvictionCallbacks(); } } diff --git a/core/Streamiz.Kafka.Net.csproj b/core/Streamiz.Kafka.Net.csproj index 4ae09b2e..89a39532 100644 --- a/core/Streamiz.Kafka.Net.csproj +++ b/core/Streamiz.Kafka.Net.csproj @@ -1,7 +1,7 @@  - netstandard2.0;net5.0;net6.0 + net5.0;net6.0;netstandard2.0;net7.0;net8.0 Streamiz.Kafka.Net Streamiz.Kafka.Net true diff --git a/environment/datagen_connector.json b/environment/datagen_connector.json index cb0eda80..7963839a 100644 --- a/environment/datagen_connector.json +++ b/environment/datagen_connector.json @@ -1,9 +1,8 @@ { - "name": "datagen-products", + "name": "datagen-orders", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", - "kafka.topic": "users", - "quickstart": "users", + "kafka.topic": "input", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", diff --git a/environment/start.sh b/environment/start.sh index 1074d89f..1658a766 100644 --- a/environment/start.sh +++ b/environment/start.sh @@ -1,21 +1,20 @@ #!/bin/bash -curl -i -X PUT http://localhost:8083/connectors/datagen_product/config \ +curl -i -X PUT http://localhost:8083/connectors/datagen_order/config \ -H "Content-Type: application/json" \ -d '{ "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", - "kafka.topic": "product3", + "kafka.topic": "input", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", - "max.interval": 50, + "max.interval": 100, "iterations": 10000000, "tasks.max": "1", "schema.filename": "/home/appuser/order.avsc", "schema.keyfield": "name" }' - -# curl -i -X PUT http://localhost:8083/connectors/datagen_product/pause -# curl -i -X PUT http://localhost:8083/connectors/datagen_product/resume -# curl -X DELETE http://localhost:8083/connectors/datagen_product \ No newline at end of file +# curl -i -X PUT http://localhost:8083/connectors/datagen_order/pause +# curl -i -X PUT http://localhost:8083/connectors/datagen_order/resume +# curl -X DELETE http://localhost:8083/connectors/datagen_order \ No newline at end of file diff --git a/launcher/sample-stream-demo/sample-stream-demo.csproj b/launcher/sample-stream-demo/sample-stream-demo.csproj index 38ba6dfb..9cf98468 100644 --- a/launcher/sample-stream-demo/sample-stream-demo.csproj +++ b/launcher/sample-stream-demo/sample-stream-demo.csproj @@ -2,7 +2,7 @@ Exe - net6.0 + net8.0 sample_stream_demo Linux diff --git a/launcher/sample-stream/Program.cs b/launcher/sample-stream/Program.cs index c35f4564..99744b86 100644 --- a/launcher/sample-stream/Program.cs +++ b/launcher/sample-stream/Program.cs @@ -1,14 +1,18 @@ using Streamiz.Kafka.Net; using System; -using System.Collections.Generic; -using System.Reflection; +using System.Diagnostics.Metrics; +using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.Logging; +using OpenTelemetry; +using Streamiz.Kafka.Net.Metrics.OpenTelemetry; using Streamiz.Kafka.Net.SerDes; -using Streamiz.Kafka.Net.State; using Streamiz.Kafka.Net.Stream; -using Streamiz.Kafka.Net.Table; +using OpenTelemetry.Metrics; +using OpenTelemetry.Resources; +using Streamiz.Kafka.Net.Metrics; +using Streamiz.Kafka.Net.Metrics.Prometheus; namespace sample_stream { @@ -20,12 +24,17 @@ public static async Task Main(string[] args) ApplicationId = $"test-app", BootstrapServers = "localhost:9092", AutoOffsetReset = AutoOffsetReset.Earliest, + MetricsRecording = MetricsRecordingLevel.DEBUG, + MetricsIntervalMs = 500, Logger = LoggerFactory.Create((b) => { b.AddConsole(); b.SetMinimumLevel(LogLevel.Debug); }) }; + + config.UseOpenTelemetryReporter(); + //config.UsePrometheusReporter(9090); var t = BuildTopology(); var stream = new KafkaStream(t, config); @@ -41,8 +50,10 @@ private static Topology BuildTopology() { var builder = new StreamBuilder(); - builder.Stream("input2") - .GroupByKey() + builder.Stream("input") + .To("output2"); + + /*.GroupByKey() .WindowedBy(TumblingWindowOptions.Of(TimeSpan.FromMinutes(1))) .Count() .Suppress(SuppressedBuilder.UntilWindowClose, long>(TimeSpan.Zero, @@ -50,7 +61,7 @@ private static Topology BuildTopology() .WithKeySerdes(new TimeWindowedSerDes(new StringSerDes(), (long)TimeSpan.FromMinutes(1).TotalMilliseconds))) .ToStream() .Map((k,v, r) => new KeyValuePair(k.Key, v)) - .To("output2"); + .To("output");*/ return builder.Build(); } diff --git a/launcher/sample-stream/sample-stream.csproj b/launcher/sample-stream/sample-stream.csproj index 91d8e479..a0f35a82 100644 --- a/launcher/sample-stream/sample-stream.csproj +++ b/launcher/sample-stream/sample-stream.csproj @@ -2,7 +2,7 @@ Exe - net6.0 + net8.0 sample_stream @@ -17,6 +17,7 @@ + diff --git a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryConfigExtension.cs b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryConfigExtension.cs index 72dd8322..771510ca 100644 --- a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryConfigExtension.cs +++ b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryConfigExtension.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Security.Cryptography; using Microsoft.Extensions.Logging; using OpenTelemetry; @@ -22,25 +23,13 @@ public static IStreamConfig UseOpenTelemetryReporter( .SetResourceBuilder( ResourceBuilder.CreateDefault() .AddService(serviceName: "Streamiz")); - + + //meterProviderBuilder.AddPrometheusExporter(); + meterProviderBuilder.AddPrometheusHttpListener(); + meterProviderBuilder.AddRuntimeInstrumentation(); + actionMeterProviderBuilder?.Invoke(meterProviderBuilder); - // ONLY FOR TEST - // var port = RandomGenerator.GetInt32(10000); - // if (port < 5000) - // port += 5000; - // - // logger.LogInformation($"Open telemetry remote port is {port}"); - // - // meterProviderBuilder.AddPrometheusExporter((options) => - // { - // // for test - // options.StartHttpListener = true; - // // Use your endpoint and port here - // options.HttpListenerPrefixes = new string[] { $"http://localhost:{port}/" }; - // options.ScrapeResponseCacheDurationMilliseconds = 0; - // }); - var tracerProvider = meterProviderBuilder.Build(); var openTelemetryExporter = new OpenTelemetryMetricsExporter(); var openTelemetryRunner = new OpenTelemetryRunner(tracerProvider, openTelemetryExporter); diff --git a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryMetricsExporter.cs b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryMetricsExporter.cs index f71595d2..75d6d85d 100644 --- a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryMetricsExporter.cs +++ b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryMetricsExporter.cs @@ -20,7 +20,7 @@ public void ExposeMetrics(IEnumerable sensors) foreach (var metric in metrics) { var metricKey = MetricKey(metric.Value); - + //meter. meter.CreateObservableGauge( metricKey, () => new[] @@ -35,7 +35,7 @@ public void ExposeMetrics(IEnumerable sensors) public void Dispose() { - meter.Dispose(); + meter?.Dispose(); } } } \ No newline at end of file diff --git a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryRunner.cs b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryRunner.cs index 11bfbdf4..2ea11ae1 100644 --- a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryRunner.cs +++ b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryRunner.cs @@ -1,4 +1,6 @@ +using System.Diagnostics.Metrics; using System.Threading; +using OpenTelemetry; using OpenTelemetry.Metrics; namespace Streamiz.Kafka.Net.Metrics.OpenTelemetry diff --git a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/Streamiz.Kafka.Net.Metrics.OpenTelemetry.csproj b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/Streamiz.Kafka.Net.Metrics.OpenTelemetry.csproj index 3da3f32e..3f88be28 100644 --- a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/Streamiz.Kafka.Net.Metrics.OpenTelemetry.csproj +++ b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/Streamiz.Kafka.Net.Metrics.OpenTelemetry.csproj @@ -1,7 +1,7 @@ - netstandard2.0;net5.0;net6.0 + net5.0;net6.0;netstandard2.0;net7.0;net8.0 1.6.0-RC1 1.6.0-RC1 1.6.0 @@ -18,9 +18,10 @@ - - - + + + + diff --git a/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/PrometheusMetricServer.cs b/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/PrometheusMetricServer.cs index 9381156a..716707a2 100644 --- a/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/PrometheusMetricServer.cs +++ b/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/PrometheusMetricServer.cs @@ -58,7 +58,7 @@ protected Task StartServer(CancellationToken cancel) { // This will ensure that any failures to start are nicely thrown from StartServerAsync. _httpListener.Start(); - + // Kick off the actual processing to a new thread and return a Task for the processing thread. return Task.Factory.StartNew(() => { @@ -84,7 +84,7 @@ protected Task StartServer(CancellationToken cancel) try { - List tmpGauges = null; + List tmpGauges; lock (@lock) { @@ -115,8 +115,15 @@ protected Task StartServer(CancellationToken cancel) //response.OutputStream.Write(memoryBuffer, 0, memoryBuffer.Length); await ms.CopyToAsync(response.OutputStream, 2048*2*2, cancel); } - - response.OutputStream.Dispose(); + + tmpGauges.Clear(); + + await response.OutputStream.FlushAsync(cancel); + #if NETSTANDARD2_0 + response.OutputStream.Dispose(); + #else + await response.OutputStream.DisposeAsync(); + #endif } catch (Exception ex) { diff --git a/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/Streamiz.Kafka.Net.Metrics.Prometheus.csproj b/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/Streamiz.Kafka.Net.Metrics.Prometheus.csproj index 7e50572e..15c8b167 100644 --- a/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/Streamiz.Kafka.Net.Metrics.Prometheus.csproj +++ b/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/Streamiz.Kafka.Net.Metrics.Prometheus.csproj @@ -1,7 +1,7 @@ - netstandard2.0;net5.0;net6.0 + net5.0;net6.0;netstandard2.0;net7.0;net8.0 1.6.0-RC1 1.6.0-RC1 1.6.0 diff --git a/remote/Streamiz.Kafka.Net.Azure.RemoteStorage/Streamiz.Kafka.Net.Azure.RemoteStorage.csproj b/remote/Streamiz.Kafka.Net.Azure.RemoteStorage/Streamiz.Kafka.Net.Azure.RemoteStorage.csproj index e01ff285..d6cec968 100644 --- a/remote/Streamiz.Kafka.Net.Azure.RemoteStorage/Streamiz.Kafka.Net.Azure.RemoteStorage.csproj +++ b/remote/Streamiz.Kafka.Net.Azure.RemoteStorage/Streamiz.Kafka.Net.Azure.RemoteStorage.csproj @@ -1,7 +1,7 @@ - netstandard2.0;net5.0;net6.0 + net5.0;net6.0;netstandard2.0;net7.0;net8.0 1.6.0-RC1 1.6.0-RC1 1.6.0 diff --git a/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Avro/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Avro.csproj b/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Avro/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Avro.csproj index b25d35c9..fd2c194e 100644 --- a/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Avro/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Avro.csproj +++ b/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Avro/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Avro.csproj @@ -1,7 +1,7 @@ - netstandard2.0;net5.0;net6.0 + net5.0;net6.0;netstandard2.0;net7.0;net8.0 Streamiz.Kafka.Net.SchemaRegistry.SerDes.Avro Streamiz.Kafka.Net.SchemaRegistry.SerDes.Avro true diff --git a/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Json/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Json.csproj b/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Json/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Json.csproj index 16c6c35c..bfb10bbd 100644 --- a/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Json/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Json.csproj +++ b/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Json/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Json.csproj @@ -1,7 +1,7 @@ - netstandard2.0;net5.0;net6.0 + net5.0;net6.0;netstandard2.0;net7.0;net8.0 Streamiz.Kafka.Net.SchemaRegistry.SerDes.Json Streamiz.Kafka.Net.SchemaRegistry.SerDes.Json true diff --git a/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Protobuf/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Protobuf.csproj b/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Protobuf/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Protobuf.csproj index c27522ee..3c5eb830 100644 --- a/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Protobuf/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Protobuf.csproj +++ b/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Protobuf/Streamiz.Kafka.Net.SchemaRegistry.SerDes.Protobuf.csproj @@ -1,7 +1,7 @@  - netstandard2.0;net5.0;net6.0 + net5.0;net6.0;netstandard2.0;net7.0;net8.0 @LGouellec Copyright 2020 MIT diff --git a/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes/Streamiz.Kafka.Net.SchemaRegistry.SerDes.csproj b/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes/Streamiz.Kafka.Net.SchemaRegistry.SerDes.csproj index 41b1f86a..b694e34b 100644 --- a/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes/Streamiz.Kafka.Net.SchemaRegistry.SerDes.csproj +++ b/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes/Streamiz.Kafka.Net.SchemaRegistry.SerDes.csproj @@ -1,7 +1,7 @@ - netstandard2.0;net5.0;net6.0 + net5.0;net6.0;netstandard2.0;net7.0;net8.0 @LGouellec @LGouellec Copyright 2020 diff --git a/serdes/Streamiz.Kafka.Net.SerDes.CloudEvents/Streamiz.Kafka.Net.SerDes.CloudEvents.csproj b/serdes/Streamiz.Kafka.Net.SerDes.CloudEvents/Streamiz.Kafka.Net.SerDes.CloudEvents.csproj index b42f03c3..27950936 100644 --- a/serdes/Streamiz.Kafka.Net.SerDes.CloudEvents/Streamiz.Kafka.Net.SerDes.CloudEvents.csproj +++ b/serdes/Streamiz.Kafka.Net.SerDes.CloudEvents/Streamiz.Kafka.Net.SerDes.CloudEvents.csproj @@ -1,7 +1,7 @@ - netstandard2.0;net5.0;net6.0 + net5.0;net6.0;netstandard2.0;net7.0;net8.0 Streamiz.Kafka.Net.SerDes.CloudEvents true @LGouellec diff --git a/test/Streamiz.Kafka.Net.IntegrationTests/Streamiz.Kafka.Net.IntegrationTests.csproj b/test/Streamiz.Kafka.Net.IntegrationTests/Streamiz.Kafka.Net.IntegrationTests.csproj index f414f871..079a2d4a 100644 --- a/test/Streamiz.Kafka.Net.IntegrationTests/Streamiz.Kafka.Net.IntegrationTests.csproj +++ b/test/Streamiz.Kafka.Net.IntegrationTests/Streamiz.Kafka.Net.IntegrationTests.csproj @@ -1,7 +1,7 @@ - net6.0 + net8.0 false diff --git a/test/Streamiz.Kafka.Net.Tests/Streamiz.Kafka.Net.Tests.csproj b/test/Streamiz.Kafka.Net.Tests/Streamiz.Kafka.Net.Tests.csproj index ab8e3012..837d083f 100644 --- a/test/Streamiz.Kafka.Net.Tests/Streamiz.Kafka.Net.Tests.csproj +++ b/test/Streamiz.Kafka.Net.Tests/Streamiz.Kafka.Net.Tests.csproj @@ -1,7 +1,6 @@  - net6.0 Streamiz.Kafka.Net.Tests false Streamiz.Kafka.Net.Tests @@ -12,6 +11,7 @@ 1.6.0-RC1 1.6.0 1.6.0 + net8.0 From 3fed909de93033975961211f72c15ac7e3d07b7c Mon Sep 17 00:00:00 2001 From: LGouellec Date: Mon, 28 Oct 2024 14:34:56 -0700 Subject: [PATCH 2/6] Switch for OtlpExporter --- environment/confs/otel-collector-config.yaml | 30 ++++++ environment/confs/prometheus.yaml | 6 ++ environment/datagen_connector.json | 15 --- environment/docker-compose-with-connect.yml | 102 ------------------ environment/docker-compose-with-mongo.yml | 86 --------------- environment/docker-compose.yml | 52 ++++++++- environment/mongo-init/init.js | 4 - .../OpenTelemetryConfigExtension.cs | 14 +-- ...miz.Kafka.Net.Metrics.OpenTelemetry.csproj | 2 +- run-demo.sh | 10 +- 10 files changed, 100 insertions(+), 221 deletions(-) create mode 100644 environment/confs/otel-collector-config.yaml create mode 100644 environment/confs/prometheus.yaml delete mode 100644 environment/datagen_connector.json delete mode 100644 environment/docker-compose-with-connect.yml delete mode 100644 environment/docker-compose-with-mongo.yml delete mode 100644 environment/mongo-init/init.js diff --git a/environment/confs/otel-collector-config.yaml b/environment/confs/otel-collector-config.yaml new file mode 100644 index 00000000..d4080fc0 --- /dev/null +++ b/environment/confs/otel-collector-config.yaml @@ -0,0 +1,30 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + +exporters: + debug: + prometheus: + endpoint: "0.0.0.0:8889" + const_labels: + label1: value1 + +processors: + batch: + +extensions: + health_check: + pprof: + endpoint: :1888 + zpages: + endpoint: :55679 + +service: + extensions: [pprof, zpages, health_check] + pipelines: + metrics: + receivers: [otlp] + processors: [batch] + exporters: [debug, prometheus] \ No newline at end of file diff --git a/environment/confs/prometheus.yaml b/environment/confs/prometheus.yaml new file mode 100644 index 00000000..eb9d8ccc --- /dev/null +++ b/environment/confs/prometheus.yaml @@ -0,0 +1,6 @@ +scrape_configs: + - job_name: 'otel-collector' + scrape_interval: 10s + static_configs: + - targets: ['otel-collector:8889'] + - targets: ['otel-collector:8888'] \ No newline at end of file diff --git a/environment/datagen_connector.json b/environment/datagen_connector.json deleted file mode 100644 index 7963839a..00000000 --- a/environment/datagen_connector.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "name": "datagen-orders", - "config": { - "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", - "kafka.topic": "input", - "key.converter": "org.apache.kafka.connect.storage.StringConverter", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": "false", - "max.interval": 50, - "iterations": 10000000, - "tasks.max": "1", - "schema.filename": "/home/appuser/order.avsc", - "schema.keyfield": "name" - } -} \ No newline at end of file diff --git a/environment/docker-compose-with-connect.yml b/environment/docker-compose-with-connect.yml deleted file mode 100644 index 6e358fe9..00000000 --- a/environment/docker-compose-with-connect.yml +++ /dev/null @@ -1,102 +0,0 @@ ---- -version: '2' -services: - zookeeper: - image: confluentinc/cp-zookeeper:7.6.1 - hostname: zookeeper - container_name: zookeeper - ports: - - "2181:2181" - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 - KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*" - - broker: - image: confluentinc/cp-server:7.6.1 - hostname: broker - container_name: broker - depends_on: - - zookeeper - ports: - - "9092:9092" - - "9101:9101" - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 - KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_JMX_PORT: 9101 - KAFKA_JMX_HOSTNAME: localhost - KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081 - CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 - CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 - CONFLUENT_METRICS_ENABLE: 'true' - CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' - - schema-registry: - image: confluentinc/cp-schema-registry:7.6.1 - hostname: schema-registry - container_name: schema-registry - depends_on: - - broker - ports: - - "8081:8081" - environment: - SCHEMA_REGISTRY_HOST_NAME: schema-registry - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' - SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 - - connect: - image: cnfldemos/kafka-connect-datagen:0.6.4-7.6.0 - container_name: connect - depends_on: - - broker - - schema-registry - ports: - - 8083:8083 - environment: - CONNECT_BOOTSTRAP_SERVERS: "broker:29092" - CONNECT_REST_ADVERTISED_HOST_NAME: "connect" - CONNECT_GROUP_ID: kafka-connect - CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs - CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets - CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status - CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' - CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' - CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO" - CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR" - CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1" - CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1" - CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" - volumes: - - ./confs/order.avsc:/home/appuser/order.avsc - - akhq: - image: tchiotludo/akhq:latest - environment: - AKHQ_CONFIGURATION: | - akhq: - server: - access-log: - enabled: false - connections: - docker-kafka-server: - properties: - bootstrap.servers: "broker:29092" - schema-registry: - type: "confluent" - url: "http://schema-registry:8081" - ports: - - 8082:8080 - links: - - broker \ No newline at end of file diff --git a/environment/docker-compose-with-mongo.yml b/environment/docker-compose-with-mongo.yml deleted file mode 100644 index 22c0f4f4..00000000 --- a/environment/docker-compose-with-mongo.yml +++ /dev/null @@ -1,86 +0,0 @@ ---- -version: '2' -services: - zookeeper: - image: confluentinc/cp-zookeeper:7.4.0 - hostname: zookeeper - container_name: zookeeper - ports: - - "2181:2181" - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 - KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*" - - broker: - image: confluentinc/cp-server:7.4.0 - hostname: broker - container_name: broker - depends_on: - - zookeeper - ports: - - "9092:9092" - - "9101:9101" - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 - KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_JMX_PORT: 9101 - KAFKA_JMX_HOSTNAME: localhost - KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081 - CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 - CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 - CONFLUENT_METRICS_ENABLE: 'true' - CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' - - schema-registry: - image: confluentinc/cp-schema-registry:7.4.0 - hostname: schema-registry - container_name: schema-registry - depends_on: - - broker - ports: - - "8081:8081" - environment: - SCHEMA_REGISTRY_HOST_NAME: schema-registry - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' - SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 - - akhq: - image: tchiotludo/akhq:latest - environment: - AKHQ_CONFIGURATION: | - akhq: - server: - access-log: - enabled: false - connections: - docker-kafka-server: - properties: - bootstrap.servers: "broker:29092" - schema-registry: - type: "confluent" - url: "http://schema-registry:8081" - ports: - - 8082:8080 - links: - - broker - - mongo: - image: 'mongo' - environment: - MONGO_INITDB_ROOT_USERNAME: admin - MONGO_INITDB_ROOT_PASSWORD: admin - volumes: - - ./mongo-init/:/docker-entrypoint-initdb.d/:ro - ports: - - "27017:27017" - diff --git a/environment/docker-compose.yml b/environment/docker-compose.yml index bd806051..fe76a002 100644 --- a/environment/docker-compose.yml +++ b/environment/docker-compose.yml @@ -109,4 +109,54 @@ services: - 8082:8080 links: - broker-1 - - broker-2 \ No newline at end of file + - broker-2 + + connect: + image: cnfldemos/kafka-connect-datagen:0.6.4-7.6.0 + container_name: connect + depends_on: + - broker-1 + - broker-2 + - schema-registry + ports: + - 8083:8083 + environment: + CONNECT_BOOTSTRAP_SERVERS: "broker-1:29092,broker-2:29093" + CONNECT_REST_ADVERTISED_HOST_NAME: "connect" + CONNECT_GROUP_ID: kafka-connect + CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs + CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets + CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status + CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter + CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' + CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter + CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' + CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO" + CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR" + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1" + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1" + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" + volumes: + - ./confs/order.avsc:/home/appuser/order.avsc + + otel-collector: + image: otel/opentelemetry-collector:0.112.0 + restart: always + command: ["--config=/etc/otel-collector-config.yaml"] + volumes: + - ./confs/otel-collector-config.yaml:/etc/otel-collector-config.yaml + ports: + - "1888:1888" # pprof extension + - "8888:8888" # Prometheus metrics exposed by the collector + - "8889:8889" # Prometheus exporter metrics + - "13133:13133" # health_check extension + - "4317:4317" # OTLP gRPC receiver + - "55679:55679" # zpages extension + + prometheus: + image: prom/prometheus:latest + restart: always + volumes: + - ./confs/prometheus.yaml:/etc/prometheus/prometheus.yml + ports: + - "9090:9090" \ No newline at end of file diff --git a/environment/mongo-init/init.js b/environment/mongo-init/init.js deleted file mode 100644 index 0c084051..00000000 --- a/environment/mongo-init/init.js +++ /dev/null @@ -1,4 +0,0 @@ -db = db.getSiblingDB('streamiz'); -db.createCollection('adress'); -db.adress.insert({ "address": { "city": "Paris", "zip": "123" }, "name": "Mike", "phone": "1234" }); -db.adress.insert({ "address": { "city": "Marsel", "zip": "321" }, "name": "Helga", "phone": "4321" }); \ No newline at end of file diff --git a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryConfigExtension.cs b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryConfigExtension.cs index 771510ca..62a52fbd 100644 --- a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryConfigExtension.cs +++ b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryConfigExtension.cs @@ -1,11 +1,8 @@ using System; -using System.Collections.Generic; -using System.Security.Cryptography; -using Microsoft.Extensions.Logging; using OpenTelemetry; +using OpenTelemetry.Exporter; using OpenTelemetry.Metrics; using OpenTelemetry.Resources; -using Streamiz.Kafka.Net.Crosscutting; namespace Streamiz.Kafka.Net.Metrics.OpenTelemetry { @@ -23,9 +20,12 @@ public static IStreamConfig UseOpenTelemetryReporter( .SetResourceBuilder( ResourceBuilder.CreateDefault() .AddService(serviceName: "Streamiz")); - - //meterProviderBuilder.AddPrometheusExporter(); - meterProviderBuilder.AddPrometheusHttpListener(); + + meterProviderBuilder.AddOtlpExporter(options => { + options.Protocol = OtlpExportProtocol.Grpc; + options.ExportProcessorType = ExportProcessorType.Batch; + }); + //meterProviderBuilder.AddPrometheusHttpListener(); meterProviderBuilder.AddRuntimeInstrumentation(); actionMeterProviderBuilder?.Invoke(meterProviderBuilder); diff --git a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/Streamiz.Kafka.Net.Metrics.OpenTelemetry.csproj b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/Streamiz.Kafka.Net.Metrics.OpenTelemetry.csproj index 3f88be28..96802451 100644 --- a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/Streamiz.Kafka.Net.Metrics.OpenTelemetry.csproj +++ b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/Streamiz.Kafka.Net.Metrics.OpenTelemetry.csproj @@ -19,7 +19,7 @@ - + diff --git a/run-demo.sh b/run-demo.sh index 3e73553d..e9678bd0 100755 --- a/run-demo.sh +++ b/run-demo.sh @@ -21,8 +21,8 @@ do fi done -docker-compose -f environment/docker-compose.yml up -d broker -kafkaContainerId=`docker ps -f name=broker | tail -n 1 | awk '{print $1}'` +docker-compose -f environment/docker-compose.yml up -d broker-1 broker-2 +kafkaContainerId=`docker ps -f name=broker-1 | tail -n 1 | awk '{print $1}'` # Wait broker is UP test=true @@ -39,12 +39,12 @@ do done docker-compose -f environment/docker-compose.yml up -d schema-registry akhq -docker exec -i ${kafkaContainerId} kafka-topics --bootstrap-server broker:29092 --topic input --create --partitions 4 --replication-factor 1 > /dev/null 2>&1 -docker exec -i ${kafkaContainerId} kafka-topics --bootstrap-server broker:29092 --topic output --create --partitions 4 --replication-factor 1 > /dev/null 2>&1 +docker exec -i ${kafkaContainerId} kafka-topics --bootstrap-server broker-1:29092 --topic input --create --partitions 4 --replication-factor 1 > /dev/null 2>&1 +docker exec -i ${kafkaContainerId} kafka-topics --bootstrap-server broker-1:29092 --topic output --create --partitions 4 --replication-factor 1 > /dev/null 2>&1 echo "Topics created" echo "List all topics ..." -docker exec -i ${kafkaContainerId} kafka-topics --bootstrap-server broker:29092 --list +docker exec -i ${kafkaContainerId} kafka-topics --bootstrap-server broker-1:29092 --list echo "Restore, build and run demo sample" dotnet dev-certs https From 7e2b168340a319a4bfbdcc067888dcbe0180234c Mon Sep 17 00:00:00 2001 From: LGouellec Date: Fri, 8 Nov 2024 11:30:02 -0800 Subject: [PATCH 3/6] hit garbage collector more frequently [tentatively] --- launcher/sample-stream/Program.cs | 56 +++++++++++++++++-- .../OpenTelemetryMetricsExporter.cs | 29 +++++----- 2 files changed, 67 insertions(+), 18 deletions(-) diff --git a/launcher/sample-stream/Program.cs b/launcher/sample-stream/Program.cs index 99744b86..736cef3e 100644 --- a/launcher/sample-stream/Program.cs +++ b/launcher/sample-stream/Program.cs @@ -18,23 +18,68 @@ namespace sample_stream { public static class Program { + public static void Main2() + { + Meter? meter = null; + + var meterProviderBuilder = Sdk + .CreateMeterProviderBuilder() + .AddMeter("Test") + .SetResourceBuilder( + ResourceBuilder.CreateDefault() + .AddService(serviceName: "Test")); + + meterProviderBuilder.AddOtlpExporter((exporterOptions, readerOptions) => + { + readerOptions.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds = 5000; + }); + + using var meterProvider = meterProviderBuilder.Build(); + + while (true) + { + meter?.Dispose(); + GC.Collect(); + meter = new Meter("Test"); + var rd = new Random(); + + meter.CreateObservableGauge( + "requests", + () => new[] + { + new Measurement(rd.NextDouble() * rd.Next(100)), + }, + description: "Request per second"); + + // will see after couple of minutes that the MetricReader contains a lot of MetricPoint[], even if we dispose the Meter after each iteration + Thread.Sleep(200); + } + } + public static async Task Main(string[] args) { var config = new StreamConfig{ ApplicationId = $"test-app", - BootstrapServers = "localhost:9092", + BootstrapServers = "pkc-p11xm.us-east-1.aws.confluent.cloud:9092", + SecurityProtocol = SecurityProtocol.SaslSsl, + SaslMechanism = SaslMechanism.Plain, + SaslUsername = "VYBKPVLBPW2CYRX4", + CommitIntervalMs = 200, + SaslPassword = "FdtYt9HdVQUo0RkI5tIrdvbfdYm3BjJfKjNiYZGBfb1VHcOABtsZR1P7ib6DKB6p", + SessionTimeoutMs = 45000, + ClientId = "ccloud-csharp-client-f7bb4f5b-f37d-4956-851e-e106065963b8", AutoOffsetReset = AutoOffsetReset.Earliest, MetricsRecording = MetricsRecordingLevel.DEBUG, MetricsIntervalMs = 500, Logger = LoggerFactory.Create((b) => { b.AddConsole(); - b.SetMinimumLevel(LogLevel.Debug); + b.SetMinimumLevel(LogLevel.Information); }) }; config.UseOpenTelemetryReporter(); - //config.UsePrometheusReporter(9090); + //config.UsePrometheusReporter(9091); var t = BuildTopology(); var stream = new KafkaStream(t, config); @@ -50,8 +95,9 @@ private static Topology BuildTopology() { var builder = new StreamBuilder(); - builder.Stream("input") - .To("output2"); + builder.Stream("sample_data") + .Print(Printed.ToOut()); + //.To("output2"); /*.GroupByKey() .WindowedBy(TumblingWindowOptions.Of(TimeSpan.FromMinutes(1))) diff --git a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryMetricsExporter.cs b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryMetricsExporter.cs index 75d6d85d..8676f893 100644 --- a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryMetricsExporter.cs +++ b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryMetricsExporter.cs @@ -1,3 +1,4 @@ +using System; using System.Collections.Generic; using System.Diagnostics.Metrics; using System.Linq; @@ -7,29 +8,31 @@ namespace Streamiz.Kafka.Net.Metrics.OpenTelemetry public class OpenTelemetryMetricsExporter { private Meter meter; - private readonly IDictionary> gauges = new Dictionary>(); + + private readonly IDictionary> gauges = + new Dictionary>(); public void ExposeMetrics(IEnumerable sensors) { + GC.Collect(); meter?.Dispose(); meter = new Meter("Streamiz"); - + string MetricKey(StreamMetric metric) => $"{metric.Group}_{metric.Name}".Replace("-", "_"); - + var metrics = sensors.SelectMany(s => s.Metrics); foreach (var metric in metrics) { var metricKey = MetricKey(metric.Value); - //meter. - meter.CreateObservableGauge( - metricKey, - () => new[] - { - new Measurement( - Crosscutting.Utils.IsNumeric(metric.Value.Value, out var value) ? value : 1d, - metric.Key.Tags.Select(kv => new KeyValuePair(kv.Key, kv.Value)).ToArray()) - }, - description: metric.Key.Description); + meter.CreateObservableGauge( + metricKey, + () => new[] + { + new Measurement( + Crosscutting.Utils.IsNumeric(metric.Value.Value, out var value) ? value : 1d, + metric.Key.Tags.Select(kv => new KeyValuePair(kv.Key, kv.Value)).ToArray()) + }, + description: metric.Key.Description); } } From 33079161af239a7605703e2d3146950f2ce52cc4 Mon Sep 17 00:00:00 2001 From: LGouellec Date: Thu, 5 Dec 2024 14:12:15 -0800 Subject: [PATCH 4/6] Add documentations --- .../OpenTelemetryMetricsExporter.cs | 11 ++++++-- .../{ => Internal}/OpenTelemetryRunner.cs | 6 ++--- .../OpenTelemetryConfigExtension.cs | 26 ++++++++++++++++++- .../{ => Internal}/PrometheusMetricServer.cs | 2 +- .../Internal/PrometheusMetricsExporter.cs | 7 +++++ .../PrometheusConfigExtension.cs | 22 ++++++++++++++++ 6 files changed, 66 insertions(+), 8 deletions(-) rename metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/{ => Internal}/OpenTelemetryMetricsExporter.cs (78%) rename metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/{ => Internal}/OpenTelemetryRunner.cs (86%) rename metrics/Streamiz.Kafka.Net.Metrics.Prometheus/{ => Internal}/PrometheusMetricServer.cs (99%) diff --git a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryMetricsExporter.cs b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/Internal/OpenTelemetryMetricsExporter.cs similarity index 78% rename from metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryMetricsExporter.cs rename to metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/Internal/OpenTelemetryMetricsExporter.cs index 8676f893..d8184645 100644 --- a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryMetricsExporter.cs +++ b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/Internal/OpenTelemetryMetricsExporter.cs @@ -3,15 +3,22 @@ using System.Diagnostics.Metrics; using System.Linq; -namespace Streamiz.Kafka.Net.Metrics.OpenTelemetry +namespace Streamiz.Kafka.Net.Metrics.OpenTelemetry.Internal { - public class OpenTelemetryMetricsExporter + /// + /// Export the metrics using an instance of + /// + internal class OpenTelemetryMetricsExporter { private Meter meter; private readonly IDictionary> gauges = new Dictionary>(); + /// + /// Expose the current sensors/metrics + /// + /// Sensors to emit public void ExposeMetrics(IEnumerable sensors) { GC.Collect(); diff --git a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryRunner.cs b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/Internal/OpenTelemetryRunner.cs similarity index 86% rename from metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryRunner.cs rename to metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/Internal/OpenTelemetryRunner.cs index 2ea11ae1..e6e0dca9 100644 --- a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryRunner.cs +++ b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/Internal/OpenTelemetryRunner.cs @@ -1,11 +1,9 @@ -using System.Diagnostics.Metrics; using System.Threading; -using OpenTelemetry; using OpenTelemetry.Metrics; -namespace Streamiz.Kafka.Net.Metrics.OpenTelemetry +namespace Streamiz.Kafka.Net.Metrics.OpenTelemetry.Internal { - public class OpenTelemetryRunner : IStreamMiddleware + internal class OpenTelemetryRunner : IStreamMiddleware { private readonly MeterProvider meterProvider; private readonly OpenTelemetryMetricsExporter openTelemetryMetricsExporter; diff --git a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryConfigExtension.cs b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryConfigExtension.cs index 62a52fbd..9bef9400 100644 --- a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryConfigExtension.cs +++ b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryConfigExtension.cs @@ -3,11 +3,26 @@ using OpenTelemetry.Exporter; using OpenTelemetry.Metrics; using OpenTelemetry.Resources; +using Streamiz.Kafka.Net.Metrics.OpenTelemetry.Internal; namespace Streamiz.Kafka.Net.Metrics.OpenTelemetry { + /// + /// Extensions class to configure the open telemetry reporter + /// public static class OpenTelemetryConfigExtension { + /// + /// This extension method configures an OpenTelemetry metrics reporter for a stream configuration. + /// It allows you to monitor and export application metrics, integrating OpenTelemetry with the stream processing system. + /// The method provides a customizable way to set up metric collection intervals, configure the OpenTelemetry MeterProvider, + /// and optionally expose statistics related to the underlying librdkafka library. + /// + /// The stream configuration object to which the OpenTelemetry metrics reporter will be added + /// The time interval (in milliseconds) at which metrics will be collected and reported. This is used to set the property of the configuration. + /// A delegate allowing additional customization of the MeterProviderBuilder. If not provided, the default setup is used. + /// Whether to expose statistics related to librdkafka. If set to true, additional configuration is applied to collect and export librdkafka statistics. + /// The configured stream instance (config) with the OpenTelemetry reporter integrated. public static IStreamConfig UseOpenTelemetryReporter( this IStreamConfig config, TimeSpan metricInterval, @@ -25,7 +40,6 @@ public static IStreamConfig UseOpenTelemetryReporter( options.Protocol = OtlpExportProtocol.Grpc; options.ExportProcessorType = ExportProcessorType.Batch; }); - //meterProviderBuilder.AddPrometheusHttpListener(); meterProviderBuilder.AddRuntimeInstrumentation(); actionMeterProviderBuilder?.Invoke(meterProviderBuilder); @@ -46,6 +60,16 @@ public static IStreamConfig UseOpenTelemetryReporter( return config; } + /// + /// This extension method configures an OpenTelemetry metrics reporter for a stream configuration. + /// It allows you to monitor and export application metrics, integrating OpenTelemetry with the stream processing system. + /// The method provides a customizable way to set up metric collection intervals, configure the OpenTelemetry MeterProvider, + /// and optionally expose statistics related to the underlying librdkafka library. + /// + /// The stream configuration object to which the OpenTelemetry metrics reporter will be added + /// A delegate allowing additional customization of the MeterProviderBuilder. If not provided, the default setup is used. + /// Whether to expose statistics related to librdkafka. If set to true, additional configuration is applied to collect and export librdkafka statistics. + /// The configured stream instance (config) with the OpenTelemetry reporter integrated. public static IStreamConfig UseOpenTelemetryReporter( this IStreamConfig config, Action actionMeterProviderBuilder = null, diff --git a/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/PrometheusMetricServer.cs b/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/Internal/PrometheusMetricServer.cs similarity index 99% rename from metrics/Streamiz.Kafka.Net.Metrics.Prometheus/PrometheusMetricServer.cs rename to metrics/Streamiz.Kafka.Net.Metrics.Prometheus/Internal/PrometheusMetricServer.cs index 716707a2..300f8d70 100644 --- a/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/PrometheusMetricServer.cs +++ b/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/Internal/PrometheusMetricServer.cs @@ -11,7 +11,7 @@ namespace Streamiz.Kafka.Net.Metrics.Prometheus { - public class PrometheusMetricServer : IDisposable + internal class PrometheusMetricServer : IDisposable { private readonly HttpListener _httpListener = new (); private static readonly object @lock = new(); diff --git a/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/Internal/PrometheusMetricsExporter.cs b/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/Internal/PrometheusMetricsExporter.cs index 4ca59f04..966a9fe0 100644 --- a/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/Internal/PrometheusMetricsExporter.cs +++ b/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/Internal/PrometheusMetricsExporter.cs @@ -2,6 +2,9 @@ namespace Streamiz.Kafka.Net.Metrics.Prometheus { + /// + /// Export the metrics through the prometheus exporter endpoint + /// internal class PrometheusMetricsExporter { private readonly PrometheusRunner prometheusRunner; @@ -11,6 +14,10 @@ public PrometheusMetricsExporter(PrometheusRunner prometheusRunner) this.prometheusRunner = prometheusRunner; } + /// + /// Expose the current sensors/metrics + /// + /// Sensors to emit public void ExposeMetrics(IEnumerable sensors) { prometheusRunner.Expose(sensors); diff --git a/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/PrometheusConfigExtension.cs b/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/PrometheusConfigExtension.cs index 969e2589..bbb0bddd 100644 --- a/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/PrometheusConfigExtension.cs +++ b/metrics/Streamiz.Kafka.Net.Metrics.Prometheus/PrometheusConfigExtension.cs @@ -2,8 +2,21 @@ namespace Streamiz.Kafka.Net.Metrics.Prometheus { + /// + /// Extensions class to configure the prometheus reporter + /// public static class PrometheusConfigExtension { + /// + /// This extension method configures a Prometheus metrics reporter for a stream configuration. + /// It integrates Prometheus-based monitoring into the stream processing system, allowing the application to expose metrics at a specific endpoint. + /// Additionally, it provides options to customize the metrics collection interval and expose statistics related to librdkafka. + /// + /// The stream configuration object to which the Prometheus metrics reporter will be added. + /// The time interval (in milliseconds) at which metrics will be collected and reported. This value is used to set the property of the configuration. + /// The port on which the Prometheus exporter will expose metrics. Default value is 9090. + /// A flag indicating whether librdkafka statistics should be exposed. If true, additional configuration is applied to collect and expose these statistics. + /// The configured stream instance (config) with the Prometheus reporter integrated. public static IStreamConfig UsePrometheusReporter( this IStreamConfig config, TimeSpan metricInterval, @@ -24,6 +37,15 @@ public static IStreamConfig UsePrometheusReporter( return config; } + /// + /// This extension method configures a Prometheus metrics reporter for a stream configuration. + /// It integrates Prometheus-based monitoring into the stream processing system, allowing the application to expose metrics at a specific endpoint. + /// Additionally, it provides options to customize the metrics collection interval and expose statistics related to librdkafka. + /// + /// The stream configuration object to which the Prometheus metrics reporter will be added. + /// The port on which the Prometheus exporter will expose metrics. Default value is 9090. + /// A flag indicating whether librdkafka statistics should be exposed. If true, additional configuration is applied to collect and expose these statistics. + /// The configured stream instance (config) with the Prometheus reporter integrated. public static IStreamConfig UsePrometheusReporter( this IStreamConfig config, int prometheusReporterEndpointPort, From 297cfa237771d54169263c031686562171996f66 Mon Sep 17 00:00:00 2001 From: LGouellec Date: Thu, 5 Dec 2024 14:50:14 -0800 Subject: [PATCH 5/6] Update ci/cd --- .github/workflows/build-ci.yml | 2 +- .github/workflows/integration.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build-ci.yml b/.github/workflows/build-ci.yml index 83ea3890..ad7a3428 100644 --- a/.github/workflows/build-ci.yml +++ b/.github/workflows/build-ci.yml @@ -28,7 +28,7 @@ jobs: - name: Setup .NET 7.0 uses: actions/setup-dotnet@v1 with: - dotnet-version: 7.0.20 + dotnet-version: 7.0.410 - name: Setup .NET 8.0 uses: actions/setup-dotnet@v1 with: diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index c87815d9..59118c91 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -2,7 +2,7 @@ name: integration on: push: - branches: [ integration ] + branches: [ master ] jobs: build: From 3818901b2cb4177bae6e292dcf80dce947c5639f Mon Sep 17 00:00:00 2001 From: LGouellec Date: Thu, 5 Dec 2024 15:06:36 -0800 Subject: [PATCH 6/6] Update .net 8 sdk version --- .github/workflows/build-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-ci.yml b/.github/workflows/build-ci.yml index ad7a3428..e0d27198 100644 --- a/.github/workflows/build-ci.yml +++ b/.github/workflows/build-ci.yml @@ -32,7 +32,7 @@ jobs: - name: Setup .NET 8.0 uses: actions/setup-dotnet@v1 with: - dotnet-version: 8.0.10 + dotnet-version: 8.0.404 # BEGIN Dependencies for RocksDB - run: sudo apt install -y libc6-dev libgflags-dev libsnappy-dev zlib1g-dev libbz2-dev liblz4-dev libzstd-dev - run: sudo apt install -y bzip2 lz4 librocksdb-dev