From 1542899ed70ffd2d61c615643bde953030e47724 Mon Sep 17 00:00:00 2001 From: Pavel Anpin Date: Tue, 14 Jan 2025 16:49:41 -0300 Subject: [PATCH 1/2] complete MergeHub Sink gracefully on NormalShutdownException --- src/core/Akka.Streams.Tests/Dsl/HubSpec.cs | 31 ++++++++++++++++++++++ src/core/Akka.Streams/Dsl/Hub.cs | 8 ++++-- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs b/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs index 42b7a345aae..e362a340b46 100644 --- a/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs @@ -24,6 +24,7 @@ using FluentAssertions.Extensions; using Xunit.Abstractions; using static FluentAssertions.FluentActions; +using Akka.Streams.Implementation; namespace Akka.Streams.Tests.Dsl { @@ -276,6 +277,36 @@ await EventFilter.Error(contains: "Upstream producer failed with exception").Exp }, Materializer); } + [Fact] + public async Task MergeHub_must_not_log_normal_shutdown_exception() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var (sink, task) = MergeHub.Source(16).Take(10).ToMaterialized(Sink.Seq(), Keep.Both).Run(Materializer); + + await WithinAsync(10.Seconds(), async () => + { + await EventFilter + .Custom((e) => + { + if (e.Cause?.InnerException is NormalShutdownException nse && nse == ActorPublisher.NormalShutdownReason) + return true; + else + return false; + }) + + // await EventFilter.Error(contains: ActorPublisher.NormalShutdownReasonMessage) + .ExpectAsync(0, async () => + { + Source.Failed(ActorPublisher.NormalShutdownReason).RunWith(sink, Materializer); + Source.From(Enumerable.Range(1, 10)).RunWith(sink, Materializer); + var result = await task.ShouldCompleteWithin(3.Seconds()); + result.Should().BeEquivalentTo(Enumerable.Range(1, 10)); + }); + }); + }, Materializer); + } + [Fact] public async Task BroadcastHub_must_work_in_the_happy_case() { diff --git a/src/core/Akka.Streams/Dsl/Hub.cs b/src/core/Akka.Streams/Dsl/Hub.cs index 49dfc19359a..cdc087be1d1 100644 --- a/src/core/Akka.Streams/Dsl/Hub.cs +++ b/src/core/Akka.Streams/Dsl/Hub.cs @@ -380,8 +380,12 @@ private void PullWithDemand() // Make some noise public override void OnUpstreamFailure(Exception e) { - throw new MergeHub.ProducerFailed( - "Upstream producer failed with exception, removing from MergeHub now", e); + if(e is Implementation.NormalShutdownException) + CompleteStage(); + else { + throw new MergeHub.ProducerFailed( + "Upstream producer failed with exception, removing from MergeHub now", e); + } } private void OnDemand(long moreDemand) From 952db1d1261f26a9f008f7232f7358fa024ea6f4 Mon Sep 17 00:00:00 2001 From: Pavel Anpin Date: Tue, 21 Jan 2025 10:31:33 -0300 Subject: [PATCH 2/2] cleanup comments and lint --- src/core/Akka.Streams.Tests/Dsl/HubSpec.cs | 28 +++++++++++----------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs b/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs index 7a5a2e1291a..7c65dc603fa 100644 --- a/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs @@ -277,32 +277,32 @@ await EventFilter.Error(contains: "Upstream producer failed with exception").Exp }, Materializer); } - [Fact] + [Fact] public async Task MergeHub_must_not_log_normal_shutdown_exception() { await this.AssertAllStagesStoppedAsync(async () => { - var (sink, task) = MergeHub.Source(16).Take(10).ToMaterialized(Sink.Seq(), Keep.Both).Run(Materializer); + var (sink, task) = MergeHub.Source(16).Take(10).ToMaterialized(Sink.Seq(), Keep.Both) + .Run(Materializer); await WithinAsync(10.Seconds(), async () => { await EventFilter - .Custom((e) => - { - if (e.Cause?.InnerException is NormalShutdownException nse && nse == ActorPublisher.NormalShutdownReason) + .Custom((e) => + { + if (e.Cause?.InnerException is NormalShutdownException nse && + nse == ActorPublisher.NormalShutdownReason) return true; - else + else return false; }) - - // await EventFilter.Error(contains: ActorPublisher.NormalShutdownReasonMessage) .ExpectAsync(0, async () => - { - Source.Failed(ActorPublisher.NormalShutdownReason).RunWith(sink, Materializer); - Source.From(Enumerable.Range(1, 10)).RunWith(sink, Materializer); - var result = await task.ShouldCompleteWithin(3.Seconds()); - result.Should().BeEquivalentTo(Enumerable.Range(1, 10)); - }); + { + Source.Failed(ActorPublisher.NormalShutdownReason).RunWith(sink, Materializer); + Source.From(Enumerable.Range(1, 10)).RunWith(sink, Materializer); + var result = await task.ShouldCompleteWithin(3.Seconds()); + result.Should().BeEquivalentTo(Enumerable.Range(1, 10)); + }); }); }, Materializer); }