From 45904b00ec14aba7eee4bb10f03c03277b9db3b3 Mon Sep 17 00:00:00 2001 From: sakno Date: Tue, 25 Jun 2024 12:54:08 +0300 Subject: [PATCH] Release of DotNext.Threading 5.8.0 --- CHANGELOG.md | 5 + README.md | 28 +--- .../Threading/AsyncBridgeTests.cs | 53 ++++++++ .../DotNext.Threading.csproj | 2 +- .../AsyncBridge.CancellationToken.cs | 123 ++++++++++++++++++ .../Threading/AsyncBridge.WaitHandle.cs | 7 - .../Threading/AsyncBridge.cs | 96 +++++++++++--- .../Tasks/ValueTaskCompletionSource.T.cs | 2 +- 8 files changed, 268 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b6f10ac81..50dae742ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ Release Notes ==== +# 06-25-2024 +DotNext.Threading 5.8.0 +* Introduced `WaitAnyAsync` method to wait on a group of cancellation tokens +* Added cancellation support for `WaitAsync` extension method for [WaitHandle](https://learn.microsoft.com/en-us/dotnet/api/system.threading.waithandle) class + # 06-20-2024 DotNext 5.7.0 * `Timestamp.ElapsedTicks` returns a value that is always greater than zero diff --git a/README.md b/README.md index 1a775ee2b7..15a141b4e4 100644 --- a/README.md +++ b/README.md @@ -44,31 +44,11 @@ All these things are implemented in 100% managed code on top of existing .NET AP * [NuGet Packages](https://www.nuget.org/profiles/rvsakno) # What's new -Release Date: 06-20-2024 +Release Date: 06-25-2024 -DotNext 5.7.0 -* `Timestamp.ElapsedTicks` returns a value that is always greater than zero -* Fixed hidden copies of value types caused by **in** modifier -* Added support of [TimeProvider](https://learn.microsoft.com/en-us/dotnet/api/system.timeprovider) to `Timestamp` and `Timeout` types - -DotNext.Metaprogramming 5.7.0 -* Updated dependencies - -DotNext.Unsafe 5.7.0 -* Updated dependencies - -DotNext.Threading 5.7.0 -* Fixed [241](https://github.com/dotnet/dotNext/issues/241) -* Introduced API for implementing leases, see `DotNext.Threading.Leases` namespace - -DotNext.IO 5.7.0 -* Various performance improvements - -DotNext.Net.Cluster 5.7.0 -* Fixed [242](https://github.com/dotnet/dotNext/issues/242) - -DotNext.AspNetCore.Cluster 5.7.0 -* Fixed [242](https://github.com/dotnet/dotNext/issues/242) +DotNext.Threading 5.8.0 +* Introduced `WaitAnyAsync` method to wait on a group of cancellation tokens +* Added cancellation support for `WaitAsync` extension method for [WaitHandle](https://learn.microsoft.com/en-us/dotnet/api/system.threading.waithandle) class Changelog for previous versions located [here](./CHANGELOG.md). diff --git a/src/DotNext.Tests/Threading/AsyncBridgeTests.cs b/src/DotNext.Tests/Threading/AsyncBridgeTests.cs index 38010dd227..de479e93b5 100644 --- a/src/DotNext.Tests/Threading/AsyncBridgeTests.cs +++ b/src/DotNext.Tests/Threading/AsyncBridgeTests.cs @@ -26,11 +26,25 @@ public static async Task WaitForSignal() await ev.WaitAsync(DefaultTimeout); } + [Fact] + public static async Task CancelWaitForSignal() + { + using var ev = new ManualResetEvent(false); + using var cts = new CancellationTokenSource(); + + var task = ev.WaitAsync(cts.Token).AsTask(); + cts.Cancel(); + + var e = await ThrowsAsync(Func.Constant(task)); + Equal(cts.Token, e.CancellationToken); + } + [Fact] public static async Task AlreadySignaled() { using var ev = new ManualResetEvent(true); True(await ev.WaitAsync(DefaultTimeout)); + True(ev.WaitAsync().IsCompletedSuccessfully); } [Fact] @@ -68,4 +82,43 @@ public static async Task CancellationTokenAwaitCornerCases() await new CancellationToken(true).WaitAsync(); await ThrowsAsync(new CancellationToken(false).WaitAsync().AsTask); } + + [Fact] + public static async Task WaitForCancellationSingleToken() + { + using var cts = new CancellationTokenSource(); + var task = AsyncBridge.WaitAnyAsync([cts.Token]); + False(task.IsCompletedSuccessfully); + + cts.Cancel(); + Equal(cts.Token, await task); + } + + [Fact] + public static async Task WaitForCancellationTwoTokens() + { + using var cts1 = new CancellationTokenSource(); + using var cts2 = new CancellationTokenSource(); + var task = AsyncBridge.WaitAnyAsync([cts1.Token, cts2.Token]); + False(task.IsCompletedSuccessfully); + + cts2.Cancel(); + cts1.Cancel(); + Equal(cts2.Token, await task); + } + + [Fact] + public static async Task WaitForCancellationMultipleTokens() + { + using var cts1 = new CancellationTokenSource(); + using var cts2 = new CancellationTokenSource(); + using var cts3 = new CancellationTokenSource(); + var task = AsyncBridge.WaitAnyAsync([cts1.Token, cts2.Token, cts3.Token]); + False(task.IsCompletedSuccessfully); + + cts3.Cancel(); + cts2.Cancel(); + cts1.Cancel(); + Equal(cts3.Token, await task); + } } \ No newline at end of file diff --git a/src/DotNext.Threading/DotNext.Threading.csproj b/src/DotNext.Threading/DotNext.Threading.csproj index b6f6e7417b..7adf1a7888 100644 --- a/src/DotNext.Threading/DotNext.Threading.csproj +++ b/src/DotNext.Threading/DotNext.Threading.csproj @@ -7,7 +7,7 @@ true true nullablePublicOnly - 5.7.0 + 5.8.0 .NET Foundation and Contributors .NEXT Family of Libraries diff --git a/src/DotNext.Threading/Threading/AsyncBridge.CancellationToken.cs b/src/DotNext.Threading/Threading/AsyncBridge.CancellationToken.cs index 33f338e671..e6bce1c208 100644 --- a/src/DotNext.Threading/Threading/AsyncBridge.CancellationToken.cs +++ b/src/DotNext.Threading/Threading/AsyncBridge.CancellationToken.cs @@ -1,4 +1,6 @@ +using System.Buffers; using System.Collections.Concurrent; +using System.Runtime.InteropServices; using Debug = System.Diagnostics.Debug; using Unsafe = System.Runtime.CompilerServices.Unsafe; @@ -38,6 +40,127 @@ internal void Return(CancellationTokenValueTask vt) Add(vt); } } + + private abstract class CancellationTokenCompletionSource : TaskCompletionSource + { + protected static readonly Action Callback = OnCanceled; + + private bool initialized; // volatile + + protected CancellationTokenCompletionSource(out InitializationFlag flag) + : base(TaskCreationOptions.RunContinuationsAsynchronously) + => flag = new(ref initialized); + + private static void OnCanceled(object? source, CancellationToken token) + { + Debug.Assert(source is CancellationTokenCompletionSource); + + Unsafe.As(source).OnCanceled(token); + } + + private void OnCanceled(CancellationToken token) + { + if (Volatile.Read(ref initialized) && TrySetResult(token)) + { + Cleanup(); + } + } + + private static void Unregister(ReadOnlySpan registrations) + { + foreach (ref readonly var registration in registrations) + { + registration.Unregister(); + } + } + + private protected virtual void Cleanup() => Unregister(Registrations); + + private protected abstract ReadOnlySpan Registrations { get; } + + [StructLayout(LayoutKind.Auto)] + protected readonly ref struct InitializationFlag + { + private readonly ref bool flag; + + internal InitializationFlag(ref bool flag) => this.flag = ref flag; + + internal CancellationToken InitializationCompleted(ReadOnlySpan registrations) + { + Volatile.Write(ref flag, true); + + foreach (ref readonly var registration in registrations) + { + if (registration.Token.IsCancellationRequested) + { + return registration.Token; + } + } + + return new(canceled: false); + } + } + } + + private sealed class CancellationTokenCompletionSource1 : CancellationTokenCompletionSource + { + private readonly CancellationTokenRegistration registration; + + internal CancellationTokenCompletionSource1(CancellationToken token) + : base(out var flag) + { + registration = token.UnsafeRegister(Callback, this); + + if (flag.InitializationCompleted(new(in registration)) is { IsCancellationRequested: true } canceledToken) + Callback(this, canceledToken); + } + + private protected override ReadOnlySpan Registrations => new(in registration); + } + + private sealed class CancellationTokenCompletionSource2 : CancellationTokenCompletionSource + { + private readonly (CancellationTokenRegistration, CancellationTokenRegistration) registrations; + + internal CancellationTokenCompletionSource2(CancellationToken token1, CancellationToken token2) + : base(out var flag) + { + registrations.Item1 = token1.UnsafeRegister(Callback, this); + registrations.Item2 = token2.UnsafeRegister(Callback, this); + + if (flag.InitializationCompleted(registrations.AsReadOnlySpan()) is { IsCancellationRequested: true } canceledToken) + Callback(this, canceledToken); + } + + private protected override ReadOnlySpan Registrations => registrations.AsReadOnlySpan(); + } + + private sealed class CancellationTokenCompletionSourceN : CancellationTokenCompletionSource + { + private readonly CancellationTokenRegistration[] registrations; + + internal CancellationTokenCompletionSourceN(ReadOnlySpan tokens) + : base(out var flag) + { + registrations = ArrayPool.Shared.Rent(tokens.Length); + + for (var i = 0; i < tokens.Length; i++) + { + registrations[i] = tokens[i].UnsafeRegister(Callback, this); + } + + if (flag.InitializationCompleted(registrations) is { IsCancellationRequested: true } canceledToken) + Callback(this, canceledToken); + } + + private protected override ReadOnlySpan Registrations => new(registrations); + + private protected override void Cleanup() + { + ArrayPool.Shared.Return(registrations, clearArray: true); + base.Cleanup(); + } + } private static readonly Action CancellationTokenValueTaskCompletionCallback = new CancellationTokenValueTaskPool().Return; diff --git a/src/DotNext.Threading/Threading/AsyncBridge.WaitHandle.cs b/src/DotNext.Threading/Threading/AsyncBridge.WaitHandle.cs index ed417ce07d..d7a96e4c3c 100644 --- a/src/DotNext.Threading/Threading/AsyncBridge.WaitHandle.cs +++ b/src/DotNext.Threading/Threading/AsyncBridge.WaitHandle.cs @@ -30,13 +30,6 @@ protected override void AfterConsumed() Interlocked.Decrement(ref instantiatedTasks); backToPool(this); } - - internal void Complete(object? token, bool timedOut) - { - Debug.Assert(token is short); - - TrySetResult(Unsafe.Unbox(token), !timedOut); - } } private sealed class WaitHandleValueTaskPool : ConcurrentBag diff --git a/src/DotNext.Threading/Threading/AsyncBridge.cs b/src/DotNext.Threading/Threading/AsyncBridge.cs index 4c9ec5ad9b..3ad215d941 100644 --- a/src/DotNext.Threading/Threading/AsyncBridge.cs +++ b/src/DotNext.Threading/Threading/AsyncBridge.cs @@ -1,4 +1,6 @@ -using static System.Threading.Timeout; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using static System.Threading.Timeout; namespace DotNext.Threading; @@ -48,19 +50,36 @@ public static ValueTask WaitAsync(this CancellationToken token, bool completeAsC } /// - /// Obtains a task that can be used to await handle completion. + /// Creates a task that will complete when any of the supplied tokens have canceled. /// - /// The handle to await. - /// The timeout used to await completion. - /// if handle is signaled; otherwise, if timeout occurred. - public static ValueTask WaitAsync(this WaitHandle handle, TimeSpan timeout) + /// The tokens to wait on for cancellation. + /// The canceled token. + /// is empty. + public static Task WaitAnyAsync(this ReadOnlySpan tokens) { - if (handle.WaitOne(0)) - return new(true); + Task result; + try + { + CancellationTokenCompletionSource source = tokens switch + { + [] => throw new InvalidOperationException(), + [var token] => new CancellationTokenCompletionSource1(token), + [var token1, var token2] => new CancellationTokenCompletionSource2(token1, token2), + _ => new CancellationTokenCompletionSourceN(tokens), + }; + + result = source.Task; + } + catch (Exception e) + { + result = Task.FromException(e); + } - if (timeout == TimeSpan.Zero) - return new(false); + return result; + } + private static WaitHandleValueTask GetCompletionSource(WaitHandle handle, TimeSpan timeout) + { WaitHandleValueTask? result; // do not keep long references when limit is reached @@ -69,8 +88,13 @@ public static ValueTask WaitAsync(this WaitHandle handle, TimeSpan timeout else if (!HandlePool.TryTake(out result)) result = new(WaitHandleTaskCompletionCallback); - IEquatable token = result.Reset(); - var registration = ThreadPool.UnsafeRegisterWaitForSingleObject(handle, result.Complete, token, timeout, executeOnlyOnce: true); + var token = result.Reset(); + var registration = ThreadPool.UnsafeRegisterWaitForSingleObject( + handle, + Complete, + new Tuple(result, token), + timeout, + executeOnlyOnce: true); if (result.IsCompleted) { @@ -81,7 +105,42 @@ public static ValueTask WaitAsync(this WaitHandle handle, TimeSpan timeout result.Registration = registration; } - return result.CreateTask(InfiniteTimeSpan, CancellationToken.None); + return result; + + static void Complete(object? state, bool timedOut) + { + Debug.Assert(state is Tuple); + + var (source, token) = Unsafe.As>(state); + source.TrySetResult(token, timedOut is false); + } + } + + /// + /// Obtains a task that can be used to await handle completion. + /// + /// The handle to await. + /// The timeout used to await completion. + /// The token that can be used to cancel the operation. + /// if handle is signaled; otherwise, if timeout occurred. + /// The operation has been canceled. + public static ValueTask WaitAsync(this WaitHandle handle, TimeSpan timeout, CancellationToken token = default) + { + ValueTask result; + if (handle.WaitOne(0)) + { + result = new(true); + } + else if (timeout == TimeSpan.Zero) + { + result = new(false); + } + else + { + result = GetCompletionSource(handle, timeout).CreateTask(InfiniteTimeSpan, token); + } + + return result; } private static void Reset(ManualResetCompletionSource source) => source.Reset(); @@ -90,9 +149,16 @@ public static ValueTask WaitAsync(this WaitHandle handle, TimeSpan timeout /// Obtains a task that can be used to await handle completion. /// /// The handle to await. + /// The token that can be used to cancel the operation. /// The task that will be completed . - public static ValueTask WaitAsync(this WaitHandle handle) => WaitAsync(handle, InfiniteTimeSpan); - + /// The operation has been canceled. + public static ValueTask WaitAsync(this WaitHandle handle, CancellationToken token = default) + => handle.WaitOne(0) + ? ValueTask.CompletedTask + : GetCompletionSource(handle, InfiniteTimeSpan) + .As>() + .Invoke(InfiniteTimeSpan, token); + /// /// Gets or sets the capacity of the internal pool used to create awaitable tasks returned /// from the public methods in this class. diff --git a/src/DotNext.Threading/Threading/Tasks/ValueTaskCompletionSource.T.cs b/src/DotNext.Threading/Threading/Tasks/ValueTaskCompletionSource.T.cs index 4cfa63829a..e91c949372 100644 --- a/src/DotNext.Threading/Threading/Tasks/ValueTaskCompletionSource.T.cs +++ b/src/DotNext.Threading/Threading/Tasks/ValueTaskCompletionSource.T.cs @@ -205,7 +205,7 @@ internal bool TrySetResult(object? completionData, short? completionToken, in Re /// /// The timeout associated with the task. /// The cancellation token that can be used to cancel the task. - /// A fresh incompleted task. + /// A fresh uncompleted task. /// is less than zero but not equals to . /// The source is in invalid state. public ValueTask CreateTask(TimeSpan timeout, CancellationToken token)