Skip to content

Commit

Permalink
feat: send data between agent and worker through files instead of str…
Browse files Browse the repository at this point in the history
…eam-based requests (#414)
  • Loading branch information
aneojgurhem authored Sep 26, 2023
2 parents 2d038ad + a36e5c1 commit 9fd5060
Show file tree
Hide file tree
Showing 9 changed files with 315 additions and 961 deletions.
64 changes: 15 additions & 49 deletions Protos/V1/agent_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,41 +47,17 @@ message CreateTaskReply {
string communication_token = 4; /** Communication token received by the worker during task processing */
}

// Request to retrieve data
message DataRequest {
string communication_token = 1; /** Communication token received by the worker during task processing */
string key = 2;
// Id of the result that will be retrieved
string result_id = 2;
}

message DataReply {
message Init {
string key = 1;
oneof has_result {
DataChunk data = 2;
string error = 3;
}
}
string communication_token = 1; /** Communication token received by the worker during task processing */
oneof type {
Init init = 2;
DataChunk data = 3;
string error = 4;
}
}

message Result {
oneof type {
InitKeyedDataStream init = 1;
DataChunk data = 2;
}
string communication_token = 3; /** Communication token received by the worker during task processing */
}

message ResultReply {
string communication_token = 3; /** Communication token received by the worker during task processing */
oneof type {
Empty Ok = 1;
string Error = 2;
}
// Response when data is available in the shared folder
message DataResponse {
// Id of the result that will be retrieved
string result_id = 2;
}

/*
Expand Down Expand Up @@ -156,7 +132,7 @@ message SubmitTasksResponse {
}

/*
* Request for creating results without data
* Request for creating results with data
*/
message CreateResultsRequest {
/**
Expand All @@ -180,11 +156,9 @@ message CreateResultsResponse {
}

/*
* Request for uploading results data through stream.
* Data must be sent in multiple chunks.
* Only one result can be uploaded.
* Request for notifying results data are available in files.
*/
message UploadResultDataRequest {
message NotifyResultDataRequest {
/**
* The metadata to identify the result to update.
*/
Expand All @@ -195,23 +169,15 @@ message UploadResultDataRequest {

/**
* The possible messages that constitute a UploadResultDataRequest
* They should be sent in the following order:
* - id
* - data_chunk (stream can have multiple data_chunk messages that represent data divided in several parts)
*
* Data chunk cannot exceed the size returned by the GetServiceConfiguration rpc method
*/
oneof type {
ResultIdentifier id = 1; /** The identifier of the result to which add data. */
bytes data_chunk = 2; /** A chunk of data. */
}
repeated ResultIdentifier ids = 1; /** The identifier of the result to which add data. */
string communication_token = 4; /** Communication token received by the worker during task processing */
}

/*
* Response for uploading data with stream for result
* Response for notifying data file availability for result
* Received when data are successfully copied to the ObjectStorage
*/
message UploadResultDataResponse {
string result_id = 1; /** The Id of the result to which data were added */
string communication_token = 2; /** Communication token received by the worker during task processing */
message NotifyResultDataResponse {
repeated string result_ids = 1; /** The Id of the result to which data were added */
}
33 changes: 26 additions & 7 deletions Protos/V1/agent_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import "agent_common.proto";
option csharp_namespace = "ArmoniK.Api.gRPC.V1.Agent";

service Agent {
rpc CreateTask(stream CreateTaskRequest) returns (CreateTaskReply);

/**
* Create the metadata of multiple results at once
* Data have to be uploaded separately
Expand All @@ -19,18 +21,35 @@ service Agent {
rpc CreateResults(CreateResultsRequest) returns (CreateResultsResponse) {}

/**
* Upload data for result with stream
* Notify Agent that a data file representing the Result to upload is available in the shared folder
* The name of the file should be the result id
* Blocks until data are stored in Object Storage
*/
rpc UploadResultData(stream UploadResultDataRequest) returns (UploadResultDataResponse) {}
rpc NotifyResultData(NotifyResultDataRequest) returns (NotifyResultDataResponse) {}

/**
* Create tasks metadata and submit task for processing.
*/
rpc SubmitTasks(SubmitTasksRequest) returns (SubmitTasksResponse) {}

rpc CreateTask(stream CreateTaskRequest) returns (CreateTaskReply);
rpc GetResourceData(DataRequest) returns (stream DataReply);
rpc GetCommonData(DataRequest) returns (stream DataReply);
rpc GetDirectData(DataRequest) returns (stream DataReply);
rpc SendResult(stream Result) returns (ResultReply);
/**
* Retrieve Resource Data from the Agent
* Data is stored in the shared folder between Agent and Worker as a file with the result id as name
* Blocks until data are available in the shared folder
*/
rpc GetResourceData(DataRequest) returns (DataResponse);

/**
* Retrieve Resource Data from the Agent
* Data is stored in the shared folder between Agent and Worker as a file with the result id as name
* Blocks until data are available in the shared folder
*/
rpc GetCommonData(DataRequest) returns (DataResponse);

/**
* Retrieve Resource Data from the Agent
* Data is stored in the shared folder between Agent and Worker as a file with the result id as name
* Blocks until data are available in the shared folder
*/
rpc GetDirectData(DataRequest) returns (DataResponse);
}
34 changes: 9 additions & 25 deletions Protos/V1/worker_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,19 @@ import "objects.proto";
option csharp_namespace = "ArmoniK.Api.gRPC.V1.Worker";

message ProcessRequest {
message ComputeRequest {
message InitRequest {
Configuration configuration = 1;
string session_id = 2;
string task_id = 3;
TaskOptions task_options = 4;
repeated string expected_output_keys = 5;
DataChunk payload = 6;
}
message InitData {
oneof type {
string key = 1;
bool last_data = 2;
}
}
oneof type {
InitRequest init_request = 1;
DataChunk payload = 2;
InitData init_data = 3;
DataChunk data = 4;
}
}
string communication_token = 1;
ComputeRequest compute = 2;
string session_id = 2;
string task_id = 3;
TaskOptions task_options = 4;
repeated string expected_output_keys = 5;
string payload_id = 6;
repeated string data_dependencies = 7;
string data_folder = 8;
Configuration configuration = 9;
}

message ProcessReply {
string communication_token = 1;
Output output = 2;
Output output = 1;
}

message HealthCheckReply {
Expand Down
2 changes: 1 addition & 1 deletion Protos/V1/worker_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ import "worker_common.proto";
option csharp_namespace = "ArmoniK.Api.gRPC.V1.Worker";

service Worker {
rpc Process(stream ProcessRequest) returns (ProcessReply);
rpc Process(ProcessRequest) returns (ProcessReply);
rpc HealthCheck(Empty) returns (HealthCheckReply);
}
113 changes: 40 additions & 73 deletions packages/csharp/ArmoniK.Api.Mock/Services/Agent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Linq;
using System.Threading.Tasks;

using ArmoniK.Api.gRPC.V1;
using ArmoniK.Api.gRPC.V1.Agent;

using Grpc.Core;
Expand All @@ -36,63 +36,6 @@ public override Task<CreateTaskReply> CreateTask(IAsyncStreamReader<CreateTaskRe
CreationStatusList = new CreateTaskReply.Types.CreationStatusList(),
});

/// <inheritdocs />
[Count]
public override async Task GetCommonData(DataRequest request,
IServerStreamWriter<DataReply> responseStream,
ServerCallContext context)
=> await responseStream.WriteAsync(new DataReply
{
Data = new DataChunk
{
DataComplete = true,
},
})
.ConfigureAwait(false);

/// <inheritdocs />
[Count]
public override async Task GetDirectData(DataRequest request,
IServerStreamWriter<DataReply> responseStream,
ServerCallContext context)
=> await responseStream.WriteAsync(new DataReply
{
Data = new DataChunk
{
DataComplete = true,
},
})
.ConfigureAwait(false);

/// <inheritdocs />
[Count]
public override async Task GetResourceData(DataRequest request,
IServerStreamWriter<DataReply> responseStream,
ServerCallContext context)
=> await responseStream.WriteAsync(new DataReply
{
Data = new DataChunk
{
DataComplete = true,
},
})
.ConfigureAwait(false);

/// <inheritdocs />
[Count]
public override async Task<ResultReply> SendResult(IAsyncStreamReader<Result> requestStream,
ServerCallContext context)
{
await foreach (var _ in requestStream.ReadAllAsync())
{
}

return new ResultReply
{
Ok = new Empty(),
};
}

/// <inheritdocs />
[Count]
public override Task<CreateResultsMetaDataResponse> CreateResultsMetaData(CreateResultsMetaDataRequest request,
Expand All @@ -111,21 +54,6 @@ public override Task<SubmitTasksResponse> SubmitTasks(SubmitTasksRequest request
CommunicationToken = request.CommunicationToken,
});

/// <inheritdocs />
[Count]
public override async Task<UploadResultDataResponse> UploadResultData(IAsyncStreamReader<UploadResultDataRequest> requestStream,
ServerCallContext context)
{
await foreach (var _ in requestStream.ReadAllAsync())
{
}

return new UploadResultDataResponse
{
ResultId = "result-id",
CommunicationToken = "communication-token",
};
}

/// <inheritdocs />
[Count]
Expand All @@ -135,4 +63,43 @@ public override Task<CreateResultsResponse> CreateResults(CreateResultsRequest r
{
CommunicationToken = request.CommunicationToken,
});

/// <inheritdocs />
[Count]
public override Task<DataResponse> GetCommonData(DataRequest request,
ServerCallContext context)
=> Task.FromResult(new DataResponse
{
ResultId = request.ResultId,
});

/// <inheritdocs />
[Count]
public override Task<DataResponse> GetDirectData(DataRequest request,
ServerCallContext context)
=> Task.FromResult(new DataResponse
{
ResultId = request.ResultId,
});

/// <inheritdocs />
[Count]
public override Task<DataResponse> GetResourceData(DataRequest request,
ServerCallContext context)
=> Task.FromResult(new DataResponse
{
ResultId = request.ResultId,
});

/// <inheritdocs />
[Count]
public override Task<NotifyResultDataResponse> NotifyResultData(NotifyResultDataRequest request,
ServerCallContext context)
=> Task.FromResult(new NotifyResultDataResponse
{
ResultIds =
{
request.Ids.Select(identifier => identifier.ResultId),
},
});
}
Loading

0 comments on commit 9fd5060

Please sign in to comment.