Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UnfoldResource ValueTask Stage and DSL methods #7028

Open
wants to merge 25 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5c3c94a
Add UnfoldResource ValueTask Stage and DSL methods
to11mtm Dec 27, 2023
b0969f8
Pool Continuations on ValueTasks
to11mtm Dec 31, 2023
f4725aa
Add SelectValueTaskAsync
to11mtm Dec 31, 2023
9d87f8b
UnfoldValueTaskAsync, clean up other ops
to11mtm Jan 1, 2024
7beabc3
SelectValueTaskAsync Fixes, Benchmarks
to11mtm Jan 7, 2024
1971702
More SlimResult, Standardize patterns
to11mtm Jan 7, 2024
0471a58
Consolidate and simplify pooling for simple cases. Probably many oppo…
to11mtm Feb 21, 2024
ed982eb
Fix derp on UnfoldAsync from bench local playtime
to11mtm Feb 21, 2024
6b869db
Improve benchmark names
to11mtm Feb 23, 2024
c1598c0
undo AsyncEnumerable change for now.
to11mtm Feb 23, 2024
3e4518b
API Docs
to11mtm Feb 23, 2024
026ce51
Update Stream docs
to11mtm Feb 23, 2024
14eb3eb
Fix doc linting
to11mtm Feb 23, 2024
a252f8c
Add FlowSelectValueTaskAsyncSpec
to11mtm Mar 3, 2024
1a1d428
Optimize SlimResult.ForSuccess codegen
to11mtm Mar 3, 2024
12602da
Add UnfoldResourceValueTaskAsyncSourceSpec
to11mtm Mar 3, 2024
8b7856a
Use correct Reactive Streams Exception.
to11mtm Mar 3, 2024
50bec19
Fix using derp
to11mtm Mar 3, 2024
a2a650c
APIDocs
to11mtm Mar 3, 2024
973c8b6
Do some props and proj voodoo to try to fix weird linux build issue
to11mtm Mar 3, 2024
827d76e
ApiDocs for framework
to11mtm Mar 3, 2024
25f0501
Merge branch 'dev' into streams-valuetask-unfoldresource
to11mtm Mar 3, 2024
125771a
Make TaskHelperExts internal in Akka streams
to11mtm Mar 4, 2024
aff7e32
Maybe don't do 471 on non nt?
to11mtm Mar 4, 2024
cfbab88
Merge branch 'dev' into streams-valuetask-unfoldresource
to11mtm Mar 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 189 additions & 0 deletions src/benchmark/Akka.Benchmarks/Streams/SelectAsyncBenchmarks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// //-----------------------------------------------------------------------
// // <copyright file="SelectAsyncBenchmarks.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System.Threading.Channels;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Benchmarks.Configurations;
using Akka.Streams;
using Akka.Streams.Dsl;
using BenchmarkDotNet.Attributes;

namespace Akka.Benchmarks.Streams;

[Config(typeof(MicroBenchmarkConfig))]
public class SelectAsyncBenchmarks
{
public struct IntOrCompletion
{
public readonly int IntValue;
public readonly TaskCompletionSource? Completion;

public IntOrCompletion(int intValue, TaskCompletionSource? completion)
{
IntValue = intValue;
Completion = completion;
}
}
private ActorSystem system;
private ActorMaterializer materializer;

private IRunnableGraph<Task> simpleGraph;
private Task<Done> selectAsyncStub;
private Channel<IntOrCompletion> asyncCh;
private Task<Done> selectValueTaskAsyncStub;
private Channel<IntOrCompletion> vtAsyncCh;
private Task<Done> selectAsyncSyncStub;
private Task<Done> selectAsyncValueTaskSyncStub;
private Channel<IntOrCompletion> asyncChSync;
private Channel<IntOrCompletion> vtAsyncChSync;

[GlobalSetup]
public void Setup()
{
system = ActorSystem.Create("system");
materializer = system.Materializer();
asyncCh = Channel.CreateUnbounded<IntOrCompletion>();

asyncChSync = Channel.CreateUnbounded<IntOrCompletion>();

vtAsyncChSync = Channel.CreateUnbounded<IntOrCompletion>();

selectAsyncSyncStub = Source.ChannelReader(asyncChSync.Reader)
.SelectAsync(4, a =>
{
if (a.Completion != null)
{
a.Completion.TrySetResult();
}
else
{
}

return Task.FromResult(NotUsed.Instance);
}).RunWith(Sink.Ignore<NotUsed>(), materializer);

selectAsyncValueTaskSyncStub = Source.ChannelReader(vtAsyncChSync.Reader)
.SelectValueTaskAsync(4, a =>
{
if (a.Completion != null)
{
a.Completion.TrySetResult();
}
else
{
}

return ValueTask.FromResult(NotUsed.Instance);
}).RunWith(Sink.Ignore<NotUsed>(), materializer);
selectAsyncStub = Source.ChannelReader(asyncCh.Reader)
.SelectAsync(4, async a =>
{
if (a.Completion != null)
{
a.Completion.TrySetResult();
}
else
{
//await Task.Yield();
await Task.Delay(0);
}

return NotUsed.Instance;
}).RunWith(Sink.Ignore<NotUsed>(), materializer);
vtAsyncCh = Channel.CreateUnbounded<IntOrCompletion>();
int vta = 0;
selectValueTaskAsyncStub = Source.ChannelReader(vtAsyncCh.Reader)
.SelectValueTaskAsync(4, async a =>
{
if (a.Completion != null)
{
a.Completion.TrySetResult();
//return NotUsed.Instance;
}
else
{
//await Task.Yield();
await Task.Delay(0);
//return NotUsed.Instance;
//Console.WriteLine(++vta);
//return vta;
}

return NotUsed.Instance;
}).RunWith(Sink.Ignore<NotUsed>(), materializer);
}

[GlobalCleanup]
public void Cleanup()
{
materializer.Dispose();
system.Dispose();
}

[Benchmark]
public async Task RunSelectAsync()
{
var completion = new TaskCompletionSource(TaskCreationOptions
.RunContinuationsAsynchronously);
for (int i = 0; i < 100; i++)
{
asyncCh.Writer.TryWrite(new IntOrCompletion(i, null));
}

asyncCh.Writer.TryWrite(new IntOrCompletion(0, completion));
await completion.Task;

}


[Benchmark]
public async Task RunSelectValueTaskAsync()
{
var completion = new TaskCompletionSource(TaskCreationOptions
.RunContinuationsAsynchronously);
for (int i = 0; i < 100; i++)
{
vtAsyncCh.Writer.TryWrite(new IntOrCompletion(i, null));
}

vtAsyncCh.Writer.TryWrite(new IntOrCompletion(0, completion));
await completion.Task;

}

[Benchmark]
public async Task RunSelectAsyncSync()
{
var completion = new TaskCompletionSource(TaskCreationOptions
.RunContinuationsAsynchronously);
for (int i = 0; i < 100; i++)
{
asyncChSync.Writer.TryWrite(new IntOrCompletion(i, null));
}

asyncChSync.Writer.TryWrite(new IntOrCompletion(0, completion));
await completion.Task;

}


[Benchmark]
public async Task RunSelectValueTaskAsyncSync()
{
var completion = new TaskCompletionSource(TaskCreationOptions
.RunContinuationsAsynchronously);
for (int i = 0; i < 100; i++)
{
vtAsyncChSync.Writer.TryWrite(new IntOrCompletion(i, null));
}

vtAsyncChSync.Writer.TryWrite(new IntOrCompletion(0, completion));
await completion.Task;

}
}
191 changes: 191 additions & 0 deletions src/benchmark/Akka.Benchmarks/Streams/UnfoldAsyncBenchmarks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// //-----------------------------------------------------------------------
// // <copyright file="UnfoldAsyncBenchmarks.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System.Threading.Channels;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Benchmarks.Configurations;
using Akka.Streams;
using Akka.Streams.Dsl;
using BenchmarkDotNet.Attributes;

namespace Akka.Benchmarks.Streams;

[Config(typeof(MicroBenchmarkConfig))]
public class UnfoldAsyncBenchmarks
{
public struct IntOrCompletion
{
public readonly int IntValue;
public readonly TaskCompletionSource? Completion;

public IntOrCompletion(int intValue, TaskCompletionSource? completion)
{
IntValue = intValue;
Completion = completion;
}
}
private ActorSystem system;
private ActorMaterializer materializer;

private IRunnableGraph<Task> simpleGraph;
private Task<Done> selectAsyncStub;
private Channel<IntOrCompletion> asyncNoYieldCh;
private Task<Done> selectValueTaskAsyncStub;
private Channel<IntOrCompletion> vtAsyncCh;
private Task<Done> unfoldAsyncSyncStub;
private Task<Done> selectAsyncValueTaskSyncStub;
private Channel<IntOrCompletion> asyncYieldCh;
private Channel<IntOrCompletion> vtAsyncYieldCh;

[GlobalSetup]
public void Setup()
{
system = ActorSystem.Create("system");
materializer = system.Materializer();
asyncNoYieldCh = Channel.CreateUnbounded<IntOrCompletion>();

asyncYieldCh = Channel.CreateUnbounded<IntOrCompletion>();

vtAsyncYieldCh = Channel.CreateUnbounded<IntOrCompletion>();

unfoldAsyncSyncStub = Source.UnfoldAsync<ChannelReader<IntOrCompletion>,int>(asyncYieldCh.Reader, async r =>
{
var i = await r.ReadAsync();
if (i.Completion != null)
{
i.Completion.TrySetResult();
return (r, -1);
}
else
{
return (r, i.IntValue);
}
})
.RunWith(Sink.Ignore<int>(), materializer);

selectAsyncValueTaskSyncStub = Source.UnfoldValueTaskAsync<ChannelReader<IntOrCompletion>,int>(vtAsyncYieldCh.Reader, async r =>
{
var i = await r.ReadAsync();
if (i.Completion != null)
{
i.Completion.TrySetResult();
return (r, -1);
}
else
{
return (r, i.IntValue);
}
})
.RunWith(Sink.Ignore<int>(), materializer);
selectAsyncStub = Source.UnfoldAsync<ChannelReader<IntOrCompletion>,int>(asyncNoYieldCh.Reader,async r =>
{
await Task.Yield();
var a = await r.ReadAsync();
if (a.Completion != null)
{
a.Completion.TrySetResult();
return (r, -1);
}
else
{
//await Task.Yield();
// await Task.Delay(0);
return (r, a.IntValue);
}
}).RunWith(Sink.Ignore<int>(), materializer);
vtAsyncCh = Channel.CreateUnbounded<IntOrCompletion>();
int vta = 0;
selectValueTaskAsyncStub = Source.UnfoldValueTaskAsync<ChannelReader<IntOrCompletion>,int>(vtAsyncCh.Reader,async r =>
{
await Task.Yield();
var a = await r.ReadAsync();
if (a.Completion != null)
{
a.Completion.TrySetResult();
return (r, -1);
}
else
{
//await Task.Yield();
//await Task.Delay(0);
return (r, a.IntValue);
}
}).RunWith(Sink.Ignore<int>(), materializer);
}

[GlobalCleanup]
public void Cleanup()
{
materializer.Dispose();
system.Dispose();
}

[Benchmark]
public async Task UnfoldAsyncNoYield()
{
var completion = new TaskCompletionSource(TaskCreationOptions
.RunContinuationsAsynchronously);
for (int i = 0; i < 100; i++)
{
asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(i, null));
}

asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion));
await completion.Task;

}


[Benchmark]
public async Task UnfoldValueTaskAsyncNoYield()
{
var completion = new TaskCompletionSource(TaskCreationOptions
.RunContinuationsAsynchronously);
for (int i = 0; i < 100; i++)
{
vtAsyncCh.Writer.TryWrite(new IntOrCompletion(i, null));
}

vtAsyncCh.Writer.TryWrite(new IntOrCompletion(0, completion));
await completion.Task;

}

[Benchmark]
public async Task UnfoldAsyncWithYield()
{
var completion = new TaskCompletionSource(TaskCreationOptions
.RunContinuationsAsynchronously);
for (int i = 0; i < 100; i++)
{
asyncYieldCh.Writer.TryWrite(new IntOrCompletion(i, null));
await Task.Delay(1);
}

asyncYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion));
await completion.Task;

}


[Benchmark]
public async Task UnfoldValueTaskAsyncWithYield()
{
var completion = new TaskCompletionSource(TaskCreationOptions
.RunContinuationsAsynchronously);
for (int i = 0; i < 100; i++)
{
vtAsyncYieldCh.Writer.TryWrite(new IntOrCompletion(i, null));
await Task.Delay(1);
}

vtAsyncYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion));
await completion.Task;

}
}
6 changes: 6 additions & 0 deletions src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,12 @@ public static IFlow<TOut, TMat> SelectAsync<TIn, TOut, TMat>(this IFlow<TIn, TMa
{
return flow.Via(new Fusing.SelectAsync<TIn, TOut>(parallelism, asyncMapper));
}

public static IFlow<TOut, TMat> SelectValueTaskAsync<TIn, TOut, TMat>(this IFlow<TIn, TMat> flow, int parallelism,
Func<TIn, ValueTask<TOut>> asyncMapper)
{
return flow.Via(new Fusing.SelectValueTaskAsync<TIn, TOut>(parallelism, asyncMapper));
}

/// <summary>
/// Transform this stream by applying the given function <paramref name="asyncMapper"/> to each of the elements
Expand Down
Loading
Loading