Skip to content

Commit

Permalink
Merge pull request #148 from Cysharp/chunk-count-skip
Browse files Browse the repository at this point in the history
Add Chunk(count, skip)
  • Loading branch information
neuecc authored Feb 29, 2024
2 parents d585f50 + dc203da commit e823bde
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 1 deletion.
68 changes: 67 additions & 1 deletion src/R3/Operators/Chunk.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace R3;
using System;

namespace R3;

public static partial class ObservableExtensions
{
Expand All @@ -8,6 +10,13 @@ public static Observable<T[]> Chunk<T>(this Observable<T> source, int count)
return new Chunk<T>(source, count);
}

public static Observable<T[]> Chunk<T>(this Observable<T> source, int count, int skip)
{
if (count <= 0) throw new ArgumentOutOfRangeException("count <= 0");
if (skip <= 0) return Chunk(source, count);
return new ChunkCountSkip<T>(source, count, skip);
}

public static Observable<T[]> Chunk<T>(this Observable<T> source, TimeSpan timeSpan)
{
return Chunk(source, timeSpan, ObservableSystem.DefaultTimeProvider);
Expand Down Expand Up @@ -80,6 +89,63 @@ protected override void OnCompletedCore(Result result)
}
}

// count + skip
internal sealed class ChunkCountSkip<T>(Observable<T> source, int count, int skip) : Observable<T[]>
{
protected override IDisposable SubscribeCore(Observer<T[]> observer)
{
return source.Subscribe(new _Chunk(observer, count, skip));
}

sealed class _Chunk(Observer<T[]> observer, int count, int skip) : Observer<T>
{
Queue<(int, T[])> q = new();
int queueIndex = -1; // start is -1.

protected override void OnNextCore(T value)
{
queueIndex++;

if (queueIndex % skip == 0)
{
q.Enqueue((0, new T[count]));
}

var len = q.Count;
for (int i = 0; i < len; i++)
{
var (index, array) = q.Dequeue();
array[index] = value;
index++;
if (index == count)
{
observer.OnNext(array);
}
else
{
q.Enqueue((index, array));
}
}
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
foreach (var (index, array) in q)
{
observer.OnNext(array.AsSpan(0, index).ToArray());
}
q.Clear();

observer.OnCompleted(result);
}
}
}

// Time
internal sealed class ChunkTime<T>(Observable<T> source, TimeSpan timeSpan, TimeProvider timeProvider) : Observable<T[]>
{
Expand Down
42 changes: 42 additions & 0 deletions tests/R3.Tests/OperatorTests/ChunkTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,48 @@ public void ChunkAsync()
list.AssertEqual([[1, 10, 100], [1000, 10000], [2, 20, 200], [500]]);
list.AssertIsCompleted();
}

// count + skip
[Fact]
public async Task ChunkCountSkip()
{
SynchronizationContext.SetSynchronizationContext(null);

{
var xs = await Observable.Range(1, 10).Chunk(3, 1).ToArrayAsync();

xs[0].Should().Equal(1, 2, 3);
xs[1].Should().Equal(2, 3, 4);
xs[2].Should().Equal(3, 4, 5);
xs[3].Should().Equal(4, 5, 6);
xs[4].Should().Equal(5, 6, 7);
xs[5].Should().Equal(6, 7, 8);
xs[6].Should().Equal(7, 8, 9);
xs[7].Should().Equal(8, 9, 10);
xs[8].Should().Equal(9, 10);
xs[9].Should().Equal(10);
}

// count == skip
{
var xs = await Observable.Range(1, 10).Chunk(3, 3).ToArrayAsync();

xs[0].Should().Equal(1, 2, 3);
xs[1].Should().Equal(4, 5, 6);
xs[2].Should().Equal(7, 8, 9);
xs[3].Should().Equal(10);
}

// count < skip
{
var xs = await Observable.Range(1, 20).Chunk(3, 5).ToArrayAsync();

xs[0].Should().Equal(1, 2, 3);
xs[1].Should().Equal(6, 7, 8);
xs[2].Should().Equal(11, 12, 13);
xs[3].Should().Equal(16, 17, 18);
}
}
}


0 comments on commit e823bde

Please sign in to comment.