Skip to content

Commit

Permalink
chore: add option in Bench to download results or not (#723)
Browse files Browse the repository at this point in the history
  • Loading branch information
aneojgurhem authored Aug 7, 2024
2 parents 290b66c + 01b5e56 commit 0bae9f0
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 44 deletions.
5 changes: 5 additions & 0 deletions Tests/Bench/Client/src/Options/BenchOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public class BenchOptions
/// </summary>
public bool PurgeData { get; set; } = true;

/// <summary>
/// Download results and check all results are properly retrieved
/// </summary>
public bool DownloadResults { get; set; } = true;

/// <summary>
/// Dictionary to put into task options
/// </summary>
Expand Down
91 changes: 47 additions & 44 deletions Tests/Bench/Client/src/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -478,60 +478,63 @@ await stream.RequestStream.CompleteAsync()

var resultsAvailable = Stopwatch.GetTimestamp();

var countRes = 0;
if (benchOptions.DownloadResults)
{
var countRes = 0;

await results.ParallelForEach(new ParallelTaskOptions(benchOptions.DegreeOfParallelism),
async resultId =>
{
for (var i = 0; i < benchOptions.MaxRetries; i++)
await results.ParallelForEach(new ParallelTaskOptions(benchOptions.DegreeOfParallelism),
async resultId =>
{
await using var channel = await channelPool.GetAsync(CancellationToken.None)
.ConfigureAwait(false);
try
for (var i = 0; i < benchOptions.MaxRetries; i++)
{
var client = new Results.ResultsClient(channel);
await using var channel = await channelPool.GetAsync(CancellationToken.None)
.ConfigureAwait(false);
try
{
var client = new Results.ResultsClient(channel);

var result = await client.DownloadResultData(createSessionReply.SessionId,
resultId,
CancellationToken.None)
.ConfigureAwait(false);

var result = await client.DownloadResultData(createSessionReply.SessionId,
resultId,
CancellationToken.None)
.ConfigureAwait(false);
// A good a way to process results would be to process them individually as soon as they are
// retrieved. They may be stored in a ConcurrentBag or a ConcurrentDictionary but you need to
// be careful to not overload your memory. If you need to retrieve a lot of results to apply
// post-processing on, consider doing so with sub-tasking so that the client-side application
// has to do less work.

// A good a way to process results would be to process them individually as soon as they are
// retrieved. They may be stored in a ConcurrentBag or a ConcurrentDictionary but you need to
// be careful to not overload your memory. If you need to retrieve a lot of results to apply
// post-processing on, consider doing so with sub-tasking so that the client-side application
// has to do less work.
if (result.Length != benchOptions.ResultSize * 1024)
{
logger.LogInformation("Received length {received}, expected length {expected}",
result.Length,
benchOptions.ResultSize * 1024);
throw new InvalidOperationException("The result size from the task should have the same size as the one specified");
}

if (result.Length != benchOptions.ResultSize * 1024)
Interlocked.Increment(ref countRes);
// If successful, return
return;
}
catch (RpcException e) when (e.StatusCode == StatusCode.Unavailable)
{
logger.LogInformation("Received length {received}, expected length {expected}",
result.Length,
benchOptions.ResultSize * 1024);
throw new InvalidOperationException("The result size from the task should have the same size as the one specified");
logger.LogWarning(e,
"Error during result retrieving, retrying to get {resultId}",
resultId);
}

Interlocked.Increment(ref countRes);
// If successful, return
return;
}
catch (RpcException e) when (e.StatusCode == StatusCode.Unavailable)
{
logger.LogWarning(e,
"Error during result retrieving, retrying to get {resultId}",
resultId);
}
}

// in this case, retries are all made so we need to tell that it did not work
throw new InvalidOperationException("Too many retries");
})
.ConfigureAwait(false);

logger.LogInformation("Results retrieved {number}",
countRes);
if (countRes != results.Count)
{
throw new InvalidOperationException("All results were not retrieved");
// in this case, retries are all made so we need to tell that it did not work
throw new InvalidOperationException("Too many retries");
})
.ConfigureAwait(false);

logger.LogInformation("Results retrieved {number}",
countRes);
if (countRes != results.Count)
{
throw new InvalidOperationException("All results were not retrieved");
}
}

var resultsReceived = Stopwatch.GetTimestamp();
Expand Down

0 comments on commit 0bae9f0

Please sign in to comment.