From 6850c4ca2e8dad83802491538e2c9d112df2f32a Mon Sep 17 00:00:00 2001 From: neuecc Date: Thu, 29 Feb 2024 02:42:43 +0900 Subject: [PATCH 1/2] WIP async chunk --- sandbox/ConsoleApp1/Program.cs | 31 +++------ src/R3/Operators/Chunk.cs | 84 +++++++++++++++++++++++ tests/R3.Tests/OperatorTests/ChunkTest.cs | 50 ++++++++++++++ 3 files changed, 144 insertions(+), 21 deletions(-) diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index 07023b95..2b1d35ac 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -1,6 +1,7 @@ using Microsoft.Extensions.Time.Testing; using R3; using System.ComponentModel.DataAnnotations; +using System.Reactive.Concurrency; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Text.Json; @@ -9,10 +10,6 @@ using System.Threading.Channels; -SynchronizationContext.SetSynchronizationContext(new MySyncContext()); - - -var channel = ChannelUtility.CreateSingleReadeWriterUnbounded(); //var t = Foo(); @@ -23,31 +20,23 @@ //t.Wait(); -var timeProvider = new FakeTimeProvider(); -var subject = new Subject(); -subject - .Do(x => Console.WriteLine($"Do:{Thread.CurrentThread.ManagedThreadId}")) - .SubscribeAwait(async (_, ct) => +Observable.Interval(TimeSpan.FromSeconds(1)) + .Index() + .Chunk(async (_, ct) => { - Console.WriteLine($"Before Await:{Thread.CurrentThread.ManagedThreadId}"); - await Task.Delay(TimeSpan.FromSeconds(1), timeProvider, ct); - Console.WriteLine($"After Yield:{Thread.CurrentThread.ManagedThreadId}"); - }, AwaitOperation.Sequential/*, configureAwait: false*/); - + await Task.Delay(TimeSpan.FromSeconds(0)); + }) + .Subscribe(x => + { + Console.WriteLine(string.Join(", ", x)); + }); -subject.OnNext(10); -subject.OnNext(20); -subject.OnNext(30); -timeProvider.Advance(TimeSpan.FromSeconds(1)); Console.ReadLine(); - - - internal static class ChannelUtility { static readonly UnboundedChannelOptions options = new UnboundedChannelOptions diff --git a/src/R3/Operators/Chunk.cs b/src/R3/Operators/Chunk.cs index 5f972e9f..f53142eb 100644 --- a/src/R3/Operators/Chunk.cs +++ b/src/R3/Operators/Chunk.cs @@ -32,6 +32,11 @@ public static Observable Chunk(this Observa { return new ChunkWindow(source, windowBoundaries); } + + public static Observable Chunk(this Observable source, Func asyncWindow, bool configureAwait = true) + { + return new ChunkAsync(source, asyncWindow, configureAwait); + } } // Count @@ -341,3 +346,82 @@ protected override void OnCompletedCore(Result result) } } } + +// Async +internal sealed class ChunkAsync(Observable source, Func asyncWindow, bool configureAwait) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return source.Subscribe(new _Chunk(observer, asyncWindow, configureAwait)); + } + + sealed class _Chunk(Observer observer, Func asyncWindow, bool configureAwait) : Observer + { + readonly List list = new List(); + CancellationTokenSource cancellationTokenSource = new(); + Task? runningTask; + + protected override void OnNextCore(T value) + { + lock (list) + { + list.Add(value); + if (runningTask == null) + { + runningTask = StartWindow(value); + } + } + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + cancellationTokenSource.Cancel(); + + lock (list) + { + if (list.Count > 0) + { + observer.OnNext(list.ToArray()); + list.Clear(); + } + } + + observer.OnCompleted(result); + } + + protected override void DisposeCore() + { + cancellationTokenSource.Cancel(); + } + + async Task StartWindow(T value) + { + try + { + await asyncWindow(value, cancellationTokenSource.Token).ConfigureAwait(configureAwait); + } + catch (Exception ex) + { + if (ex is OperationCanceledException oce && oce.CancellationToken == cancellationTokenSource.Token) + { + return; + } + OnErrorResume(ex); + } + finally + { + lock (list) + { + observer.OnNext(list.ToArray()); + list.Clear(); + runningTask = null; + } + } + } + } +} diff --git a/tests/R3.Tests/OperatorTests/ChunkTest.cs b/tests/R3.Tests/OperatorTests/ChunkTest.cs index e0926cb1..7862c6c8 100644 --- a/tests/R3.Tests/OperatorTests/ChunkTest.cs +++ b/tests/R3.Tests/OperatorTests/ChunkTest.cs @@ -252,4 +252,54 @@ public void ChunkFrameAndCount() list.AssertIsCompleted(); } + + // Async + [Fact] + public void ChunkAsync() + { + var publisher = new Subject(); + var tp = new FakeTimeProvider(); + var list = publisher.Chunk(async (x, ct) => + { + await Task.Delay(TimeSpan.FromSeconds(3), tp); + + }).ToLiveList(); + + publisher.OnNext(1); + publisher.OnNext(10); + publisher.OnNext(100); + list.AssertEqual([]); + + tp.Advance(3); + + list.AssertEqual([[1, 10, 100]]); + + publisher.OnNext(1000); + publisher.OnNext(10000); + list.AssertEqual([[1, 10, 100]]); + + tp.Advance(3); + list.AssertEqual([[1, 10, 100], [1000, 10000]]); + + publisher.OnNext(2); + publisher.OnNext(20); + publisher.OnNext(200); + + list.AssertEqual([[1, 10, 100], [1000, 10000]]); + + tp.Advance(1); + list.AssertEqual([[1, 10, 100], [1000, 10000]]); + + tp.Advance(2); + list.AssertEqual([[1, 10, 100], [1000, 10000], [2, 20, 200]]); + + publisher.OnNext(500); + + publisher.OnCompleted(); + + list.AssertEqual([[1, 10, 100], [1000, 10000], [2, 20, 200], [500]]); + list.AssertIsCompleted(); + } } + + From 98407204d83cea98dcb51b545c319e523d8a4844 Mon Sep 17 00:00:00 2001 From: neuecc Date: Thu, 29 Feb 2024 14:15:00 +0900 Subject: [PATCH 2/2] done --- sandbox/ConsoleApp1/Program.cs | 2 +- src/R3/Operators/Chunk.cs | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index 2b1d35ac..5b1ceaca 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -26,7 +26,7 @@ .Index() .Chunk(async (_, ct) => { - await Task.Delay(TimeSpan.FromSeconds(0)); + await Task.Delay(TimeSpan.FromSeconds(Random.Shared.Next(0, 5)), ct); }) .Subscribe(x => { diff --git a/src/R3/Operators/Chunk.cs b/src/R3/Operators/Chunk.cs index f53142eb..6c572035 100644 --- a/src/R3/Operators/Chunk.cs +++ b/src/R3/Operators/Chunk.cs @@ -359,16 +359,17 @@ sealed class _Chunk(Observer observer, Func list = new List(); CancellationTokenSource cancellationTokenSource = new(); - Task? runningTask; + bool isRunning; protected override void OnNextCore(T value) { lock (list) { list.Add(value); - if (runningTask == null) + if (!isRunning) { - runningTask = StartWindow(value); + isRunning = true; + StartWindow(value); } } } @@ -399,7 +400,7 @@ protected override void DisposeCore() cancellationTokenSource.Cancel(); } - async Task StartWindow(T value) + async void StartWindow(T value) { try { @@ -419,7 +420,7 @@ async Task StartWindow(T value) { observer.OnNext(list.ToArray()); list.Clear(); - runningTask = null; + isRunning = false; } } }