Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UnfoldResource ValueTask Stage and DSL methods #7028

Open
wants to merge 25 commits into
base: dev
Choose a base branch
from

Conversation

to11mtm
Copy link
Member

@to11mtm to11mtm commented Dec 27, 2023

Changes

Adds Akka.Streams Source stages:

  • UnfoldValueTaskAsync <-- VT Optimized version of UnfoldAsync
  • UnfoldResourceValueTaskAsync <-- VT Optimized version of UnfoldResourceAsync
  • SelectValueTaskAsync <-- VT Optimized version of SelectAsync

This allows all sorts of potential benefits to end users, including:

  1. A 'fast-path' successful synchronous return capability;
  2. The ability for consumers to more easily pool completion awaiters if they so choose.
  3. The ability to avoid accidental overcapture between passed lambdas.

Checklist

For significant changes, please ensure that the following have been completed (delete if not relevant):

Latest dev Benchmarks

N/A since these are 'new' APIs.

This PR's Benchmarks

See other comments, overall the new methods are 'at least' as fast with lower allocation, at worst we are 0-10% faster but in some cases we are twice as fast :D

Comment on lines 782 to 783
private readonly PooledValueTaskContinuationHelper<Option<TOut>>
_pooledContinuation;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's entirely possible that this entire pattern is overkill and we should consider using Result or Result+Holder patterns consistently throughout these implementations for consistency's sake.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or, maybe, the 'hold local' pattern we use for the UnfoldValueTaskAsync?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd stick with one implementation pattern for this - I think the Holder pattern is easier to reason about so far but I haven't had to debug any of this code yet :p

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Holder pattern is easier to reason about, Especially from a 'safety' standpoint...

But since these are internals, I would expect that unless the ActorGraphInterpreter is broken, that the debugging experience would not be overly diminished for maintainers; instead of peeking at Result property state they just need to instead look at the ValueTask state. Also it's a -little- less code to maintain.

The win, is that directly holding the VT removes indirection Result introduces

Note: We do need to keep Holder however for SelectAsync due to it's internal parallelism and logic rules.

Really though, I want to have benchmarks of Task vs this state first before I try to standardize things, just in case the difference of keeping a specific impl is worth a discussion, and/or if whichever we 'pick' shows notable regression (which means, maybe try the other one.) Kinda the other part of why I tried a few different strategies here, This may be important for Artery.

{
public Result<T> Element { get; private set; }
private readonly Action<Holder<T>> _callback;
private ValueTask<T> _pending;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seemed to me, that the best thing to do here was have Holder<T> itself keep the continuation and just re-use them appropriately.

Yes, it costs the size of ValueTask<T> but we'd take at least that .AsTask()ing and in this case we only pay it 'once'.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems pretty clever to me - I'd love to see a benchmark comparison between this and a normal SelectAsync

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Latest benchmarks are in comments, It's 5-10% faster and improved alloc!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional note, There probably are additional opts to be made here but I tried to keep it, for lack of a better term, 'debugging friendly' as much as I could. :)

Comment on lines 2649 to 2651
private readonly
ConcurrentQueue<
Holder<TOut>> _queue;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here, is that we treat used Holders as 'Lazy' instances, so this will only grow to the size of parallelism at the most, and even then only if we get to that level of demand.

Comment on lines 130 to 145
private void CompletionAction()
{
if (_currentTask.IsCompletedSuccessfully)
{
_asyncHandler.Invoke(Result.Success(_currentTask.Result));
}
else
{
_asyncHandler.Invoke(
Result.FromTask(_currentTask.AsTask()));
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be capturing, then clearing clearing the _currentTask

@Aaronontheweb Aaronontheweb self-requested a review January 2, 2024 18:02
Comment on lines 2611 to 2631
var peeker =
Unsafe.As<ValueTask<T>, ValueTaskCheatingPeeker<T>>(
ref valueTask);
if (peeker._obj == null)
{
Invoke(Result.Success(peeker._result));
}
else if (peeker._obj is Task<T> asTask)
{
asTask.ContinueWith(TaskCompletedAction, this,
TaskContinuationOptions.NotOnCanceled);
}
else
{
_pending = vt;
var source =
Unsafe.As<IValueTaskSource<T>>(peeker._obj);
source.OnCompleted(OnCompletedAction, this,
peeker._token,
ValueTaskSourceOnCompletedFlags.None);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Magic"

  • We get back a ValueTask<T>
  • Unfortunately, Polite callbacks are not exposed
    • In other words, we can't pass a state in.
  • So we just abuse Unsafe.As and roll with that.

If this pattern (b/c we do it elsewhere) is overly offensive (I get it) there are some options that come to mind:

  1. Always pull the ValueTask into a given holder/scope and use a static continuation
  2. do a 'pre-check' and branch logic accordingly
  3. I've seen this done in other libs, it's painful for whoever does it the first time but way cleaner than you would think.
  4. Fallback would be some form of compiled expression/etc.
  5. Hide all of this behind the newest .NET and rely on UnsafeAccessor to get the fields.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To use Unsafe.As, do we have to set an attribute on the assembly? I think we do this for some unsafe stuff inside Akka.Remote already.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. Unsafe.As (and many/most other parts of that API) does not have unsafe on the public API method signature.

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some feedback; would love some benchmarks showcasing the perf gains possible using these stages vs. their Task-y equivalents.

/// <typeparam name="T"></typeparam>
/// <typeparam name="TSource"></typeparam>
/// <returns>A source of T that is created on stream start and read on backpressure</returns>
public static Source<T, NotUsed> UnfoldResourceValueTaskAsync<T,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick / question: can we call this UnfoldResourceAsync and overload the other call or is there going to be some conflict with Task / ValueTask conversion?

Copy link
Member Author

@to11mtm to11mtm Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mayyyyybe?

Biggest concern off the top of my head, is that it may be easy to accidentally wind up with the Task variant, i.e.

Source.UnfoldResourceAsync(
() => inThing.OpenAsync(), //Is Task
(r) => r.ReadAsync(), //Is ValueTask
r => r.DisposeAsync() // is ValueTask

Above should(?) give a compiler error.
But below, need to verify it would not wind up using Task variant:

Source.UnfoldResourceAsync(
async () => return await inThing.OpenAsync(), //Is Task
async (r) => return await r.ReadAsync(), //Is ValueTask
async r => return await r.DisposeAsync() // is ValueTask

@@ -2551,7 +2907,7 @@ public void Invoke(Result<T> result)
}
}

private static readonly Result<TOut> NotYetThere = Result.Failure<TOut>(new Exception());
private static readonly Result<TOut> NotYetThere = Result.Failure<TOut>(NotYetThereSentinel.Instance);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

{
public Result<T> Element { get; private set; }
private readonly Action<Holder<T>> _callback;
private ValueTask<T> _pending;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems pretty clever to me - I'd love to see a benchmark comparison between this and a normal SelectAsync

Comment on lines 2611 to 2631
var peeker =
Unsafe.As<ValueTask<T>, ValueTaskCheatingPeeker<T>>(
ref valueTask);
if (peeker._obj == null)
{
Invoke(Result.Success(peeker._result));
}
else if (peeker._obj is Task<T> asTask)
{
asTask.ContinueWith(TaskCompletedAction, this,
TaskContinuationOptions.NotOnCanceled);
}
else
{
_pending = vt;
var source =
Unsafe.As<IValueTaskSource<T>>(peeker._obj);
source.OnCompleted(OnCompletedAction, this,
peeker._token,
ValueTaskSourceOnCompletedFlags.None);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To use Unsafe.As, do we have to set an attribute on the assembly? I think we do this for some unsafe stuff inside Akka.Remote already.

/// <typeparam name="TOut">TBD</typeparam>
[InternalApi]
public sealed class
SelectValueTaskAsync<TIn, TOut> : GraphStage<FlowShape<TIn, TOut>>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same nitpicks about naming

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1.5 points here:

  1. The class itself, will almost certainly require a different name from the original
    1.5. As for the DSL, Will see whether having the same exposed name causes issues; in this (i.e. as opposed to UnfoldResource) case I'm more worried about compiler being confused as to which overload to use and diminish dev experience.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per discord comments giving the DSL methods the same name results in compiler confusion due to inability to auto-resolve inlined async Func, Happy to give DSL a better name though and open to options. :)

Comment on lines 782 to 783
private readonly PooledValueTaskContinuationHelper<Option<TOut>>
_pooledContinuation;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd stick with one implementation pattern for this - I think the Holder pattern is easier to reason about so far but I haven't had to debug any of this code yet :p

@to11mtm
Copy link
Member Author

to11mtm commented Jan 7, 2024

ValueTask versions of SelectAsync seem to be working and I got to where I'm happy with the improvements while avoiding Unsafe.As and just using .GetAwaiter().OnCompleted(Action a):

Method Mean Error StdDev Gen0 Allocated
RunSelectAsync 44.53 us 0.874 us 1.196 us 3.9063 17.96 KB
RunSelectValueTaskAsync 40.18 us 0.589 us 0.551 us 0.7935 3.76 KB
RunSelectAsyncSync 40.14 us 0.507 us 0.450 us 3.9063 17.96 KB
RunSelectValueTaskAsyncSync 36.14 us 0.415 us 0.388 us 0.7935 3.76 KB

The key to getting a clear perf benefit was having a readonly struct SlimResult<T> in place of normal Akka.Util.Result<T>. Getting rid of the boolean and instead preferring to use the Exception field's state as a status sentinel.

@to11mtm to11mtm force-pushed the streams-valuetask-unfoldresource branch from 26a5854 to 0471a58 Compare February 21, 2024 01:53
@to11mtm
Copy link
Member Author

to11mtm commented Feb 21, 2024

New benchmarks after rebase and clearing Rider caches.

Also IDK why I couldn't reply directly, but Unsafe.As is in System.Runtime.CompilerServices (I think) but in general we are good.

Note the benchmarks are... probably imperfect, (and need far better names,) but are fair in that they compare between each in the same way:

BenchmarkDotNet v0.13.12, Windows 10 (10.0.19045.4046/22H2/2022Update)
Intel Core i7-8750H CPU 2.20GHz (Coffee Lake), 1 CPU, 12 logical and 6 physical cores
.NET SDK 8.0.101
  [Host]     : .NET 8.0.1 (8.0.123.58001), X64 RyuJIT AVX2
  DefaultJob : .NET 8.0.1 (8.0.123.58001), X64 RyuJIT AVX2

Select

I'm not unhappy...

Method Mean Error StdDev Gen0 Allocated
RunSelectAsync 33.21 us 0.661 us 0.989 us 3.9063 17.94 KB
RunSelectValueTaskAsync 29.68 us 0.341 us 0.319 us 0.7935 3.74 KB
RunSelectAsyncSync 32.13 us 0.637 us 1.064 us 3.9063 17.94 KB
RunSelectValueTaskAsyncSync 30.27 us 0.553 us 0.490 us 0.7935 3.74 KB

Unfold

Very happy here; we are clearly faster and with somewhat/greatly lowered allocation:

Method Mean Error StdDev Gen0 Allocated
UnfoldAsyncNoYield 176.07 us 2.020 us 1.791 us 9.7656 45.23 KB
UnfoldValueTaskAsyncNoYield 136.77 us 2.715 us 2.788 us 7.3242 33.38 KB
UnfoldAsyncWithYield 161.83 us 1.198 us 1.062 us 8.5449 38.99 KB
UnfoldValueTaskAsyncWithYield 69.99 us 0.479 us 0.448 us 1.5869 7.37 KB

Comment on lines +20 to +29
/// Inheritors are expected to utilize the <see cref="SetPooledCompletionCallback"/>
/// and call `base.PreStart()` in their `PreStart` conditions.
/// <para/>
/// Additionally, if inheritors have their own 'restart' logic,
/// They should utilize the `ResetHolder()` method,
/// to avoid callback clashes.
///
/// </summary>
/// <typeparam name="T"></typeparam>
internal abstract class PooledAwaitOutGraphStageLogic<T> : OutGraphStageLogic
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was created based on desire to consolidate logic.

It works, but it feels clunky. Not sure what we can do to make the API cleaner?

{
public Result<T> Element { get; private set; }
private readonly Action<Holder<T>> _callback;
private ValueTask<T> _pending;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Latest benchmarks are in comments, It's 5-10% faster and improved alloc!

@@ -3805,7 +4124,7 @@ public sealed class AsyncEnumerable<T> : GraphStage<SourceShape<T>>
{
#region internal classes

private sealed class Logic : OutGraphStageLogic
private sealed class Logic : PooledAwaitOutGraphStageLogic<bool>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the words of GL, "I May have gone too far".

I don't have benchmarks, Can try to make something and diff against dev (although if someone else wants to 😇) but:

  1. 90% sure this is better than what we had
  2. 100% sure it's less less LOC and vaguely cleaner.


namespace Akka.Streams.Implementation.Fusing;

public readonly struct SlimResult<T>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing Akka Result<T> type has some pain points AFA internals

Primarily, The structure results in poor alignment, due to the bool/<T>/Exception layout.

What this does instead, is we use Sentinel Exceptions (similar to Channel<T> internals) to track other states as required. This helps on alignment and overall buffer allocation.

Comment on lines +42 to +44
var whyMSFTwhy = _vt.ConfigureAwait(false);
_vt = Unsafe.As<ConfiguredValueTaskAwaitable<T>, ValueTask<T>>(
ref whyMSFTwhy);
Copy link
Member Author

@to11mtm to11mtm Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Someone please suggest a better name, I could not think of one after comprehending that this is all more or less legal but normally only internals are allowed to do directly =/.

Add UnfoldResourceAsync benchies
@to11mtm
Copy link
Member Author

to11mtm commented Feb 21, 2024

I'm very happy.

Method Mean Error StdDev Gen0 Allocated
UnfoldResourceAsyncNoYield 262.13 us 5.220 us 9.142 us 16.1133 74.92 KB
UnfoldResourceValueTaskAsyncNoYield 130.76 us 2.579 us 4.968 us 6.1035 28.65 KB
UnfoldResourceAsyncWithYield 233.87 us 4.499 us 6.595 us 15.1367 68.67 KB
UnfoldResourceValueTaskAsyncWithYield 70.88 us 1.378 us 1.531 us 1.5869 7.22 KB

@to11mtm to11mtm marked this pull request as ready for review February 23, 2024 21:38
@to11mtm to11mtm marked this pull request as draft March 3, 2024 03:12
to11mtm added 2 commits March 3, 2024 14:33
Fix SlimResult badness ValueType Error pass in.
Add Convenience methods for SlimResult, avoid branching.
Add guarding around SelectValueTaskAsync Holder pool usage with apology
@to11mtm
Copy link
Member Author

to11mtm commented Mar 3, 2024

'Ported' the existing FlowSelectAsyncSpec over, IDK how I didn't do that, ~~will double check that all tests are present before I mark ready for review again. 🙃 ~~ (Test cases present now)

There's still an improvement in numbers, but we lost a bit with guard code and these allocation savings are a 'best case' as they hit the happy path, we can almost certainly improve in future however. :) Alas it's a bit of weaving and don't want the API goodness to get lost in the weeds of optimizations.

SelectAsync:

Method Mean Error StdDev Gen0 Allocated
RunSelectAsync 33.41 us 0.398 us 0.332 us 3.9063 17.94 KB
RunSelectValueTaskAsync 30.25 us 0.353 us 0.330 us 0.7935 3.74 KB
RunSelectAsyncSync 31.35 us 0.440 us 0.412 us 3.9063 17.94 KB
RunSelectValueTaskAsyncSync 28.53 us 0.456 us 0.427 us 0.7935 3.74 KB

Add UnfoldValueTaskAsync test based on UnfoldAsync test.
@to11mtm to11mtm marked this pull request as ready for review March 3, 2024 23:37
@to11mtm
Copy link
Member Author

to11mtm commented Mar 4, 2024

Tests are passing locally but a couple may be flaky on CI.

Hoping to see what Linux tests will do once we solve the build errors tripping up ValueTask resolution between mscorlib and System.Threading.Tasks.Extensions. Moving the ValueTask Extensions to Akka.Streams.Util did not help overall (but I think it's worth keeping them over there.)

Copy link
Member Author

@to11mtm to11mtm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Edited due to Weird draft I couldn't get rid of, apologies!)

I think this is good short of a review and approval from others.

Fairly convinced the new failing test(s) are racy, but left it in for discussion/pushback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants