Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agents protocol updates #4775

Closed
wants to merge 10 commits into from
75 changes: 62 additions & 13 deletions flyteidl/protos/flyteidl/admin/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ option go_package = "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin

import "flyteidl/core/literals.proto";
import "flyteidl/core/tasks.proto";
import "flyteidl/core/workflow.proto";
import "flyteidl/core/identifier.proto";
import "flyteidl/core/execution.proto";
import "flyteidl/core/metrics.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";


// The state of the execution is used to control its visibility in the UI/CLI.
enum State {
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
RETRYABLE_FAILURE = 0;
Expand All @@ -35,6 +35,10 @@ message TaskExecutionMetadata {
string k8s_service_account = 5;
// Environment variables attached to the task execution
map<string, string> environment_variables = 6;
int32 max_attempts = 7;
bool interruptible = 8;
int32 interruptible_failure_threshold = 9;
core.TaskNodeOverrides overrides = 10;
}

// Represents a request structure to create task.
Expand All @@ -53,18 +57,46 @@ message CreateTaskRequest {

// Represents a create response structure.
message CreateTaskResponse {
// ResourceMeta is created by the agent. It could be a string (jobId) or a dict (more complex metadata).
bytes resource_meta = 1;
}

message CreateRequestHeader {
// Template of the task that encapsulates all the metadata of the task.
core.TaskTemplate template = 1;
// Prefix for where task output data will be written. (e.g. s3://my-bucket/randomstring)
string output_prefix = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkpoint?

// subset of runtime task execution metadata.
TaskExecutionMetadata task_execution_metadata = 3;
// MaxDatasetSizeBytes is the maximum size of the dataset that can be generated by the task.
int64 max_dataset_size_bytes = 4;
}


message ExecuteTaskSyncRequest {
oneof part {
CreateRequestHeader header = 1;
core.LiteralMap inputs = 2;
}
}

message ExecuteTaskSyncResponseHeader {
Resource resource = 1;
}

message ExecuteTaskSyncResponse {
// Metadata is created by the agent. It could be a string (jobId) or a dict (more complex metadata).
// Resource is for synchronous task execution.
oneof res {
bytes resource_meta = 1;
Resource resource = 2;
ExecuteTaskSyncResponseHeader header = 1;
core.LiteralMap outputs = 2;
}
}

// A message used to fetch a job resource from flyte agent server.
message GetTaskRequest {
// A predefined yet extensible Task type identifier.
string task_type = 1;
TaskType task_type = 1;
// Metadata about the resource to be pass to the agent.
bytes resource_meta = 2;
}
Expand Down Expand Up @@ -95,7 +127,7 @@ message Resource {
// A message used to delete a task.
message DeleteTaskRequest {
// A predefined yet extensible Task type identifier.
string task_type = 1;
TaskType task_type = 1;
// Metadata about the resource to be pass to the agent.
bytes resource_meta = 2;
}
Expand All @@ -109,7 +141,14 @@ message Agent {
string name = 1;

// SupportedTaskTypes are the types of the tasks that the agent can handle.
repeated string supported_task_types = 2;
repeated TaskType supported_task_types = 2;
}

message TaskType {
// The name of the task type.
string name = 1;
// The version of the task type.
int32 version = 2;
}

// A request to get an agent.
Expand All @@ -134,7 +173,7 @@ message ListAgentsResponse {
// A request to get the metrics from a task execution.
message GetTaskMetricsRequest {
// A predefined yet extensible Task type identifier.
string task_type = 1;
TaskType task_type = 1;
// Metadata is created by the agent. It could be a string (jobId) or a dict (more complex metadata).
bytes resource_meta = 2;
// The metrics to query. If empty, will return a default set of metrics.
Expand All @@ -157,7 +196,7 @@ message GetTaskMetricsResponse {
// A request to get the log from a task execution.
message GetTaskLogsRequest {
// A predefined yet extensible Task type identifier.
string task_type = 1;
TaskType task_type = 1;
// Metadata is created by the agent. It could be a string (jobId) or a dict (more complex metadata).
bytes resource_meta = 2;
// Number of lines to return.
Expand All @@ -167,11 +206,21 @@ message GetTaskLogsRequest {
string token = 4;
}

// A response containing the logs for a task execution.
message GetTaskLogsResponse {
// The execution log results.
repeated string results = 1;
message GetTaskLogsResponseHeader {
// In the case of multiple pages of results, the server-provided token can be used to fetch the next page
// in a query. If there are no more results, this value will be empty.
string token = 2;
string token = 1;
}

message GetTaskLogsResponseBody {
// The execution log results.
repeated string results = 1;
}

// A response containing the logs for a task execution.
message GetTaskLogsResponse {
oneof part {
GetTaskLogsResponseHeader header = 1;
GetTaskLogsResponseBody body = 2;
}
}
43 changes: 37 additions & 6 deletions flyteidl/protos/flyteidl/service/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,53 @@ import "flyteidl/admin/agent.proto";

// AsyncAgentService defines an RPC Service that allows propeller to send the request to the agent server.
service AsyncAgentService {
// Send a task create request to the agent server.
rpc CreateTask (flyteidl.admin.CreateTaskRequest) returns (flyteidl.admin.CreateTaskResponse){};
// ExecuteTaskSync streams the create request and inputs to the agent service and streams the outputs back.
rpc ExecuteTaskSync (stream flyteidl.admin.ExecuteTaskSyncRequest) returns (stream flyteidl.admin.ExecuteTaskSyncResponse){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like this stream

option (google.api.http) = {
post: "/api/v1/agent/task/stream"
body: "*"
};
};

// CreateTask sends a task create request to the agent service.
rpc CreateTask (stream flyteidl.admin.CreateTaskRequest) returns (flyteidl.admin.CreateTaskResponse){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why should this be a stream? isnt that potentially dangerous?

option (google.api.http) = {
post: "/api/v1/agent/task"
body: "*"
};
};

// Get job status.
rpc GetTask (flyteidl.admin.GetTaskRequest) returns (flyteidl.admin.GetTaskResponse){};
rpc GetTask (flyteidl.admin.GetTaskRequest) returns (stream flyteidl.admin.GetTaskResponse){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this stream also seems odd?
Maybe this is the problem with async vs sync.
Async being a stream is really odd?

option (google.api.http) = {
get: "/api/v1/agent/task/{task_type.name}/{task_type.version}/{resource_meta}"
};
};

// Delete the task resource.
rpc DeleteTask (flyteidl.admin.DeleteTaskRequest) returns (flyteidl.admin.DeleteTaskResponse){};
rpc DeleteTask (flyteidl.admin.DeleteTaskRequest) returns (flyteidl.admin.DeleteTaskResponse){
option (google.api.http) = {
delete: "/api/v1/agent/task_executions/{task_type.name}/{task_type.version}/{resource_meta}"
};
};

// GetTaskMetrics returns one or more task execution metrics, if available.
//
// Errors include
// * OutOfRange if metrics are not available for the specified task time range
// * various other errors
rpc GetTaskMetrics(flyteidl.admin.GetTaskMetricsRequest) returns (flyteidl.admin.GetTaskMetricsResponse){};
rpc GetTaskMetrics(flyteidl.admin.GetTaskMetricsRequest) returns (flyteidl.admin.GetTaskMetricsResponse){
option (google.api.http) = {
get: "/api/v1/agent/task/metrics/{task_type.name}/{task_type.version}/{resource_meta}"
};
};

// GetTaskLogs returns task execution logs, if available.
rpc GetTaskLogs(flyteidl.admin.GetTaskLogsRequest) returns (flyteidl.admin.GetTaskLogsResponse){};
rpc GetTaskLogs(flyteidl.admin.GetTaskLogsRequest) returns (stream flyteidl.admin.GetTaskLogsResponse){
option (google.api.http) = {
get: "/api/v1/agent/task/logs/{task_type.name}/{task_type.version}/{resource_meta}"
};
};
}

// AgentMetadataService defines an RPC service that is also served over HTTP via grpc-gateway.
Expand Down
Loading