Skip to content

Commit

Permalink
fix: prevent hanging if stop called from multiple threads
Browse files Browse the repository at this point in the history
  • Loading branch information
robertcoltheart authored and ruiqbarbosa committed Apr 15, 2024
1 parent 6ede5c7 commit dd57ad5
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions src/KafkaFlow/KafkaBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ internal class KafkaBus : IKafkaBus

private readonly List<IConsumerManager> _consumerManagers = new();

private bool _stopped;

public KafkaBus(
IDependencyResolver dependencyResolver,
KafkaConfiguration configuration,
Expand Down Expand Up @@ -73,12 +75,22 @@ public async Task StartAsync(CancellationToken stopCancellationToken = default)

public Task StopAsync()
{
foreach (var cluster in _configuration.Clusters)
lock (_consumerManagers)
{
cluster.OnStoppingHandler(_dependencyResolver);
}
if (_stopped)
{
return Task.CompletedTask;
}

return Task.WhenAll(_consumerManagers.Select(x => x.StopAsync()));
_stopped = true;

foreach (var cluster in _configuration.Clusters)
{
cluster.OnStoppingHandler(_dependencyResolver);
}

return Task.WhenAll(_consumerManagers.Select(x => x.StopAsync()));
}
}

private async Task CreateMissingClusterTopics(ClusterConfiguration cluster)
Expand Down

0 comments on commit dd57ad5

Please sign in to comment.