-
Notifications
You must be signed in to change notification settings - Fork 5.4k
Stage and Source Scheduler and Grouped Execution
When Presto executes a query, it does so by breaking up the execution into a hierarchy of stages. Stage models a particular part of a distributed query plan. Each stage reads from data source and writes to an output buffer, and there are two types of sources:
- Table scan source (sometimes referred to as partitioned source in code)
- Remote source (sometimes referred to as unpartitioned source in code)
The following figure shows part of a query plan with two stages:
See more details about stage at https://prestodb.github.io/docs/current/overview/concepts.html#stage
StageScheduler is responsible for the following jobs:
- Create tasks for this stage
- Schedule table splits to tasks
This section will discuss about stage scheduler before grouped execution is introduced. ScaledWriterScheduler is not discussed here.
The StageScheduler interface is quite concise (https://github.com/prestodb/presto/blob/b430a562679aab0a04df37b7a1e77cfd5d941c81/presto-main/src/main/java/com/facebook/presto/execution/scheduler/StageScheduler.java):
public interface StageScheduler
extends Closeable
{
/**
* Schedules as much work as possible without blocking.
* The schedule results is a hint to the query scheduler if and
* when the stage scheduler should be invoked again. It is
* important to note that this is only a hint and the query
* scheduler may call the schedule method at any time.
*/
ScheduleResult schedule();
@Override
default void close() {}
}
ScheduleResult
contains the following information (see https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/ScheduleResult.java#L36-L40):
Set<RemoteTask> newTasks;
ListenableFuture<?> blocked;
Optional<BlockedReason> blockedReason;
boolean finished;
int splitsScheduled;
Prior to grouped execution (introduced in https://github.com/prestodb/presto/pull/8951), there are two block reasons:
-
WAITING_FOR_SOURCE
: Blocked on fetching splits from Connector (SplitSource#getNextBatch
) -
SPLIT_QUEUES_FULL
: Some splits are not assigned since the node has enough work to do. This is enforced in NodeSelector#computeAssignments (e.g. https://github.com/prestodb/presto/blob/b430a562679aab0a04df37b7a1e77cfd5d941c81/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SimpleNodeSelector.java#L130-L134)
In this section, we are going to introduce the three main stage schedulers prior to grouped execution (https://github.com/prestodb/presto/pull/8951).
This is used when stage doesn't has node partitioning, and all the split sources are remote. A typical case is performing join or aggregate over unbucketed table. This is the simplest stage scheduler, all tasks are created in one time and there will be no further split scheduling (since all splits are RemoteSplit
): https://github.com/prestodb/presto/blob/b430a562679aab0a04df37b7a1e77cfd5d941c81/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedCountScheduler.java#L55-L65
This is used when stage has node partitioning, and at least one split source is from table scan. This usually happens to stages reading from bucketed tables.
For each table scan split source, an SourcePartitionedScheduler
is created to help schedule the splits. We denote these SourcePartitionedScheduler
as s_1, s_2, …, s_k
and the FixedSourcePartitionedScheduler
as f
. Note these s_i
share the same SqlStageExecution
with f
-- so each s_i
is not a “independent” stage scheduler, but somewhat used as a “source scheduler” for f
.
s_i.schedule()
are called in order (from builder to probe side). s_{i+1}.schedule()
will be called only after s_i
finished schedule (but not execution!), as demonstrated by the following simplified code: (the actual code can be found at https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L90-L105) :
while (!sourceSchedulers.isEmpty()) {
ScheduleResult schedule = sourceSchedulers.peek().schedule();
if (schedule.isDone()) {
sourceSchedulers.remove();
}
else {
break;
}
}
All the tasks are eagerly created when first time schedule() is called. https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L78-L85
An interesting note is both the following cases will use FixedSourcePartitionedScheduler
:
- Colocated join (i.e. join two bucketed tables).
- Bucketed table join unbucketed table. In this case, remote exchange will be added to the unbucketed table side to adopt the same bucketing.
This is used when the stage partitioning is SOURCE_DISTRIBUTION
. In other words:
- It doesn’t have node partitioning.
- It doesn’t contain partitioned remote source (replicated remote source is fine). Usually, this is used for the leaf stages contains table scan over unbucketed table.
The main workflow for SourcePartitionedScheduler#schedule
(https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SourcePartitionedScheduler.java#L84-L147) is as follows:
- Get the splits to be scheduled (stored in
pendingSplits
).splitSource#getNextBatch
will be called if necessary. - Decide the node assignments of these splits (by calling
splitPlacementPolicy#computeAssignments
). There are two different split placement policy:-
DynamicSplitPlacementPolicy
: splits are from unbucketed table. It will be assigned based on worker load and locality. -
FixedSplitPlacementPolicy
(now renamed toBucketedSplitPlacementPolicy
): splits are from bucketed table. It will be assigned based onNodePartitioningMap
. Note for this case,SourcePartitionedScheduler
is actually used as a “source scheduler” of aFixedSourcePartitionedScheduler
-- since the whenSourcePartitionedScheduler
is used as a “stage scheduler”, the stage partitioning has to beSOURCE_DISTRIBUTION
thus the input table cannot be bucketed.
-
- Create new tasks if necessary, etc.
- Note this only makes a difference when
SourcePartitionedScheduler
is used as a stage scheduler rather than source scheduler. - When
SourcePartitionedScheduler
is used as a source scheduler,newTasks
returned inScheduleResult
will simply be ignored.
- Note this only makes a difference when
Even before grouped execution is introduced, there are some hacks introduced, especially FixedSourcePartitionedScheduler
can use SourcePartitionedScheduler
as a “source scheduler”, and some of the code in SourcePartitionedScheduler
are only useful as a stage scheduler, which is confusing.
Some latent bugs exist back to the old days, and only get fixed after amplified by grouped execution, such as https://github.com/prestodb/presto/issues/11253
The following figures show some concrete examples about what kind of stage schedulers are used. The logic to decide stage scheduler can be found in: https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SqlQueryScheduler.java#L255-L292
Grouped execution was introduced in https://github.com/prestodb/presto/pull/8951 to support huge join and aggregation raised in ETL pipelines. After this initial pull request, there are several fixes to make it production ready.
After stabilization, grouped execution consists of 3 components on coordinator:
-
LifespanScheduler
: Responsible for initiated new lifespans after current lifespan finish execution (by callingSourceScheduler#startLifespan
) -
FixedSourcePartitionedScheduler
(we will useStageScheduler
to refer to it): Responsible for coordinatingSourceSchedulers
in this stage. -
SourcePartitionedScheduler
(we will useSourceScheduler
to refer to it): Responsible for schedule splits of a given table source.
The boundary and responsibility of these 3 components is not very clear in some cases, and some major refactor/redesign will be required for lifespan schedule before it’s easy to understand. But it works for now.
-
In
StageScheduler
constructor, the initial lifespans are scheduled: https://github.com/prestodb/presto/blob/548fb438ad135776399deec80a1005e1c63c51dd/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L144-L145 . TheSourceScheduler
of the first source will start to schedule the splits for these buckets (a more formal name would be partitions).As a concrete example, assume there are two SourceScheduler in the stage, responsible for tableA and tableB, and lifespan 17 get scheduled here. The first SourceScheduler in the stage will be asked to schedule splits for bucket 42 of tableA.
-
Once a
SourceScheduler
finished schedule some buckets (collected by callingSourceScheduler#drainCompletelyScheduledLifespans
: https://github.com/prestodb/presto/blob/548fb438ad135776399deec80a1005e1c63c51dd/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L225),StageScheduler
will ask the nextSourceScheduler
to schedule these buckets: https://github.com/prestodb/presto/blob/548fb438ad135776399deec80a1005e1c63c51dd/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L210-L212 . In other words,StageScheduler
will “push” scheduling buckets for scheduled lifespans.In the above example, once the first
SourceScheduler
finish schedule splits for bucket 42 of tableA, the StageScheduler will ask the second SourceScheduler to start schedule splits for bucket 42 of tableB. -
Once a lifespan finished execution, LifespanScheduler#onLifespanExecutionFinished will be invoked (registered in StageScheduler constructor: https://github.com/prestodb/presto/blob/548fb438ad135776399deec80a1005e1c63c51dd/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L146-L147) .
This onLifespanExecutionFinished method will trigger the newDriverGroupReady future: https://github.com/prestodb/presto/blob/548fb438ad135776399deec80a1005e1c63c51dd/presto-main/src/main/java/com/facebook/presto/execution/scheduler/group/FixedLifespanScheduler.java#L118 . This causes thread waiting on this future to continue. Usually it means StageScheduler#schedule will be called since it knows it has some new work to do.
As a result, LifespanScheduler#schedule will be called in StageScheduler#schedule , this means new lifespans will be scheduled: https://github.com/prestodb/presto/blob/548fb438ad135776399deec80a1005e1c63c51dd/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L195-L200. And again, the first SourceScheduler will start schedule splits for these buckets.
Following the example, some time after all splits of bucket 42 on both tableA and tableB are scheduled, the worker will report back bucket 42 finish execution. onLifespanExecutionFinished will be invoked, record lifepsan 42 finished execution, and set the newDriverGroupReady future trigger StageScheduler#schedule to be called. This result in LifespanScheduler#schedule being called, and a new lifespan, say 46, will be scheduled. And lifespan 46 will repeat the same journey as from step 1.