Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve submitter client ext #39

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Prev Previous commit
Format + doc
  • Loading branch information
lemaitre-aneo committed Sep 19, 2022
commit 3f5f4039ce9ef960d17d66b2baa1dd0594d3bd21
20 changes: 10 additions & 10 deletions Api/csharp/ArmoniK.Api.Client.Tests/SubmitterClientExtTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,6 @@ public ResultReply Current

public class TestClient : gRPC.V1.Submitter.Submitter.SubmitterClient
{
private static Task<Metadata> GetResponse()
=> Task.FromResult(new Metadata());

private readonly IAsyncStreamReader<ResultReply> streamReader_;

public TestClient(byte[] resultData,
Expand Down Expand Up @@ -240,14 +237,17 @@ public TestClient(byte[] resultData,
streamReader_ = new EnumerableAsyncStreamReader(list);
}

private static Task<Metadata> GetResponse()
=> Task.FromResult(new Metadata());

public override AsyncServerStreamingCall<ResultReply> TryGetResultStream(ResultRequest request,
CallOptions options)
=> new AsyncServerStreamingCall<ResultReply>(streamReader_,
GetResponse(),
() => Status.DefaultSuccess,
() => new Metadata(),
() =>
{
});
=> new(streamReader_,
GetResponse(),
() => Status.DefaultSuccess,
() => new Metadata(),
() =>
{
});
}
}
107 changes: 83 additions & 24 deletions Api/csharp/ArmoniK.Api.Client/Internals/AsyncEnumerableStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,43 +30,96 @@

namespace ArmoniK.Api.Client.Internals
{
/// <summary>
/// IO.Stream that is read from an IAsyncEnumerable
/// </summary>
public class AsyncEnumerableStream : Stream, IAsyncDisposable

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lots of hypothesis on how it will be consumed.
Many of them will likely appear to be wrong.
This should be in another PR with lots of test and with microbenchmarks corresponding to different consumption use cases. These benchmarks should include both asynchronous and synchronous scenarios. In real application, the later are more likely as legacy deserializers are synchronous. None of the existing libs are using Memory or Span to get the data from the Stream.

{
private readonly IAsyncEnumerator<ReadOnlyMemory<byte>> enumerator_;
private int inputOffset_;


/// <summary>
/// Constructs the stream from an IAsyncEnumerable
/// </summary>
/// <param name="enumerable">async enumerable read from the stream</param>
/// <param name="cancellationToken">cancellation token used to abort the enumeration</param>
public AsyncEnumerableStream(IAsyncEnumerable<ReadOnlyMemory<byte>> enumerable,
CancellationToken cancellationToken = default)
{
enumerator_ = enumerable.GetAsyncEnumerator(cancellationToken);
inputOffset_ = 0;
}

/// <inheritdoc />
public override bool CanRead
=> true;

/// <inheritdoc />
public override bool CanSeek
=> false;

/// <inheritdoc />
public override bool CanWrite
=> false;

/// <inheritdoc />
public override long Length
=> 0;

/// <inheritdoc />
public override long Position
{
get => 0;
set => throw new NotImplementedException();
}

/// <inheritdoc />
public async ValueTask DisposeAsync()
=> await enumerator_.DisposeAsync()
.ConfigureAwait(false);

/// <inheritdoc />
public override void Flush()
{
}

/// <inheritdoc />
public override int Read(byte[] buffer,
int offset,
int count)
=> Read(buffer.AsSpan(offset,
count));

/// <summary>
/// Reads a sequence of bytes from the current stream and advances the position within the stream by the number of
/// bytes read.
/// </summary>
/// <param name="span">
/// An array of bytes. When this method returns, the buffer contains the specified byte array with the
/// values replaced by the bytes read from the current source.
/// </param>
/// <returns>
/// The total number of bytes read into the buffer. This can be less than the number of bytes requested if that
/// many bytes are not currently available, or zero (0) if the end of the stream has been reached.
/// </returns>
/// <exception cref="T:System.ArgumentNullException"><paramref name="span">buffer</paramref> is null.</exception>
/// <exception cref="T:System.IO.IOException">An I/O error occurs.</exception>
/// <exception cref="T:System.ObjectDisposedException">Methods were called after the stream was closed.</exception>
public int Read(Span<byte> span)
{
if (enumerator_.Current.Length == 0)
// More data is needed
while (enumerator_.Current.Length == inputOffset_)
{
enumerator_.MoveNextAsync()
.GetAwaiter()
.GetResult();
inputOffset_ = 0;
if (!enumerator_.MoveNextAsync()
.GetAwaiter()
.GetResult())
{
return 0;
}
}

// Write only what is asked for
var length = Math.Min(span.Length,
enumerator_.Current.Length - inputOffset_);
enumerator_.Current.Slice(inputOffset_,
Expand All @@ -78,6 +131,7 @@ public int Read(Span<byte> span)
return length;
}

/// <inheritdoc />
public override Task<int> ReadAsync(byte[] buffer,
int offset,
int count,
Expand All @@ -87,9 +141,29 @@ public override Task<int> ReadAsync(byte[] buffer,
cancellationToken)
.AsTask();


/// <summary>
/// Asynchronously reads a sequence of bytes from the current stream, advances the position within the stream by
/// the number of bytes read, and monitors cancellation requests.
/// </summary>
/// <param name="buffer">The buffer to write the data into.</param>
/// <param name="cancellationToken">
/// The token to monitor for cancellation requests. The default value is
/// <see cref="P:System.Threading.CancellationToken.None"></see>.
/// </param>
/// <returns>
/// A task that represents the asynchronous read operation. The value of the
/// <paramref name="TResult">TResult</paramref> parameter contains the total number of bytes read into the buffer. The
/// result value can be less than the number of bytes requested if the number of bytes currently available is less than
/// the requested number, or it can be 0 (zero) if the end of the stream has been reached.
/// </returns>
/// <exception cref="T:System.ArgumentNullException"><paramref name="buffer">buffer</paramref> is null.</exception>
/// <exception cref="T:System.ObjectDisposedException">The stream has been disposed.</exception>
/// <exception cref="T:System.InvalidOperationException">The stream is currently in use by a previous read operation.</exception>
public async ValueTask<int> ReadAsync(Memory<byte> buffer,
CancellationToken cancellationToken = default)
{
// More data is needed
while (enumerator_.Current.Length == inputOffset_)
{
inputOffset_ = 0;
Expand All @@ -100,6 +174,7 @@ public async ValueTask<int> ReadAsync(Memory<byte> buffer,
}
}

// Write only what is asked for
var length = Math.Min(buffer.Length,
enumerator_.Current.Length - inputOffset_);
enumerator_.Current.Slice(inputOffset_,
Expand All @@ -111,35 +186,19 @@ public async ValueTask<int> ReadAsync(Memory<byte> buffer,
return length;
}


/// <inheritdoc />
public override long Seek(long offset,
SeekOrigin origin)
=> throw new NotImplementedException();

/// <inheritdoc />
public override void SetLength(long value)
=> throw new NotImplementedException();

/// <inheritdoc />
public override void Write(byte[] buffer,
int offset,
int count)
=> throw new NotImplementedException();

public override bool CanRead
=> true;

public override bool CanSeek
=> false;

public override bool CanWrite
=> false;

public override long Length
=> 0;

public override long Position
{
get => 0;
set => throw new NotImplementedException();
}
}
}
4 changes: 2 additions & 2 deletions Api/csharp/ArmoniK.Api.Client/Submitter/SubmitterClientExt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private static async IAsyncEnumerable<CreateLargeTaskRequest> ToRequestStream(th
}

/// <summary>
/// Get result without streaming
/// Get result as a stream
/// </summary>
/// <param name="client">gRPC client to the Submitter</param>
/// <param name="resultRequest">Request for result</param>
Expand All @@ -203,7 +203,7 @@ public static Task<Stream> GetResultAsStreamAsync(this gRPC.V1.Submitter.Submitt


/// <summary>
/// Get result without streaming
/// Get result from an AsyncEnumerable
/// </summary>
/// <param name="client">gRPC client to the Submitter</param>
/// <param name="resultRequest">Request for result</param>
Expand Down