Skip to content

Commit

Permalink
Merge pull request #143 from Cysharp/chunk-async
Browse files Browse the repository at this point in the history
Add Chunk(Func<ValueTask>)
  • Loading branch information
neuecc authored Feb 29, 2024
2 parents bedbe00 + 9840720 commit ab76681
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 21 deletions.
31 changes: 10 additions & 21 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.Extensions.Time.Testing;
using R3;
using System.ComponentModel.DataAnnotations;
using System.Reactive.Concurrency;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text.Json;
Expand All @@ -9,10 +10,6 @@
using System.Threading.Channels;


SynchronizationContext.SetSynchronizationContext(new MySyncContext());


var channel = ChannelUtility.CreateSingleReadeWriterUnbounded<int>();

//var t = Foo();

Expand All @@ -23,31 +20,23 @@


//t.Wait();
var timeProvider = new FakeTimeProvider();

var subject = new Subject<int>();

subject
.Do(x => Console.WriteLine($"Do:{Thread.CurrentThread.ManagedThreadId}"))
.SubscribeAwait(async (_, ct) =>
Observable.Interval(TimeSpan.FromSeconds(1))
.Index()
.Chunk(async (_, ct) =>
{
Console.WriteLine($"Before Await:{Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(TimeSpan.FromSeconds(1), timeProvider, ct);
Console.WriteLine($"After Yield:{Thread.CurrentThread.ManagedThreadId}");
}, AwaitOperation.Sequential/*, configureAwait: false*/);

await Task.Delay(TimeSpan.FromSeconds(Random.Shared.Next(0, 5)), ct);
})
.Subscribe(x =>
{
Console.WriteLine(string.Join(", ", x));
});

subject.OnNext(10);
subject.OnNext(20);
subject.OnNext(30);

timeProvider.Advance(TimeSpan.FromSeconds(1));
Console.ReadLine();





internal static class ChannelUtility
{
static readonly UnboundedChannelOptions options = new UnboundedChannelOptions
Expand Down
85 changes: 85 additions & 0 deletions src/R3/Operators/Chunk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public static Observable<TSource[]> Chunk<TSource, TWindowBoundary>(this Observa
{
return new ChunkWindow<TSource, TWindowBoundary>(source, windowBoundaries);
}

public static Observable<T[]> Chunk<T>(this Observable<T> source, Func<T, CancellationToken, ValueTask> asyncWindow, bool configureAwait = true)
{
return new ChunkAsync<T>(source, asyncWindow, configureAwait);
}
}

// Count
Expand Down Expand Up @@ -341,3 +346,83 @@ protected override void OnCompletedCore(Result result)
}
}
}

// Async
internal sealed class ChunkAsync<T>(Observable<T> source, Func<T, CancellationToken, ValueTask> asyncWindow, bool configureAwait) : Observable<T[]>
{
protected override IDisposable SubscribeCore(Observer<T[]> observer)
{
return source.Subscribe(new _Chunk(observer, asyncWindow, configureAwait));
}

sealed class _Chunk(Observer<T[]> observer, Func<T, CancellationToken, ValueTask> asyncWindow, bool configureAwait) : Observer<T>
{
readonly List<T> list = new List<T>();
CancellationTokenSource cancellationTokenSource = new();
bool isRunning;

protected override void OnNextCore(T value)
{
lock (list)
{
list.Add(value);
if (!isRunning)
{
isRunning = true;
StartWindow(value);
}
}
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
cancellationTokenSource.Cancel();

lock (list)
{
if (list.Count > 0)
{
observer.OnNext(list.ToArray());
list.Clear();
}
}

observer.OnCompleted(result);
}

protected override void DisposeCore()
{
cancellationTokenSource.Cancel();
}

async void StartWindow(T value)
{
try
{
await asyncWindow(value, cancellationTokenSource.Token).ConfigureAwait(configureAwait);
}
catch (Exception ex)
{
if (ex is OperationCanceledException oce && oce.CancellationToken == cancellationTokenSource.Token)
{
return;
}
OnErrorResume(ex);
}
finally
{
lock (list)
{
observer.OnNext(list.ToArray());
list.Clear();
isRunning = false;
}
}
}
}
}
50 changes: 50 additions & 0 deletions tests/R3.Tests/OperatorTests/ChunkTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -252,4 +252,54 @@ public void ChunkFrameAndCount()

list.AssertIsCompleted();
}

// Async
[Fact]
public void ChunkAsync()
{
var publisher = new Subject<int>();
var tp = new FakeTimeProvider();
var list = publisher.Chunk(async (x, ct) =>
{
await Task.Delay(TimeSpan.FromSeconds(3), tp);

}).ToLiveList();

publisher.OnNext(1);
publisher.OnNext(10);
publisher.OnNext(100);
list.AssertEqual([]);

tp.Advance(3);

list.AssertEqual([[1, 10, 100]]);

publisher.OnNext(1000);
publisher.OnNext(10000);
list.AssertEqual([[1, 10, 100]]);

tp.Advance(3);
list.AssertEqual([[1, 10, 100], [1000, 10000]]);

publisher.OnNext(2);
publisher.OnNext(20);
publisher.OnNext(200);

list.AssertEqual([[1, 10, 100], [1000, 10000]]);

tp.Advance(1);
list.AssertEqual([[1, 10, 100], [1000, 10000]]);

tp.Advance(2);
list.AssertEqual([[1, 10, 100], [1000, 10000], [2, 20, 200]]);

publisher.OnNext(500);

publisher.OnCompleted();

list.AssertEqual([[1, 10, 100], [1000, 10000], [2, 20, 200], [500]]);
list.AssertIsCompleted();
}
}


0 comments on commit ab76681

Please sign in to comment.