-
Notifications
You must be signed in to change notification settings - Fork 670
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC: Add execution concurrency #5659
base: master
Are you sure you want to change the base?
Changes from 4 commits
1d4d4b7
5f9a60f
9b66651
a108074
0b4ee1f
121066c
4a88d7c
550c571
61debe2
da704b4
cce3e69
dd58406
fea29eb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
# [RFC Template] Title | ||
|
||
**Authors:** | ||
|
||
- @eapolinario | ||
- @katrogan | ||
|
||
## 1 Executive Summary | ||
|
||
This is a proposal to implement workflow execution concurrency, defined at the launch plan level. | ||
|
||
## 2 Motivation | ||
|
||
See the following issues | ||
1. https://github.com/flyteorg/flyte/issues/267 | ||
2. https://github.com/flyteorg/flyte/issues/420 | ||
3. https://github.com/flyteorg/flyte/discussions/3754 | ||
4. https://github.com/flyteorg/flyte/issues/5125 | ||
|
||
## 3 Proposed Implementation | ||
|
||
Introduce a new attribute in [LaunchPlan.get_or_create](https://github.com/flyteorg/flytekit/blob/bc2e000cc8d710ed3d135cdbf3cbf257c5da8100/flytekit/core/launch_plan.py#L195) to allow specifying execution concurrency | ||
|
||
e.g. | ||
```python | ||
my_lp = LaunchPlan.get_or_create( | ||
name="my_serial_lp", | ||
workflow=my_wf, | ||
... | ||
concurrency=Concurrency( | ||
max=1, # defines how many executions with this launch plan can run in parallel | ||
policy=ConcurrencyPolicy.WAIT # defines the policy to apply when the max concurrency is reached | ||
) | ||
) | ||
``` | ||
|
||
### FlyteIDL | ||
We propose adding a new IDL message to capture concurrency behavior at CreateExecutionTime | ||
|
||
```protobuf | ||
message Concurrency { | ||
// Defines how many executions with this launch plan can run in parallel | ||
uint32 max = 1; | ||
|
||
// Defines how to handle the execution when the max concurrency is reached. | ||
ConcurrencyPolicy policy = 2; | ||
} | ||
|
||
enum ConcurrencyPolicy { | ||
UNSPECIFIED = 0; | ||
|
||
// wait for previous executions to terminate before starting a new one | ||
WAIT = 1; | ||
|
||
// fail the CreateExecution request and do not permit the execution to start | ||
ABORT = 2; | ||
} | ||
|
||
message LaunchPlanSpec { | ||
... | ||
|
||
Concurrency concurrency = X; | ||
} | ||
|
||
// embedded in the ExecutionClosure | ||
message ExecutionStateChangeDetails { | ||
... | ||
|
||
// Includes the reason for the `PENDING` phase | ||
string description = X; | ||
|
||
|
||
} | ||
|
||
// Can also add to ExecutionSpec to specify execution time overrides | ||
|
||
``` | ||
|
||
### FlyteAdmin | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. During last week's contributors meeting someone asked a question about having this concurrency control work across versions. Can we either have a discussion in this PR about it or list that use case as not being supported explicitly in the RFC? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can say that something that works across versions would be really useful for us. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For us too because we very often This could be made configurable here: concurrency=Concurrency(
max=1, # defines how many executions with this launch plan can run in parallel
policy=ConcurrencyPolicy.WAIT # defines the policy to apply when the max concurrency is reached,
level=ConcurrencyLevel.Version, # or ConcurrencyLevel.LaunchPlan
) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks @eapolinario @corleyma @fg91 for the feedback, I don't think this will be too much of a lift but added a proposal for different levels of precision here too |
||
At a broad level | ||
1. At CreateExecution time, if the launch plan in the ExecutionSpec has a concurrency policy | ||
1. Create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails`. | ||
1. or fail the request when the concurrency policy is set to `ABORT` | ||
1. Do not create the workflow CRD | ||
|
||
Introduce an async reconciliation loop in FlyteAdmin to poll for all pending executions: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have prior art for this kind of reconciliation loop in flyteadmin? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, the scheduler! |
||
1. Query all pending executions by timestamp ascending (open question, should we prefer more recent executions instead? should we make this configurable?) | ||
1. as an optimization, could even parallelize this into goroutines, one per distinct launch plan ID that has any `PENDING` execution | ||
2. Check the database to see if there are fewer than `MAX_CONCURRENCY` non-terminal executions with an identical launch plan ID | ||
3. If there are fewer than `MAX_CONCURRENCY` executions running, select the oldest pending execution for that launch plan | ||
1. create the workflow CRD | ||
1. open question: also update its phase in the database to `QUEUED`? | ||
1. let execution proceed | ||
|
||
We should consider adding an index to the executions table to include | ||
eapolinario marked this conversation as resolved.
Show resolved
Hide resolved
|
||
- launch_plan_id | ||
- phase | ||
- created_at | ||
|
||
#### Open Questions | ||
- Should we always attempt to schedule pending executions in ascending order of creation time? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe make it configurable? FIFO, FILO There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah I wasn't sure! any suggestions here? we could introduce an enum and choose fifo to begin with and expand support There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have mixed thoughts on making the queue's order of execution configurable. If we support a limited number of parallel executions (more than 1), the order of these executions would naturally start as FIFO up until that limit is reached. To me, providing an option to begin executing FILO after that limit is reached feels confusing to me. However, that brings a different question to mind: If multiple workflows are queued up, should we provide an option to enable loud notifications? In other words, if backlogged executions have the possibility of impacting downstream operations, can we enable users to receive loud notifications, including the number of queued executions? I can imagine a use case where: holiday shopping -> increased purchase volume -> increased data size -> multiple, consecutive execution delays -> cascading backlog of executions. In this scenario, the owners of the workflow may be out on leave and not be aware of the growing backlog. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. interesting, we have workflow notifications enabled for terminal state but we've talked more about richer, customizable notifications and I think this slates neatly into that I think for a v1 having the default behavior be fifo with an extended description/explanation for the pending state may provide some visibility here to start off with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add this suggestion of having an enum listing the policies to the Implementation details section? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can add a customers feedback here, where the desired behaviour is to actually replace (terminate) the current executions by subsequent executions. Sounds like too much for the initial scope but still interested if this would be possible to add later with the current approach? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @fiedlerNr9 added a section under Alternatives. I don't think this is precluded by this implementation but not in scope for this proposal atm |
||
- Should we propagate concurrency policies to child executions? | ||
|
||
## 4 Metrics & Dashboards | ||
|
||
*What are the main metrics we should be measuring? For example, when interacting with an external system, it might be the external system latency. When adding a new table, how fast would it fill up?* | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How is this feature going to be rolled out? Should we have an explicit list of metrics used to help the health of the feature? (e.g. total number of attempts of a given launchplan ) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting question. I think scheduling attempts here is based on the polling interval right? But could be useful to understand time spent in PENDING |
||
|
||
## 5 Drawbacks | ||
|
||
*Are there any reasons why we should not do this? Here we aim to evaluate risk and check ourselves.* | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have any reservations about more load on the DB (even with indexes, etc)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good point, we already have a ton of indices on executions - there is definitely a tradeoff to adding a new one |
||
|
||
## 6 Alternatives | ||
|
||
*What are other ways of achieving the same outcome?* | ||
|
||
## 7 Potential Impact and Dependencies | ||
|
||
*Here, we aim to be mindful of our environment and generate empathy towards others who may be impacted by our decisions.* | ||
|
||
- *What other systems or teams are affected by this proposal?* | ||
- *How could this be exploited by malicious attackers?* | ||
|
||
## 8 Unresolved questions | ||
|
||
*What parts of the proposal are still being defined or not covered by this proposal?* | ||
|
||
## 9 Conclusion | ||
|
||
*Here, we briefly outline why this is the right decision to make at this time and move forward!* | ||
|
||
## 10 RFC Process Guide, remove this section when done | ||
|
||
*By writing an RFC, you're giving insight to your team on the direction you're taking. There may not be a right or better decision in many cases, but we will likely learn from it. By authoring, you're making a decision on where you want us to go and are looking for feedback on this direction from your team members, but ultimately the decision is yours.* | ||
|
||
This document is a: | ||
|
||
- thinking exercise, prototype with words. | ||
- historical record, its value may decrease over time. | ||
- way to broadcast information. | ||
- mechanism to build trust. | ||
- tool to empower. | ||
- communication channel. | ||
|
||
This document is not: | ||
|
||
- a request for permission. | ||
- the most up to date representation of any process or system | ||
|
||
**Checklist:** | ||
|
||
- [ ] Copy template | ||
- [ ] Draft RFC (think of it as a wireframe) | ||
- [ ] Share as WIP with folks you trust to gut-check | ||
- [ ] Send pull request when comfortable | ||
- [ ] Label accordingly | ||
- [ ] Assign reviewers | ||
- [ ] Merge PR | ||
|
||
**Recommendations** | ||
|
||
- Tag RFC title with [WIP] if you're still ironing out details. | ||
- Tag RFC title with [Newbie] if you're trying out something experimental or you're not entirely convinced of what you're proposing. | ||
- Tag RFC title with [RR] if you'd like to schedule a review request to discuss the RFC. | ||
- If there are areas that you're not convinced on, tag people who you consider may know about this and ask for their input. | ||
- If you have doubts, ask on [#feature-discussions](https://slack.com/app_redirect?channel=CPQ3ZFQ84&team=TN89P6GGK) for help moving something forward. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please change the filename to include the PR number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, thanks