diff --git a/Streamistry.Core/Pipes/Aggregators/Average.cs b/Streamistry.Core/Pipes/Aggregators/Average.cs index 346cd6e..ce8bd1b 100644 --- a/Streamistry.Core/Pipes/Aggregators/Average.cs +++ b/Streamistry.Core/Pipes/Aggregators/Average.cs @@ -22,8 +22,6 @@ public AverageState Append(T? value) public readonly T? Select() => count > T.Zero ? total / count : default; - - public static readonly AverageState @Default = new(); } public class Average : Aggregator, T> where T : INumber @@ -40,7 +38,7 @@ protected Average(Expression, T>>>? complet : base(upstream , (x, y) => x.Append(y) , (x) => x.Select() - , AverageState.Default + , new AverageState() , completion) { } } @@ -61,7 +59,7 @@ protected Average(Expression, U>>>? complet : base(upstream , (x, y) => x.Append(y is null ? default : U.CreateChecked(y)) , (x) => x.Select() - , AverageState.Default + , new AverageState() , completion) { } } diff --git a/Streamistry.Core/Pipes/Aggregators/Max.cs b/Streamistry.Core/Pipes/Aggregators/Max.cs index d9d0586..58fd4d3 100644 --- a/Streamistry.Core/Pipes/Aggregators/Max.cs +++ b/Streamistry.Core/Pipes/Aggregators/Max.cs @@ -23,8 +23,6 @@ public MaxState Append(T? value) public readonly T? Select() => IsEmpty ? default : Value; - - public static readonly MaxState @Default = new(); } public class Max : Aggregator, TInput> where TInput : INumber @@ -41,7 +39,7 @@ protected Max(Expression, TInput>>>? : base(upstream , (x, y) => x.Append(y) , (x) => x.Select() - , MaxState.Default + , new MaxState() , completion) { } } diff --git a/Streamistry.Core/Pipes/Aggregators/Median.cs b/Streamistry.Core/Pipes/Aggregators/Median.cs index 470fcfb..61eaaf3 100644 --- a/Streamistry.Core/Pipes/Aggregators/Median.cs +++ b/Streamistry.Core/Pipes/Aggregators/Median.cs @@ -40,8 +40,6 @@ public MedianState Append(T? value) return left; return ((left / T.CreateChecked(2)) + (right / T.CreateChecked(2)) + (T.IsOddInteger(left) && T.IsOddInteger(right) ? T.One : T.Zero)); } - - public static readonly MedianState @Default = new(); } public class Median : Aggregator, TInput> where TInput : INumber @@ -54,7 +52,7 @@ protected Median(Expression, TInpu : base(upstream , (x, y) => x.Append(y) , (x) => x.Select() - , MedianState.Default + , new MedianState() , completion) { } } @@ -71,7 +69,7 @@ protected Median(Expression, TOupu : base(upstream , (x, y) => x.Append(y is null ? default : TOuput.CreateChecked(y)) , (x) => x.Select() - , MedianState.Default + , new MedianState() , completion) { } } diff --git a/Streamistry.Core/Pipes/Aggregators/Min.cs b/Streamistry.Core/Pipes/Aggregators/Min.cs index ca7c158..8a83b8e 100644 --- a/Streamistry.Core/Pipes/Aggregators/Min.cs +++ b/Streamistry.Core/Pipes/Aggregators/Min.cs @@ -23,8 +23,6 @@ public MinState Append(T? value) public readonly T? Select() => IsEmpty ? default : Value; - - public static readonly MinState @Default = new(); } public class Min : Aggregator, TInput> where TInput : INumber @@ -41,7 +39,7 @@ protected Min(Expression, TInput>>>? : base(upstream , (x, y) => x.Append(y) , (x) => x.Select() - , MinState.Default + , new MinState() , completion) { } } diff --git a/Streamistry.Core/Streamistry.csproj b/Streamistry.Core/Streamistry.csproj index 19f02a9..5152279 100644 --- a/Streamistry.Core/Streamistry.csproj +++ b/Streamistry.Core/Streamistry.csproj @@ -30,4 +30,8 @@ runtime; build; native; contentfiles; analyzers + + + + diff --git a/Streamistry.Testing/Pipes/Aggregators/MaxTests.cs b/Streamistry.Testing/Pipes/Aggregators/MaxTests.cs index 9df0e3e..c6fcd19 100644 --- a/Streamistry.Testing/Pipes/Aggregators/MaxTests.cs +++ b/Streamistry.Testing/Pipes/Aggregators/MaxTests.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using NUnit.Framework; using Streamistry.Pipes.Aggregators; +using Streamistry.Pipes.Sources; using Streamistry.Testability; namespace Streamistry.Testing.Pipes.Aggregators; @@ -34,4 +35,18 @@ public void Emit_ManyElements_CorrectResults() Assert.That(aggregator.EmitAndGetOutput(22), Is.EqualTo(22)); Assert.That(aggregator.EmitAndGetOutput(10), Is.EqualTo(22)); } + + [Test] + public void Emit_TwoAggregatorsInParallel_CorrectResults() + { + var source = new EnumerableSource(new[] { 15, 22, 10 }); + var pipeline = new Pipeline(source); + var aggregator1 = new Max(source); + var filter = new Filter(source, x => x < 20); + var aggregator2 = new Max(filter); + + var data = aggregator2.GetOutputs(pipeline.Start); + Assert.That(data, Does.Not.Contain(22)); + Assert.That(data, Does.Contain(15)); + } }