From dd57ad521ca760c0cc6309dc7c283ac1b4e7f2cf Mon Sep 17 00:00:00 2001 From: Robert Coltheart Date: Thu, 11 Apr 2024 08:07:41 +1000 Subject: [PATCH] fix: prevent hanging if stop called from multiple threads --- src/KafkaFlow/KafkaBus.cs | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/KafkaFlow/KafkaBus.cs b/src/KafkaFlow/KafkaBus.cs index 246fd5f41..205f26386 100644 --- a/src/KafkaFlow/KafkaBus.cs +++ b/src/KafkaFlow/KafkaBus.cs @@ -18,6 +18,8 @@ internal class KafkaBus : IKafkaBus private readonly List _consumerManagers = new(); + private bool _stopped; + public KafkaBus( IDependencyResolver dependencyResolver, KafkaConfiguration configuration, @@ -73,12 +75,22 @@ public async Task StartAsync(CancellationToken stopCancellationToken = default) public Task StopAsync() { - foreach (var cluster in _configuration.Clusters) + lock (_consumerManagers) { - cluster.OnStoppingHandler(_dependencyResolver); - } + if (_stopped) + { + return Task.CompletedTask; + } - return Task.WhenAll(_consumerManagers.Select(x => x.StopAsync())); + _stopped = true; + + foreach (var cluster in _configuration.Clusters) + { + cluster.OnStoppingHandler(_dependencyResolver); + } + + return Task.WhenAll(_consumerManagers.Select(x => x.StopAsync())); + } } private async Task CreateMissingClusterTopics(ClusterConfiguration cluster)