Skip to content

Commit

Permalink
Use Wait without wait handle
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Dec 31, 2024
1 parent a74073c commit 7cdfbae
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 42 deletions.
11 changes: 6 additions & 5 deletions src/DotNext/IO/AsyncWriterStream.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace DotNext.IO;

using static Threading.Tasks.Synchronization;

internal sealed class AsyncWriterStream<TOutput>(TOutput output) : WriterStream<TOutput>(output)
where TOutput : ISupplier<ReadOnlyMemory<byte>, CancellationToken, ValueTask>, IFlushable
{
Expand All @@ -25,24 +27,23 @@ public override void Write(ReadOnlySpan<byte> buffer)
{
if (!buffer.IsEmpty)
{
using var rental = buffer.Copy();

var rental = buffer.Copy();
timeoutSource ??= new();
timeoutSource.CancelAfter(timeout);
var task = WriteAsync(rental.Memory, timeoutSource.Token).AsTask();
var task = WriteAsync(rental.Memory, timeoutSource.Token);
try
{
task.Wait();
}
finally
{
task.Dispose();

if (!timeoutSource.TryReset())
{
timeoutSource.Dispose();
timeoutSource = null;
}

rental.Dispose();
}
}
}
Expand Down
26 changes: 9 additions & 17 deletions src/DotNext/IO/ReadOnlyStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

namespace DotNext.IO;

using Buffers;
using static Threading.Tasks.Synchronization;

internal abstract class ReadOnlyStream : Stream
{
public sealed override bool CanRead => true;
Expand Down Expand Up @@ -84,7 +87,6 @@ internal sealed class ReadOnlyStream<TArg>(Func<Memory<byte>, TArg, Cancellation
{
private const int DefaultTimeout = 4000;
private int timeout = DefaultTimeout;
private byte[]? synchronousBuffer;
private CancellationTokenSource? timeoutSource;

public override int ReadTimeout
Expand Down Expand Up @@ -121,40 +123,30 @@ public override int Read(Span<byte> buffer)
}
else
{
var tempBuffer = RentBuffer(buffer.Length);
var tempBuffer = Memory.AllocateExactly<byte>(buffer.Length);
timeoutSource ??= new();
timeoutSource.CancelAfter(timeout);
var task = ReadAsync(tempBuffer, timeoutSource.Token).AsTask();
var task = ReadAsync(tempBuffer.Memory, timeoutSource.Token);
try
{
task.Wait();
writtenCount = task.Result;
writtenCount = task.Wait();
tempBuffer.Span.Slice(0, writtenCount).CopyTo(buffer);
}
finally
{
task.Dispose();

if (!timeoutSource.TryReset())
{
timeoutSource.Dispose();
timeoutSource = null;
}

tempBuffer.Dispose();
}

tempBuffer.AsSpan(0, writtenCount).CopyTo(buffer);
}

return writtenCount;
}

private ArraySegment<byte> RentBuffer(int length)
{
if (synchronousBuffer is null || synchronousBuffer.Length < length)
synchronousBuffer = GC.AllocateUninitializedArray<byte>(length);

return new(synchronousBuffer, 0, length);
}

protected override void Dispose(bool disposing)
{
if (disposing)
Expand Down
15 changes: 3 additions & 12 deletions src/DotNext/Runtime/CompilerServices/Scope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace DotNext.Runtime.CompilerServices;

using static Threading.Tasks.Synchronization;
using ExceptionAggregator = ExceptionServices.ExceptionAggregator;

/// <summary>
Expand Down Expand Up @@ -114,8 +115,6 @@ public void Dispose()

static void ExecuteCallbacks(ReadOnlySpan<object?> callbacks, ref ExceptionAggregator aggregator)
{
Task t;

foreach (var cb in callbacks)
{
try
Expand All @@ -128,22 +127,14 @@ static void ExecuteCallbacks(ReadOnlySpan<object?> callbacks, ref ExceptionAggre
callback();
break;
case Func<ValueTask> callback:
using (t = callback().AsTask())
{
t.Wait();
}

callback().Wait();
break;
case IDisposable disposable:
// IDisposable in synchronous implementation has higher priority than IAsyncDisposable
disposable.Dispose();
break;
case IAsyncDisposable disposable:
using (t = disposable.DisposeAsync().AsTask())
{
t.Wait();
}

disposable.DisposeAsync().Wait();
break;
}
}
Expand Down
16 changes: 8 additions & 8 deletions src/DotNext/Threading/Tasks/Synchronization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ public static Result<TResult> GetResult<TResult>(this Task<TResult> task, TimeSp
{
result = task.Wait(timeout) ? new(task.Result) : new(new TimeoutException());
}
catch (AggregateException e) when (e.InnerExceptions.Count == 1)
catch (AggregateException e) when (e.InnerExceptions is [var innerEx])
{
result = new(e.InnerExceptions[0]);
result = new(innerEx);
}
catch (Exception e)
{
Expand All @@ -49,9 +49,9 @@ public static Result<TResult> GetResult<TResult>(this Task<TResult> task, Cancel
task.Wait(token);
result = task.Result;
}
catch (AggregateException e) when (e.InnerExceptions.Count == 1)
catch (AggregateException e) when (e.InnerExceptions is [var innerEx])
{
result = new(e.InnerExceptions[0]);
result = new(innerEx);
}
catch (Exception e)
{
Expand Down Expand Up @@ -87,9 +87,9 @@ public static Result<TResult> GetResult<TResult>(this Task<TResult> task, Cancel
var awaiter = new DynamicTaskAwaitable.Awaiter(task, ConfigureAwaitOptions.None);
result = new(awaiter.GetRawResult());
}
catch (AggregateException e) when (e.InnerExceptions.Count is 1)
catch (AggregateException e) when (e.InnerExceptions is [var innerEx])
{
result = new(e.InnerExceptions[0]);
result = new(innerEx);
}
catch (Exception e)
{
Expand Down Expand Up @@ -119,9 +119,9 @@ public static Result<TResult> GetResult<TResult>(this Task<TResult> task, Cancel
var awaiter = new DynamicTaskAwaitable.Awaiter(task, ConfigureAwaitOptions.None);
result = new(awaiter.GetRawResult());
}
catch (AggregateException e) when (e.InnerExceptions.Count is 1)
catch (AggregateException e) when (e.InnerExceptions is [var innerEx])
{
result = new(e.InnerExceptions[0]);
result = new(innerEx);
}
catch (Exception e)
{
Expand Down

0 comments on commit 7cdfbae

Please sign in to comment.