diff --git a/rfc/system/RFC-5659-execution-concurrency.md b/rfc/system/RFC-5659-execution-concurrency.md index e2c73b3efe..88de2da75b 100644 --- a/rfc/system/RFC-5659-execution-concurrency.md +++ b/rfc/system/RFC-5659-execution-concurrency.md @@ -38,7 +38,7 @@ my_lp = LaunchPlan.get_or_create( We propose adding a new IDL message to capture concurrency behavior at CreateExecutionTime and embedding it in the existing [Schedule](https://github.com/flyteorg/flyte/blob/master/flyteidl/protos/flyteidl/admin/schedule.proto) message ```protobuf -message Concurrency { +message SchedulerPolicy { // Defines how many executions with this launch plan can run in parallel uint32 max = 1; @@ -62,7 +62,7 @@ enum ConcurrencyPolicy { message Schedule { ... - Concurrency concurrency = X; + SchedulerPolicy scheduler_policy = X; } // embedded in the ExecutionClosure @@ -78,19 +78,22 @@ message ExecutionStateChangeDetails { // Can also add to ExecutionSpec to specify execution time overrides ``` +### Control Plane -### Concurrency Controller Singleton At a broad level, we'll follow the precedent of the [scheduler](https://github.com/flyteorg/flyte/tree/master/flyteadmin/scheduler) defined in FlyteAdmin and define a singleton to manage concurrency across all launch plans. 1. At CreateExecution time, if the launch plan in the ExecutionSpec has a concurrency policy 1. Create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails`. 1. or fail the request when the concurrency policy is set to `ABORT` + 2. let the concurrency controller manage scheduling 1. Do not create the workflow CRD +### Concurrency Controller Singleton + Introduce the Concurrency Controller to poll for all pending executions: 1. Upon start-up, initialize a launch plan informer and a worker pool and spawn N number of worker threads. - 1. The launch plan informer will be responsible for keeping a map of launch plans, by [NamedEntityIdentifier](https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/flyteidl/protos/flyteidl/admin/common.proto) (that is across versions) and their concurrency policy: `map[admin.NamedEntityIdentifier]admin.Schedule` -1. Periodically query the DB for pending executions `SELECT * FROM executions WHERE phase not in ('SUCCEEDED', 'FAILED', 'ABORTED', 'TIMED_OUT') group by launch_plan_id;` + 1. The launch plan informer will be responsible for keeping a map of launch plans, by [NamedEntityIdentifier](https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/flyteidl/protos/flyteidl/admin/common.proto) (that is across versions) and their concurrency policy: `map[admin.NamedEntityIdentifier]admin.SchedulerPolicy` +1. Periodically query the DB for pending executions `SELECT * FROM executions WHERE phase not in ('SUCCEEDED', 'FAILED', 'ABORTED', 'TIMED_OUT');` 1. For each `PENDING` execution returned by the above query, `Add()` the pending execution to a [workqueue](https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go). We can fine tune in the future to include differentiated priority. 1. For each non-`PENDING` execution returned by the above query, update the map of active executions by launch plan named entity into a thread-safe Map of type `rawActiveLaunchPlanExecutions map[admin.NamedEntityIdentifier]util.Set[admin.Execution]` (e.g. using this [set]("k8s.io/apimachinery/pkg/util/sets") library) 1. After processing the complete set of non-terminal executions, transform the `rawActiveLaunchPlanExecutions` map into a thread-safe, ordered list of executions by creation time: `activeLaunchPlanExecutions map[admin.NamedEntityIdentifier][]*core.WorkflowExecutionIdentifier` using an implementation where different keys can be accessed concurrently. @@ -115,9 +118,9 @@ Creating an execution #### Launch Plan informer This is an async process we run in the Concurrency Controller to ensure we have an eventually consistent view of launch plans. -Upon Concurrency Controller start-up, we'll query the DB for all active launch plans and populate a map of active launch plans: `map[admin.NamedEntityIdentifier]admin.Schedule +Upon Concurrency Controller start-up, we'll query the DB for all active launch plans and populate a map of active launch plans: `map[admin.NamedEntityIdentifier]admin.SchedulerPolicy` -Periodically, the informer will re-issue the query, optionally filtering by [UpdatedAt](https://github.com/flyteorg/flyte/blob/master/datacatalog/pkg/repositories/models/base.go#L7) to only fetch launch plans that have been updated since the last query to repopulate the map. +Periodically, the informer will re-issue the query, optionally filtering by [UpdatedAt](https://github.com/flyteorg/flyte/blob/master/datacatalog/pkg/repositories/models/base.go#L7) to only fetch launch plans that have been updated since the last query to repopulate the map. If an execution has terminated since the last time the query ran, it won't be in the result set and we'll want to update the in memory map to remove the execution. ### Flyte Admin changes @@ -125,7 +128,7 @@ Periodically, the informer will re-issue the query, optionally filtering by [Upd Because we fetch the launch plan to reconcile execution inputs at CreateExecution time, we'll have the concurrency policy available to us at the time of execution creation. If there is no concurrency policy defined, we'll proceed as [normal](https://github.com/flyteorg/flyte/blob/f14348165ccdfb26f8509c0f1ef380a360e59c4d/flyteadmin/pkg/manager/impl/execution_manager.go#L1169-L1173) and create the workflow execution CRD and then create a database entry for the execution with phase `UNKNOWN`. This way, we don't incur any penalty for executions -If there is a concurrency policy defined, we'll create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails` _but will not create a workflow CRD_ +If there is a concurrency policy defined, if it's set to `ABORT` immediately fail the execution. Otherwise, we'll create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails` _but will not create a workflow CRD_ #### Database @@ -145,9 +148,9 @@ We should consider adding an index to the executions table to include ##### Concurrency by specified launch plan versions Executions are always tied to the versioned launch plan that triggered them (see [here](https://github.com/flyteorg/flyte/blob/38883c721dac2875bdd2333f4cd56e757e81ea5f/flyteadmin/pkg/repositories/models/execution.go#L26)) -Therefore, this proposal only applies concurrency at the launch plan Named Entity level, that is across (project, domain, version). +However, this proposal only applies concurrency at the launch plan Named Entity level, that is by (project, domain, name) and across all versions. The currently active launch plan version will determine the concurrency policy that gets applied for all executions created with the launch plan NamedEntity. -If we wanted to support concurrency by launch plan versions, we'd introduce `LaunchPlanVersion` to the execution model and update the keys for the in memory maps to be by versioned launch plan rather than NamedEntityIdentifier. +If we wanted to support concurrency by launch plan versions, we'd introduce `LaunchPlanVersion` to the execution model and add duplicates but with update keys for the in memory maps to be by versioned launch plan rather than NamedEntityIdentifier. We could update usage like so @@ -159,16 +162,16 @@ my_lp = LaunchPlan.get_or_create( concurrency=Concurrency( max=1, # defines how many executions with this launch plan can run in parallel policy=ConcurrencyPolicy.WAIT # defines the policy to apply when the max concurrency is reached - precision=ConcurrencyPrecision.LAUNCH_PLAN + precision=ConcurrencyPrecision.LAUNCH_PLAN_VERSION ) ) ``` -and by default, when the precision is omitted the SDK could register the launch plan using `ConcurrencyPrecision.LAUNCH_PLAN_VERSION` +and by default, when the precision is omitted the SDK could register the launch plan using `ConcurrencyPrecision.LAUNCH_PLAN` We could update the concurrency protobuf definition like so: ```protobuf -message Concurrency { +message SchedulerPolicy { // Defines how many executions with this launch plan can run in parallel uint32 max = 1;