From ab6df11f0d3d1cde0459724fdd56e3c38c5c3954 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Por=C4=99bski?= Date: Mon, 18 Nov 2024 16:49:16 +0100 Subject: [PATCH 1/3] allow to pass logger factory to stream config so that new one isn't created each time --- core/StreamConfig.cs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/core/StreamConfig.cs b/core/StreamConfig.cs index c92ab851..664f06c9 100644 --- a/core/StreamConfig.cs +++ b/core/StreamConfig.cs @@ -2282,7 +2282,7 @@ public StreamConfig() /// /// /// Dictionary of stream properties - public StreamConfig(IDictionary properties) + public StreamConfig(IDictionary properties, ILoggerFactory loggerFactory = null) { InitializeReflectedProperties(); @@ -2329,17 +2329,22 @@ public StreamConfig(IDictionary properties) PartitionAssignmentStrategy = Confluent.Kafka.PartitionAssignmentStrategy.Range; Partitioner = Confluent.Kafka.Partitioner.Murmur2Random; - Logger = LoggerFactory.Create(builder => - { - builder.SetMinimumLevel(LogLevel.Information); - builder.AddConsole(); - }); - + Logger = loggerFactory; + if (properties != null) { foreach (var k in properties) AddConfig(k.Key, k.Value); } + + if (Logger != null) + return; + + Logger = LoggerFactory.Create(builder => + { + builder.SetMinimumLevel(LogLevel.Information); + builder.AddConsole(); + }); } #endregion @@ -3403,8 +3408,9 @@ public StreamConfig() : this(null) { } /// /// /// Dictionary of stream properties - public StreamConfig(IDictionary properties) - : base(properties) + /// LoggerFactory for creating loggers. If not provided new instance will be created + public StreamConfig(IDictionary properties, ILoggerFactory loggerFactory = null) + : base(properties, loggerFactory) { DefaultKeySerDes = new KS(); DefaultValueSerDes = new VS(); From d0f0c2c5f0c5c845add4bb99f6e48d75984775af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Por=C4=99bski?= Date: Tue, 19 Nov 2024 10:32:15 +0100 Subject: [PATCH 2/3] clear changelogReader on thread shutdown --- core/Processors/StreamThread.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/Processors/StreamThread.cs b/core/Processors/StreamThread.cs index 680e8362..ae99e8db 100644 --- a/core/Processors/StreamThread.cs +++ b/core/Processors/StreamThread.cs @@ -566,6 +566,8 @@ private void CompleteShutdown() consumer.Close(); consumer.Dispose(); + changelogReader.Clear(); + streamMetricsRegistry.RemoveThreadSensors(threadId); log.LogInformation($"{logPrefix}Shutdown complete"); IsDisposable = true; From 57a4e4235e789097dd63b962499720779007fc24 Mon Sep 17 00:00:00 2001 From: maciejniepokoj <168194780+maciejniepokoj@users.noreply.github.com> Date: Thu, 12 Dec 2024 23:08:38 +0000 Subject: [PATCH 3/3] remove reduntant call --- core/Processors/StreamThread.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/Processors/StreamThread.cs b/core/Processors/StreamThread.cs index ae99e8db..680e8362 100644 --- a/core/Processors/StreamThread.cs +++ b/core/Processors/StreamThread.cs @@ -566,8 +566,6 @@ private void CompleteShutdown() consumer.Close(); consumer.Dispose(); - changelogReader.Clear(); - streamMetricsRegistry.RemoveThreadSensors(threadId); log.LogInformation($"{logPrefix}Shutdown complete"); IsDisposable = true;