From 46cab83147a21fe5a7a5bc73d3010c07f13ca374 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Thu, 30 Mar 2023 08:21:26 +0200 Subject: [PATCH] feat: Add alternative methods to create tasks and results separately --- Protos/V1/agent_common.proto | 153 ++++++++++++++++-- Protos/V1/agent_service.proto | 21 +++ Protos/V1/objects.proto | 11 +- Protos/V1/results_common.proto | 141 +++++++++++++++- Protos/V1/results_service.proto | 32 ++++ Protos/V1/submitter_common.proto | 7 +- Protos/V1/tasks_common.proto | 49 ++++++ Protos/V1/tasks_service.proto | 10 ++ .../ArmoniK.Api.Client/ResultsClientExt.cs | 129 +++++++++++++++ .../ArmoniK.Api.Worker/Worker/ITaskHandler.cs | 40 +++++ .../ArmoniK.Api.Worker/Worker/TaskHandler.cs | 82 ++++++++++ 11 files changed, 652 insertions(+), 23 deletions(-) create mode 100644 packages/csharp/ArmoniK.Api.Client/ResultsClientExt.cs diff --git a/Protos/V1/agent_common.proto b/Protos/V1/agent_common.proto index c3dcb1cd1..e5b5ae66e 100644 --- a/Protos/V1/agent_common.proto +++ b/Protos/V1/agent_common.proto @@ -2,7 +2,9 @@ syntax = "proto3"; package armonik.api.grpc.v1.agent; +import "google/protobuf/timestamp.proto"; import "objects.proto"; +import "result_status.proto"; option csharp_namespace = "ArmoniK.Api.gRPC.V1.Agent"; @@ -16,14 +18,15 @@ message CreateTaskRequest { DataChunk task_payload = 3; } - string communication_token = 4; + string communication_token = 4; /** Communication token received by the worker during task processing */ } message CreateTaskReply { message TaskInfo { - string task_id = 1; - repeated string expected_output_keys = 2; - repeated string data_dependencies = 3; + string task_id = 1; /** The task ID. */ + repeated string expected_output_keys = 2; /** The expected output IDs. A task have expected output IDs. */ + repeated string data_dependencies = 3; /** The data dependencies IDs (inputs). A task have data dependencies. */ + string payload_id = 4; /** Unique ID of the result that will be used as payload. Results are created implicitly. */ } message CreationStatus { @@ -41,11 +44,11 @@ message CreateTaskReply { CreationStatusList creation_status_list = 1; string error = 2; } - string communication_token = 4; + string communication_token = 4; /** Communication token received by the worker during task processing */ } message DataRequest { - string communication_token = 1; + string communication_token = 1; /** Communication token received by the worker during task processing */ string key = 2; } @@ -57,7 +60,7 @@ message DataReply { string error = 3; } } - string communication_token = 1; + string communication_token = 1; /** Communication token received by the worker during task processing */ oneof type { Init init = 2; DataChunk data = 3; @@ -70,13 +73,145 @@ message Result { InitKeyedDataStream init = 1; DataChunk data = 2; } - string communication_token = 3; + string communication_token = 3; /** Communication token received by the worker during task processing */ } message ResultReply { - string communication_token = 3; + string communication_token = 3; /** Communication token received by the worker during task processing */ oneof type { Empty Ok = 1; string Error = 2; } } + +/* +* Request for creating results without data +*/ +message CreateResultsMetaDataRequest { + /** + * A result to create. + */ + message ResultCreate { + string name = 1; /** The result name. Given by the client. */ + } + repeated ResultCreate results = 1; /** The list of results to create. */ + string session_id = 2; /** The session in which create results. */ + string communication_token = 3; /** Communication token received by the worker during task processing */ +} + +/** +* Result metadata +*/ +message ResultMetaData { + string session_id = 1; /** The session ID. */ + string result_id = 2; /** The result ID. */ + string name = 3; /** The result name. */ + result_status.ResultStatus status = 4; /** The result status. */ + google.protobuf.Timestamp created_at = 5; /** The result creation date. */ +} + +/* +* Response for creating results without data +*/ +message CreateResultsMetaDataResponse { + repeated ResultMetaData results = 1; /** The list of metadata results that were created. */ + string communication_token = 2; /** Communication token received by the worker during task processing */ +} + +/** +* Request to create tasks. +*/ +message SubmitTasksRequest { + message TaskCreation { + repeated string expected_output_keys = 1; /** Unique ID of the results that will be produced by the task. Results are created using ResultsService. */ + repeated string data_dependencies = 2; /** Unique ID of the results that will be used as datadependencies. Results are created using ResultsService. */ + string payload_id = 3; /** Unique ID of the result that will be used as payload. Results are created using ResultsService. */ + TaskOptions task_options = 4; /** Optionnal task options. */ + } + + string session_id = 1; /** The session ID. */ + TaskOptions task_options = 2; /** The options for the tasks. Each task will have the same. Options are merged with the one from the session. */ + repeated TaskCreation task_creations = 3; /** Task creation requests. */ + string communication_token = 4; /** Communication token received by the worker during task processing */ +} + +/** +* Response to create tasks. +* +* expected_output_ids and data_dependencies must be created through ResultsService. +* +* Remark : this may have to be enriched to a better management of errors but +* will the client application be able to manage a missing data dependency or expected output ? +*/ +message SubmitTasksResponse { + message TaskInfo { + string task_id = 1; /** The task ID. */ + repeated string expected_output_ids = 2; /** The expected output IDs. A task has expected output IDs. */ + repeated string data_dependencies = 3; /** The data dependencies IDs (inputs). A task has data dependencies. */ + string payload_id = 4; /** Unique ID of the result that will be used as payload. Results are created implicitly. */ + } + + repeated TaskInfo task_infos = 1; /** List of task infos if submission successful, else throw gRPC exception. */ + string communication_token = 2; /** Communication token received by the worker during task processing */ +} + +/* +* Request for creating results without data +*/ +message CreateResultsRequest { + /** + * A result to create. + */ + message ResultCreate { + bytes data = 1; /** The actual data of the result. */ + string name = 2; /** The result name. Given by the client. */ + } + repeated ResultCreate results = 1; /** The results to create. */ + string session_id = 2; /** The session in which create results. */ + string communication_token = 3; /** Communication token received by the worker during task processing */ +} + +/* +* Response for creating results without data +*/ +message CreateResultsResponse { + repeated ResultMetaData results = 1; /** The raw results that were created. */ + string communication_token = 2; /** Communication token received by the worker during task processing */ +} + +/* +* Request for uploading results data through stream. +* Data must be sent in multiple chunks. +* Only one result can be uploaded. +*/ +message UploadResultDataRequest { + /** + * The metadata to identify the result to update. + */ + message ResultIdentifier { + string session_id = 1; /** The session of the result. */ + string result_id = 2; /** The ID of the result. */ + } + + /** + * The possible messages that constitute a UploadResultDataRequest + * They should be sent in the following order: + * - id + * - data_chunk (stream can have multiple data_chunk messages that represent data divided in several parts) + * + * Data chunk cannot exceed the size returned by the GetServiceConfiguration rpc method + */ + oneof type { + ResultIdentifier id = 1; /** The identifier of the result to which add data. */ + bytes data_chunk = 2; /** A chunk of data. */ + } + string communication_token = 4; /** Communication token received by the worker during task processing */ +} + +/* +* Response for uploading data with stream for result +*/ +message UploadResultDataResponse { + string result_id = 1; /** The Id of the result to which data were added */ + string communication_token = 2; /** Communication token received by the worker during task processing */ +} diff --git a/Protos/V1/agent_service.proto b/Protos/V1/agent_service.proto index 26f380143..7badffc2b 100644 --- a/Protos/V1/agent_service.proto +++ b/Protos/V1/agent_service.proto @@ -7,6 +7,27 @@ import "agent_common.proto"; option csharp_namespace = "ArmoniK.Api.gRPC.V1.Agent"; service Agent { + /** + * Create the metadata of multiple results at once + * Data have to be uploaded separately + */ + rpc CreateResultsMetaData(CreateResultsMetaDataRequest) returns (CreateResultsMetaDataResponse) {} + + /** + * Create one result with data included in the request + */ + rpc CreateResults(CreateResultsRequest) returns (CreateResultsResponse) {} + + /** + * Upload data for result with stream + */ + rpc UploadResultData(stream UploadResultDataRequest) returns (UploadResultDataResponse) {} + + /** + * Create tasks metadata and submit task for processing. + */ + rpc SubmitTasks(SubmitTasksRequest) returns (SubmitTasksResponse) {} + rpc CreateTask(stream CreateTaskRequest) returns (CreateTaskReply); rpc GetResourceData(DataRequest) returns (stream DataReply); rpc GetCommonData(DataRequest) returns (stream DataReply); diff --git a/Protos/V1/objects.proto b/Protos/V1/objects.proto index 97922de45..cb937b120 100644 --- a/Protos/V1/objects.proto +++ b/Protos/V1/objects.proto @@ -41,9 +41,10 @@ message Output { } message TaskRequest { - repeated string expected_output_keys = 1; - repeated string data_dependencies = 2; - bytes payload = 3; + repeated string expected_output_keys = 1; /** Given names to the expected outputs that will be created implicitly. IDs are returned after task creation */ + repeated string data_dependencies = 2; /** IDs of the results that will be used as data dependency. */ + bytes payload = 3; /** Content of the payload for the task. */ + string payload_name = 4; /** Name that will be associated to the result created for the payload. Optionnal */ } message InitKeyedDataStream { @@ -61,8 +62,8 @@ message DataChunk { } message TaskRequestHeader { - repeated string expected_output_keys = 1; - repeated string data_dependencies = 2; + repeated string expected_output_keys = 1; /** Given names to the expected outputs that will be created implicitly. IDs are returned after task creation */ + repeated string data_dependencies = 2; /** IDs of the results that will be used as data dependency. */ } message InitTaskRequest { diff --git a/Protos/V1/results_common.proto b/Protos/V1/results_common.proto index e9f84cf32..9fd960156 100644 --- a/Protos/V1/results_common.proto +++ b/Protos/V1/results_common.proto @@ -14,10 +14,13 @@ option csharp_namespace = "ArmoniK.Api.gRPC.V1.Results"; */ message ResultRaw { string session_id = 1; /** The session ID. */ - string name = 2; /** The result name. */ + string name = 2; /** The result name. Given by the client. */ string owner_task_id = 3; /** The owner task ID. */ result_status.ResultStatus status = 4; /** The result status. */ google.protobuf.Timestamp created_at = 5; /** The result creation date. */ + google.protobuf.Timestamp completed_at = 6; /** The result completion date. */ + google.protobuf.Timestamp wait_for_data_at = 7; /** The result passing to status wait_for_data date. */ + string result_id = 8; /** The result ID. Uniquely generated by the server. */ } /** @@ -41,6 +44,11 @@ message ListResultsRequest { result_status.ResultStatus status = 4; /** The result status. */ google.protobuf.Timestamp created_after = 5; /** Use the creation date of a result to filter results created after the input. */ google.protobuf.Timestamp created_before = 6; /** Use the creation date of a result to filter results created before the input. */ + google.protobuf.Timestamp completed_after = 7; /** Use the creation date of a result to filter results completed after the input. */ + google.protobuf.Timestamp completed_before = 8; /** Use the creation date of a result to filter results completed before the input. */ + google.protobuf.Timestamp wait_for_data_after = 9; /** Use the creation date of a result to filter results passed in status wait_for_data after the input. */ + google.protobuf.Timestamp wait_for_data_before = 10; /** Use the creation date of a result to filter results passed in status wait_for_data before the input. */ + string result_id = 11; /** The result ID. Uniquely generated by the server. */ } /** @@ -60,6 +68,7 @@ message ListResultsRequest { ORDER_BY_FIELD_OWNER_TASK_ID = 3; /** The owner task ID. */ ORDER_BY_FIELD_STATUS = 4; /** The result status. */ ORDER_BY_FIELD_CREATED_AT = 5; /** The result creation date. */ + ORDER_BY_FIELD_RESULT_ID = 6; /** The result ID. */ } /** @@ -104,8 +113,8 @@ message ListResultsResponse { * Request for getting the id of the task that should create this result */ message GetOwnerTaskIdRequest { - string session_id = 1; - repeated string result_id = 2; + string session_id = 1; /** The session ID. */ + repeated string result_id = 2; /** The list of result ID/name. */ } /* @@ -113,9 +122,129 @@ message GetOwnerTaskIdRequest { */ message GetOwnerTaskIdResponse { message MapResultTask { - string result_id = 1; - string task_id = 2; + string result_id = 1; /** The result ID/name. */ + string task_id = 2; /** The owner task ID associated to the result. */ } repeated MapResultTask result_task = 1; - string session_id = 2; + string session_id = 2; /** The session ID. */ +} + +/* +* Request for creating results without data +*/ +message CreateResultsMetaDataRequest { + /** + * A result to create. + */ + message ResultCreate { + string name = 1; /** The result name. Given by the client. */ + } + repeated ResultCreate results = 1; /** The list of results to create. */ + string session_id = 2; /** The session in which create results. */ +} + +/* +* Response for creating results without data +*/ +message CreateResultsMetaDataResponse { + repeated ResultRaw results = 1; /** The list of raw results that were created. */ +} + +/* +* Request for creating results without data +*/ +message CreateResultsRequest { + /** + * A result to create. + */ + message ResultCreate { + bytes data = 1; /** The actual data of the result. */ + string name = 2; /** The result name. Given by the client. */ + } + repeated ResultCreate results = 1; /** Results to create. */ + string session_id = 2; /** The session in which create results. */ +} + +/* +* Response for creating results without data +*/ +message CreateResultsResponse { + repeated ResultRaw results = 1; /** The raw results that were created. */ +} + +/* +* Request for uploading results data through stream. +* Data must be sent in multiple chunks. +* Only one result can be uploaded. +*/ +message UploadResultDataRequest { + /** + * The metadata to identify the result to update. + */ + message ResultIdentifier { + string session_id = 1; /** The session of the result. */ + string result_id = 2; /** The ID of the result. */ + } + + /** + * The possible messages that constitute a UploadResultDataRequest + * They should be sent in the following order: + * - id + * - data_chunk (stream can have multiple data_chunk messages that represent data divided in several parts) + * + * Data chunk cannot exceed the size returned by the GetServiceConfiguration rpc method + */ + oneof type { + ResultIdentifier id = 1; /** The identifier of the result to which add data. */ + bytes data_chunk = 2; /** A chunk of data. */ + } +} + +/* +* Response for creating results without data +*/ +message UploadResultDataResponse { + ResultRaw result = 1; /** The metadata of the updated result that was updated. */ +} + +/* +* Response for obtaining results service configuration +*/ +message ResultsServiceConfigurationResponse { + int32 data_chunk_max_size = 1; /** Maximum size supported by a data chunk for the result service*/ +} + +/* +* Request for getting a result +*/ +message DownloadResultDataRequest { + string session_id = 1; /** The session of the result. */ + string result_id = 2; /** The ID of the result. */ +} + +/* +* Response for creating results without data +*/ +message DownloadResultDataResponse { + /** + * The possible messages that constitute a UploadResultDataRequest + * Get the data chunks of the result + */ + bytes data_chunk = 1; /** A chunk of data. */ +} + +/* +* Request deleting data from results results but keeping metadata +*/ +message DeleteResultsDataRequest { + string session_id = 1; /** The session of the results. */ + repeated string result_id = 2; /** The ID of the results to delete. */ +} + +/* +* Response deleting data from results results but keeping metadata +*/ +message DeleteResultsDataResponse { + string session_id = 1; /** The session of the results. */ + repeated string result_id = 2; /** The ID of the deleted results. */ } diff --git a/Protos/V1/results_service.proto b/Protos/V1/results_service.proto index 77bd8c7f6..e9e1e67e0 100644 --- a/Protos/V1/results_service.proto +++ b/Protos/V1/results_service.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package armonik.api.grpc.v1.results; +import "objects.proto"; import "results_common.proto"; option csharp_namespace = "ArmoniK.Api.gRPC.V1.Results"; @@ -19,4 +20,35 @@ service Results { * Get the id of the task that should produce the result */ rpc GetOwnerTaskId(GetOwnerTaskIdRequest) returns (GetOwnerTaskIdResponse); + + /** + * Create the metadata of multiple results at once + * Data have to be uploaded separately + */ + rpc CreateResultsMetaData(CreateResultsMetaDataRequest) returns (CreateResultsMetaDataResponse) {} + + /** + * Create one result with data included in the request + */ + rpc CreateResults(CreateResultsRequest) returns (CreateResultsResponse) {} + + /** + * Upload data for result with stream + */ + rpc UploadResultData(stream UploadResultDataRequest) returns (UploadResultDataResponse) {} + + /** + * Retrieve data + */ + rpc DownloadResultData(DownloadResultDataRequest) returns (stream DownloadResultDataResponse) {} + + /** + * Delete data from multiple results + */ + rpc DeleteResultsData(DeleteResultsDataRequest) returns (DeleteResultsDataResponse) {} + + /** + * Get the configuration of the service + */ + rpc GetServiceConfiguration(Empty) returns (ResultsServiceConfigurationResponse); } diff --git a/Protos/V1/submitter_common.proto b/Protos/V1/submitter_common.proto index cbe166c28..8d18b05fc 100644 --- a/Protos/V1/submitter_common.proto +++ b/Protos/V1/submitter_common.proto @@ -53,9 +53,10 @@ message CreateLargeTaskRequest { message CreateTaskReply { message TaskInfo { - string task_id = 1; - repeated string expected_output_keys = 2; - repeated string data_dependencies = 3; + string task_id = 1; /** Unique ID of the created task. */ + repeated string expected_output_keys = 2; /** Unique ID of the result that will be used as expected output. Results should already exist. */ + repeated string data_dependencies = 3; /** Unique ID of the result that will be used as data dependency. Results should already exist. */ + string payload_id = 4; /** Unique ID of the result that will be used as payload. Result associated to the payload is created implicitly. */ } message CreationStatus { diff --git a/Protos/V1/tasks_common.proto b/Protos/V1/tasks_common.proto index 9455199e0..7eb33f899 100644 --- a/Protos/V1/tasks_common.proto +++ b/Protos/V1/tasks_common.proto @@ -156,6 +156,20 @@ message ListTasksResponse { int32 total = 4; /** The total number of tasks. */ } +/** + * Response to list tasks. + * + * Use pagination, filtering and sorting from the request. + * Return a list of formated tasks. + */ +message ListTasksRawResponse { + repeated TaskRaw tasks = 1; /** The list of tasks. */ + + int32 page = 2; /** The page number. Start at 0. */ + int32 page_size = 3; /** The page size. */ + int32 total = 4; /** The total number of tasks. */ +} + /** * Request for getting a single task. */ @@ -221,3 +235,38 @@ message CountTasksByStatusRequest {} message CountTasksByStatusResponse { repeated StatusCount status = 1; /** Number of tasks by status. Expected to have only 1 objct by tasks status. */ } + +/** +* Request to create tasks. +*/ +message SubmitTasksRequest { + message TaskCreation { + repeated string expected_output_keys = 1; /** Unique ID of the results that will be produced by the task. Results should be created using ResultsService. */ + repeated string data_dependencies = 2; /** Unique ID of the results that will be used as datadependencies. Results should be created using ResultsService. */ + string payload_id = 3; /** Unique ID of the result that will be used as payload. Result should created using ResultsService. */ + TaskOptions task_options = 4; /** Optionnal task options. */ + } + + string session_id = 1; /** The session ID. */ + TaskOptions task_options = 2; /** The options for the tasks. Each task will have the same. Options are merged with the one from the session. */ + repeated TaskCreation task_creations = 3; /** Task creation requests. */ +} + +/** +* Response to create tasks. +* +* expected_output_ids and data_dependencies must be created through ResultsService. +* +* Remark : this may have to be enriched to a better management of errors but +* will the client application be able to manage a missing data dependency or expected output ? +*/ +message SubmitTasksResponse { + message TaskInfo { + string task_id = 1; /** The task ID. */ + repeated string expected_output_ids = 2; /** The expected output IDs. A task have expected output IDs. */ + repeated string data_dependencies = 3; /** The data dependencies IDs (inputs). A task have data dependencies. */ + string payload_id = 4; /** Unique ID of the result that will be used as payload. Result should created using ResultsService. */ + } + + repeated TaskInfo task_infos = 1; /** List of task infos if submission successful, else throw gRPC exception. */ +} diff --git a/Protos/V1/tasks_service.proto b/Protos/V1/tasks_service.proto index 3ffc9c2fb..abd79c8a4 100644 --- a/Protos/V1/tasks_service.proto +++ b/Protos/V1/tasks_service.proto @@ -18,6 +18,11 @@ service Tasks { */ rpc ListTasks(ListTasksRequest) returns (ListTasksResponse) {} + /** + * Get a tasks list using pagination, filters and sorting with complete metada. + */ + rpc ListTasksRaw(ListTasksRequest) returns (ListTasksRawResponse) {} + /** * Get a task by its id. */ @@ -37,4 +42,9 @@ service Tasks { * Get count from tasks status. */ rpc CountTasksByStatus(CountTasksByStatusRequest) returns (CountTasksByStatusResponse) {} + + /** + * Create tasks metadata and submit task for processing. + */ + rpc SubmitTasks(SubmitTasksRequest) returns (SubmitTasksResponse) {} } diff --git a/packages/csharp/ArmoniK.Api.Client/ResultsClientExt.cs b/packages/csharp/ArmoniK.Api.Client/ResultsClientExt.cs new file mode 100644 index 000000000..67af37fa7 --- /dev/null +++ b/packages/csharp/ArmoniK.Api.Client/ResultsClientExt.cs @@ -0,0 +1,129 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2022. All rights reserved. +// W. Kirschenmann +// J. Gurhem +// D. Dubuc +// L. Ziane Khodja +// F. Lemaitre +// S. Djebbar +// J. Fonseca +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +using ArmoniK.Api.gRPC.V1; +using ArmoniK.Api.gRPC.V1.Results; + +using Google.Protobuf; + +using JetBrains.Annotations; + +namespace ArmoniK.Api.Client +{ + /// + /// Extension to simplify usage + /// + [PublicAPI] + public static class ResultsClientExt + { + /// + /// Upload data to populate an existing result + /// + /// gRPC result client + /// The id of the session + /// The id of the result + /// The data to send + /// + /// The upload result response + /// + [PublicAPI] + public static async Task UploadResultData(this Results.ResultsClient client, + string sessionId, + string resultId, + byte[] data) + { + var configuration = await client.GetServiceConfigurationAsync(new Empty()); + + var stream = client.UploadResultData(); + + await stream.RequestStream.WriteAsync(new UploadResultDataRequest + { + Id = new UploadResultDataRequest.Types.ResultIdentifier + { + ResultId = resultId, + SessionId = sessionId, + }, + }) + .ConfigureAwait(false); + + var start = 0; + while (start < data.Length) + { + var chunkSize = Math.Min(configuration.DataChunkMaxSize, + data.Length - start); + + await stream.RequestStream.WriteAsync(new UploadResultDataRequest + { + DataChunk = UnsafeByteOperations.UnsafeWrap(data.AsMemory() + .Slice(start, + chunkSize)), + }) + .ConfigureAwait(false); + + start += chunkSize; + } + + await stream.RequestStream.CompleteAsync() + .ConfigureAwait(false); + + return await stream.ResponseAsync.ConfigureAwait(false); + } + + /// + /// Download a result + /// + /// gRPC result client + /// The id of the session + /// The id of the result + /// Token used to cancel the execution of the method + /// + /// A byte array containing the data associated to the result + /// + [PublicAPI] + public static async Task DownloadResultData(this Results.ResultsClient client, + string sessionId, + string resultId, + CancellationToken cancellationToken) + { + var stream = client.DownloadResultData(new DownloadResultDataRequest + { + ResultId = resultId, + SessionId = sessionId, + }); + + var result = new List(); + + while (await stream.ResponseStream.MoveNext(cancellationToken)) + { + result.AddRange(stream.ResponseStream.Current.DataChunk.ToByteArray()); + } + + return result.ToArray(); + } + } +} diff --git a/packages/csharp/ArmoniK.Api.Worker/Worker/ITaskHandler.cs b/packages/csharp/ArmoniK.Api.Worker/Worker/ITaskHandler.cs index 6bff4609c..91abcef06 100644 --- a/packages/csharp/ArmoniK.Api.Worker/Worker/ITaskHandler.cs +++ b/packages/csharp/ArmoniK.Api.Worker/Worker/ITaskHandler.cs @@ -111,4 +111,44 @@ Task CreateTasksAsync(IEnumerable tasks, /// Task SendResult(string key, byte[] data); + + /// + /// Create results metadata + /// + /// The collection of results to be created + /// + /// The result creation response + /// + Task CreateResultsMetaDataAsync(IEnumerable results); + + /// + /// Submit tasks with existing payloads (results) + /// + /// The requests to create tasks + /// optional tasks for the whole submission + /// + /// The task submission response + /// + Task SubmitTasksAsync(IEnumerable taskCreations, + TaskOptions? submissionTaskOptions); + + /// + /// Create results from metadata and data in an unique request + /// + /// The results to create + /// + /// The task submission response + /// + Task CreateResultsAsync(IEnumerable results); + + /// + /// Upload data to an existing result + /// + /// The result Id + /// The data to submit for the given result + /// + /// The upload data response + /// + Task UploadResultData(string key, + byte[] data); } diff --git a/packages/csharp/ArmoniK.Api.Worker/Worker/TaskHandler.cs b/packages/csharp/ArmoniK.Api.Worker/Worker/TaskHandler.cs index 93d4c698b..62158608a 100644 --- a/packages/csharp/ArmoniK.Api.Worker/Worker/TaskHandler.cs +++ b/packages/csharp/ArmoniK.Api.Worker/Worker/TaskHandler.cs @@ -135,6 +135,20 @@ public Task RequestCommonData(string key) public Task RequestDirectData(string key) => throw new NotImplementedException(); + /// + public async Task CreateResultsMetaDataAsync(IEnumerable results) + => await client_.CreateResultsMetaDataAsync(new CreateResultsMetaDataRequest + { + CommunicationToken = Token, + Results = + { + results, + }, + SessionId = sessionId_, + }) + .ConfigureAwait(false); + + /// public async Task SendResult(string key, byte[] data) @@ -206,6 +220,74 @@ await stream.RequestStream.CompleteAsync() public ValueTask DisposeAsync() => ValueTask.CompletedTask; + /// + public async Task SubmitTasksAsync(IEnumerable taskCreations, + TaskOptions? submissionTaskOptions) + => await client_.SubmitTasksAsync(new SubmitTasksRequest + { + CommunicationToken = Token, + SessionId = sessionId_, + TaskCreations = + { + taskCreations, + }, + TaskOptions = submissionTaskOptions, + }) + .ConfigureAwait(false); + + /// + public async Task CreateResultsAsync(IEnumerable results) + => await client_.CreateResultsAsync(new CreateResultsRequest + { + CommunicationToken = Token, + SessionId = sessionId_, + Results = + { + results, + }, + }) + .ConfigureAwait(false); + + public async Task UploadResultData(string key, + byte[] data) + { + var stream = client_.UploadResultData(); + + await stream.RequestStream.WriteAsync(new UploadResultDataRequest + { + Id = new UploadResultDataRequest.Types.ResultIdentifier + { + ResultId = key, + SessionId = sessionId_, + }, + CommunicationToken = Token, + }) + .ConfigureAwait(false); + + var start = 0; + while (start < data.Length) + { + var chunkSize = Math.Min(Configuration!.DataChunkMaxSize, + data.Length - start); + + await stream.RequestStream.WriteAsync(new UploadResultDataRequest + { + CommunicationToken = Token, + DataChunk = UnsafeByteOperations.UnsafeWrap(data.AsMemory() + .Slice(start, + chunkSize)), + }) + .ConfigureAwait(false); + + start += chunkSize; + } + + await stream.RequestStream.CompleteAsync() + .ConfigureAwait(false); + + return await stream.ResponseAsync.ConfigureAwait(false); + } + public static async Task Create(IAsyncStreamReader requestStream, Agent.AgentClient agentClient, ILoggerFactory loggerFactory,