diff --git a/README.md b/README.md
index eaaf77346..836af3d8e 100644
--- a/README.md
+++ b/README.md
@@ -22,6 +22,7 @@
Java/Scala library for easily authoring Flyte tasks and workflows.
Current development status:
+
- MVP features are developed
- Missing user documentation
- Project being tested, and collecting feedback
@@ -29,8 +30,8 @@ Current development status:
To learn more about Flyte refer to:
- - [Flyte homepage](https://flyte.org)
- - [Flyte master repository](https://github.com/lyft/flyte)
+- [Flyte homepage](https://flyte.org)
+- [Flyte master repository](https://github.com/lyft/flyte)
## Build from source
@@ -49,7 +50,7 @@ mvn dependency:resolve-plugins
## How to run examples
-You can build und run examples yourself.
+You can build und run examples yourself.
Create `.env.local` with:
@@ -62,7 +63,9 @@ FLYTE_STAGING_LOCATION=s3://my-s3-bucket
FLYTE_PLATFORM_INSECURE=True
```
-Package and run:
+**Note**: If you're registering against [the local Demo Flyte Cluster](https://docs.flyte.org/en/latest/user_guide/environment_setup.html#create-a-local-demo-flyte-cluster), you'll need to adjust the ports to align with it.
+
+Package and register:
```bash
$ mvn package
@@ -73,18 +76,17 @@ $ scripts/jflyte register workflows \
-cp=flytekit-examples/target/lib
```
-**Note**: `scripts/jflyte` requires `jq` to run, in adition to `docker`
+**Note**: `scripts/jflyte` requires `jq` to run, in addition to `docker`
## Usage
-
### Maven
```
org.flyteflytekit-java
- 0.3.15
+ 0.4.58
```
@@ -94,20 +96,20 @@ Scala 2.12 and Scala 2.13 are supported.
```scala
libraryDependencies ++= Seq(
- "org.flyte" % "flytekit-java" % "0.4.35",
- "org.flyte" %% "flytekit-scala" % "0.4.35"
+ "org.flyte" % "flytekit-java" % "0.4.58",
+ "org.flyte" %% "flytekit-scala" % "0.4.58"
)
```
-## Contributing
+## Contributing
-Run `mvn spotless:apply` before committing.
+Run `mvn spotless:apply` before committing.
-Also use `git commit --signoff "Commit message"` to comply with DCO.
+Also use `git commit --signoff "Commit message"` to comply with DCO.
## Releasing
-* Go to [Actions: Create flytekit-java release](https://github.com/flyteorg/flytekit-java/actions/workflows/release.yaml) and click "Run workflow"
-* Wait until the workflow finishes; in the meanwhile prepare a release note
-* Making sure the new release is visible in [Maven central](https://repo1.maven.org/maven2/org/flyte/flytekit-java/)
-* Publish the release note associating with the latest tag created by the release workflow
+- Go to [Actions: Create flytekit-java release](https://github.com/flyteorg/flytekit-java/actions/workflows/release.yaml) and click "Run workflow"
+- Wait until the workflow finishes; in the meanwhile prepare a release note
+- Making sure the new release is visible in [Maven central](https://repo1.maven.org/maven2/org/flyte/flytekit-java/)
+- Publish the release note associating with the latest tag created by the release workflow
diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/agent.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/agent.proto
new file mode 100644
index 000000000..e9a484917
--- /dev/null
+++ b/flyteidl-protos/src/main/proto/flyteidl/admin/agent.proto
@@ -0,0 +1,88 @@
+syntax = "proto3";
+
+package flyteidl.admin;
+option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin";
+
+import "flyteidl/core/literals.proto";
+import "flyteidl/core/tasks.proto";
+import "flyteidl/core/interface.proto";
+import "flyteidl/core/identifier.proto";
+
+// The state of the execution is used to control its visibility in the UI/CLI.
+enum State {
+ RETRYABLE_FAILURE = 0;
+ PERMANENT_FAILURE = 1;
+ PENDING = 2;
+ RUNNING = 3;
+ SUCCEEDED = 4;
+}
+
+// Represents a subset of runtime task execution metadata that are relevant to external plugins.
+message TaskExecutionMetadata {
+ // ID of the task execution
+ core.TaskExecutionIdentifier task_execution_id = 1;
+ // k8s namespace where the task is executed in
+ string namespace = 2;
+ // Labels attached to the task execution
+ map labels = 3;
+ // Annotations attached to the task execution
+ map annotations = 4;
+ // k8s service account associated with the task execution
+ string k8s_service_account = 5;
+ // Environment variables attached to the task execution
+ map environment_variables = 6;
+}
+
+// Represents a request structure to create task.
+message CreateTaskRequest {
+ // The inputs required to start the execution. All required inputs must be
+ // included in this map. If not required and not provided, defaults apply.
+ // +optional
+ core.LiteralMap inputs = 1;
+ // Template of the task that encapsulates all the metadata of the task.
+ core.TaskTemplate template = 2;
+ // Prefix for where task output data will be written. (e.g. s3://my-bucket/randomstring)
+ string output_prefix = 3;
+ // subset of runtime task execution metadata.
+ TaskExecutionMetadata task_execution_metadata = 4;
+}
+
+// Represents a create response structure.
+message CreateTaskResponse {
+ // Metadata is created by the agent. It could be a string (jobId) or a dict (more complex metadata).
+ bytes resource_meta = 1;
+}
+
+// A message used to fetch a job resource from flyte agent server.
+message GetTaskRequest {
+ // A predefined yet extensible Task type identifier.
+ string task_type = 1;
+ // Metadata about the resource to be pass to the agent.
+ bytes resource_meta = 2;
+}
+
+// Response to get an individual task resource.
+message GetTaskResponse {
+ Resource resource = 1;
+}
+
+message Resource {
+ // The state of the execution is used to control its visibility in the UI/CLI.
+ State state = 1;
+ // The outputs of the execution. It's typically used by sql task. Agent service will create a
+ // Structured dataset pointing to the query result table.
+ // +optional
+ core.LiteralMap outputs = 2;
+}
+
+// A message used to delete a task.
+message DeleteTaskRequest {
+ // A predefined yet extensible Task type identifier.
+ string task_type = 1;
+ // Metadata about the resource to be pass to the agent.
+ bytes resource_meta = 2;
+}
+
+// Response to delete a task.
+message DeleteTaskResponse {
+}
diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/cluster_assignment.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/cluster_assignment.proto
index 3f279424a..85a6a4ef8 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/admin/cluster_assignment.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/admin/cluster_assignment.proto
@@ -6,45 +6,6 @@ option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin";
// Encapsulates specifications for routing an execution onto a specific cluster.
message ClusterAssignment {
- Affinity affinity = 1;
-
- Toleration toleration = 2;
-}
-
-// Defines a set of constraints used to select eligible objects based on labels they possess.
-message Affinity {
- // Multiples selectors are 'and'-ed together to produce the list of matching, eligible objects.
- repeated Selector selectors = 1;
-}
-
-// Defines a set of specific label selectors that the execution can tolerate on a cluster.
-message Toleration {
-
- // A toleration selector is similar to that of an affinity but the only valid operators are EQUALS AND EXISTS.
- repeated Selector selectors = 1;
+ reserved 1, 2;
+ string cluster_pool_name = 3;
}
-
-// A Selector is a specification for identifying a set of objects with corresponding labels.
-message Selector {
-
- // The label key.
- string key = 1;
-
- // One or more values used to match labels.
- // For equality (or inequality) requirements, values must contain a single element.
- // For set-based requirements, values may contain one or more elements.
- repeated string value = 2;
-
- // Defines how a label with a corresponding key and value is selected or excluded.
- enum Operator {
- EQUALS = 0;
- NOT_EQUALS = 1;
- IN = 2;
- NOT_IN = 3;
- EXISTS = 4; // A label key with any value
-
- // K8s supports more operators, we can consider adding them if necessary
- }
- Operator operator = 3;
-}
-
diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/common.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/common.proto
index 69e02e710..dbfb41285 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/admin/common.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/admin/common.proto
@@ -5,6 +5,8 @@ option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin";
import "flyteidl/core/execution.proto";
import "flyteidl/core/identifier.proto";
+import "flyteidl/core/literals.proto";
+import "google/protobuf/timestamp.proto";
// Encapsulation of fields that identifies a Flyte resource.
// A Flyte resource can be a task, workflow or launch plan.
@@ -279,6 +281,14 @@ message Annotations {
map values = 1;
}
+// Environment variable values to be applied to an execution resource.
+// In the future a mode (e.g. OVERRIDE, APPEND, etc) can be defined
+// to specify how to merge environment variables defined at registration and execution time.
+message Envs {
+ // Map of custom environment variables to be applied to the execution resource.
+ repeated flyteidl.core.KeyValuePair values = 1;
+}
+
// Defines permissions associated with executions created by this launch plan spec.
// Use either of these roles when they have permissions required by your workflow execution.
// Deprecated.
@@ -300,3 +310,10 @@ message RawOutputDataConfig {
// e.g. s3://bucket/key or s3://bucket/
string output_location_prefix = 1;
}
+
+// These URLs are returned as part of node and task execution data requests.
+message FlyteURLs {
+ string inputs = 1;
+ string outputs = 2;
+ string deck = 3;
+}
diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/description_entity.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/description_entity.proto
new file mode 100644
index 000000000..fcf4e1a46
--- /dev/null
+++ b/flyteidl-protos/src/main/proto/flyteidl/admin/description_entity.proto
@@ -0,0 +1,95 @@
+syntax = "proto3";
+
+package flyteidl.admin;
+option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin";
+
+import "flyteidl/core/identifier.proto";
+import "flyteidl/admin/common.proto";
+
+// DescriptionEntity contains detailed description for the task/workflow.
+// Documentation could provide insight into the algorithms, business use case, etc.
+message DescriptionEntity {
+ // id represents the unique identifier of the description entity.
+ core.Identifier id = 1;
+ // One-liner overview of the entity.
+ string short_description = 2;
+ // Full user description with formatting preserved.
+ Description long_description = 3;
+ // Optional link to source code used to define this entity.
+ SourceCode source_code = 4;
+ // User-specified tags. These are arbitrary and can be used for searching
+ // filtering and discovering tasks.
+ repeated string tags = 5;
+}
+
+// The format of the long description
+enum DescriptionFormat {
+ DESCRIPTION_FORMAT_UNKNOWN = 0;
+ DESCRIPTION_FORMAT_MARKDOWN = 1;
+ DESCRIPTION_FORMAT_HTML = 2;
+ // python default documentation - comments is rst
+ DESCRIPTION_FORMAT_RST = 3;
+}
+
+// Full user description with formatting preserved. This can be rendered
+// by clients, such as the console or command line tools with in-tact
+// formatting.
+message Description {
+ oneof content {
+ // long description - no more than 4KB
+ string value = 1;
+ // if the description sizes exceed some threshold we can offload the entire
+ // description proto altogether to an external data store, like S3 rather than store inline in the db
+ string uri = 2;
+ }
+
+ // Format of the long description
+ DescriptionFormat format = 3;
+ // Optional link to an icon for the entity
+ string icon_link = 4;
+}
+
+// Link to source code used to define this entity
+message SourceCode {
+ string link = 1;
+}
+
+// Represents a list of DescriptionEntities returned from the admin.
+// See :ref:`ref_flyteidl.admin.DescriptionEntity` for more details
+message DescriptionEntityList {
+ // A list of DescriptionEntities returned based on the request.
+ repeated DescriptionEntity descriptionEntities = 1;
+
+ // In the case of multiple pages of results, the server-provided token can be used to fetch the next page
+ // in a query. If there are no more results, this value will be empty.
+ string token = 2;
+}
+
+// Represents a request structure to retrieve a list of DescriptionEntities.
+// See :ref:`ref_flyteidl.admin.DescriptionEntity` for more details
+message DescriptionEntityListRequest {
+ // Identifies the specific type of resource that this identifier corresponds to.
+ flyteidl.core.ResourceType resource_type = 1;
+
+ // The identifier for the description entity.
+ // +required
+ NamedEntityIdentifier id = 2;
+
+ // Indicates the number of resources to be returned.
+ // +required
+ uint32 limit = 3;
+
+ // In the case of multiple pages of results, the server-provided token can be used to fetch the next page
+ // in a query.
+ // +optional
+ string token = 4;
+
+ // Indicates a list of filters passed as string.
+ // More info on constructing filters :
+ // +optional
+ string filters = 5;
+
+ // Sort ordering for returned list.
+ // +optional
+ Sort sort_by = 6;
+}
diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/execution.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/execution.proto
index a08c9bbd1..55933d470 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/admin/execution.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/admin/execution.proto
@@ -8,6 +8,7 @@ import "flyteidl/admin/common.proto";
import "flyteidl/core/literals.proto";
import "flyteidl/core/execution.proto";
import "flyteidl/core/identifier.proto";
+import "flyteidl/core/metrics.proto";
import "flyteidl/core/security.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
@@ -45,10 +46,18 @@ message ExecutionRelaunchRequest {
// +required
core.WorkflowExecutionIdentifier id = 1;
+ // Deprecated field, do not use.
+ reserved 2;
+
// User provided value for the relaunched execution.
// If none is provided the system will generate a unique string.
// +optional
string name = 3;
+
+ // Allows for all cached values of a workflow and its tasks to be overwritten for a single execution.
+ // If enabled, all calculations are performed even if cached results would be available, overwriting the stored
+ // data once execution finishes successfully.
+ bool overwrite_cache = 4;
}
// Request to recover the referenced execution.
@@ -181,6 +190,9 @@ message SystemMetadata {
// Which execution cluster this execution ran on.
string execution_cluster = 1;
+
+ // Which kubernetes namespace the execution ran under.
+ string namespace = 2;
}
// Represents attributes about an execution which are not required to launch the execution but are useful to record.
@@ -296,6 +308,16 @@ message ExecutionSpec {
// As we need to distinguish between the field not being provided and its default value false, we have to use a wrapper
// around the bool field.
google.protobuf.BoolValue interruptible = 21;
+
+ // Allows for all cached values of a workflow and its tasks to be overwritten for a single execution.
+ // If enabled, all calculations are performed even if cached results would be available, overwriting the stored
+ // data once execution finishes successfully.
+ bool overwrite_cache = 22;
+
+ // Environment variables to be set for the execution.
+ Envs envs = 23;
+ // Tags to be set for the execution.
+ repeated string tags = 24;
}
// Request to terminate an in-progress execution. This action is irreversible.
@@ -368,3 +390,19 @@ message ExecutionStateChangeDetails {
}
message ExecutionUpdateResponse {}
+
+// WorkflowExecutionGetMetricsRequest represents a request to retrieve metrics for the specified workflow execution.
+message WorkflowExecutionGetMetricsRequest {
+ // id defines the workflow execution to query for.
+ core.WorkflowExecutionIdentifier id = 1;
+
+ // depth defines the number of Flyte entity levels to traverse when breaking down execution details.
+ int32 depth = 2;
+}
+
+// WorkflowExecutionGetMetricsResponse represents the response containing metrics for the specified workflow execution.
+message WorkflowExecutionGetMetricsResponse {
+ // Span defines the top-level breakdown of the workflows execution. More precise information is nested in a
+ // hierarchical structure using Flyte entity references.
+ core.Span span = 1;
+}
diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/launch_plan.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/launch_plan.proto
index 13d87fd97..2164be31f 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/admin/launch_plan.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/admin/launch_plan.proto
@@ -125,6 +125,14 @@ message LaunchPlanSpec {
// As we need to distinguish between the field not being provided and its default value false, we have to use a wrapper
// around the bool field.
google.protobuf.BoolValue interruptible = 19;
+
+ // Allows for all cached values of a workflow and its tasks to be overwritten for a single execution.
+ // If enabled, all calculations are performed even if cached results would be available, overwriting the stored
+ // data once execution finishes successfully.
+ bool overwrite_cache = 20;
+
+ // Environment variables to be set for the execution.
+ Envs envs = 21;
}
// Values computed by the flyte platform after launch plan registration.
diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/matchable_resource.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/matchable_resource.proto
index bba83a1b9..4ab6be6aa 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/admin/matchable_resource.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/admin/matchable_resource.proto
@@ -123,6 +123,14 @@ message WorkflowExecutionConfig {
// As we need to distinguish between the field not being provided and its default value false, we have to use a wrapper
// around the bool field.
google.protobuf.BoolValue interruptible = 6;
+
+ // Allows for all cached values of a workflow and its tasks to be overwritten for a single execution.
+ // If enabled, all calculations are performed even if cached results would be available, overwriting the stored
+ // data once execution finishes successfully.
+ bool overwrite_cache = 7;
+
+ // Environment variables to be set for the execution.
+ Envs envs = 8;
}
// Generic container for encapsulating all types of the above attributes messages.
diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/node_execution.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/node_execution.proto
index 618284948..fe71699a8 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/admin/node_execution.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/admin/node_execution.proto
@@ -167,6 +167,10 @@ message NodeExecutionClosure {
// String location uniquely identifying where the deck HTML file is.
// NativeUrl specifies the url in the format of the configured storage provider (e.g. s3://my-bucket/randomstring/suffix.tar)
string deck_uri = 11;
+
+ // dynamic_job_spec_uri is the location of the DynamicJobSpec proto message for a DynamicWorkflow. This is required
+ // to correctly recover partially completed executions where the subworkflow has already been compiled.
+ string dynamic_job_spec_uri = 12;
}
// Metadata for a WorkflowNode
@@ -181,6 +185,8 @@ message TaskNodeMetadata {
core.CatalogCacheStatus cache_status = 1;
// This structure carries the catalog artifact information
core.CatalogMetadata catalog_key = 2;
+ // The latest checkpoint location
+ string checkpoint_uri = 4;
}
// For dynamic workflow nodes we capture information about the dynamic workflow definition that gets generated.
@@ -190,6 +196,10 @@ message DynamicWorkflowNodeMetadata {
// Represents the compiled representation of the embedded dynamic workflow.
core.CompiledWorkflowClosure compiled_workflow = 2;
+
+ // dynamic_job_spec_uri is the location of the DynamicJobSpec proto message for this DynamicWorkflow. This is
+ // required to correctly recover partially completed executions where the subworkflow has already been compiled.
+ string dynamic_job_spec_uri = 3;
}
// Request structure to fetch inputs and output for a node execution.
@@ -217,5 +227,7 @@ message NodeExecutionGetDataResponse {
// Optional Workflow closure for a dynamically generated workflow, in the case this node yields a dynamic workflow we return its structure here.
DynamicWorkflowNodeMetadata dynamic_workflow = 16;
-}
+ FlyteURLs flyte_urls = 17;
+
+}
diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/project.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/project.proto
index 80c3da86a..8d1d02959 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/admin/project.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/admin/project.proto
@@ -29,7 +29,7 @@ message Project {
string description = 4;
- // Leverage Labels from flyteidel.admin.common.proto to
+ // Leverage Labels from flyteidl.admin.common.proto to
// tag projects with ownership information.
Labels labels = 5;
diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/project_attributes.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/project_attributes.proto
new file mode 100644
index 000000000..de6f7a17e
--- /dev/null
+++ b/flyteidl-protos/src/main/proto/flyteidl/admin/project_attributes.proto
@@ -0,0 +1,60 @@
+syntax = "proto3";
+
+package flyteidl.admin;
+option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin";
+
+import "flyteidl/admin/matchable_resource.proto";
+
+// Defines a set of custom matching attributes at the project level.
+// For more info on matchable attributes, see :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration`
+message ProjectAttributes {
+ // Unique project id for which this set of attributes will be applied.
+ string project = 1;
+
+ MatchingAttributes matching_attributes = 2;
+}
+
+// Sets custom attributes for a project
+// For more info on matchable attributes, see :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration`
+message ProjectAttributesUpdateRequest {
+ // +required
+ ProjectAttributes attributes = 1;
+}
+
+// Purposefully empty, may be populated in the future.
+message ProjectAttributesUpdateResponse {
+}
+
+// Request to get an individual project level attribute override.
+// For more info on matchable attributes, see :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration`
+message ProjectAttributesGetRequest {
+ // Unique project id which this set of attributes references.
+ // +required
+ string project = 1;
+
+ // Which type of matchable attributes to return.
+ // +required
+ MatchableResource resource_type = 2;
+}
+
+// Response to get an individual project level attribute override.
+// For more info on matchable attributes, see :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration`
+message ProjectAttributesGetResponse {
+ ProjectAttributes attributes = 1;
+}
+
+// Request to delete a set matchable project level attribute override.
+// For more info on matchable attributes, see :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration`
+message ProjectAttributesDeleteRequest {
+ // Unique project id which this set of attributes references.
+ // +required
+ string project = 1;
+
+ // Which type of matchable attributes to delete.
+ // +required
+ MatchableResource resource_type = 2;
+}
+
+// Purposefully empty, may be populated in the future.
+message ProjectAttributesDeleteResponse {
+}
diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/signal.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/signal.proto
new file mode 100644
index 000000000..8fc1c83e5
--- /dev/null
+++ b/flyteidl-protos/src/main/proto/flyteidl/admin/signal.proto
@@ -0,0 +1,86 @@
+syntax = "proto3";
+
+package flyteidl.admin;
+option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin";
+
+import "flyteidl/admin/common.proto";
+import "flyteidl/core/identifier.proto";
+import "flyteidl/core/literals.proto";
+import "flyteidl/core/types.proto";
+
+// SignalGetOrCreateRequest represents a request structure to retrive or create a signal.
+// See :ref:`ref_flyteidl.admin.Signal` for more details
+message SignalGetOrCreateRequest {
+ // A unique identifier for the requested signal.
+ core.SignalIdentifier id = 1;
+
+ // A type denoting the required value type for this signal.
+ core.LiteralType type = 2;
+}
+
+// SignalListRequest represents a request structure to retrieve a collection of signals.
+// See :ref:`ref_flyteidl.admin.Signal` for more details
+message SignalListRequest {
+ // Indicates the workflow execution to filter by.
+ // +required
+ core.WorkflowExecutionIdentifier workflow_execution_id = 1;
+
+ // Indicates the number of resources to be returned.
+ // +required
+ uint32 limit = 2;
+
+ // In the case of multiple pages of results, the, server-provided token can be used to fetch the next page
+ // in a query.
+ // +optional
+ string token = 3;
+
+ // Indicates a list of filters passed as string.
+ // +optional
+ string filters = 4;
+
+ // Sort ordering.
+ // +optional
+ Sort sort_by = 5;
+}
+
+// SignalList represents collection of signals along with the token of the last result.
+// See :ref:`ref_flyteidl.admin.Signal` for more details
+message SignalList {
+ // A list of signals matching the input filters.
+ repeated Signal signals = 1;
+
+ // In the case of multiple pages of results, the server-provided token can be used to fetch the next page
+ // in a query. If there are no more results, this value will be empty.
+ string token = 2;
+}
+
+// SignalSetRequest represents a request structure to set the value on a signal. Setting a signal
+// effetively satisfies the signal condition within a Flyte workflow.
+// See :ref:`ref_flyteidl.admin.Signal` for more details
+message SignalSetRequest {
+ // A unique identifier for the requested signal.
+ core.SignalIdentifier id = 1;
+
+ // The value of this signal, must match the defining signal type.
+ core.Literal value = 2;
+}
+
+// SignalSetResponse represents a response structure if signal setting succeeds.
+message SignalSetResponse {
+ // Purposefully empty, may be populated in the future.
+}
+
+// Signal encapsulates a unique identifier, associated metadata, and a value for a single Flyte
+// signal. Signals may exist either without a set value (representing a signal request) or with a
+// populated value (indicating the signal has been given).
+message Signal {
+ // A unique identifier for the requested signal.
+ core.SignalIdentifier id = 1;
+
+ // A type denoting the required value type for this signal.
+ core.LiteralType type = 2;
+
+ // The value of the signal. This is only available if the signal has been "set" and must match
+ // the defined the type.
+ core.Literal value = 3;
+}
diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/task.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/task.proto
index 1a5ea8dce..b768bc010 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/admin/task.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/admin/task.proto
@@ -6,6 +6,7 @@ option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin";
import "flyteidl/core/identifier.proto";
import "flyteidl/core/tasks.proto";
import "flyteidl/core/compiler.proto";
+import "flyteidl/admin/description_entity.proto";
import "google/protobuf/timestamp.proto";
// Represents a request structure to create a revision of a task.
@@ -34,6 +35,9 @@ message Task {
// closure encapsulates all the fields that maps to a compiled version of the task.
TaskClosure closure = 2;
+
+ // One-liner overview of the entity.
+ string short_description = 3;
}
// Represents a list of tasks returned from the admin.
@@ -51,6 +55,9 @@ message TaskList {
message TaskSpec {
// Template of the task that encapsulates all the metadata of the task.
core.TaskTemplate template = 1;
+
+ // Represents the specification for description entity.
+ DescriptionEntity description = 2;
}
// Compute task attributes which include values derived from the TaskSpec, as well as plugin-specific data
diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/task_execution.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/task_execution.proto
index 36d9b77e1..6706a1283 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/admin/task_execution.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/admin/task_execution.proto
@@ -123,6 +123,19 @@ message TaskExecutionClosure {
// TaskExecutionMetadata ExternalResourceInfo fields for each subtask rather than the TaskLog
// in this message.
int32 event_version = 17;
+
+ // A time-series of the phase transition or update explanations. This, when compared to storing a singular reason
+ // as previously done, is much more valuable in visualizing and understanding historical evaluations.
+ repeated Reason reasons = 18;
+}
+
+// Reason is a single message annotated with a timestamp to indicate the instant the reason occurred.
+message Reason {
+ // occurred_at is the timestamp indicating the instant that this reason happened.
+ google.protobuf.Timestamp occurred_at = 1;
+
+ // message is the explanation for the most recent phase transition or status update.
+ string message = 2;
}
// Request structure to fetch inputs and output for a task execution.
@@ -148,4 +161,8 @@ message TaskExecutionGetDataResponse {
// Full_outputs will only be populated if they are under a configured size threshold.
core.LiteralMap full_outputs = 4;
+
+ // flyte tiny url to fetch a core.LiteralMap of task execution's IO
+ // Deck will be empty for task
+ FlyteURLs flyte_urls = 5;
}
diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/workflow.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/workflow.proto
index 895e33b77..b768cf960 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/admin/workflow.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/admin/workflow.proto
@@ -6,6 +6,7 @@ option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin";
import "flyteidl/core/compiler.proto";
import "flyteidl/core/identifier.proto";
import "flyteidl/core/workflow.proto";
+import "flyteidl/admin/description_entity.proto";
import "google/protobuf/timestamp.proto";
// Represents a request structure to create a revision of a workflow.
@@ -33,6 +34,9 @@ message Workflow {
// closure encapsulates all the fields that maps to a compiled version of the workflow.
WorkflowClosure closure = 2;
+
+ // One-liner overview of the entity.
+ string short_description = 3;
}
// Represents a list of workflows returned from the admin.
@@ -55,6 +59,9 @@ message WorkflowSpec {
// propeller compiler (since the compiler doesn't have any knowledge of other workflows - ie, it doesn't reach out
// to Admin to see other registered workflows). In fact, subworkflows do not even need to be registered.
repeated core.WorkflowTemplate sub_workflows = 2;
+
+ // Represents the specification for description entity.
+ DescriptionEntity description = 3;
}
// A container holding the compiled workflow produced from the WorkflowSpec and additional metadata.
@@ -65,3 +72,21 @@ message WorkflowClosure {
// Time at which the workflow was created.
google.protobuf.Timestamp created_at = 2;
}
+
+// The workflow id is already used and the structure is different
+message WorkflowErrorExistsDifferentStructure {
+ core.Identifier id = 1;
+}
+
+// The workflow id is already used with an identical sctructure
+message WorkflowErrorExistsIdenticalStructure {
+ core.Identifier id = 1;
+}
+
+// When a CreateWorkflowRequest failes due to matching id
+message CreateWorkflowFailureReason {
+ oneof reason {
+ WorkflowErrorExistsDifferentStructure exists_different_structure = 1;
+ WorkflowErrorExistsIdenticalStructure exists_identical_structure = 2;
+ }
+}
diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/catalog.proto b/flyteidl-protos/src/main/proto/flyteidl/core/catalog.proto
index 945c3334b..80cc04432 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/core/catalog.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/core/catalog.proto
@@ -20,6 +20,8 @@ enum CatalogCacheStatus {
CACHE_LOOKUP_FAILURE = 4;
// Used to indicate that cache lookup failed because of an error
CACHE_PUT_FAILURE = 5;
+ // Used to indicate the cache lookup was skipped
+ CACHE_SKIPPED = 6;
};
message CatalogArtifactTag {
diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/condition.proto b/flyteidl-protos/src/main/proto/flyteidl/core/condition.proto
index 36e5c7041..247618713 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/core/condition.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/core/condition.proto
@@ -11,17 +11,13 @@ import "flyteidl/core/literals.proto";
message ComparisonExpression {
// Binary Operator for each expression
enum Operator {
- // Equal to
EQ = 0;
- // Not equal to
NEQ = 1;
// Greater Than
GT = 2;
- // Greater than or equal to
GTE = 3;
// Less Than
LT = 4;
- // Less than or equal to
LTE = 5;
}
@@ -34,9 +30,11 @@ message ComparisonExpression {
message Operand {
oneof val {
// Can be a constant
- core.Primitive primitive = 1;
+ core.Primitive primitive = 1 [deprecated = true];
// Or one of this node's input variables
string var = 2;
+ // Replace the primitive field
+ core.Scalar scalar = 3;
}
}
@@ -56,7 +54,6 @@ message ConjunctionExpression {
enum LogicalOperator {
// Conjunction
AND = 0;
- // Disjunction
OR = 1;
}
diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/dynamic_job.proto b/flyteidl-protos/src/main/proto/flyteidl/core/dynamic_job.proto
index c5fe07601..05d0731a1 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/core/dynamic_job.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/core/dynamic_job.proto
@@ -27,6 +27,6 @@ message DynamicJobSpec {
// [Optional] A complete list of task specs referenced in nodes.
repeated TaskTemplate tasks = 4;
- // [Optional] A complete list of sub-workflows templates.
+ // [Optional] A complete list of task specs referenced in nodes.
repeated WorkflowTemplate subworkflows = 5;
}
diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/execution.proto b/flyteidl-protos/src/main/proto/flyteidl/core/execution.proto
index d39024a21..0c3787b66 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/core/execution.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/core/execution.proto
@@ -22,7 +22,7 @@ message WorkflowExecution {
}
}
-// Indicates various phases of Node Execution
+// Indicates various phases of Node Execution that only include the time spent to run the nodes/workflows
message NodeExecution {
enum Phase {
UNDEFINED = 0;
diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/identifier.proto b/flyteidl-protos/src/main/proto/flyteidl/core/identifier.proto
index df375f44a..ef8ca4494 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/core/identifier.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/core/identifier.proto
@@ -19,7 +19,7 @@ enum ResourceType {
// Encapsulation of fields that uniquely identifies a Flyte resource.
message Identifier {
// Identifies the specific type of resource that this identifier corresponds to.
- ResourceType resource_type = 1;
+ core.ResourceType resource_type = 1;
// Name of the project the resource belongs to.
string project = 2;
@@ -63,3 +63,12 @@ message TaskExecutionIdentifier {
uint32 retry_attempt = 3;
}
+
+// Encapsulation of fields the uniquely identify a signal.
+message SignalIdentifier {
+ // Unique identifier for a signal.
+ string signal_id = 1;
+
+ // Identifies the Flyte workflow execution this signal belongs to.
+ WorkflowExecutionIdentifier execution_id = 2;
+}
diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/literals.proto b/flyteidl-protos/src/main/proto/flyteidl/core/literals.proto
index 5df56db04..06af80335 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/core/literals.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/core/literals.proto
@@ -77,7 +77,6 @@ message StructuredDataset {
StructuredDatasetMetadata metadata = 2;
}
-// A simple value.
message Scalar {
oneof value {
Primitive primitive = 1;
diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/metrics.proto b/flyteidl-protos/src/main/proto/flyteidl/core/metrics.proto
new file mode 100644
index 000000000..c96a59988
--- /dev/null
+++ b/flyteidl-protos/src/main/proto/flyteidl/core/metrics.proto
@@ -0,0 +1,36 @@
+syntax = "proto3";
+
+package flyteidl.core;
+
+option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core";
+
+import "flyteidl/core/identifier.proto";
+import "google/protobuf/timestamp.proto";
+
+// Span represents a duration trace of Flyte execution. The id field denotes a Flyte execution entity or an operation
+// which uniquely identifies the Span. The spans attribute allows this Span to be further broken down into more
+// precise definitions.
+message Span {
+ // start_time defines the instance this span began.
+ google.protobuf.Timestamp start_time = 1;
+
+ // end_time defines the instance this span completed.
+ google.protobuf.Timestamp end_time = 2;
+
+ oneof id {
+ // workflow_id is the id of the workflow execution this Span represents.
+ flyteidl.core.WorkflowExecutionIdentifier workflow_id = 3;
+
+ // node_id is the id of the node execution this Span represents.
+ flyteidl.core.NodeExecutionIdentifier node_id = 4;
+
+ // task_id is the id of the task execution this Span represents.
+ flyteidl.core.TaskExecutionIdentifier task_id = 5;
+
+ // operation_id is the id of a unique operation that this Span represents.
+ string operation_id = 6;
+ }
+
+ // spans defines a collection of Spans that breakdown this execution.
+ repeated Span spans = 7;
+}
diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/security.proto b/flyteidl-protos/src/main/proto/flyteidl/core/security.proto
index eb0c1413a..f9830bf6b 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/core/security.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/core/security.proto
@@ -69,6 +69,9 @@ message Identity {
// oauth2_client references an oauth2 client. Backend plugins can use this information to impersonate the client when
// making external calls.
OAuth2Client oauth2_client = 3;
+
+ // execution_identity references the subject who makes the execution
+ string execution_identity = 4;
}
// OAuth2TokenRequest encapsulates information needed to request an OAuth2 token.
diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/tasks.proto b/flyteidl-protos/src/main/proto/flyteidl/core/tasks.proto
index 48961029e..808c196d1 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/core/tasks.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/core/tasks.proto
@@ -95,6 +95,17 @@ message TaskMetadata {
// Indicates whether the system should attempt to execute discoverable instances in serial to avoid duplicate work
bool cache_serializable = 9;
+
+ // Indicates whether the task will generate a Deck URI when it finishes executing.
+ bool generates_deck = 10;
+
+ // Arbitrary tags that allow users and the platform to store small but arbitrary labels
+ map tags = 11;
+
+ // pod_template_name is the unique name of a PodTemplate k8s resource to be used as the base configuration if this
+ // task creates a k8s Pod. If this value is set, the specified PodTemplate will be used instead of, but applied
+ // identically as, the default PodTemplate configured in FlytePropeller.
+ string pod_template_name = 12;
}
// A Task structure that uniquely identifies a task in the system
@@ -104,7 +115,7 @@ message TaskTemplate {
Identifier id = 1;
// A predefined yet extensible Task type identifier. This can be used to customize any of the components. If no
- // extensions are provided in the system, Flyte will resolve this task to its TaskCategory and default the
+ // extensions are provided in the system, Flyte will resolve the this task to its TaskCategory and default the
// implementation registered for the TaskCategory.
string type = 2;
@@ -137,7 +148,6 @@ message TaskTemplate {
// to use as required.
// reserve the field numbers 1 through 15 for very frequently occurring message elements
map config = 16;
-
}
// ----------------- First class Plugins
@@ -258,24 +268,32 @@ message DataLoadingConfig {
// Defines a pod spec and additional pod metadata that is created when a task is executed.
message K8sPod {
- // Contains additional metadata for building a kubernetes pod.
- K8sObjectMetadata metadata = 1;
-
- // Defines the primary pod spec created when a task is executed.
- // This should be a JSON-marshalled pod spec, which can be defined in
- // - go, using: https://github.com/kubernetes/api/blob/release-1.21/core/v1/types.go#L2936
- // - python: using https://github.com/kubernetes-client/python/blob/release-19.0/kubernetes/client/models/v1_pod_spec.py
- google.protobuf.Struct pod_spec = 2;
+ // Contains additional metadata for building a kubernetes pod.
+ K8sObjectMetadata metadata = 1;
+
+ // Defines the primary pod spec created when a task is executed.
+ // This should be a JSON-marshalled pod spec, which can be defined in
+ // - go, using: https://github.com/kubernetes/api/blob/release-1.21/core/v1/types.go#L2936
+ // - python: using https://github.com/kubernetes-client/python/blob/release-19.0/kubernetes/client/models/v1_pod_spec.py
+ google.protobuf.Struct pod_spec = 2;
+
+ // BETA: Optional configuration for DataLoading. If not specified, then default values are used.
+ // This makes it possible to to run a completely portable container, that uses inputs and outputs
+ // only from the local file-system and without having any reference to flytekit. This is supported only on K8s at the moment.
+ // If data loading is enabled, then data will be mounted in accompanying directories specified in the DataLoadingConfig. If the directories
+ // are not specified, inputs will be mounted onto and outputs will be uploaded from a pre-determined file-system path. Refer to the documentation
+ // to understand the default paths.
+ // Only K8s
+ DataLoadingConfig data_config = 3;
}
// Metadata for building a kubernetes object when a task is executed.
message K8sObjectMetadata {
+ // Optional labels to add to the pod definition.
+ map labels = 1;
- // Optional labels to add to the pod definition.
- map labels = 1;
-
- // Optional annotations to add to the pod definition.
- map annotations = 2;
+ // Optional annotations to add to the pod definition.
+ map annotations = 2;
}
// Sql represents a generic sql workload with a statement and dialect.
diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/types.proto b/flyteidl-protos/src/main/proto/flyteidl/core/types.proto
index a2babe783..a05c93d38 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/core/types.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/core/types.proto
@@ -170,6 +170,26 @@ message OutputReference {
// Variable name must refer to an output variable for the node.
string var = 2;
+
+ repeated PromiseAttribute attr_path = 3;
+}
+
+// PromiseAttribute stores the attribute path of a promise, which will be resolved at runtime.
+// The attribute path is a list of strings and integers.
+// In the following example,
+// ```
+// @workflow
+// def wf():
+// o = t1()
+// t2(o.a["b"][0])
+// ```
+// the output reference t2 binds to has a list of PromiseAttribute ["a", "b", 0]
+
+message PromiseAttribute {
+ oneof value {
+ string string_value = 1;
+ int32 int_value = 2;
+ }
}
// Represents an error thrown from a node.
diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/workflow.proto b/flyteidl-protos/src/main/proto/flyteidl/core/workflow.proto
index 00c621daf..37d39182e 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/core/workflow.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/core/workflow.proto
@@ -68,6 +68,71 @@ message WorkflowNode {
}
}
+// ApproveCondition represents a dependency on an external approval. During execution, this will manifest as a boolean
+// signal with the provided signal_id.
+message ApproveCondition {
+ // A unique identifier for the requested boolean signal.
+ string signal_id = 1;
+}
+
+// SignalCondition represents a dependency on an signal.
+message SignalCondition {
+ // A unique identifier for the requested signal.
+ string signal_id = 1;
+
+ // A type denoting the required value type for this signal.
+ LiteralType type = 2;
+
+ // The variable name for the signal value in this nodes outputs.
+ string output_variable_name = 3;
+}
+
+// SleepCondition represents a dependency on waiting for the specified duration.
+message SleepCondition {
+ // The overall duration for this sleep.
+ google.protobuf.Duration duration = 1;
+}
+
+// GateNode refers to the condition that is required for the gate to successfully complete.
+message GateNode {
+ oneof condition {
+ // ApproveCondition represents a dependency on an external approval provided by a boolean signal.
+ ApproveCondition approve = 1;
+
+ // SignalCondition represents a dependency on an signal.
+ SignalCondition signal = 2;
+
+ // SleepCondition represents a dependency on waiting for the specified duration.
+ SleepCondition sleep = 3;
+ }
+}
+
+// ArrayNode is a Flyte node type that simplifies the execution of a sub-node over a list of input
+// values. An ArrayNode can be executed with configurable parallelism (separate from the parent
+// workflow) and can be configured to succeed when a certain number of sub-nodes succeed.
+message ArrayNode {
+ // node is the sub-node that will be executed for each element in the array.
+ Node node = 1;
+
+ // parallelism defines the minimum number of instances to bring up concurrently at any given
+ // point. Note that this is an optimistic restriction and that, due to network partitioning or
+ // other failures, the actual number of currently running instances might be more. This has to
+ // be a positive number if assigned. Default value is size.
+ uint32 parallelism = 2;
+
+ oneof success_criteria {
+ // min_successes is an absolute number of the minimum number of successful completions of
+ // sub-nodes. As soon as this criteria is met, the ArrayNode will be marked as successful
+ // and outputs will be computed. This has to be a non-negative number if assigned. Default
+ // value is size (if specified).
+ uint32 min_successes = 3;
+
+ // If the array job size is not known beforehand, the min_success_ratio can instead be used
+ // to determine when an ArrayNode can be marked successful.
+ float min_success_ratio = 4;
+ }
+}
+
// Defines extra information about the Node.
message NodeMetadata {
// A friendly name for the Node
@@ -129,6 +194,13 @@ message Node {
// Information about the branch node to evaluate in this node.
BranchNode branch_node = 8;
+
+ // Information about the condition to evaluate in this node.
+ GateNode gate_node = 9;
+
+ // Information about the sub-node executions for each value in the list of this nodes
+ // inputs values.
+ ArrayNode array_node = 10;
}
}
@@ -154,6 +226,9 @@ message WorkflowMetadata {
// Defines how the system should behave when a failure is detected in the workflow execution.
OnFailurePolicy on_failure = 2;
+
+ // Arbitrary tags that allow users and the platform to store small but arbitrary labels
+ map tags = 3;
}
// The difference between these settings and the WorkflowMetadata ones is that these are meant to be passed down to
diff --git a/flyteidl-protos/src/main/proto/flyteidl/event/event.proto b/flyteidl-protos/src/main/proto/flyteidl/event/event.proto
index 030073390..934c9f944 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/event/event.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/event/event.proto
@@ -51,7 +51,12 @@ message NodeExecutionEvent {
// by the executor of the node.
google.protobuf.Timestamp occurred_at = 4;
- string input_uri = 5;
+ oneof input_value {
+ string input_uri = 5;
+
+ // Raw input data consumed by this node execution.
+ core.LiteralMap input_data = 20;
+ }
oneof output_result {
// URL to the output of the execution, it encodes all the information
@@ -99,6 +104,12 @@ message NodeExecutionEvent {
// String location uniquely identifying where the deck HTML file is
// NativeUrl specifies the url in the format of the configured storage provider (e.g. s3://my-bucket/randomstring/suffix.tar)
string deck_uri = 19;
+
+ // This timestamp represents the instant when the event was reported by the executing framework. For example,
+ // when first processing a node the `occurred_at` timestamp should be the instant propeller makes progress, so when
+ // literal inputs are initially copied. The event however will not be sent until after the copy completes.
+ // Extracting both of these timestamps facilitates a more accurate portrayal of the evaluation time-series.
+ google.protobuf.Timestamp reported_at = 21;
}
// For Workflow Nodes we need to send information about the workflow that's launched
@@ -113,6 +124,8 @@ message TaskNodeMetadata {
core.CatalogMetadata catalog_key = 2;
// Captures the status of cache reservations for this execution.
core.CatalogReservation.Status reservation_status = 3;
+ // The latest checkpoint location
+ string checkpoint_uri = 4;
// In the case this task launched a dynamic workflow we capture its structure here.
DynamicWorkflowNodeMetadata dynamic_workflow = 16;
@@ -125,6 +138,10 @@ message DynamicWorkflowNodeMetadata {
// Represents the compiled representation of the embedded dynamic workflow.
core.CompiledWorkflowClosure compiled_workflow = 2;
+
+ // dynamic_job_spec_uri is the location of the DynamicJobSpec proto message for this DynamicWorkflow. This is
+ // required to correctly recover partially completed executions where the workflow has already been compiled.
+ string dynamic_job_spec_uri = 3;
}
message ParentTaskExecutionMetadata {
@@ -137,6 +154,14 @@ message ParentNodeExecutionMetadata {
string node_id = 1;
}
+message EventReason {
+ // An explanation for this event
+ string reason = 1;
+
+ // The time this reason occurred
+ google.protobuf.Timestamp occurred_at = 2;
+}
+
// Plugin specific execution event information. For tasks like Python, Hive, Spark, DynamicJob.
message TaskExecutionEvent {
// ID of the task. In combination with the retryAttempt this will indicate
@@ -163,9 +188,14 @@ message TaskExecutionEvent {
// by the executor of the task.
google.protobuf.Timestamp occurred_at = 7;
- // URI of the input file, it encodes all the information
- // including Cloud source provider. ie., s3://...
- string input_uri = 8;
+ oneof input_value {
+ // URI of the input file, it encodes all the information
+ // including Cloud source provider. ie., s3://...
+ string input_uri = 8;
+
+ // Raw input data consumed by this task execution.
+ core.LiteralMap input_data = 19;
+ }
oneof output_result {
// URI to the output of the execution, it will be in a format that encodes all the information
@@ -188,8 +218,11 @@ message TaskExecutionEvent {
uint32 phase_version = 12;
// An optional explanation for the phase transition.
- string reason = 13;
+ // Deprecated: Use reasons instead.
+ string reason = 13 [deprecated = true];
+ // An optional list of explanations for the phase transition.
+ repeated EventReason reasons = 21;
// A predefined yet extensible Task type identifier. If the task definition is already registered in flyte admin
// this type will be identical, but not all task executions necessarily use pre-registered definitions and this
@@ -204,6 +237,12 @@ message TaskExecutionEvent {
// TaskExecutionMetadata ExternalResourceInfo fields for each subtask rather than the TaskLog
// in this message.
int32 event_version = 18;
+
+ // This timestamp represents the instant when the event was reported by the executing framework. For example, a k8s
+ // pod task may be marked completed at (ie. `occurred_at`) the instant the container running user code completes,
+ // but this event will not be reported until the pod is marked as completed. Extracting both of these timestamps
+ // facilitates a more accurate portrayal of the evaluation time-series.
+ google.protobuf.Timestamp reported_at = 20;
}
// This message contains metadata about external resources produced or used by a specific task execution.
diff --git a/flyteidl-protos/src/main/proto/flyteidl/service/admin.proto b/flyteidl-protos/src/main/proto/flyteidl/service/admin.proto
index ca612515b..a99a9818b 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/service/admin.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/service/admin.proto
@@ -6,6 +6,7 @@ option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service";
import "google/api/annotations.proto";
import "flyteidl/admin/project.proto";
import "flyteidl/admin/project_domain_attributes.proto";
+import "flyteidl/admin/project_attributes.proto";
import "flyteidl/admin/task.proto";
import "flyteidl/admin/workflow.proto";
import "flyteidl/admin/workflow_attributes.proto";
@@ -17,7 +18,8 @@ import "flyteidl/admin/node_execution.proto";
import "flyteidl/admin/task_execution.proto";
import "flyteidl/admin/version.proto";
import "flyteidl/admin/common.proto";
-import "protoc-gen-swagger/options/annotations.proto";
+import "flyteidl/admin/description_entity.proto";
+// import "protoc-gen-swagger/options/annotations.proto";
// The following defines an RPC service that is also served over HTTP via grpc-gateway.
// Standard response codes for both are defined here: https://github.com/grpc-ecosystem/grpc-gateway/blob/master/runtime/errors.go
@@ -28,21 +30,21 @@ service AdminService {
post: "/api/v1/tasks"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Create and register a task definition."
- responses: {
- key: "400"
- value: {
- description: "Returned for bad request that may have failed validation."
- }
- }
- responses: {
- key: "409"
- value: {
- description: "Returned for a request that references an identical entity that has already been registered."
- }
- }
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Create and register a task definition."
+ // responses: {
+ // key: "400"
+ // value: {
+ // description: "Returned for bad request that may have failed validation."
+ // }
+ // }
+ // responses: {
+ // key: "409"
+ // value: {
+ // description: "Returned for a request that references an identical entity that has already been registered."
+ // }
+ // }
+ // };
}
// Fetch a :ref:`ref_flyteidl.admin.Task` definition.
@@ -50,9 +52,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/tasks/{id.project}/{id.domain}/{id.name}/{id.version}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Retrieve an existing task definition."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve an existing task definition."
+ // };
}
// Fetch a list of :ref:`ref_flyteidl.admin.NamedEntityIdentifier` of task objects.
@@ -60,9 +62,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/task_ids/{project}/{domain}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Fetch existing task definition identifiers matching input filters."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Fetch existing task definition identifiers matching input filters."
+ // };
}
// Fetch a list of :ref:`ref_flyteidl.admin.Task` definitions.
@@ -73,9 +75,9 @@ service AdminService {
get: "/api/v1/tasks/{id.project}/{id.domain}"
}
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Fetch existing task definitions matching input filters."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Fetch existing task definitions matching input filters."
+ // };
}
// Create and upload a :ref:`ref_flyteidl.admin.Workflow` definition
@@ -84,21 +86,21 @@ service AdminService {
post: "/api/v1/workflows"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Create and register a workflow definition."
- responses: {
- key: "400"
- value: {
- description: "Returned for bad request that may have failed validation."
- }
- }
- responses: {
- key: "409"
- value: {
- description: "Returned for a request that references an identical entity that has already been registered."
- }
- }
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Create and register a workflow definition."
+ // responses: {
+ // key: "400"
+ // value: {
+ // description: "Returned for bad request that may have failed validation."
+ // }
+ // }
+ // responses: {
+ // key: "409"
+ // value: {
+ // description: "Returned for a request that references an identical entity that has already been registered."
+ // }
+ // }
+ // };
}
// Fetch a :ref:`ref_flyteidl.admin.Workflow` definition.
@@ -106,9 +108,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/workflows/{id.project}/{id.domain}/{id.name}/{id.version}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Retrieve an existing workflow definition."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve an existing workflow definition."
+ // };
}
// Fetch a list of :ref:`ref_flyteidl.admin.NamedEntityIdentifier` of workflow objects.
@@ -116,9 +118,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/workflow_ids/{project}/{domain}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Fetch an existing workflow definition identifiers matching input filters."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Fetch an existing workflow definition identifiers matching input filters."
+ // };
}
// Fetch a list of :ref:`ref_flyteidl.admin.Workflow` definitions.
@@ -129,9 +131,9 @@ service AdminService {
get: "/api/v1/workflows/{id.project}/{id.domain}"
}
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Fetch existing workflow definitions matching input filters."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Fetch existing workflow definitions matching input filters."
+ // };
}
// Create and upload a :ref:`ref_flyteidl.admin.LaunchPlan` definition
@@ -140,21 +142,21 @@ service AdminService {
post: "/api/v1/launch_plans"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Create and register a launch plan definition."
- responses: {
- key: "400"
- value: {
- description: "Returned for bad request that may have failed validation."
- }
- }
- responses: {
- key: "409"
- value: {
- description: "Returned for a request that references an identical entity that has already been registered."
- }
- }
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Create and register a launch plan definition."
+ // responses: {
+ // key: "400"
+ // value: {
+ // description: "Returned for bad request that may have failed validation."
+ // }
+ // }
+ // responses: {
+ // key: "409"
+ // value: {
+ // description: "Returned for a request that references an identical entity that has already been registered."
+ // }
+ // }
+ // };
}
// Fetch a :ref:`ref_flyteidl.admin.LaunchPlan` definition.
@@ -162,9 +164,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/launch_plans/{id.project}/{id.domain}/{id.name}/{id.version}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Retrieve an existing launch plan definition."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve an existing launch plan definition."
+ // };
}
// Fetch the active version of a :ref:`ref_flyteidl.admin.LaunchPlan`.
@@ -172,9 +174,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/active_launch_plans/{id.project}/{id.domain}/{id.name}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Retrieve the active launch plan version specified by input request filters."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve the active launch plan version specified by input request filters."
+ // };
}
// List active versions of :ref:`ref_flyteidl.admin.LaunchPlan`.
@@ -182,9 +184,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/active_launch_plans/{project}/{domain}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Fetch the active launch plan versions specified by input request filters."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Fetch the active launch plan versions specified by input request filters."
+ // };
}
// Fetch a list of :ref:`ref_flyteidl.admin.NamedEntityIdentifier` of launch plan objects.
@@ -192,9 +194,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/launch_plan_ids/{project}/{domain}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Fetch existing launch plan definition identifiers matching input filters."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Fetch existing launch plan definition identifiers matching input filters."
+ // };
}
// Fetch a list of :ref:`ref_flyteidl.admin.LaunchPlan` definitions.
@@ -205,9 +207,9 @@ service AdminService {
get: "/api/v1/launch_plans/{id.project}/{id.domain}"
}
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Fetch existing launch plan definitions matching input filters."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Fetch existing launch plan definitions matching input filters."
+ // };
}
// Updates the status of a registered :ref:`ref_flyteidl.admin.LaunchPlan`.
@@ -216,14 +218,14 @@ service AdminService {
put: "/api/v1/launch_plans/{id.project}/{id.domain}/{id.name}/{id.version}"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Update the status of an existing launch plan definition. "
- "At most one launch plan version for a given {project, domain, name} can be active at a time. "
- "If this call sets a launch plan to active and existing version is already active, the result of this call will be that the "
- "formerly active launch plan will be made inactive and specified launch plan in this request will be made active. "
- "In the event that the formerly active launch plan had a schedule associated it with it, this schedule will be disabled. "
- "If the reference launch plan in this request is being set to active and has a schedule associated with it, the schedule will be enabled."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Update the status of an existing launch plan definition. "
+ // "At most one launch plan version for a given {project, domain, name} can be active at a time. "
+ // "If this call sets a launch plan to active and existing version is already active, the result of this call will be that the "
+ // "formerly active launch plan will be made inactive and specified launch plan in this request will be made active. "
+ // "In the event that the formerly active launch plan had a schedule associated it with it, this schedule will be disabled. "
+ // "If the reference launch plan in this request is being set to active and has a schedule associated with it, the schedule will be enabled."
+ // };
}
// Triggers the creation of a :ref:`ref_flyteidl.admin.Execution`
@@ -232,9 +234,9 @@ service AdminService {
post: "/api/v1/executions"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Create a workflow execution."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Create a workflow execution."
+ // };
}
// Triggers the creation of an identical :ref:`ref_flyteidl.admin.Execution`
@@ -243,9 +245,9 @@ service AdminService {
post: "/api/v1/executions/relaunch"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Relaunch a workflow execution."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Relaunch a workflow execution."
+ // };
}
// Recreates a previously-run workflow execution that will only start executing from the last known failure point.
@@ -258,13 +260,13 @@ service AdminService {
post: "/api/v1/executions/recover"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Recreates a previously-run workflow execution that will only start executing from the last known failure point. "
- "In Recover mode, users cannot change any input parameters or update the version of the execution. "
- "This is extremely useful to recover from system errors and byzantine faults like - Loss of K8s cluster, bugs in platform or instability, machine failures, "
- "downstream system failures (downstream services), or simply to recover executions that failed because of retry exhaustion and should complete if tried again."
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Recreates a previously-run workflow execution that will only start executing from the last known failure point. "
+ // "In Recover mode, users cannot change any input parameters or update the version of the execution. "
+ // "This is extremely useful to recover from system errors and byzantine faults like - Loss of K8s cluster, bugs in platform or instability, machine failures, "
+ // "downstream system failures (downstream services), or simply to recover executions that failed because of retry exhaustion and should complete if tried again."
- };
+ // };
}
// Fetches a :ref:`ref_flyteidl.admin.Execution`.
@@ -272,9 +274,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/executions/{id.project}/{id.domain}/{id.name}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Retrieve an existing workflow execution."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve an existing workflow execution."
+ // };
}
// Update execution belonging to project domain :ref:`ref_flyteidl.admin.Execution`.
@@ -283,9 +285,9 @@ service AdminService {
put: "/api/v1/executions/{id.project}/{id.domain}/{id.name}"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Update execution belonging to project domain."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Update execution belonging to project domain."
+ // };
}
// Fetches input and output data for a :ref:`ref_flyteidl.admin.Execution`.
@@ -293,9 +295,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/data/executions/{id.project}/{id.domain}/{id.name}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Retrieve input and output data from an existing workflow execution."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve input and output data from an existing workflow execution."
+ // };
};
// Fetch a list of :ref:`ref_flyteidl.admin.Execution`.
@@ -303,9 +305,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/executions/{id.project}/{id.domain}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Fetch existing workflow executions matching input filters."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Fetch existing workflow executions matching input filters."
+ // };
}
// Terminates an in-progress :ref:`ref_flyteidl.admin.Execution`.
@@ -314,9 +316,9 @@ service AdminService {
delete: "/api/v1/executions/{id.project}/{id.domain}/{id.name}"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Terminate the active workflow execution specified in the request."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Terminate the active workflow execution specified in the request."
+ // };
}
// Fetches a :ref:`ref_flyteidl.admin.NodeExecution`.
@@ -324,9 +326,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/node_executions/{id.execution_id.project}/{id.execution_id.domain}/{id.execution_id.name}/{id.node_id}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Retrieve an existing node execution."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve an existing node execution."
+ // };
}
// Fetch a list of :ref:`ref_flyteidl.admin.NodeExecution`.
@@ -334,9 +336,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/node_executions/{workflow_execution_id.project}/{workflow_execution_id.domain}/{workflow_execution_id.name}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Fetch existing node executions matching input filters."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Fetch existing node executions matching input filters."
+ // };
}
// Fetch a list of :ref:`ref_flyteidl.admin.NodeExecution` launched by the reference :ref:`ref_flyteidl.admin.TaskExecution`.
@@ -344,9 +346,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/children/task_executions/{task_execution_id.node_execution_id.execution_id.project}/{task_execution_id.node_execution_id.execution_id.domain}/{task_execution_id.node_execution_id.execution_id.name}/{task_execution_id.node_execution_id.node_id}/{task_execution_id.task_id.project}/{task_execution_id.task_id.domain}/{task_execution_id.task_id.name}/{task_execution_id.task_id.version}/{task_execution_id.retry_attempt}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Fetch child node executions launched by the specified task execution."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Fetch child node executions launched by the specified task execution."
+ // };
}
// Fetches input and output data for a :ref:`ref_flyteidl.admin.NodeExecution`.
@@ -354,9 +356,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/data/node_executions/{id.execution_id.project}/{id.execution_id.domain}/{id.execution_id.name}/{id.node_id}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Retrieve input and output data from an existing node execution."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve input and output data from an existing node execution."
+ // };
};
// Registers a :ref:`ref_flyteidl.admin.Project` with the Flyte deployment.
@@ -365,9 +367,9 @@ service AdminService {
post: "/api/v1/projects"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Register a project."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Register a project."
+ // };
}
// Updates an existing :ref:`ref_flyteidl.admin.Project`
@@ -378,9 +380,9 @@ service AdminService {
put: "/api/v1/projects/{id}"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Update a project."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Update a project."
+ // };
}
// Fetches a list of :ref:`ref_flyteidl.admin.Project`
@@ -388,9 +390,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/projects"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Fetch registered projects."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Fetch registered projects."
+ // };
}
// Indicates a :ref:`ref_flyteidl.event.WorkflowExecutionEvent` has occurred.
@@ -399,9 +401,9 @@ service AdminService {
post: "/api/v1/events/workflows"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Create a workflow execution event recording a phase transition."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Create a workflow execution event recording a phase transition."
+ // };
}
// Indicates a :ref:`ref_flyteidl.event.NodeExecutionEvent` has occurred.
@@ -410,9 +412,9 @@ service AdminService {
post: "/api/v1/events/nodes"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Create a node execution event recording a phase transition."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Create a node execution event recording a phase transition."
+ // };
}
// Indicates a :ref:`ref_flyteidl.event.TaskExecutionEvent` has occurred.
@@ -421,9 +423,9 @@ service AdminService {
post: "/api/v1/events/tasks"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Create a task execution event recording a phase transition."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Create a task execution event recording a phase transition."
+ // };
}
// Fetches a :ref:`ref_flyteidl.admin.TaskExecution`.
@@ -431,9 +433,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/task_executions/{id.node_execution_id.execution_id.project}/{id.node_execution_id.execution_id.domain}/{id.node_execution_id.execution_id.name}/{id.node_execution_id.node_id}/{id.task_id.project}/{id.task_id.domain}/{id.task_id.name}/{id.task_id.version}/{id.retry_attempt}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Retrieve an existing task execution."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve an existing task execution."
+ // };
}
// Fetches a list of :ref:`ref_flyteidl.admin.TaskExecution`.
@@ -441,9 +443,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/task_executions/{node_execution_id.execution_id.project}/{node_execution_id.execution_id.domain}/{node_execution_id.execution_id.name}/{node_execution_id.node_id}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Fetch existing task executions matching input filters."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Fetch existing task executions matching input filters."
+ // };
}
@@ -452,9 +454,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/data/task_executions/{id.node_execution_id.execution_id.project}/{id.node_execution_id.execution_id.domain}/{id.node_execution_id.execution_id.name}/{id.node_execution_id.node_id}/{id.task_id.project}/{id.task_id.domain}/{id.task_id.name}/{id.task_id.version}/{id.retry_attempt}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Retrieve input and output data from an existing task execution."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve input and output data from an existing task execution."
+ // };
}
// Creates or updates custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` for a project and domain.
@@ -463,9 +465,9 @@ service AdminService {
put: "/api/v1/project_domain_attributes/{attributes.project}/{attributes.domain}"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Update the customized resource attributes associated with a project-domain combination"
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Update the customized resource attributes associated with a project-domain combination"
+ // };
}
// Fetches custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` for a project and domain.
@@ -473,9 +475,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/project_domain_attributes/{project}/{domain}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Retrieve the customized resource attributes associated with a project-domain combination"
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve the customized resource attributes associated with a project-domain combination"
+ // };
}
// Deletes custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` for a project and domain.
@@ -484,20 +486,51 @@ service AdminService {
delete: "/api/v1/project_domain_attributes/{project}/{domain}"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Delete the customized resource attributes associated with a project-domain combination"
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Delete the customized resource attributes associated with a project-domain combination"
+ // };
+ }
+
+ // Creates or updates custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` at the project level
+ rpc UpdateProjectAttributes (flyteidl.admin.ProjectAttributesUpdateRequest) returns (flyteidl.admin.ProjectAttributesUpdateResponse) {
+ option (google.api.http) = {
+ put: "/api/v1/project_attributes/{attributes.project}"
+ body: "*"
+ };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Update the customized resource attributes associated with a project"
+ // };
+ }
+
+ // Fetches custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` for a project and domain.
+ rpc GetProjectAttributes (flyteidl.admin.ProjectAttributesGetRequest) returns (flyteidl.admin.ProjectAttributesGetResponse) {
+ option (google.api.http) = {
+ get: "/api/v1/project_attributes/{project}"
};
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve the customized resource attributes associated with a project"
+ // };
}
+ // Deletes custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` for a project and domain.
+ rpc DeleteProjectAttributes (flyteidl.admin.ProjectAttributesDeleteRequest) returns (flyteidl.admin.ProjectAttributesDeleteResponse) {
+ option (google.api.http) = {
+ delete: "/api/v1/project_attributes/{project}"
+ body: "*"
+ };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Delete the customized resource attributes associated with a project"
+ // };
+ }
// Creates or updates custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` for a project, domain and workflow.
rpc UpdateWorkflowAttributes (flyteidl.admin.WorkflowAttributesUpdateRequest) returns (flyteidl.admin.WorkflowAttributesUpdateResponse) {
option (google.api.http) = {
put: "/api/v1/workflow_attributes/{attributes.project}/{attributes.domain}/{attributes.workflow}"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Update the customized resource attributes associated with a project, domain and workflow combination"
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Update the customized resource attributes associated with a project, domain and workflow combination"
+ // };
}
// Fetches custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` for a project, domain and workflow.
@@ -505,9 +538,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/workflow_attributes/{project}/{domain}/{workflow}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Retrieve the customized resource attributes associated with a project, domain and workflow combination"
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve the customized resource attributes associated with a project, domain and workflow combination"
+ // };
}
// Deletes custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` for a project, domain and workflow.
@@ -516,9 +549,9 @@ service AdminService {
delete: "/api/v1/workflow_attributes/{project}/{domain}/{workflow}"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Delete the customized resource attributes associated with a project, domain and workflow combination"
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Delete the customized resource attributes associated with a project, domain and workflow combination"
+ // };
}
// Lists custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` for a specific resource type.
@@ -526,9 +559,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/matchable_attributes"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Retrieve a list of MatchableAttributesConfiguration objects."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve a list of MatchableAttributesConfiguration objects."
+ // };
}
// Returns a list of :ref:`ref_flyteidl.admin.NamedEntity` objects.
@@ -536,9 +569,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/named_entities/{resource_type}/{project}/{domain}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Retrieve a list of NamedEntity objects sharing a common resource type, project, and domain."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve a list of NamedEntity objects sharing a common resource type, project, and domain."
+ // };
}
// Returns a :ref:`ref_flyteidl.admin.NamedEntity` object.
@@ -546,9 +579,9 @@ service AdminService {
option (google.api.http) = {
get: "/api/v1/named_entities/{resource_type}/{id.project}/{id.domain}/{id.name}"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Retrieve a NamedEntity object."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve a NamedEntity object."
+ // };
}
// Updates a :ref:`ref_flyteidl.admin.NamedEntity` object.
@@ -557,19 +590,50 @@ service AdminService {
put: "/api/v1/named_entities/{resource_type}/{id.project}/{id.domain}/{id.name}"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Update the fields associated with a NamedEntity"
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Update the fields associated with a NamedEntity"
+ // };
}
rpc GetVersion (flyteidl.admin.GetVersionRequest) returns (flyteidl.admin.GetVersionResponse) {
option (google.api.http) = {
get: "/api/v1/version"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Retrieve the Version (including the Build information) for FlyteAdmin service"
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve the Version (including the Build information) for FlyteAdmin service"
+ // };
}
-}
+ // Fetch a :ref:`ref_flyteidl.admin.DescriptionEntity` object.
+ rpc GetDescriptionEntity (flyteidl.admin.ObjectGetRequest) returns (flyteidl.admin.DescriptionEntity) {
+ option (google.api.http) = {
+ get: "/api/v1/description_entities/{id.resource_type}/{id.project}/{id.domain}/{id.name}/{id.version}"
+ };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve an existing description entity description."
+ // };
+ }
+ // Fetch a list of :ref:`ref_flyteidl.admin.DescriptionEntity` definitions.
+ rpc ListDescriptionEntities (flyteidl.admin.DescriptionEntityListRequest) returns (flyteidl.admin.DescriptionEntityList) {
+ option (google.api.http) = {
+ get: "/api/v1/description_entities/{resource_type}/{id.project}/{id.domain}/{id.name}"
+ additional_bindings {
+ get: "/api/v1/description_entities/{resource_type}/{id.project}/{id.domain}"
+ }
+ };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Fetch existing description entity definitions matching input filters."
+ // };
+ }
+
+ // Fetches runtime metrics for a :ref:`ref_flyteidl.admin.Execution`.
+ rpc GetExecutionMetrics (flyteidl.admin.WorkflowExecutionGetMetricsRequest) returns (flyteidl.admin.WorkflowExecutionGetMetricsResponse) {
+ option (google.api.http) = {
+ get: "/api/v1/metrics/executions/{id.project}/{id.domain}/{id.name}"
+ };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve metrics from an existing workflow execution."
+ // };
+ };
+}
diff --git a/flyteidl-protos/src/main/proto/flyteidl/service/agent.proto b/flyteidl-protos/src/main/proto/flyteidl/service/agent.proto
new file mode 100644
index 000000000..2a1a14370
--- /dev/null
+++ b/flyteidl-protos/src/main/proto/flyteidl/service/agent.proto
@@ -0,0 +1,15 @@
+syntax = "proto3";
+package flyteidl.service;
+
+option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service";
+import "flyteidl/admin/agent.proto";
+
+// AgentService defines an RPC Service that allows propeller to send the request to the agent server.
+service AsyncAgentService {
+ // Send a task create request to the agent server.
+ rpc CreateTask (flyteidl.admin.CreateTaskRequest) returns (flyteidl.admin.CreateTaskResponse){};
+ // Get job status.
+ rpc GetTask (flyteidl.admin.GetTaskRequest) returns (flyteidl.admin.GetTaskResponse){};
+ // Delete the task resource.
+ rpc DeleteTask (flyteidl.admin.DeleteTaskRequest) returns (flyteidl.admin.DeleteTaskResponse){};
+}
diff --git a/flyteidl-protos/src/main/proto/flyteidl/service/auth.proto b/flyteidl-protos/src/main/proto/flyteidl/service/auth.proto
index defed6ecf..2d11e7fa3 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/service/auth.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/service/auth.proto
@@ -4,7 +4,7 @@ package flyteidl.service;
option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service";
import "google/api/annotations.proto";
-import "protoc-gen-swagger/options/annotations.proto";
+// import "protoc-gen-swagger/options/annotations.proto";
message OAuth2MetadataRequest {}
@@ -41,6 +41,9 @@ message OAuth2MetadataResponse {
// JSON array containing a list of the OAuth 2.0 grant type values that this authorization server supports.
repeated string grant_types_supported = 9;
+
+ // URL of the authorization server's device authorization endpoint, as defined in Section 3.1 of [RFC8628]
+ string device_authorization_endpoint = 10;
}
message PublicClientAuthConfigRequest {}
@@ -60,6 +63,8 @@ message PublicClientAuthConfigResponse {
// to configure the gRPC connection can be used for the http one respecting the insecure flag to choose between
// SSL or no SSL connections.
string service_http_endpoint = 5;
+ // audience to use when initiating OAuth2 authorization requests.
+ string audience = 6;
}
// The following defines an RPC service that is also served over HTTP via grpc-gateway.
@@ -71,9 +76,9 @@ service AuthMetadataService {
option (google.api.http) = {
get: "/.well-known/oauth-authorization-server"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Retrieves OAuth2 authorization server metadata. This endpoint is anonymously accessible."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieves OAuth2 authorization server metadata. This endpoint is anonymously accessible."
+ // };
}
// Anonymously accessible. Retrieves the client information clients should use when initiating OAuth2 authorization
@@ -82,8 +87,8 @@ service AuthMetadataService {
option (google.api.http) = {
get: "/config/v1/flyte_client"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Retrieves public flyte client info. This endpoint is anonymously accessible."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieves public flyte client info. This endpoint is anonymously accessible."
+ // };
}
}
diff --git a/flyteidl-protos/src/main/proto/flyteidl/service/dataproxy.proto b/flyteidl-protos/src/main/proto/flyteidl/service/dataproxy.proto
index e82757acb..8972d4f6d 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/service/dataproxy.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/service/dataproxy.proto
@@ -4,9 +4,12 @@ package flyteidl.service;
option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service";
import "google/api/annotations.proto";
-import "protoc-gen-swagger/options/annotations.proto";
+// import "protoc-gen-swagger/options/annotations.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
+import "flyteidl/core/identifier.proto";
+import "flyteidl/core/literals.proto";
+
message CreateUploadLocationResponse {
// SignedUrl specifies the url to use to upload content to (e.g. https://my-bucket.s3.amazonaws.com/randomstring/suffix.tar?X-...)
@@ -20,6 +23,10 @@ message CreateUploadLocationResponse {
}
// CreateUploadLocationRequest specified request for the CreateUploadLocation API.
+// The implementation in data proxy service will create the s3 location with some server side configured prefixes,
+// and then:
+// - project/domain/(a deterministic str representation of the content_md5)/filename (if present); OR
+// - project/domain/filename_root (if present)/filename (if present).
message CreateUploadLocationRequest {
// Project to create the upload location for
// +required
@@ -42,10 +49,17 @@ message CreateUploadLocationRequest {
// generated path.
// +required
bytes content_md5 = 5;
+
+ // If present, data proxy will use this string in lieu of the md5 hash in the path. When the filename is also included
+ // this makes the upload location deterministic. The native url will still be prefixed by the upload location prefix
+ // in data proxy config. This option is useful when uploading multiple files.
+ // +optional
+ string filename_root = 6;
}
// CreateDownloadLocationRequest specified request for the CreateDownloadLocation API.
message CreateDownloadLocationRequest {
+ option deprecated = true;
// NativeUrl specifies the url in the format of the configured storage provider (e.g. s3://my-bucket/randomstring/suffix.tar)
string native_url = 1;
@@ -57,12 +71,85 @@ message CreateDownloadLocationRequest {
}
message CreateDownloadLocationResponse {
+ option deprecated = true;
// SignedUrl specifies the url to use to download content from (e.g. https://my-bucket.s3.amazonaws.com/randomstring/suffix.tar?X-...)
string signed_url = 1;
// ExpiresAt defines when will the signed URL expires.
google.protobuf.Timestamp expires_at = 2;
}
+// ArtifactType
+enum ArtifactType {
+ // ARTIFACT_TYPE_UNDEFINED is the default, often invalid, value for the enum.
+ ARTIFACT_TYPE_UNDEFINED = 0;
+
+ // ARTIFACT_TYPE_DECK refers to the deck html file optionally generated after a task, a workflow or a launch plan
+ // finishes executing.
+ ARTIFACT_TYPE_DECK = 1;
+}
+
+// CreateDownloadLinkRequest defines the request parameters to create a download link (signed url)
+message CreateDownloadLinkRequest {
+ // ArtifactType of the artifact requested.
+ ArtifactType artifact_type = 1;
+
+ // ExpiresIn defines a requested expiration duration for the generated url. The request will be rejected if this
+ // exceeds the platform allowed max.
+ // +optional. The default value comes from a global config.
+ google.protobuf.Duration expires_in = 2;
+
+ oneof source {
+ // NodeId is the unique identifier for the node execution. For a task node, this will retrieve the output of the
+ // most recent attempt of the task.
+ core.NodeExecutionIdentifier node_execution_id = 3;
+ }
+}
+
+// CreateDownloadLinkResponse defines the response for the generated links
+message CreateDownloadLinkResponse {
+ // SignedUrl specifies the url to use to download content from (e.g. https://my-bucket.s3.amazonaws.com/randomstring/suffix.tar?X-...)
+ repeated string signed_url = 1 [deprecated = true];
+
+ // ExpiresAt defines when will the signed URL expire.
+ google.protobuf.Timestamp expires_at = 2 [deprecated = true];
+
+ // New wrapper object containing the signed urls and expiration time
+ PreSignedURLs pre_signed_urls = 3;
+}
+
+// Wrapper object since the message is shared across this and the GetDataResponse
+message PreSignedURLs {
+ // SignedUrl specifies the url to use to download content from (e.g. https://my-bucket.s3.amazonaws.com/randomstring/suffix.tar?X-...)
+ repeated string signed_url = 1;
+
+ // ExpiresAt defines when will the signed URL expire.
+ google.protobuf.Timestamp expires_at = 2;
+}
+
+// General request artifact to retrieve data from a Flyte artifact url.
+message GetDataRequest {
+ // A unique identifier in the form of flyte:// that uniquely, for a given Flyte
+ // backend, identifies a Flyte artifact ([i]nput, [o]utput, flyte [d]eck, etc.).
+ // e.g. flyte://v1/proj/development/execid/n2/0/i (for 0th task execution attempt input)
+ // flyte://v1/proj/development/execid/n2/i (for node execution input)
+ // flyte://v1/proj/development/execid/n2/o/o3 (the o3 output of the second node)
+ string flyte_url = 1;
+}
+
+message GetDataResponse {
+ oneof data {
+ // literal map data will be returned
+ core.LiteralMap literal_map = 1;
+
+ // Flyte deck html will be returned as a signed url users can download
+ PreSignedURLs pre_signed_urls = 2;
+
+ // Single literal will be returned. This is returned when the user/url requests a specific output or input
+ // by name. See the o3 example above.
+ core.Literal literal = 3;
+ }
+}
+
// DataProxyService defines an RPC Service that allows access to user-data in a controlled manner.
service DataProxyService {
// CreateUploadLocation creates a signed url to upload artifacts to for a given project/domain.
@@ -71,17 +158,37 @@ service DataProxyService {
post: "/api/v1/dataproxy/artifact_urn"
body: "*"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Creates a write-only http location that is accessible for tasks at runtime."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Creates a write-only http location that is accessible for tasks at runtime."
+ // };
}
+
// CreateDownloadLocation creates a signed url to download artifacts.
rpc CreateDownloadLocation (CreateDownloadLocationRequest) returns (CreateDownloadLocationResponse) {
+ option deprecated = true;
option (google.api.http) = {
get: "/api/v1/dataproxy/artifact_urn"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Creates a read-only http location that is accessible for tasks at runtime."
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Deprecated: Please use CreateDownloadLink instead. Creates a read-only http location that is accessible for tasks at runtime."
+ // };
+ }
+
+ // CreateDownloadLocation creates a signed url to download artifacts.
+ rpc CreateDownloadLink (CreateDownloadLinkRequest) returns (CreateDownloadLinkResponse) {
+ option (google.api.http) = {
+ post: "/api/v1/dataproxy/artifact_link"
+ body: "*"
+ };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Creates a read-only http location that is accessible for tasks at runtime."
+ // };
+ }
+
+ rpc GetData (GetDataRequest) returns (GetDataResponse) {
+ // Takes an address like flyte://v1/proj/development/execid/n2/0/i and return the actual data
+ option (google.api.http) = {
+ get: "/api/v1/data"
};
}
}
diff --git a/flyteidl-protos/src/main/proto/flyteidl/service/external_plugin_service.proto b/flyteidl-protos/src/main/proto/flyteidl/service/external_plugin_service.proto
new file mode 100644
index 000000000..18f60a7d9
--- /dev/null
+++ b/flyteidl-protos/src/main/proto/flyteidl/service/external_plugin_service.proto
@@ -0,0 +1,80 @@
+syntax = "proto3";
+package flyteidl.service;
+
+option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service";
+import "flyteidl/core/literals.proto";
+import "flyteidl/core/tasks.proto";
+import "flyteidl/core/interface.proto";
+
+// ExternalPluginService defines an RPC Service that allows propeller to send the request to the backend plugin server.
+service ExternalPluginService {
+ // Send a task create request to the backend plugin server.
+ rpc CreateTask (TaskCreateRequest) returns (TaskCreateResponse){option deprecated = true;};
+ // Get job status.
+ rpc GetTask (TaskGetRequest) returns (TaskGetResponse){option deprecated = true;};
+ // Delete the task resource.
+ rpc DeleteTask (TaskDeleteRequest) returns (TaskDeleteResponse){option deprecated = true;};
+}
+
+// The state of the execution is used to control its visibility in the UI/CLI.
+enum State {
+ option deprecated = true;
+ RETRYABLE_FAILURE = 0;
+ PERMANENT_FAILURE = 1;
+ PENDING = 2;
+ RUNNING = 3;
+ SUCCEEDED = 4;
+}
+
+// Represents a request structure to create task.
+message TaskCreateRequest {
+ option deprecated = true;
+ // The inputs required to start the execution. All required inputs must be
+ // included in this map. If not required and not provided, defaults apply.
+ // +optional
+ core.LiteralMap inputs = 1;
+ // Template of the task that encapsulates all the metadata of the task.
+ core.TaskTemplate template = 2;
+ // Prefix for where task output data will be written. (e.g. s3://my-bucket/randomstring)
+ string output_prefix = 3;
+}
+
+// Represents a create response structure.
+message TaskCreateResponse {
+ option deprecated = true;
+ string job_id = 1;
+}
+
+// A message used to fetch a job state from backend plugin server.
+message TaskGetRequest {
+ option deprecated = true;
+ // A predefined yet extensible Task type identifier.
+ string task_type = 1;
+ // The unique id identifying the job.
+ string job_id = 2;
+}
+
+// Response to get an individual task state.
+message TaskGetResponse {
+ option deprecated = true;
+ // The state of the execution is used to control its visibility in the UI/CLI.
+ State state = 1;
+ // The outputs of the execution. It's typically used by sql task. Flyteplugins service will create a
+ // Structured dataset pointing to the query result table.
+ // +optional
+ core.LiteralMap outputs = 2;
+}
+
+// A message used to delete a task.
+message TaskDeleteRequest {
+ option deprecated = true;
+ // A predefined yet extensible Task type identifier.
+ string task_type = 1;
+ // The unique id identifying the job.
+ string job_id = 2;
+}
+
+// Response to delete a task.
+message TaskDeleteResponse {
+ option deprecated = true;
+}
diff --git a/flyteidl-protos/src/main/proto/flyteidl/service/identity.proto b/flyteidl-protos/src/main/proto/flyteidl/service/identity.proto
index d51168cb9..e4bc5dcb0 100644
--- a/flyteidl-protos/src/main/proto/flyteidl/service/identity.proto
+++ b/flyteidl-protos/src/main/proto/flyteidl/service/identity.proto
@@ -4,7 +4,8 @@ package flyteidl.service;
option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service";
import "google/api/annotations.proto";
-import "protoc-gen-swagger/options/annotations.proto";
+import "google/protobuf/struct.proto";
+// import "protoc-gen-swagger/options/annotations.proto";
message UserInfoRequest {}
@@ -31,6 +32,9 @@ message UserInfoResponse {
// Profile picture URL
string picture = 7;
+
+ // Additional claims
+ google.protobuf.Struct additional_claims = 8;
}
// IdentityService defines an RPC Service that interacts with user/app identities.
@@ -40,8 +44,8 @@ service IdentityService {
option (google.api.http) = {
get: "/me"
};
- option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
- description: "Retrieves authenticated identity info."
- };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieves authenticated identity info."
+ // };
}
}
diff --git a/flyteidl-protos/src/main/proto/flyteidl/service/signal.proto b/flyteidl-protos/src/main/proto/flyteidl/service/signal.proto
new file mode 100644
index 000000000..634440715
--- /dev/null
+++ b/flyteidl-protos/src/main/proto/flyteidl/service/signal.proto
@@ -0,0 +1,55 @@
+syntax = "proto3";
+package flyteidl.service;
+
+option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service";
+
+import "google/api/annotations.proto";
+import "flyteidl/admin/signal.proto";
+// import "protoc-gen-swagger/options/annotations.proto";
+
+// SignalService defines an RPC Service that may create, update, and retrieve signal(s).
+service SignalService {
+ // Fetches or creates a :ref:`ref_flyteidl.admin.Signal`.
+ rpc GetOrCreateSignal (flyteidl.admin.SignalGetOrCreateRequest) returns (flyteidl.admin.Signal) {
+ // Purposefully left out an HTTP API for this RPC call. This is meant to idempotently retrieve
+ // a signal, meaning the first call will create the signal and all subsequent calls will
+ // fetch the existing signal. This is only useful during Flyte Workflow execution and therefore
+ // is not exposed to mitigate unintended behavior.
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Retrieve a signal, creating it if it does not exist."
+ // };
+ }
+
+ // Fetch a list of :ref:`ref_flyteidl.admin.Signal` definitions.
+ rpc ListSignals (flyteidl.admin.SignalListRequest) returns (flyteidl.admin.SignalList) {
+ option (google.api.http) = {
+ get: "/api/v1/signals/{workflow_execution_id.project}/{workflow_execution_id.domain}/{workflow_execution_id.name}"
+ };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Fetch existing signal definitions matching the input signal id filters."
+ // };
+ }
+
+ // Sets the value on a :ref:`ref_flyteidl.admin.Signal` definition
+ rpc SetSignal (flyteidl.admin.SignalSetRequest) returns (flyteidl.admin.SignalSetResponse) {
+ option (google.api.http) = {
+ post: "/api/v1/signals"
+ body: "*"
+ };
+ // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
+ // description: "Set a signal value."
+ // responses: {
+ // key: "400"
+ // value: {
+ // description: "Returned for bad request that may have failed validation."
+ // }
+ // }
+ // responses: {
+ // key: "409"
+ // value: {
+ // description: "Returned for a request that references an identical entity that has already been registered."
+ // }
+ // }
+ // };
+ }
+}
diff --git a/flytekit-scala-tests/src/test/scala/org/flyte/flytekitscala/SdkScalaTypeTest.scala b/flytekit-scala-tests/src/test/scala/org/flyte/flytekitscala/SdkScalaTypeTest.scala
index a36a6a3c2..949550c61 100644
--- a/flytekit-scala-tests/src/test/scala/org/flyte/flytekitscala/SdkScalaTypeTest.scala
+++ b/flytekit-scala-tests/src/test/scala/org/flyte/flytekitscala/SdkScalaTypeTest.scala
@@ -95,7 +95,9 @@ class SdkScalaTypeTest {
datetime: SdkBindingData[Instant],
duration: SdkBindingData[Duration],
blob: SdkBindingData[Blob],
- generic: SdkBindingData[ScalarNested]
+ generic: SdkBindingData[ScalarNested],
+ none: SdkBindingData[Option[String]],
+ some: SdkBindingData[Option[String]]
)
case class CollectionInput(
@@ -105,7 +107,8 @@ class SdkScalaTypeTest {
booleans: SdkBindingData[List[Boolean]],
datetimes: SdkBindingData[List[Instant]],
durations: SdkBindingData[List[Duration]],
- generics: SdkBindingData[List[ScalarNested]]
+ generics: SdkBindingData[List[ScalarNested]],
+ options: SdkBindingData[List[Option[String]]]
)
case class MapInput(
@@ -115,7 +118,8 @@ class SdkScalaTypeTest {
booleanMap: SdkBindingData[Map[String, Boolean]],
datetimeMap: SdkBindingData[Map[String, Instant]],
durationMap: SdkBindingData[Map[String, Duration]],
- genericMap: SdkBindingData[Map[String, ScalarNested]]
+ genericMap: SdkBindingData[Map[String, ScalarNested]],
+ optionMap: SdkBindingData[Map[String, Option[String]]]
)
case class ComplexInput(
@@ -196,7 +200,9 @@ class SdkScalaTypeTest {
.literalType(LiteralType.ofBlobType(BlobType.DEFAULT))
.description("")
.build(),
- "generic" -> createVar(SimpleType.STRUCT)
+ "generic" -> createVar(SimpleType.STRUCT),
+ "none" -> createVar(SimpleType.STRUCT),
+ "some" -> createVar(SimpleType.STRUCT)
).asJava
val output = SdkScalaType[ScalarInput].getVariableMap
@@ -274,6 +280,16 @@ class SdkScalaTypeTest {
).asJava
)
)
+ ),
+ "none" -> Literal.ofScalar(
+ Scalar.ofGeneric(
+ Struct.of(Map.empty[String, Struct.Value].asJava)
+ )
+ ),
+ "some" -> Literal.ofScalar(
+ Scalar.ofGeneric(
+ Struct.of(Map("value" -> Struct.Value.ofStringValue("hello")).asJava)
+ )
)
).asJava
@@ -295,6 +311,14 @@ class SdkScalaTypeTest {
List(ScalarNestedNested("foo", Some("bar"))),
Map("foo" -> ScalarNestedNested("foo", Some("bar")))
)
+ ),
+ none = SdkBindingDataFactory.of(
+ SdkLiteralTypes.generics[Option[String]](),
+ Option(null)
+ ),
+ some = SdkBindingDataFactory.of(
+ SdkLiteralTypes.generics[Option[String]](),
+ Option("hello")
)
)
@@ -323,7 +347,11 @@ class SdkScalaTypeTest {
List(ScalarNestedNested("foo", Some("bar"))),
Map("foo" -> ScalarNestedNested("foo", Some("bar")))
)
- )
+ ),
+ none =
+ SdkBindingDataFactory.of(SdkLiteralTypes.generics(), Option(null)),
+ some =
+ SdkBindingDataFactory.of(SdkLiteralTypes.generics(), Option("hello"))
)
val expected = Map(
@@ -399,6 +427,23 @@ class SdkScalaTypeTest {
).asJava
)
)
+ ),
+ "none" -> Literal.ofScalar(
+ Scalar.ofGeneric(
+ Struct.of(
+ Map(__TYPE -> Struct.Value.ofStringValue("scala.None$")).asJava
+ )
+ )
+ ),
+ "some" -> Literal.ofScalar(
+ Scalar.ofGeneric(
+ Struct.of(
+ Map(
+ "value" -> Struct.Value.ofStringValue("hello"),
+ __TYPE -> Struct.Value.ofStringValue("scala.Some")
+ ).asJava
+ )
+ )
)
).asJava
@@ -416,7 +461,8 @@ class SdkScalaTypeTest {
"booleans" -> createCollectionVar(SimpleType.BOOLEAN),
"datetimes" -> createCollectionVar(SimpleType.DATETIME),
"durations" -> createCollectionVar(SimpleType.DURATION),
- "generics" -> createCollectionVar(SimpleType.STRUCT)
+ "generics" -> createCollectionVar(SimpleType.STRUCT),
+ "options" -> createCollectionVar(SimpleType.STRUCT)
).asJava
val output = SdkScalaType[CollectionInput].getVariableMap
@@ -443,6 +489,14 @@ class SdkScalaTypeTest {
List(ScalarNestedNested("foo", Some("bar"))),
Map("foo" -> ScalarNestedNested("foo", Some("bar")))
)
+ ),
+ none = SdkBindingDataFactory.of(
+ SdkLiteralTypes.generics[Option[String]](),
+ Option(null)
+ ),
+ some = SdkBindingDataFactory.of(
+ SdkLiteralTypes.generics[Option[String]](),
+ Option("hello")
)
)
@@ -465,6 +519,14 @@ class SdkScalaTypeTest {
List(ScalarNestedNested("foo", Some("bar"))),
Map("foo" -> ScalarNestedNested("foo", Some("bar")))
)
+ ),
+ "none" -> SdkBindingDataFactory.of(
+ SdkLiteralTypes.generics[Option[String]](),
+ Option(null)
+ ),
+ "some" -> SdkBindingDataFactory.of(
+ SdkLiteralTypes.generics[Option[String]](),
+ Option("hello")
)
).asJava
@@ -531,6 +593,10 @@ class SdkScalaTypeTest {
Map("foo2" -> ScalarNestedNested("foo2", Some("bar2")))
)
)
+ ),
+ options = SdkBindingDataFactory.of(
+ SdkLiteralTypes.generics[Option[String]](),
+ List(Option("hello"), Option(null))
)
)
@@ -550,7 +616,8 @@ class SdkScalaTypeTest {
"booleanMap" -> createMapVar(SimpleType.BOOLEAN),
"datetimeMap" -> createMapVar(SimpleType.DATETIME),
"durationMap" -> createMapVar(SimpleType.DURATION),
- "genericMap" -> createMapVar(SimpleType.STRUCT)
+ "genericMap" -> createMapVar(SimpleType.STRUCT),
+ "optionMap" -> createMapVar(SimpleType.STRUCT)
).asJava
val output = SdkScalaType[MapInput].getVariableMap
@@ -598,6 +665,10 @@ class SdkScalaTypeTest {
Map("foo2" -> ScalarNestedNested("foo2", Some("bar2")))
)
)
+ ),
+ optionMap = SdkBindingDataFactory.of(
+ SdkLiteralTypes.generics[Option[String]](),
+ Map("none" -> Option(null), "some" -> Option("hello"))
)
)
diff --git a/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/SdkLiteralTypes.scala b/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/SdkLiteralTypes.scala
index de245a5be..d08394fe7 100644
--- a/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/SdkLiteralTypes.scala
+++ b/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/SdkLiteralTypes.scala
@@ -28,6 +28,7 @@ import scala.reflect.api.{Mirror, TypeCreator, Universe}
import scala.reflect.runtime.universe
import scala.reflect.{ClassTag, classTag}
import scala.reflect.runtime.universe.{
+ ClassSymbol,
NoPrefix,
Symbol,
Type,
@@ -73,14 +74,12 @@ object SdkLiteralTypes {
blobs(BlobType.DEFAULT).asInstanceOf[SdkLiteralType[T]]
case t if t =:= typeOf[Binary] =>
binary().asInstanceOf[SdkLiteralType[T]]
-
case t if t <:< typeOf[List[Any]] =>
collections(of()(createTypeTag(typeTag[T].mirror, t.typeArgs.head)))
.asInstanceOf[SdkLiteralType[T]]
case t if t <:< typeOf[Map[String, Any]] =>
maps(of()(createTypeTag(typeTag[T].mirror, t.typeArgs.last)))
.asInstanceOf[SdkLiteralType[T]]
-
case t if t <:< typeOf[Product] && !(t =:= typeOf[Option[_]]) =>
generics().asInstanceOf[SdkLiteralType[T]]
@@ -314,11 +313,23 @@ object SdkLiteralTypes {
throw new IllegalArgumentException(
s"Map is missing required parameter named $paramName"
)
- )
- valueToParamValue(value, param.typeSignature.dealias)
- })
+ valueToParamValue(value, param.typeSignature.dealias)
+ })
+
+ constructorMirror(constructorArgs: _*).asInstanceOf[S]
+ }
+
+ val clazz = typeOf[S].typeSymbol.asClass
+ // special handling of scala.Option as it is a Product, but can't be instantiated like common
+ // case classes
+ if (clazz.name.toString == "Option")
+ map
+ .get("value")
+ .map(valueToParamValue(_, typeOf[S].typeArgs.head))
+ .asInstanceOf[S]
+ else
+ instantiateViaConstructor(clazz)
- constructorMirror(constructorArgs: _*).asInstanceOf[S]
}
def structValueToAny(value: Struct.Value): Any = {
diff --git a/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/SdkScalaType.scala b/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/SdkScalaType.scala
index 00cbdea56..c4868094e 100644
--- a/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/SdkScalaType.scala
+++ b/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/SdkScalaType.scala
@@ -232,11 +232,8 @@ object SdkScalaType {
implicit def durationLiteralType: SdkScalaLiteralType[Duration] =
DelegateLiteralType(SdkLiteralTypes.durations())
- // more specific matching to fail the usage of SdkBindingData[Option[_]]
- implicit def optionLiteralType: SdkScalaLiteralType[Option[_]] = ???
-
// fixme: using Product is just an approximation for case class because Product
- // is also super class of, for example, Option and Tuple
+ // is also super class of, for example, Either or Try
implicit def productLiteralType[T <: Product: TypeTag: ClassTag]
: SdkScalaLiteralType[T] =
DelegateLiteralType(SdkLiteralTypes.generics())
diff --git a/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/package.scala b/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/package.scala
index 47c6332b3..b5bcc208d 100644
--- a/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/package.scala
+++ b/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/package.scala
@@ -30,7 +30,11 @@ package object flytekitscala {
} catch {
case _: Throwable =>
// fall back to java's way, less reliable and with limitations
- product.getClass.getDeclaredFields.map(_.getName).toList
+ val methodNames = product.getClass.getDeclaredMethods.map(_.getName)
+ product.getClass.getDeclaredFields
+ .map(_.getName)
+ .filter(methodNames.contains)
+ .toList
}
}
}