From 25242970c7848c5822e4f588c2791e20aa98a37c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Thu, 30 Mar 2023 08:21:26 +0200 Subject: [PATCH] feat: Add alternative methods to create tasks and results separately --- Protos/V1/agent_common.proto | 152 ++++++++++++++++-- Protos/V1/agent_service.proto | 21 +++ Protos/V1/objects.proto | 11 +- Protos/V1/result_status.proto | 1 + Protos/V1/results_common.proto | 140 +++++++++++++++- Protos/V1/results_service.proto | 32 ++++ Protos/V1/submitter_common.proto | 7 +- Protos/V1/tasks_common.proto | 49 ++++++ Protos/V1/tasks_service.proto | 10 ++ .../ArmoniK.Api.Worker/Worker/ITaskHandler.cs | 9 ++ .../ArmoniK.Api.Worker/Worker/TaskHandler.cs | 14 ++ 11 files changed, 423 insertions(+), 23 deletions(-) diff --git a/Protos/V1/agent_common.proto b/Protos/V1/agent_common.proto index c3dcb1cd1..9031e73a0 100644 --- a/Protos/V1/agent_common.proto +++ b/Protos/V1/agent_common.proto @@ -2,7 +2,9 @@ syntax = "proto3"; package armonik.api.grpc.v1.agent; +import "google/protobuf/timestamp.proto"; import "objects.proto"; +import "result_status.proto"; option csharp_namespace = "ArmoniK.Api.gRPC.V1.Agent"; @@ -16,14 +18,15 @@ message CreateTaskRequest { DataChunk task_payload = 3; } - string communication_token = 4; + string communication_token = 4; /** Communication token received by the worker during task processing */ } message CreateTaskReply { message TaskInfo { - string task_id = 1; - repeated string expected_output_keys = 2; - repeated string data_dependencies = 3; + string task_id = 1; /** The task ID. */ + repeated string expected_output_keys = 2; /** The expected output IDs. A task have expected output IDs. */ + repeated string data_dependencies = 3; /** The data dependencies IDs (inputs). A task have data dependencies. */ + string payload_id = 4; /** Unique ID of the result that will be used as payload. Results are created implicitly. */ } message CreationStatus { @@ -41,11 +44,11 @@ message CreateTaskReply { CreationStatusList creation_status_list = 1; string error = 2; } - string communication_token = 4; + string communication_token = 4; /** Communication token received by the worker during task processing */ } message DataRequest { - string communication_token = 1; + string communication_token = 1; /** Communication token received by the worker during task processing */ string key = 2; } @@ -57,7 +60,7 @@ message DataReply { string error = 3; } } - string communication_token = 1; + string communication_token = 1; /** Communication token received by the worker during task processing */ oneof type { Init init = 2; DataChunk data = 3; @@ -70,13 +73,144 @@ message Result { InitKeyedDataStream init = 1; DataChunk data = 2; } - string communication_token = 3; + string communication_token = 3; /** Communication token received by the worker during task processing */ } message ResultReply { - string communication_token = 3; + string communication_token = 3; /** Communication token received by the worker during task processing */ oneof type { Empty Ok = 1; string Error = 2; } } + +/* +* Request for creating results without data +*/ +message CreateResultsMetaDataRequest { + /** + * A result to create. + */ + message ResultCreate { + string name = 1; /** The result name. Given by the client. */ + } + repeated ResultCreate results = 1; /** The list of results to create. */ + string session_id = 2; /** The session in which create results. */ + string communication_token = 3; /** Communication token received by the worker during task processing */ +} + +/** +* Result metadata +*/ +message ResultMetaData { + string session_id = 1; /** The session ID. */ + string result_id = 2; /** The result ID. */ + string name = 3; /** The result name. */ + result_status.ResultStatus status = 4; /** The result status. */ + google.protobuf.Timestamp created_at = 5; /** The result creation date. */ +} + +/* +* Response for creating results without data +*/ +message CreateResultsMetaDataResponse { + repeated ResultMetaData results = 1; /** The list of metadata results that were created. */ + string communication_token = 2; /** Communication token received by the worker during task processing */ +} + +/** +* Request to create tasks. +*/ +message SubmitTasksRequest { + message TaskCreation { + repeated string expected_output_keys = 1; /** Unique ID of the results that will be produced by the task. Results are created using ResultsService. */ + repeated string data_dependencies = 2; /** Unique ID of the results that will be used as datadependencies. Results are created using ResultsService. */ + string payload_id = 3; /** Unique ID of the result that will be used as payload. Results are created using ResultsService. */ + TaskOptions task_options = 4; /** Optionnal task options. */ + } + + string session_id = 1; /** The session ID. */ + TaskOptions task_options = 2; /** The options for the tasks. Each task will have the same. Options are merged with the one from the session. */ + repeated TaskCreation task_creations = 3; /** Task creation requests. */ + string communication_token = 4; /** Communication token received by the worker during task processing */ +} + +/** +* Response to create tasks. +* +* expected_output_ids and data_dependencies must be created through ResultsService. +* +* Remark : this may have to be enriched to a better management of errors but +* will the client application be able to manage a missing data dependency or expected output ? +*/ +message SubmitTasksResponse { + message TaskInfo { + string task_id = 1; /** The task ID. */ + repeated string expected_output_ids = 2; /** The expected output IDs. A task has expected output IDs. */ + repeated string data_dependencies = 3; /** The data dependencies IDs (inputs). A task has data dependencies. */ + string payload_id = 4; /** Unique ID of the result that will be used as payload. Results are created implicitly. */ + } + + repeated TaskInfo task_infos = 1; /** List of task infos if submission successful, else throw gRPC exception. */ + string communication_token = 2; /** Communication token received by the worker during task processing */ +} + +/* +* Request for creating results without data +*/ +message CreateResultsRequest { + /** + * A result to create. + */ + message ResultCreate { + bytes data = 1; /** The actual data of the result. */ + string name = 2; /** The result name. Given by the client. */ + } + repeated ResultCreate results = 1; /** The results to create. */ + string session_id = 2; /** The session in which create results. */ + string communication_token = 3; /** Communication token received by the worker during task processing */ +} + +/* +* Response for creating results without data +*/ +message CreateResultsResponse { + repeated ResultMetaData results = 1; /** The raw results that were created. */ + string communication_token = 2; /** Communication token received by the worker during task processing */ +} + +/* +* Request for uploading results data through stream. +* Data must be sent in multiple chunks +*/ +message UploadResultDataRequest { + /** + * The metadata to identify the result to update. + */ + message ResultIdentifier { + string session_id = 1; /** The session of the result. */ + string result_id = 2; /** The ID of the result. */ + } + + /** + * The possible messages that constitute a UploadResultDataRequest + * They should be sent in the following order: + * - id + * - data_chunk (stream can have multiple data_chunk messages that represent data divided in several parts) + * + * Data chunk cannot exceed the size returned by the GetServiceConfiguration rpc method + */ + oneof type { + ResultIdentifier id = 1; /** The identifier of the result to which add data. */ + bytes data_chunk = 2; /** A chunk of data. */ + } + string communication_token = 4; /** Communication token received by the worker during task processing */ +} + +/* +* Response for uploading data with stream for result +*/ +message UploadResultDataResponse { + string result_id = 1; /** The Id of the result to which data were added */ + string communication_token = 2; /** Communication token received by the worker during task processing */ +} diff --git a/Protos/V1/agent_service.proto b/Protos/V1/agent_service.proto index 26f380143..7badffc2b 100644 --- a/Protos/V1/agent_service.proto +++ b/Protos/V1/agent_service.proto @@ -7,6 +7,27 @@ import "agent_common.proto"; option csharp_namespace = "ArmoniK.Api.gRPC.V1.Agent"; service Agent { + /** + * Create the metadata of multiple results at once + * Data have to be uploaded separately + */ + rpc CreateResultsMetaData(CreateResultsMetaDataRequest) returns (CreateResultsMetaDataResponse) {} + + /** + * Create one result with data included in the request + */ + rpc CreateResults(CreateResultsRequest) returns (CreateResultsResponse) {} + + /** + * Upload data for result with stream + */ + rpc UploadResultData(stream UploadResultDataRequest) returns (UploadResultDataResponse) {} + + /** + * Create tasks metadata and submit task for processing. + */ + rpc SubmitTasks(SubmitTasksRequest) returns (SubmitTasksResponse) {} + rpc CreateTask(stream CreateTaskRequest) returns (CreateTaskReply); rpc GetResourceData(DataRequest) returns (stream DataReply); rpc GetCommonData(DataRequest) returns (stream DataReply); diff --git a/Protos/V1/objects.proto b/Protos/V1/objects.proto index 97922de45..cb937b120 100644 --- a/Protos/V1/objects.proto +++ b/Protos/V1/objects.proto @@ -41,9 +41,10 @@ message Output { } message TaskRequest { - repeated string expected_output_keys = 1; - repeated string data_dependencies = 2; - bytes payload = 3; + repeated string expected_output_keys = 1; /** Given names to the expected outputs that will be created implicitly. IDs are returned after task creation */ + repeated string data_dependencies = 2; /** IDs of the results that will be used as data dependency. */ + bytes payload = 3; /** Content of the payload for the task. */ + string payload_name = 4; /** Name that will be associated to the result created for the payload. Optionnal */ } message InitKeyedDataStream { @@ -61,8 +62,8 @@ message DataChunk { } message TaskRequestHeader { - repeated string expected_output_keys = 1; - repeated string data_dependencies = 2; + repeated string expected_output_keys = 1; /** Given names to the expected outputs that will be created implicitly. IDs are returned after task creation */ + repeated string data_dependencies = 2; /** IDs of the results that will be used as data dependency. */ } message InitTaskRequest { diff --git a/Protos/V1/result_status.proto b/Protos/V1/result_status.proto index d041a6294..3ca5e19cc 100644 --- a/Protos/V1/result_status.proto +++ b/Protos/V1/result_status.proto @@ -9,6 +9,7 @@ enum ResultStatus { RESULT_STATUS_CREATED = 1; /** Result is created and task is created, submitted or dispatched. */ RESULT_STATUS_COMPLETED = 2; /** Result is completed with a completed task. */ RESULT_STATUS_ABORTED = 3; /** Result is aborted. */ + RESULT_STATUS_WAIT_FOR_DATA = 4; /** Result is created and needs data to be added. */ /** NOTFOUND is encoded as 127 to make it small while still leaving enough room for future status extensions * diff --git a/Protos/V1/results_common.proto b/Protos/V1/results_common.proto index e9f84cf32..3c31c23c8 100644 --- a/Protos/V1/results_common.proto +++ b/Protos/V1/results_common.proto @@ -14,10 +14,13 @@ option csharp_namespace = "ArmoniK.Api.gRPC.V1.Results"; */ message ResultRaw { string session_id = 1; /** The session ID. */ - string name = 2; /** The result name. */ + string name = 2; /** The result name. Given by the client. */ string owner_task_id = 3; /** The owner task ID. */ result_status.ResultStatus status = 4; /** The result status. */ google.protobuf.Timestamp created_at = 5; /** The result creation date. */ + google.protobuf.Timestamp completed_at = 6; /** The result completion date. */ + google.protobuf.Timestamp wait_for_data_at = 7; /** The result passing to status wait_for_data date. */ + string result_id = 8; /** The result ID. Uniquely generated by the server. */ } /** @@ -41,6 +44,11 @@ message ListResultsRequest { result_status.ResultStatus status = 4; /** The result status. */ google.protobuf.Timestamp created_after = 5; /** Use the creation date of a result to filter results created after the input. */ google.protobuf.Timestamp created_before = 6; /** Use the creation date of a result to filter results created before the input. */ + google.protobuf.Timestamp completed_after = 7; /** Use the creation date of a result to filter results completed after the input. */ + google.protobuf.Timestamp completed_before = 8; /** Use the creation date of a result to filter results completed before the input. */ + google.protobuf.Timestamp wait_for_data_after = 9; /** Use the creation date of a result to filter results passed in status wait_for_data after the input. */ + google.protobuf.Timestamp wait_for_data_before = 10; /** Use the creation date of a result to filter results passed in status wait_for_data before the input. */ + string result_id = 11; /** The result ID. Uniquely generated by the server. */ } /** @@ -60,6 +68,7 @@ message ListResultsRequest { ORDER_BY_FIELD_OWNER_TASK_ID = 3; /** The owner task ID. */ ORDER_BY_FIELD_STATUS = 4; /** The result status. */ ORDER_BY_FIELD_CREATED_AT = 5; /** The result creation date. */ + ORDER_BY_FIELD_RESULT_ID = 6; /** The result ID. */ } /** @@ -104,8 +113,8 @@ message ListResultsResponse { * Request for getting the id of the task that should create this result */ message GetOwnerTaskIdRequest { - string session_id = 1; - repeated string result_id = 2; + string session_id = 1; /** The session ID. */ + repeated string result_id = 2; /** The list of result ID/name. */ } /* @@ -113,9 +122,128 @@ message GetOwnerTaskIdRequest { */ message GetOwnerTaskIdResponse { message MapResultTask { - string result_id = 1; - string task_id = 2; + string result_id = 1; /** The result ID/name. */ + string task_id = 2; /** The owner task ID associated to the result. */ } repeated MapResultTask result_task = 1; - string session_id = 2; + string session_id = 2; /** The session ID. */ +} + +/* +* Request for creating results without data +*/ +message CreateResultsMetaDataRequest { + /** + * A result to create. + */ + message ResultCreate { + string name = 1; /** The result name. Given by the client. */ + } + repeated ResultCreate results = 1; /** The list of results to create. */ + string session_id = 2; /** The session in which create results. */ +} + +/* +* Response for creating results without data +*/ +message CreateResultsMetaDataResponse { + repeated ResultRaw results = 1; /** The list of raw results that were created. */ +} + +/* +* Request for creating results without data +*/ +message CreateResultsRequest { + /** + * A result to create. + */ + message ResultCreate { + bytes data = 1; /** The actual data of the result. */ + string name = 2; /** The result name. Given by the client. */ + } + repeated ResultCreate results = 1; /** Results to create. */ + string session_id = 2; /** The session in which create results. */ +} + +/* +* Response for creating results without data +*/ +message CreateResultsResponse { + repeated ResultRaw results = 1; /** The raw results that were created. */ +} + +/* +* Request for uploading results data through stream. +* Data must be sent in multiple chunks +*/ +message UploadResultDataRequest { + /** + * The metadata to identify the result to update. + */ + message ResultIdentifier { + string session_id = 1; /** The session of the result. */ + string result_id = 2; /** The ID of the result. */ + } + + /** + * The possible messages that constitute a UploadResultDataRequest + * They should be sent in the following order: + * - id + * - data_chunk (stream can have multiple data_chunk messages that represent data divided in several parts) + * + * Data chunk cannot exceed the size returned by the GetServiceConfiguration rpc method + */ + oneof type { + ResultIdentifier id = 1; /** The identifier of the result to which add data. */ + bytes data_chunk = 2; /** A chunk of data. */ + } +} + +/* +* Response for creating results without data +*/ +message UploadResultDataResponse { + ResultRaw result = 1; /** The metadata of the updated result that was updated. */ +} + +/* +* Response for obtaining results service configuration +*/ +message ResultsServiceConfigurationResponse { + int32 data_chunk_max_size = 1; /** Maximum size supported by a data chunk for the result service*/ +} + +/* +* Request for getting a result +*/ +message DownloadResultDataRequest { + string session_id = 1; /** The session of the result. */ + string result_id = 2; /** The ID of the result. */ +} + +/* +* Response for creating results without data +*/ +message DownloadResultDataResponse { + /** + * The possible messages that constitute a UploadResultDataRequest + * Get the data chunks of the result + */ + bytes data_chunk = 1; /** A chunk of data. */ +} + +/* +* Request deleting data from results results but keeping metadata +*/ +message DeleteResultsDataRequest { + string session_id = 1; /** The session of the results. */ + repeated string result_id = 2; /** The ID of the results to delete. */ +} + +/* +* Response deleting data from results results but keeping metadata +*/ +message DeleteResultsDataResponse { + string session_id = 1; /** The session of the results. */ + repeated string result_id = 2; /** The ID of the deleted results. */ } diff --git a/Protos/V1/results_service.proto b/Protos/V1/results_service.proto index 77bd8c7f6..e9e1e67e0 100644 --- a/Protos/V1/results_service.proto +++ b/Protos/V1/results_service.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package armonik.api.grpc.v1.results; +import "objects.proto"; import "results_common.proto"; option csharp_namespace = "ArmoniK.Api.gRPC.V1.Results"; @@ -19,4 +20,35 @@ service Results { * Get the id of the task that should produce the result */ rpc GetOwnerTaskId(GetOwnerTaskIdRequest) returns (GetOwnerTaskIdResponse); + + /** + * Create the metadata of multiple results at once + * Data have to be uploaded separately + */ + rpc CreateResultsMetaData(CreateResultsMetaDataRequest) returns (CreateResultsMetaDataResponse) {} + + /** + * Create one result with data included in the request + */ + rpc CreateResults(CreateResultsRequest) returns (CreateResultsResponse) {} + + /** + * Upload data for result with stream + */ + rpc UploadResultData(stream UploadResultDataRequest) returns (UploadResultDataResponse) {} + + /** + * Retrieve data + */ + rpc DownloadResultData(DownloadResultDataRequest) returns (stream DownloadResultDataResponse) {} + + /** + * Delete data from multiple results + */ + rpc DeleteResultsData(DeleteResultsDataRequest) returns (DeleteResultsDataResponse) {} + + /** + * Get the configuration of the service + */ + rpc GetServiceConfiguration(Empty) returns (ResultsServiceConfigurationResponse); } diff --git a/Protos/V1/submitter_common.proto b/Protos/V1/submitter_common.proto index cbe166c28..8d18b05fc 100644 --- a/Protos/V1/submitter_common.proto +++ b/Protos/V1/submitter_common.proto @@ -53,9 +53,10 @@ message CreateLargeTaskRequest { message CreateTaskReply { message TaskInfo { - string task_id = 1; - repeated string expected_output_keys = 2; - repeated string data_dependencies = 3; + string task_id = 1; /** Unique ID of the created task. */ + repeated string expected_output_keys = 2; /** Unique ID of the result that will be used as expected output. Results should already exist. */ + repeated string data_dependencies = 3; /** Unique ID of the result that will be used as data dependency. Results should already exist. */ + string payload_id = 4; /** Unique ID of the result that will be used as payload. Result associated to the payload is created implicitly. */ } message CreationStatus { diff --git a/Protos/V1/tasks_common.proto b/Protos/V1/tasks_common.proto index 9455199e0..7eb33f899 100644 --- a/Protos/V1/tasks_common.proto +++ b/Protos/V1/tasks_common.proto @@ -156,6 +156,20 @@ message ListTasksResponse { int32 total = 4; /** The total number of tasks. */ } +/** + * Response to list tasks. + * + * Use pagination, filtering and sorting from the request. + * Return a list of formated tasks. + */ +message ListTasksRawResponse { + repeated TaskRaw tasks = 1; /** The list of tasks. */ + + int32 page = 2; /** The page number. Start at 0. */ + int32 page_size = 3; /** The page size. */ + int32 total = 4; /** The total number of tasks. */ +} + /** * Request for getting a single task. */ @@ -221,3 +235,38 @@ message CountTasksByStatusRequest {} message CountTasksByStatusResponse { repeated StatusCount status = 1; /** Number of tasks by status. Expected to have only 1 objct by tasks status. */ } + +/** +* Request to create tasks. +*/ +message SubmitTasksRequest { + message TaskCreation { + repeated string expected_output_keys = 1; /** Unique ID of the results that will be produced by the task. Results should be created using ResultsService. */ + repeated string data_dependencies = 2; /** Unique ID of the results that will be used as datadependencies. Results should be created using ResultsService. */ + string payload_id = 3; /** Unique ID of the result that will be used as payload. Result should created using ResultsService. */ + TaskOptions task_options = 4; /** Optionnal task options. */ + } + + string session_id = 1; /** The session ID. */ + TaskOptions task_options = 2; /** The options for the tasks. Each task will have the same. Options are merged with the one from the session. */ + repeated TaskCreation task_creations = 3; /** Task creation requests. */ +} + +/** +* Response to create tasks. +* +* expected_output_ids and data_dependencies must be created through ResultsService. +* +* Remark : this may have to be enriched to a better management of errors but +* will the client application be able to manage a missing data dependency or expected output ? +*/ +message SubmitTasksResponse { + message TaskInfo { + string task_id = 1; /** The task ID. */ + repeated string expected_output_ids = 2; /** The expected output IDs. A task have expected output IDs. */ + repeated string data_dependencies = 3; /** The data dependencies IDs (inputs). A task have data dependencies. */ + string payload_id = 4; /** Unique ID of the result that will be used as payload. Result should created using ResultsService. */ + } + + repeated TaskInfo task_infos = 1; /** List of task infos if submission successful, else throw gRPC exception. */ +} diff --git a/Protos/V1/tasks_service.proto b/Protos/V1/tasks_service.proto index 3ffc9c2fb..abd79c8a4 100644 --- a/Protos/V1/tasks_service.proto +++ b/Protos/V1/tasks_service.proto @@ -18,6 +18,11 @@ service Tasks { */ rpc ListTasks(ListTasksRequest) returns (ListTasksResponse) {} + /** + * Get a tasks list using pagination, filters and sorting with complete metada. + */ + rpc ListTasksRaw(ListTasksRequest) returns (ListTasksRawResponse) {} + /** * Get a task by its id. */ @@ -37,4 +42,9 @@ service Tasks { * Get count from tasks status. */ rpc CountTasksByStatus(CountTasksByStatusRequest) returns (CountTasksByStatusResponse) {} + + /** + * Create tasks metadata and submit task for processing. + */ + rpc SubmitTasks(SubmitTasksRequest) returns (SubmitTasksResponse) {} } diff --git a/packages/csharp/ArmoniK.Api.Worker/Worker/ITaskHandler.cs b/packages/csharp/ArmoniK.Api.Worker/Worker/ITaskHandler.cs index 6bff4609c..d2a17bd70 100644 --- a/packages/csharp/ArmoniK.Api.Worker/Worker/ITaskHandler.cs +++ b/packages/csharp/ArmoniK.Api.Worker/Worker/ITaskHandler.cs @@ -111,4 +111,13 @@ Task CreateTasksAsync(IEnumerable tasks, /// Task SendResult(string key, byte[] data); + + /// + /// Create results metadata + /// + /// The collection of results to be created + /// + /// The result creation response + /// + Task CreateResultsMetaDataAsync(IEnumerable results); } diff --git a/packages/csharp/ArmoniK.Api.Worker/Worker/TaskHandler.cs b/packages/csharp/ArmoniK.Api.Worker/Worker/TaskHandler.cs index 93d4c698b..bda1eef48 100644 --- a/packages/csharp/ArmoniK.Api.Worker/Worker/TaskHandler.cs +++ b/packages/csharp/ArmoniK.Api.Worker/Worker/TaskHandler.cs @@ -135,6 +135,20 @@ public Task RequestCommonData(string key) public Task RequestDirectData(string key) => throw new NotImplementedException(); + /// + public async Task CreateResultsMetaDataAsync(IEnumerable results) + => await client_.CreateResultsMetaDataAsync(new CreateResultsMetaDataRequest + { + CommunicationToken = Token, + Results = + { + results, + }, + SessionId = sessionId_, + }) + .ConfigureAwait(false); + + /// public async Task SendResult(string key, byte[] data)