From 44d52002e8d7e37062248b2cc364c94f9e573b25 Mon Sep 17 00:00:00 2001 From: davidmirror-ops Date: Tue, 26 Mar 2024 14:43:07 -0500 Subject: [PATCH 1/7] Exapnd intro and add diagram Signed-off-by: davidmirror-ops --- .../flytepropeller_architecture.rst | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/docs/concepts/component_architecture/flytepropeller_architecture.rst b/docs/concepts/component_architecture/flytepropeller_architecture.rst index c04edbf617..bad41c89c2 100644 --- a/docs/concepts/component_architecture/flytepropeller_architecture.rst +++ b/docs/concepts/component_architecture/flytepropeller_architecture.rst @@ -15,13 +15,20 @@ Introduction A Flyte :ref:`workflow ` is represented as a Directed Acyclic Graph (DAG) of interconnected Nodes. Flyte supports a robust collection of Node types to ensure diverse functionality. - ``TaskNodes`` support a plugin system to externally add system integrations. -- Control flow can be altered during runtime using ``BranchNodes``, which prune downstream evaluation paths based on input. +- ``BranchNodes`` allow altering the Control flow during runtime; pruning downstream evaluation paths based on input. - ``DynamicNodes`` add nodes to the DAG. - ``WorkflowNodes`` allow embedding workflows within each other. -FlytePropeller is responsible for scheduling and tracking execution of Flyte workflows. It is implemented using a K8s controller and adheres to the established K8s design principles. In this scheme, resources are periodically evaluated and the goal is to transition from the observed state to a requested state. +FlytePropeller is responsible for scheduling and tracking execution of Flyte workflows. It is implemented using a K8s controller that follows the reconciler pattern. -In our case, workflows are the resources and they are iteratively evaluated to transition from the current state to success. During each loop, the current workflow state is established as the phase of workflow nodes and subsequent tasks, and FlytePropeller performs operations to transition this state to success. The operations may include scheduling (or rescheduling) node executions, evaluating dynamic or branch nodes, etc. These design decisions ensure that FlytePropeller can scale to manage a large number of concurrent workflows without performance degradation. +.. image:: https://raw.githubusercontent.com/flyteorg/static-resources/main/common/reconciler-pattern.png + +In this scheme, resources are periodically evaluated and the goal is to transition from the observed state to a requested state. + +In our case, workflows are the resources, whose desired stated (*workflow definition*) is expressed using Flyte's SDK. Workflows are iteratively evaluated to transition from the current state to success. During each evaluation loop, the current workflow state is established as the `phase of workflow nodes `__ and subsequent tasks, and FlytePropeller performs operations to transition this state to success. +The operations may include scheduling (or rescheduling) node executions, evaluating dynamic or branch nodes, etc. + +By using a simple yet robust mechanism, FlytePropeller can scale to manage a large number of concurrent workflows without performance degradation. This document attempts to break down the FlytePropeller architecture by tracking workflow life cycle through each internal component. Below is a high-level illustration of the FlytePropeller architecture and a flow chart of each component's responsibilities during FlyteWorkflow execution. @@ -33,7 +40,10 @@ Components FlyteWorkflow CRD / K8s Integration ----------------------------------- -Workflows in Flyte are maintained as Custom Resource Definitions (CRDs) in Kubernetes, which are stored in the backing etcd cluster. Each execution of a workflow definition results in the creation of a new FlyteWorkflow CR (Custom Resource) which maintains a state for the entirety of processing. CRDs provide variable definitions to describe both resource specifications (spec) and status (status). The FlyteWorkflow CRD uses the spec subsection to detail the workflow DAG, embodying node dependencies, etc. The status subsection tracks workflow metadata including overall workflow status, node/task phases, status/phase transition timestamps, etc. + +Workflows in Flyte are maintained as `Custom Resource Definitions (CRDs) `__ in Kubernetes, which are stored in the backing ``etcd`` key-value store. Each worklfow execution results in the creation of a new ``flyteworkflow`` CR (Custom Resource) which maintains its state for the duration of the execution. CRDs provide variable definitions to describe both resource specifications (``spec``) and status (``status``). The ``flyteworkflow`` CRD uses the ``spec`` subsection to detail the workflow DAG, embodying node dependencies, etc. + +The status subsection tracks workflow metadata including overall workflow status, node/task phases, status/phase transition timestamps, etc. K8s exposes a powerful controller/operator API that enables entities to track creation/updates over a specific resource type. FlytePropeller uses this API to track FlyteWorkflows, meaning every time an instance of the FlyteWorkflow CR is created/updated, the FlytePropeller instance is notified. FlyteAdmin is the common entry point, where initialization of FlyteWorkflow CRs may be triggered by user workflow definition executions, automatic relaunches, or periodically scheduled workflow definition executions. However, it is conceivable to manually create FlyteWorkflow CRs, but this will have limited visibility and usability. From eb514338e3f389e42c9f8b983c26c5e57f7a96a4 Mon Sep 17 00:00:00 2001 From: davidmirror-ops Date: Tue, 26 Mar 2024 14:45:57 -0500 Subject: [PATCH 2/7] Add note to intro Signed-off-by: davidmirror-ops --- .../component_architecture/flytepropeller_architecture.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/concepts/component_architecture/flytepropeller_architecture.rst b/docs/concepts/component_architecture/flytepropeller_architecture.rst index bad41c89c2..7568e6b947 100644 --- a/docs/concepts/component_architecture/flytepropeller_architecture.rst +++ b/docs/concepts/component_architecture/flytepropeller_architecture.rst @@ -28,7 +28,7 @@ In this scheme, resources are periodically evaluated and the goal is to transiti In our case, workflows are the resources, whose desired stated (*workflow definition*) is expressed using Flyte's SDK. Workflows are iteratively evaluated to transition from the current state to success. During each evaluation loop, the current workflow state is established as the `phase of workflow nodes `__ and subsequent tasks, and FlytePropeller performs operations to transition this state to success. The operations may include scheduling (or rescheduling) node executions, evaluating dynamic or branch nodes, etc. -By using a simple yet robust mechanism, FlytePropeller can scale to manage a large number of concurrent workflows without performance degradation. +By using a simple yet robust mechanism, FlytePropeller can scale to manage a large number of concurrent workflows without significant performance degradation. This document attempts to break down the FlytePropeller architecture by tracking workflow life cycle through each internal component. Below is a high-level illustration of the FlytePropeller architecture and a flow chart of each component's responsibilities during FlyteWorkflow execution. From 3e95fcfba00363f573d3f058ae1ff6aa02b1538c Mon Sep 17 00:00:00 2001 From: davidmirror-ops Date: Tue, 26 Mar 2024 15:13:46 -0500 Subject: [PATCH 3/7] Add example CR contents Signed-off-by: davidmirror-ops --- .../flytepropeller_architecture.rst | 88 ++++++++++++++++++- 1 file changed, 86 insertions(+), 2 deletions(-) diff --git a/docs/concepts/component_architecture/flytepropeller_architecture.rst b/docs/concepts/component_architecture/flytepropeller_architecture.rst index 7568e6b947..dbb7d598dd 100644 --- a/docs/concepts/component_architecture/flytepropeller_architecture.rst +++ b/docs/concepts/component_architecture/flytepropeller_architecture.rst @@ -37,15 +37,99 @@ This document attempts to break down the FlytePropeller architecture by tracking Components ========== + +FlyteAdmin is the common entry point, where initialization of FlyteWorkflow CRs may be triggered by user workflow definition executions, automatic relaunches, or periodically scheduled workflow definition executions. + FlyteWorkflow CRD / K8s Integration ----------------------------------- +Workflows in Flyte are maintained as `Custom Resource Definitions (CRDs) `__ in Kubernetes, which are stored in the backing ``etcd`` key-value store. Each workflow execution results in the creation of a new ``flyteworkflow`` CR (Custom Resource) which maintains its state for the duration of the execution. CRDs provide variable definitions to describe both resource specifications (``spec``) and status (``status``). The ``flyteworkflow`` CRD uses the ``spec`` subsection to detail the workflow DAG, embodying node dependencies, etc. + +**Example** + +1. Execute an `example workflow `__ on a remote Flyte cluster: + +.. code-block:: bash + + pyflyte run --remote example.py training_workflow --hyperparameters '{"C": 0.4}' + +2. Verify there's a new Custom Resource on the ``flytesnacks-development`` namespace (this is, the workflow belongs to the ``flytesnacks`` project and the ``development`` domain): + +.. code-block:: bash + + kubectl get flyteworkflows.flyte.lyft.com -n flytesnacks-development + +Example output: -Workflows in Flyte are maintained as `Custom Resource Definitions (CRDs) `__ in Kubernetes, which are stored in the backing ``etcd`` key-value store. Each worklfow execution results in the creation of a new ``flyteworkflow`` CR (Custom Resource) which maintains its state for the duration of the execution. CRDs provide variable definitions to describe both resource specifications (``spec``) and status (``status``). The ``flyteworkflow`` CRD uses the ``spec`` subsection to detail the workflow DAG, embodying node dependencies, etc. +.. code-block:: bash + + NAME AGE + f7616dc75400f43e6920 3h42m + +3. Describe the contents of the CR, for example the ``spec`` section: + +.. code-block:: bash + + kubectl describe flyteworkflows.flyte.lyft.com f7616dc75400f43e6920 -n flytesnacks-development + +.. code-block:: json + + "spec": { + "connections": { + "n0": [ + "n1" + ], + "n1": [ + "n2" + ], + "n2": [ + "end-node" + ], + "start-node": [ + "n0", + "n2" + ] + }, The status subsection tracks workflow metadata including overall workflow status, node/task phases, status/phase transition timestamps, etc. -K8s exposes a powerful controller/operator API that enables entities to track creation/updates over a specific resource type. FlytePropeller uses this API to track FlyteWorkflows, meaning every time an instance of the FlyteWorkflow CR is created/updated, the FlytePropeller instance is notified. FlyteAdmin is the common entry point, where initialization of FlyteWorkflow CRs may be triggered by user workflow definition executions, automatic relaunches, or periodically scheduled workflow definition executions. However, it is conceivable to manually create FlyteWorkflow CRs, but this will have limited visibility and usability. +.. code-block:: json + + }, + "status": { + "dataDir": "gs://flyteontf-gcp-data-116223838137/metadata/propeller/flytesnacks-development-f7616dc75400f43e6920", + "defVersion": 1, + "lastUpdatedAt": "2024-03-26T16:22:16Z", + "nodeStatus": { + "end-node": { + "phase": 5, + "stoppedAt": "2024-03-26T16:22:16Z" + }, + "n0": { + "phase": 5, + "stoppedAt": "2024-03-26T16:21:46Z" + }, + "n1": { + "phase": 5, + "stoppedAt": "2024-03-26T16:22:02Z" + }, + "n2": { + "phase": 5, + "stoppedAt": "2024-03-26T16:22:16Z" + }, + "start-node": { + "phase": 5, + "stoppedAt": "2024-03-26T16:20:39Z" + } + }, + + +K8s exposes a powerful controller/operator API that enables entities to track creation/updates over a specific resource type. FlytePropeller uses this API to track FlyteWorkflows, meaning every time an instance of the ``flyteworkflow`` CR is created/updated, the FlytePropeller instance is notified. + +.. note:: + + Manual creation of ``flyteworkflow`` CRs, without the intervention of ``flyteadmin``, is possible but not supported as the resulting resource will have limited visibility and usability. + WorkQueue/WorkerPool ---------------------- From 3b1c9afc0f4f89b36596148ffe432c0ac8f9e0b9 Mon Sep 17 00:00:00 2001 From: davidmirror-ops Date: Wed, 27 Mar 2024 07:48:46 -0500 Subject: [PATCH 4/7] Update contents of the CR Signed-off-by: davidmirror-ops --- .../component_architecture/flytepropeller_architecture.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/concepts/component_architecture/flytepropeller_architecture.rst b/docs/concepts/component_architecture/flytepropeller_architecture.rst index dbb7d598dd..32ea054b5a 100644 --- a/docs/concepts/component_architecture/flytepropeller_architecture.rst +++ b/docs/concepts/component_architecture/flytepropeller_architecture.rst @@ -139,12 +139,12 @@ FlytePropeller supports concurrent execution of multiple, unique workflows using The WorkQueue is a FIFO queue storing workflow ID strings that require a lookup to retrieve the FlyteWorkflow CR to ensure up-to-date status. A workflow may be added to the queue in a variety of circumstances: #. A new FlyteWorkflow CR is created or an existing instance is updated -#. The K8s Informer resyncs the FlyteWorkflow periodically (necessary to detect workflow timeouts and ensure liveness) +#. The K8s Informer detects a workflow timeout or failed liveness check during its periodic resync operation on the FlyteWorkflow. #. A FlytePropeller worker experiences an error during a processing loop #. The WorkflowExecutor observes a completed downstream node #. A NodeHandler observes state change and explicitly enqueues its owner (For example, K8s pod informer observes completion of a task) -The WorkerPool is implemented as a collection of goroutines, one for each worker. Using this lightweight construct, FlytePropeller can scale to 1000s of workers on a single CPU. Workers continually poll the WorkQueue for workflows. On success, the workflow is executed (passed to WorkflowExecutor). +The WorkerPool is implemented as a collection of ``goroutines``, one for each worker. Using this lightweight construct, FlytePropeller can scale to 1000s of workers on a single CPU. Workers continually poll the WorkQueue for workflows. On success, the workflow is passed to the WorkflowExecutor. WorkflowExecutor ---------------- From 49d786297ffc7ebc7675fadd8cbd885477de6ab8 Mon Sep 17 00:00:00 2001 From: davidmirror-ops Date: Wed, 27 Mar 2024 09:41:01 -0500 Subject: [PATCH 5/7] Expand description of plugins diagram Signed-off-by: davidmirror-ops --- .../flytepropeller_architecture.rst | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/docs/concepts/component_architecture/flytepropeller_architecture.rst b/docs/concepts/component_architecture/flytepropeller_architecture.rst index 32ea054b5a..ccb50676a0 100644 --- a/docs/concepts/component_architecture/flytepropeller_architecture.rst +++ b/docs/concepts/component_architecture/flytepropeller_architecture.rst @@ -154,7 +154,8 @@ The WorkflowExecutor is responsible for handling high-level workflow operations. NodeExecutor ------------ -The NodeExecutor is executed on a single node, beginning with the workflow's start node. It traverses the workflow using a visitor pattern with a modified depth-first search (DFS), evaluating each node along the path. A few examples of node evaluation based on phase: successful nodes are skipped, unevaluated nodes are queued for processing, and failed nodes may be reattempted up to a configurable threshold. There are many configurable parameters to tune evaluation criteria including max parallelism which restricts the number of nodes which may be scheduled concurrently. Additionally, nodes may be retried to ensure recoverability on failure. +The NodeExecutor is executed on a single node, beginning with the workflow's start node. It traverses the workflow using a visitor pattern with a modified depth-first search (DFS), evaluating each node along the path. A few examples of node evaluation based on phase include: successful nodes are skipped, unevaluated nodes are queued for processing, and failed nodes may be reattempted up to a configurable threshold. There are many configurable parameters to tune evaluation criteria including max parallelism which restricts the number of nodes which may be scheduled concurrently. Additionally, nodes may be retried to ensure recoverability on failure. +Go to the `Optimizing Performance section __` for more information on how to tune Propeller parameters. The NodeExecutor is also responsible for linking data readers/writers to facilitate data transfer between node executions. The data transfer process occurs automatically within Flyte, using efficient K8s events rather than a polling listener pattern which incurs more overhead. Relatively small amounts of data may be passed between nodes inline, but it is more common to pass data URLs to backing storage. A component of this is writing to and checking the data cache, which facilitates the reuse of previously completed evaluations. @@ -167,7 +168,7 @@ FlytePropeller includes a robust collection of NodeHandlers to support diverse e 1. **Pod Task**: Create a pod in the Kubernetes cluster, execute the task, and then delete the pod. - 2. **K8s Operator Backend Plugin**: Install a specific Kubernetes Operator (e.g., Spark, Ray, and Kubeflow) in the cluster, create pods by the Kubernetes Operator, execute the task, and then delete the pods. + 2. **K8s Operator Backend Plugin**: Install a specific Kubernetes Operator (e.g., Spark, Ray, or Kubeflow) in the cluster, create pods by the Kubernetes Operator, execute the task, and then delete the pods. 3. **Web API Task**: Send REST/gRPC requests to a server and return the response. Note: The Web API Task will not start a pod. @@ -185,6 +186,8 @@ It should be noted that the WorkflowExecutor, NodeExecutor, and TaskHandlers sen FlytePlugins ------------ -Here is an overview architecture of FlytePlugins: +Every operation that Propeller performs makes use of a plugin. The following diagram describe the different types of plugins available for Propeller and an example operation when using the Spark integration: .. image:: https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/concepts/architecture/flytepropeller_plugins_architecture.png + + \ No newline at end of file From fbdbdc689c8f4ee65c99e968fe77d6de49c99936 Mon Sep 17 00:00:00 2001 From: davidmirror-ops Date: Wed, 27 Mar 2024 10:28:33 -0500 Subject: [PATCH 6/7] Fix hyperlink structure Signed-off-by: davidmirror-ops --- .../component_architecture/flytepropeller_architecture.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/concepts/component_architecture/flytepropeller_architecture.rst b/docs/concepts/component_architecture/flytepropeller_architecture.rst index ccb50676a0..197118ec8c 100644 --- a/docs/concepts/component_architecture/flytepropeller_architecture.rst +++ b/docs/concepts/component_architecture/flytepropeller_architecture.rst @@ -155,7 +155,8 @@ NodeExecutor ------------ The NodeExecutor is executed on a single node, beginning with the workflow's start node. It traverses the workflow using a visitor pattern with a modified depth-first search (DFS), evaluating each node along the path. A few examples of node evaluation based on phase include: successful nodes are skipped, unevaluated nodes are queued for processing, and failed nodes may be reattempted up to a configurable threshold. There are many configurable parameters to tune evaluation criteria including max parallelism which restricts the number of nodes which may be scheduled concurrently. Additionally, nodes may be retried to ensure recoverability on failure. -Go to the `Optimizing Performance section __` for more information on how to tune Propeller parameters. + +Go to the `Optimizing Performance `__ section for more information on how to tune Propeller parameters. The NodeExecutor is also responsible for linking data readers/writers to facilitate data transfer between node executions. The data transfer process occurs automatically within Flyte, using efficient K8s events rather than a polling listener pattern which incurs more overhead. Relatively small amounts of data may be passed between nodes inline, but it is more common to pass data URLs to backing storage. A component of this is writing to and checking the data cache, which facilitates the reuse of previously completed evaluations. From 6d948ef106dfd4193d6167edefdd5ea5b4441a00 Mon Sep 17 00:00:00 2001 From: davidmirror-ops Date: Mon, 1 Apr 2024 07:04:16 -0500 Subject: [PATCH 7/7] Apply reviews Signed-off-by: davidmirror-ops --- .../flytepropeller_architecture.rst | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/docs/concepts/component_architecture/flytepropeller_architecture.rst b/docs/concepts/component_architecture/flytepropeller_architecture.rst index 197118ec8c..6de45566f0 100644 --- a/docs/concepts/component_architecture/flytepropeller_architecture.rst +++ b/docs/concepts/component_architecture/flytepropeller_architecture.rst @@ -15,7 +15,7 @@ Introduction A Flyte :ref:`workflow ` is represented as a Directed Acyclic Graph (DAG) of interconnected Nodes. Flyte supports a robust collection of Node types to ensure diverse functionality. - ``TaskNodes`` support a plugin system to externally add system integrations. -- ``BranchNodes`` allow altering the Control flow during runtime; pruning downstream evaluation paths based on input. +- ``BranchNodes`` allow altering the control flow during runtime; pruning downstream evaluation paths based on input. - ``DynamicNodes`` add nodes to the DAG. - ``WorkflowNodes`` allow embedding workflows within each other. @@ -38,7 +38,7 @@ Components ========== -FlyteAdmin is the common entry point, where initialization of FlyteWorkflow CRs may be triggered by user workflow definition executions, automatic relaunches, or periodically scheduled workflow definition executions. +FlyteAdmin is the common entry point, where initialization of FlyteWorkflow Custom Resources may be triggered by user workflow definition executions, automatic relaunches, or periodically scheduled workflow definition executions. FlyteWorkflow CRD / K8s Integration ----------------------------------- @@ -66,7 +66,7 @@ Example output: NAME AGE f7616dc75400f43e6920 3h42m -3. Describe the contents of the CR, for example the ``spec`` section: +3. Describe the contents of the Custom Resource, for example the ``spec`` section: .. code-block:: bash @@ -95,7 +95,6 @@ The status subsection tracks workflow metadata including overall workflow status .. code-block:: json - }, "status": { "dataDir": "gs://flyteontf-gcp-data-116223838137/metadata/propeller/flytesnacks-development-f7616dc75400f43e6920", "defVersion": 1, @@ -142,7 +141,7 @@ The WorkQueue is a FIFO queue storing workflow ID strings that require a lookup #. The K8s Informer detects a workflow timeout or failed liveness check during its periodic resync operation on the FlyteWorkflow. #. A FlytePropeller worker experiences an error during a processing loop #. The WorkflowExecutor observes a completed downstream node -#. A NodeHandler observes state change and explicitly enqueues its owner (For example, K8s pod informer observes completion of a task) +#. A NodeHandler observes state change and explicitly enqueues its owner. (For example, K8s pod informer observes completion of a task.) The WorkerPool is implemented as a collection of ``goroutines``, one for each worker. Using this lightweight construct, FlytePropeller can scale to 1000s of workers on a single CPU. Workers continually poll the WorkQueue for workflows. On success, the workflow is passed to the WorkflowExecutor. @@ -154,7 +153,13 @@ The WorkflowExecutor is responsible for handling high-level workflow operations. NodeExecutor ------------ -The NodeExecutor is executed on a single node, beginning with the workflow's start node. It traverses the workflow using a visitor pattern with a modified depth-first search (DFS), evaluating each node along the path. A few examples of node evaluation based on phase include: successful nodes are skipped, unevaluated nodes are queued for processing, and failed nodes may be reattempted up to a configurable threshold. There are many configurable parameters to tune evaluation criteria including max parallelism which restricts the number of nodes which may be scheduled concurrently. Additionally, nodes may be retried to ensure recoverability on failure. +The NodeExecutor is executed on a single node, beginning with the workflow's start node. It traverses the workflow using a visitor pattern with a modified depth-first search (DFS), evaluating each node along the path. A few examples of node evaluation based on phase include: + +* Successful nodes are skipped +* Unevaluated nodes are queued for processing +* Failed nodes may be reattempted up to a configurable threshold. + +There are many configurable parameters to tune evaluation criteria including max parallelism which restricts the number of nodes which may be scheduled concurrently. Additionally, nodes may be retried to ensure recoverability on failure. Go to the `Optimizing Performance `__ section for more information on how to tune Propeller parameters.