Skip to content

Commit

Permalink
refactor: remove static fields for aggregators state (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
Seddryck authored Sep 21, 2024
1 parent 1a8ab6d commit c52e07f
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 14 deletions.
6 changes: 2 additions & 4 deletions Streamistry.Core/Pipes/Aggregators/Average.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ public AverageState<T> Append(T? value)

public readonly T? Select()
=> count > T.Zero ? total / count : default;

public static readonly AverageState<T> @Default = new();
}

public class Average<T> : Aggregator<T, AverageState<T>, T> where T : INumber<T>
Expand All @@ -40,7 +38,7 @@ protected Average(Expression<Action<Aggregator<T, AverageState<T>, T>>>? complet
: base(upstream
, (x, y) => x.Append(y)
, (x) => x.Select()
, AverageState<T>.Default
, new AverageState<T>()
, completion)
{ }
}
Expand All @@ -61,7 +59,7 @@ protected Average(Expression<Action<Aggregator<T, AverageState<U>, U>>>? complet
: base(upstream
, (x, y) => x.Append(y is null ? default : U.CreateChecked(y))
, (x) => x.Select()
, AverageState<U>.Default
, new AverageState<U>()
, completion)
{ }
}
4 changes: 1 addition & 3 deletions Streamistry.Core/Pipes/Aggregators/Max.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ public MaxState<T> Append(T? value)

public readonly T? Select()
=> IsEmpty ? default : Value;

public static readonly MaxState<T> @Default = new();
}

public class Max<TInput> : Aggregator<TInput, MaxState<TInput>, TInput> where TInput : INumber<TInput>
Expand All @@ -41,7 +39,7 @@ protected Max(Expression<Action<Aggregator<TInput, MaxState<TInput>, TInput>>>?
: base(upstream
, (x, y) => x.Append(y)
, (x) => x.Select()
, MaxState<TInput>.Default
, new MaxState<TInput>()
, completion)
{ }
}
6 changes: 2 additions & 4 deletions Streamistry.Core/Pipes/Aggregators/Median.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ public MedianState<T> Append(T? value)
return left;
return ((left / T.CreateChecked(2)) + (right / T.CreateChecked(2)) + (T.IsOddInteger(left) && T.IsOddInteger(right) ? T.One : T.Zero));
}

public static readonly MedianState<T> @Default = new();
}

public class Median<TInput> : Aggregator<TInput, MedianState<TInput>, TInput> where TInput : INumber<TInput>
Expand All @@ -54,7 +52,7 @@ protected Median(Expression<Action<Aggregator<TInput, MedianState<TInput>, TInpu
: base(upstream
, (x, y) => x.Append(y)
, (x) => x.Select()
, MedianState<TInput>.Default
, new MedianState<TInput>()
, completion)
{ }
}
Expand All @@ -71,7 +69,7 @@ protected Median(Expression<Action<Aggregator<TInput, MedianState<TOuput>, TOupu
: base(upstream
, (x, y) => x.Append(y is null ? default : TOuput.CreateChecked(y))
, (x) => x.Select()
, MedianState<TOuput>.Default
, new MedianState<TOuput>()
, completion)
{ }
}
Expand Down
4 changes: 1 addition & 3 deletions Streamistry.Core/Pipes/Aggregators/Min.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ public MinState<T> Append(T? value)

public readonly T? Select()
=> IsEmpty ? default : Value;

public static readonly MinState<T> @Default = new();
}

public class Min<TInput> : Aggregator<TInput, MinState<TInput>, TInput> where TInput : INumber<TInput>
Expand All @@ -41,7 +39,7 @@ protected Min(Expression<Action<Aggregator<TInput, MinState<TInput>, TInput>>>?
: base(upstream
, (x, y) => x.Append(y)
, (x) => x.Select()
, MinState<TInput>.Default
, new MinState<TInput>()
, completion)
{ }
}
4 changes: 4 additions & 0 deletions Streamistry.Core/Streamistry.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,8 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<Folder Include="Pipes\Windows\" />
</ItemGroup>
</Project>
15 changes: 15 additions & 0 deletions Streamistry.Testing/Pipes/Aggregators/MaxTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using NUnit.Framework;
using Streamistry.Pipes.Aggregators;
using Streamistry.Pipes.Sources;
using Streamistry.Testability;

namespace Streamistry.Testing.Pipes.Aggregators;
Expand Down Expand Up @@ -34,4 +35,18 @@ public void Emit_ManyElements_CorrectResults()
Assert.That(aggregator.EmitAndGetOutput(22), Is.EqualTo(22));
Assert.That(aggregator.EmitAndGetOutput(10), Is.EqualTo(22));
}

[Test]
public void Emit_TwoAggregatorsInParallel_CorrectResults()
{
var source = new EnumerableSource<int>(new[] { 15, 22, 10 });
var pipeline = new Pipeline(source);
var aggregator1 = new Max<int>(source);
var filter = new Filter<int>(source, x => x < 20);
var aggregator2 = new Max<int>(filter);

var data = aggregator2.GetOutputs(pipeline.Start);
Assert.That(data, Does.Not.Contain(22));
Assert.That(data, Does.Contain(15));
}
}

0 comments on commit c52e07f

Please sign in to comment.