Skip to content

Commit

Permalink
fix: Abort worker upon cancel (#493)
Browse files Browse the repository at this point in the history
  • Loading branch information
ngruelaneo authored Apr 15, 2024
2 parents 481f425 + d425fea commit 2ed7fd3
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 37 deletions.
7 changes: 7 additions & 0 deletions packages/csharp/ArmoniK.Api.Common/Options/ComputePlane.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System;

using JetBrains.Annotations;

namespace ArmoniK.Api.Common.Options;
Expand Down Expand Up @@ -50,4 +52,9 @@ public class ComputePlane
/// Number of messages retrieved from the queue by the Agent
/// </summary>
public int MessageBatchSize { get; set; } = 1;

/// <summary>
/// Time to wait upon cancellation before aborting the worker
/// </summary>
public TimeSpan AbortAfter { get; set; } = TimeSpan.FromSeconds(1);
}
2 changes: 2 additions & 0 deletions packages/csharp/ArmoniK.Api.Tests/WorkerServerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,10 @@ public Task BuildServerAddService()
public class TestService : WorkerStreamWrapper
{
public TestService([NotNull] ILoggerFactory loggerFactory,
[NotNull] ComputePlane computePlane,
[NotNull] GrpcChannelProvider provider)
: base(loggerFactory,
computePlane,
provider)
{
}
Expand Down
59 changes: 50 additions & 9 deletions packages/csharp/ArmoniK.Api.Worker/Worker/ITaskHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

using ArmoniK.Api.gRPC.V1;
Expand Down Expand Up @@ -78,69 +79,109 @@ public interface ITaskHandler : IAsyncDisposable
/// </summary>
/// <param name="tasks">Lists the tasks to submit</param>
/// <param name="taskOptions">The task options. If no value is provided, will use the default session options</param>
/// <param name="cancellationToken">
/// Token used to cancel the execution of the method.
/// If null, the cancellation token of the task handler is used
/// </param>
/// <returns></returns>
Task<CreateTaskReply> CreateTasksAsync(IEnumerable<TaskRequest> tasks,
TaskOptions? taskOptions = null);
TaskOptions? taskOptions = null,
CancellationToken? cancellationToken = null);

/// <summary>
/// NOT IMPLEMENTED
/// This method is used to retrieve data available system-wide.
/// </summary>
/// <param name="key">The data key identifier</param>
/// <param name="cancellationToken">
/// Token used to cancel the execution of the method.
/// If null, the cancellation token of the task handler is used
/// </param>
/// <returns></returns>
Task<byte[]> RequestResource(string key);
Task<byte[]> RequestResource(string key,
CancellationToken? cancellationToken = null);

/// <summary>
/// NOT IMPLEMENTED
/// This method is used to retrieve data provided when creating the session.
/// </summary>
/// <param name="key">The da ta key identifier</param>
/// <param name="cancellationToken">
/// Token used to cancel the execution of the method.
/// If null, the cancellation token of the task handler is used
/// </param>
/// <returns></returns>
Task<byte[]> RequestCommonData(string key);
Task<byte[]> RequestCommonData(string key,
CancellationToken? cancellationToken = null);

/// <summary>
/// NOT IMPLEMENTED
/// This method is used to retrieve data directly from the submission client.
/// </summary>
/// <param name="key"></param>
/// <param name="cancellationToken">
/// Token used to cancel the execution of the method.
/// If null, the cancellation token of the task handler is used
/// </param>
/// <returns></returns>
Task<byte[]> RequestDirectData(string key);
Task<byte[]> RequestDirectData(string key,
CancellationToken? cancellationToken = null);

/// <summary>
/// Send the results computed by the task
/// </summary>
/// <param name="key">The key identifier of the result.</param>
/// <param name="data">The data corresponding to the result</param>
/// <param name="cancellationToken">
/// Token used to cancel the execution of the method.
/// If null, the cancellation token of the task handler is used
/// </param>
/// <returns></returns>
Task SendResult(string key,
byte[] data);
Task SendResult(string key,
byte[] data,
CancellationToken? cancellationToken = null);

/// <summary>
/// Create results metadata
/// </summary>
/// <param name="results">The collection of results to be created</param>
/// <param name="cancellationToken">
/// Token used to cancel the execution of the method.
/// If null, the cancellation token of the task handler is used
/// </param>
/// <returns>
/// The result creation response
/// </returns>
Task<CreateResultsMetaDataResponse> CreateResultsMetaDataAsync(IEnumerable<CreateResultsMetaDataRequest.Types.ResultCreate> results);
Task<CreateResultsMetaDataResponse> CreateResultsMetaDataAsync(IEnumerable<CreateResultsMetaDataRequest.Types.ResultCreate> results,
CancellationToken? cancellationToken = null);

/// <summary>
/// Submit tasks with existing payloads (results)
/// </summary>
/// <param name="taskCreations">The requests to create tasks</param>
/// <param name="submissionTaskOptions">optional tasks for the whole submission</param>
/// <param name="cancellationToken">
/// Token used to cancel the execution of the method.
/// If null, the cancellation token of the task handler is used
/// </param>
/// <returns>
/// The task submission response
/// </returns>
Task<SubmitTasksResponse> SubmitTasksAsync(IEnumerable<SubmitTasksRequest.Types.TaskCreation> taskCreations,
TaskOptions? submissionTaskOptions);
TaskOptions? submissionTaskOptions,
CancellationToken? cancellationToken = null);

/// <summary>
/// Create results from metadata and data in an unique request
/// </summary>
/// <param name="results">The results to create</param>
/// <param name="cancellationToken">
/// Token used to cancel the execution of the method.
/// If null, the cancellation token of the task handler is used
/// </param>
/// <returns>
/// The task submission response
/// </returns>
Task<CreateResultsResponse> CreateResultsAsync(IEnumerable<CreateResultsRequest.Types.ResultCreate> results);
Task<CreateResultsResponse> CreateResultsAsync(IEnumerable<CreateResultsRequest.Types.ResultCreate> results,
CancellationToken? cancellationToken = null);
}
41 changes: 27 additions & 14 deletions packages/csharp/ArmoniK.Api.Worker/Worker/TaskHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,17 @@ public TaskHandler(ProcessRequest processRequest,

/// <inheritdoc />
public async Task<CreateTaskReply> CreateTasksAsync(IEnumerable<TaskRequest> tasks,
TaskOptions? taskOptions = null)
TaskOptions? taskOptions = null,
CancellationToken? cancellationToken = null)
{
using var stream = client_.CreateTask();

foreach (var createLargeTaskRequest in tasks.ToRequestStream(taskOptions,
Token,
Configuration!.DataChunkMaxSize))
{
await stream.RequestStream.WriteAsync(createLargeTaskRequest)
await stream.RequestStream.WriteAsync(createLargeTaskRequest,
cancellationToken ?? cancellationToken_)
.ConfigureAwait(false);
}

Expand All @@ -211,19 +213,23 @@ await stream.RequestStream.CompleteAsync()
}

/// <inheritdoc />
public Task<byte[]> RequestResource(string key)
public Task<byte[]> RequestResource(string key,
CancellationToken? cancellationToken = null)
=> throw new NotImplementedException();

/// <inheritdoc />
public Task<byte[]> RequestCommonData(string key)
public Task<byte[]> RequestCommonData(string key,
CancellationToken? cancellationToken = null)
=> throw new NotImplementedException();

/// <inheritdoc />
public Task<byte[]> RequestDirectData(string key)
public Task<byte[]> RequestDirectData(string key,
CancellationToken? cancellationToken = null)
=> throw new NotImplementedException();

/// <inheritdoc />
public async Task<CreateResultsMetaDataResponse> CreateResultsMetaDataAsync(IEnumerable<CreateResultsMetaDataRequest.Types.ResultCreate> results)
public async Task<CreateResultsMetaDataResponse> CreateResultsMetaDataAsync(IEnumerable<CreateResultsMetaDataRequest.Types.ResultCreate> results,
CancellationToken? cancellationToken = null)
=> await client_.CreateResultsMetaDataAsync(new CreateResultsMetaDataRequest
{
CommunicationToken = Token,
Expand All @@ -232,13 +238,15 @@ public async Task<CreateResultsMetaDataResponse> CreateResultsMetaDataAsync(IEnu
results,
},
SessionId = SessionId,
})
},
cancellationToken: cancellationToken ?? cancellationToken_)
.ConfigureAwait(false);


/// <inheritdoc />
public async Task SendResult(string key,
byte[] data)
public async Task SendResult(string key,
byte[] data,
CancellationToken? cancellationToken = null)
{
await using (var fs = new FileStream(Path.Combine(folder_,
key),
Expand All @@ -259,7 +267,8 @@ await client_.NotifyResultDataAsync(new NotifyResultDataRequest
ResultId = key,
},
},
})
},
cancellationToken: cancellationToken ?? cancellationToken_)
.ConfigureAwait(false);
}

Expand All @@ -269,7 +278,8 @@ public ValueTask DisposeAsync()

/// <inheritdoc />
public async Task<SubmitTasksResponse> SubmitTasksAsync(IEnumerable<SubmitTasksRequest.Types.TaskCreation> taskCreations,
TaskOptions? submissionTaskOptions)
TaskOptions? submissionTaskOptions,
CancellationToken? cancellationToken = null)
=> await client_.SubmitTasksAsync(new SubmitTasksRequest
{
CommunicationToken = Token,
Expand All @@ -279,11 +289,13 @@ public async Task<SubmitTasksResponse> SubmitTasksAsync(IEnumerable<SubmitTasksR
taskCreations,
},
TaskOptions = submissionTaskOptions,
})
},
cancellationToken: cancellationToken ?? cancellationToken_)
.ConfigureAwait(false);

/// <inheritdoc />
public async Task<CreateResultsResponse> CreateResultsAsync(IEnumerable<CreateResultsRequest.Types.ResultCreate> results)
public async Task<CreateResultsResponse> CreateResultsAsync(IEnumerable<CreateResultsRequest.Types.ResultCreate> results,
CancellationToken? cancellationToken = null)
=> await client_.CreateResultsAsync(new CreateResultsRequest
{
CommunicationToken = Token,
Expand All @@ -292,6 +304,7 @@ public async Task<CreateResultsResponse> CreateResultsAsync(IEnumerable<CreateRe
{
results,
},
})
},
cancellationToken: cancellationToken ?? cancellationToken_)
.ConfigureAwait(false);
}
Loading

0 comments on commit 2ed7fd3

Please sign in to comment.