diff --git a/src/R3/Operators/Chunk.cs b/src/R3/Operators/Chunk.cs index 0c456b29..e037a4f5 100644 --- a/src/R3/Operators/Chunk.cs +++ b/src/R3/Operators/Chunk.cs @@ -1,4 +1,6 @@ -namespace R3; +using System; + +namespace R3; public static partial class ObservableExtensions { @@ -8,6 +10,13 @@ public static Observable Chunk(this Observable source, int count) return new Chunk(source, count); } + public static Observable Chunk(this Observable source, int count, int skip) + { + if (count <= 0) throw new ArgumentOutOfRangeException("count <= 0"); + if (skip <= 0) return Chunk(source, count); + return new ChunkCountSkip(source, count, skip); + } + public static Observable Chunk(this Observable source, TimeSpan timeSpan) { return Chunk(source, timeSpan, ObservableSystem.DefaultTimeProvider); @@ -80,6 +89,63 @@ protected override void OnCompletedCore(Result result) } } +// count + skip +internal sealed class ChunkCountSkip(Observable source, int count, int skip) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return source.Subscribe(new _Chunk(observer, count, skip)); + } + + sealed class _Chunk(Observer observer, int count, int skip) : Observer + { + Queue<(int, T[])> q = new(); + int queueIndex = -1; // start is -1. + + protected override void OnNextCore(T value) + { + queueIndex++; + + if (queueIndex % skip == 0) + { + q.Enqueue((0, new T[count])); + } + + var len = q.Count; + for (int i = 0; i < len; i++) + { + var (index, array) = q.Dequeue(); + array[index] = value; + index++; + if (index == count) + { + observer.OnNext(array); + } + else + { + q.Enqueue((index, array)); + } + } + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + foreach (var (index, array) in q) + { + observer.OnNext(array.AsSpan(0, index).ToArray()); + } + q.Clear(); + + observer.OnCompleted(result); + } + } +} + // Time internal sealed class ChunkTime(Observable source, TimeSpan timeSpan, TimeProvider timeProvider) : Observable { diff --git a/tests/R3.Tests/OperatorTests/ChunkTest.cs b/tests/R3.Tests/OperatorTests/ChunkTest.cs index 970372bf..aa4e619a 100644 --- a/tests/R3.Tests/OperatorTests/ChunkTest.cs +++ b/tests/R3.Tests/OperatorTests/ChunkTest.cs @@ -302,6 +302,48 @@ public void ChunkAsync() list.AssertEqual([[1, 10, 100], [1000, 10000], [2, 20, 200], [500]]); list.AssertIsCompleted(); } + + // count + skip + [Fact] + public async Task ChunkCountSkip() + { + SynchronizationContext.SetSynchronizationContext(null); + + { + var xs = await Observable.Range(1, 10).Chunk(3, 1).ToArrayAsync(); + + xs[0].Should().Equal(1, 2, 3); + xs[1].Should().Equal(2, 3, 4); + xs[2].Should().Equal(3, 4, 5); + xs[3].Should().Equal(4, 5, 6); + xs[4].Should().Equal(5, 6, 7); + xs[5].Should().Equal(6, 7, 8); + xs[6].Should().Equal(7, 8, 9); + xs[7].Should().Equal(8, 9, 10); + xs[8].Should().Equal(9, 10); + xs[9].Should().Equal(10); + } + + // count == skip + { + var xs = await Observable.Range(1, 10).Chunk(3, 3).ToArrayAsync(); + + xs[0].Should().Equal(1, 2, 3); + xs[1].Should().Equal(4, 5, 6); + xs[2].Should().Equal(7, 8, 9); + xs[3].Should().Equal(10); + } + + // count < skip + { + var xs = await Observable.Range(1, 20).Chunk(3, 5).ToArrayAsync(); + + xs[0].Should().Equal(1, 2, 3); + xs[1].Should().Equal(6, 7, 8); + xs[2].Should().Equal(11, 12, 13); + xs[3].Should().Equal(16, 17, 18); + } + } }