Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UnfoldResource ValueTask Stage and DSL methods #7028

Open
wants to merge 25 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5c3c94a
Add UnfoldResource ValueTask Stage and DSL methods
to11mtm Dec 27, 2023
b0969f8
Pool Continuations on ValueTasks
to11mtm Dec 31, 2023
f4725aa
Add SelectValueTaskAsync
to11mtm Dec 31, 2023
9d87f8b
UnfoldValueTaskAsync, clean up other ops
to11mtm Jan 1, 2024
7beabc3
SelectValueTaskAsync Fixes, Benchmarks
to11mtm Jan 7, 2024
1971702
More SlimResult, Standardize patterns
to11mtm Jan 7, 2024
0471a58
Consolidate and simplify pooling for simple cases. Probably many oppo…
to11mtm Feb 21, 2024
ed982eb
Fix derp on UnfoldAsync from bench local playtime
to11mtm Feb 21, 2024
6b869db
Improve benchmark names
to11mtm Feb 23, 2024
c1598c0
undo AsyncEnumerable change for now.
to11mtm Feb 23, 2024
3e4518b
API Docs
to11mtm Feb 23, 2024
026ce51
Update Stream docs
to11mtm Feb 23, 2024
14eb3eb
Fix doc linting
to11mtm Feb 23, 2024
a252f8c
Add FlowSelectValueTaskAsyncSpec
to11mtm Mar 3, 2024
1a1d428
Optimize SlimResult.ForSuccess codegen
to11mtm Mar 3, 2024
12602da
Add UnfoldResourceValueTaskAsyncSourceSpec
to11mtm Mar 3, 2024
8b7856a
Use correct Reactive Streams Exception.
to11mtm Mar 3, 2024
50bec19
Fix using derp
to11mtm Mar 3, 2024
a2a650c
APIDocs
to11mtm Mar 3, 2024
973c8b6
Do some props and proj voodoo to try to fix weird linux build issue
to11mtm Mar 3, 2024
827d76e
ApiDocs for framework
to11mtm Mar 3, 2024
25f0501
Merge branch 'dev' into streams-valuetask-unfoldresource
to11mtm Mar 3, 2024
125771a
Make TaskHelperExts internal in Akka streams
to11mtm Mar 4, 2024
aff7e32
Maybe don't do 471 on non nt?
to11mtm Mar 4, 2024
cfbab88
Merge branch 'dev' into streams-valuetask-unfoldresource
to11mtm Mar 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/articles/streams/builtinstages.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ Can be used to implement many stateful sources without having to touch the more

**completes** when the task returned by the unfold function completes with an null value

### UnfoldValueTaskAsync

Just like ``UnfoldAsync``, but the fold function returns a ``ValueTask``, with internal pooling to minimize allocation and improve latency.

**emits** when there is demand and unfold state returned task completes with not null value

**completes** when the task returned by the unfold function completes with an null value

### Empty

Complete right away without ever emitting any elements. Useful when you have to provide a source to
Expand Down Expand Up @@ -196,6 +204,14 @@ Functions return ``Task`` to achieve asynchronous processing

**completes** when ``Task`` from read function returns ``None``

### UnfoldResourceValueTaskAsync

Like ``UnfoldResourceAsync`` but takes ``ValueTask`` Functions instead, with amortization of allocations for the main read stage.

**emits** when there is demand and ``ValueTask`` from read function returns value

**completes** when ``ValueTask`` from read function returns ``None``

### Queue

Materialize a ``SourceQueue`` onto which elements can be pushed for emitting from the source. The queue contains
Expand Down Expand Up @@ -846,6 +862,16 @@ If a Task fails, the stream also fails (unless a different supervision strategy

**completes** when upstream completes and all tasks has been completed and all elements has been emitted

### SelectValueTaskAsync

Version of ``SelectAsync`` that is optimized for ValueTask returns. Prefer this over ``SelectAsync`` if your work may be synchronus or is primarily waiting on ``ValueTask``

**emits** when the ``ValueTask`` returned by the provided function finishes for the next element in sequence

**backpressures** when the number of tasks reaches the configured parallelism and the downstream backpressures

**completes** when upstream completes and all tasks has been completed and all elements has been emitted

### SelectAsyncUnordered

Like ``SelectAsync`` but ``Task`` results are passed downstream as they arrive regardless of the order of the elements
Expand Down
1 change: 1 addition & 0 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<XunitRunnerVersion>2.5.3</XunitRunnerVersion>
<TestSdkVersion>17.9.0</TestSdkVersion>
<HyperionVersion>0.12.2</HyperionVersion>
<SystemThreadingTasksExtensionsVersion>4.5.4</SystemThreadingTasksExtensionsVersion>
<NewtonsoftJsonVersion>[13.0.1,)</NewtonsoftJsonVersion>
<NBenchVersion>2.0.1</NBenchVersion>
<ProtobufVersion>3.25.3</ProtobufVersion>
Expand Down
189 changes: 189 additions & 0 deletions src/benchmark/Akka.Benchmarks/Streams/SelectAsyncBenchmarks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// //-----------------------------------------------------------------------
// // <copyright file="SelectAsyncBenchmarks.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System.Threading.Channels;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Benchmarks.Configurations;
using Akka.Streams;
using Akka.Streams.Dsl;
using BenchmarkDotNet.Attributes;

namespace Akka.Benchmarks.Streams;

[Config(typeof(MicroBenchmarkConfig))]
public class SelectAsyncBenchmarks
{
public struct IntOrCompletion
{
public readonly int IntValue;
public readonly TaskCompletionSource? Completion;

public IntOrCompletion(int intValue, TaskCompletionSource? completion)
{
IntValue = intValue;
Completion = completion;
}
}
private ActorSystem system;
private ActorMaterializer materializer;

private IRunnableGraph<Task> simpleGraph;
private Task<Done> selectAsyncStub;
private Channel<IntOrCompletion> asyncCh;
private Task<Done> selectValueTaskAsyncStub;
private Channel<IntOrCompletion> vtAsyncCh;
private Task<Done> selectAsyncSyncStub;
private Task<Done> selectAsyncValueTaskSyncStub;
private Channel<IntOrCompletion> asyncChSync;
private Channel<IntOrCompletion> vtAsyncChSync;

[GlobalSetup]
public void Setup()
{
system = ActorSystem.Create("system");
materializer = system.Materializer();
asyncCh = Channel.CreateUnbounded<IntOrCompletion>();

asyncChSync = Channel.CreateUnbounded<IntOrCompletion>();

vtAsyncChSync = Channel.CreateUnbounded<IntOrCompletion>();

selectAsyncSyncStub = Source.ChannelReader(asyncChSync.Reader)
.SelectAsync(4, a =>
{
if (a.Completion != null)
{
a.Completion.TrySetResult();
}
else
{
}

return Task.FromResult(NotUsed.Instance);
}).RunWith(Sink.Ignore<NotUsed>(), materializer);

selectAsyncValueTaskSyncStub = Source.ChannelReader(vtAsyncChSync.Reader)
.SelectValueTaskAsync(4, a =>
{
if (a.Completion != null)
{
a.Completion.TrySetResult();
}
else
{
}

return ValueTask.FromResult(NotUsed.Instance);
}).RunWith(Sink.Ignore<NotUsed>(), materializer);
selectAsyncStub = Source.ChannelReader(asyncCh.Reader)
.SelectAsync(4, async a =>
{
if (a.Completion != null)
{
a.Completion.TrySetResult();
}
else
{
//await Task.Yield();
await Task.Delay(0);
}

return NotUsed.Instance;
}).RunWith(Sink.Ignore<NotUsed>(), materializer);
vtAsyncCh = Channel.CreateUnbounded<IntOrCompletion>();
int vta = 0;
selectValueTaskAsyncStub = Source.ChannelReader(vtAsyncCh.Reader)
.SelectValueTaskAsync(4, async a =>
{
if (a.Completion != null)
{
a.Completion.TrySetResult();
//return NotUsed.Instance;
}
else
{
//await Task.Yield();
await Task.Delay(0);
//return NotUsed.Instance;
//Console.WriteLine(++vta);
//return vta;
}

return NotUsed.Instance;
}).RunWith(Sink.Ignore<NotUsed>(), materializer);
}

[GlobalCleanup]
public void Cleanup()
{
materializer.Dispose();
system.Dispose();
}

[Benchmark]
public async Task RunSelectAsync()
{
var completion = new TaskCompletionSource(TaskCreationOptions
.RunContinuationsAsynchronously);
for (int i = 0; i < 100; i++)
{
asyncCh.Writer.TryWrite(new IntOrCompletion(i, null));
}

asyncCh.Writer.TryWrite(new IntOrCompletion(0, completion));
await completion.Task;

}


[Benchmark]
public async Task RunSelectValueTaskAsync()
{
var completion = new TaskCompletionSource(TaskCreationOptions
.RunContinuationsAsynchronously);
for (int i = 0; i < 100; i++)
{
vtAsyncCh.Writer.TryWrite(new IntOrCompletion(i, null));
}

vtAsyncCh.Writer.TryWrite(new IntOrCompletion(0, completion));
await completion.Task;

}

[Benchmark]
public async Task RunSelectAsyncSync()
{
var completion = new TaskCompletionSource(TaskCreationOptions
.RunContinuationsAsynchronously);
for (int i = 0; i < 100; i++)
{
asyncChSync.Writer.TryWrite(new IntOrCompletion(i, null));
}

asyncChSync.Writer.TryWrite(new IntOrCompletion(0, completion));
await completion.Task;

}


[Benchmark]
public async Task RunSelectValueTaskAsyncSync()
{
var completion = new TaskCompletionSource(TaskCreationOptions
.RunContinuationsAsynchronously);
for (int i = 0; i < 100; i++)
{
vtAsyncChSync.Writer.TryWrite(new IntOrCompletion(i, null));
}

vtAsyncChSync.Writer.TryWrite(new IntOrCompletion(0, completion));
await completion.Task;

}
}
Loading
Loading