Skip to content

Commit

Permalink
Debugging the Scheduler.
Browse files Browse the repository at this point in the history
Much better staged execution, now wired in the Pipeline and everywhere.

Some nice refactor of Tier1 and Tier2 distinctions.
  • Loading branch information
Alexandre Bourget committed Jul 2, 2023
1 parent 703d7cc commit 72ad2e5
Show file tree
Hide file tree
Showing 12 changed files with 215 additions and 173 deletions.
5 changes: 4 additions & 1 deletion orchestrator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ func (s *Scheduler) Init() loop.Cmd {
}

func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {
fmt.Printf("UPDATE: %T %v\n", msg, msg)
fmt.Printf("Scheduler message: %T %v\n", msg, msg)
fmt.Print(s.Stages.StatesString())
var cmds []loop.Cmd

switch msg := msg.(type) {
Expand All @@ -75,6 +76,7 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {
)

case work.MsgScheduleNextJob:

workUnit, workRange := s.Stages.NextJob()
if workRange == nil {
return nil
Expand All @@ -85,6 +87,7 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {
}
worker := s.WorkerPool.Borrow()

s.logger.Info("scheduling work", zap.Object("unit", workUnit))
return loop.Batch(
worker.Work(s.ctx, workUnit, workRange, s.stream),
work.CmdScheduleNextJob(),
Expand Down
34 changes: 34 additions & 0 deletions orchestrator/stage/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stage
import (
"context"
"fmt"
"strings"

"go.uber.org/zap"

Expand Down Expand Up @@ -62,6 +63,7 @@ func NewStages(
traceID: traceID,
segmenter: segmenter,
segmentOffset: segmenter.IndexForStartBlock(outputGraph.LowestInitBlock()),
logger: reqctx.Logger(ctx),
}
for idx, stageLayer := range stagedModules {
mods := stageLayer.LastLayer()
Expand Down Expand Up @@ -154,23 +156,33 @@ func (s *Stages) CmdTryMerge(stageIdx int) loop.Cmd {
}

stage := s.stages[stageIdx]
if stage.kind != KindStore {
fmt.Println("TRYM: kindnot store")
return nil
}

mergeUnit := stage.nextUnit()

if mergeUnit.Segment > s.segmenter.LastIndex() {
fmt.Println("TRYM: past last segment")

return nil // We're done here.
}

if s.getState(mergeUnit) != UnitPartialPresent {
fmt.Println("TRYM: wasn't in partial state")
return nil
}

if !s.previousUnitComplete(mergeUnit) {
fmt.Println("TRYM: prev unit not complete")
return nil
}

s.MarkSegmentMerging(mergeUnit)

return func() loop.Msg {
fmt.Println("TRYM: launching multiSquash", stage, mergeUnit)
if err := s.multiSquash(stage, mergeUnit); err != nil {
return MsgMergeFailed{Unit: mergeUnit, Error: err}
}
Expand Down Expand Up @@ -291,3 +303,25 @@ func (s *Stages) FinalStoreMap(exclusiveEndBlock uint64) (store.Map, error) {
}
return out, nil
}

func (s *Stages) StatesString() string {
out := strings.Builder{}
for i := 0; i < len(s.stages); i++ {
if s.stages[i].kind == KindMap {
out.WriteString("M:")
} else {
out.WriteString("S:")
}
for _, segment := range s.segmentStates {
out.WriteString(map[UnitState]string{
UnitPending: ".",
UnitPartialPresent: "P",
UnitScheduled: "S",
UnitMerging: "M",
UnitCompleted: "C",
}[segment[i]])
}
out.WriteString("\n")
}
return out.String()
}
130 changes: 59 additions & 71 deletions orchestrator/stage/stages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,146 +47,146 @@ func TestNewStagesNextJobs(t *testing.T) {
assert.Equal(t, 0, j1.Segment)

segmentStateEquals(t, stages, `
..
..
S.`)
S:..
S:..
M:S.`)

stages.forceTransition(0, 2, UnitCompleted)
stages.NextJob()

segmentStateEquals(t, stages, `
..
S.
C.`)
S:..
S:S.
M:C.`)

stages.forceTransition(0, 1, UnitCompleted)

segmentStateEquals(t, stages, `
..
C.
C.`)
S:..
S:C.
M:C.`)

stages.NextJob()

segmentStateEquals(t, stages, `
S.
C.
C.`)
S:S.
S:C.
M:C.`)

stages.NextJob()

segmentStateEquals(t, stages, `
SS
C.
C.`)
S:SS
S:C.
M:C.`)

stages.forceTransition(0, 0, UnitCompleted)
stages.NextJob()

segmentStateEquals(t, stages, `
CS
C.
CS`)
S:CS
S:C.
M:CS`)

stages.forceTransition(1, 0, UnitCompleted)
stages.NextJob()

segmentStateEquals(t, stages, `
CC
CS
CS`)
S:CC
S:CS
M:CS`)

stages.NextJob()

segmentStateEquals(t, stages, `
CC..
CSS.
CS..`)
S:CC..
S:CSS.
M:CS..`)

stages.MarkSegmentPartialPresent(id(1, 2))

segmentStateEquals(t, stages, `
CC..
CSS.
CP..`)
S:CC..
S:CSS.
M:CP..`)

stages.MarkSegmentMerging(id(1, 2))

segmentStateEquals(t, stages, `
CC..
CSS.
CM..`)
S:CC..
S:CSS.
M:CM..`)

stages.markSegmentCompleted(id(1, 2))
stages.NextJob()

segmentStateEquals(t, stages, `
CCS.
CSS.
CC..`)
S:CCS.
S:CSS.
M:CC..`)

stages.NextJob()

segmentStateEquals(t, stages, `
CCSS
CSS.
CC..`)
S:CCSS
S:CSS.
M:CC..`)

stages.NextJob()

segmentStateEquals(t, stages, `
CCSSS...
CSS.....
CC......`)
S:CCSSS...
S:CSS.....
M:CC......`)

stages.NextJob()

segmentStateEquals(t, stages, `
CCSSSS..
CSS.....
CC......`)
S:CCSSSS..
S:CSS.....
M:CC......`)

_, r := stages.NextJob()
assert.Nil(t, r)
stages.MarkSegmentPartialPresent(id(2, 0))

segmentStateEquals(t, stages, `
CCPSSS..
CSS.....
CC......`)
S:CCPSSS..
S:CSS.....
M:CC......`)

_, r = stages.NextJob()
assert.Nil(t, r)
stages.MarkSegmentMerging(id(2, 0))

segmentStateEquals(t, stages, `
CCMSSS..
CSS.....
CC......`)
S:CCMSSS..
S:CSS.....
M:CC......`)

_, r = stages.NextJob()
assert.Nil(t, r)
stages.markSegmentCompleted(id(2, 0))

segmentStateEquals(t, stages, `
CCCSSS..
CSS.....
CC......`)
S:CCCSSS..
S:CSS.....
M:CC......`)

stages.NextJob()

segmentStateEquals(t, stages, `
CCCSSS..
CSSS....
CC......`)
S:CCCSSS..
S:CSSS....
M:CC......`)

stages.forceTransition(1, 1, UnitCompleted)
stages.NextJob()

segmentStateEquals(t, stages, `
CCCSSS..
CCSS....
CCS.....`)
S:CCCSSS..
S:CCSS....
M:CCS.....`)

}

Expand All @@ -197,21 +197,9 @@ func id(segment, stage int) Unit {
func segmentStateEquals(t *testing.T, s *Stages, segments string) {
t.Helper()

out := strings.Builder{}
for i := 0; i < len(s.stages); i++ {
for _, segment := range s.segmentStates {
out.WriteString(map[UnitState]string{
UnitPending: ".",
UnitPartialPresent: "P",
UnitScheduled: "S",
UnitMerging: "M",
UnitCompleted: "C",
}[segment[i]])
}
out.WriteString("\n")
}
out := s.StatesString()

assert.Equal(t, strings.TrimSpace(segments), strings.TrimSpace(out.String()))
assert.Equal(t, strings.TrimSpace(segments), strings.TrimSpace(out))
}

func TestStages_previousUnitComplete(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pipeline/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (p *Pipeline) OnStreamTerminated(ctx context.Context, err error) error {
// block to flush stores supporting holes in chains.
// And it will write multiple stores with the same content
// when presented with multiple boundaries / ranges.
if err := p.stores.flushStores(ctx, reqDetails.StopBlockNum); err != nil {
if err := p.stores.flushStores(ctx, p.executionStages, reqDetails.StopBlockNum); err != nil {
return fmt.Errorf("step new irr: stores end of stream: %w", err)
}

Expand Down
22 changes: 13 additions & 9 deletions pipeline/outputmodules/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,38 +90,42 @@ func computeLowestInitBlock(modules []*pbsubstreams.Module) (out uint64) {

// A list of units that we can schedule, that might include some mappers and a store,
// or the last module could be an exeuction layer with only a map.
type ExecutionStages []stageLayers
type ExecutionStages []StageLayers

func (e ExecutionStages) LastStage() StageLayers {
return e[len(e)-1]
}

// For a given execution stage, the layers of execution, for example:
// a layer of mappers, followed by a layer of stores.
type stageLayers []layerModules
type StageLayers []LayerModules

func (l stageLayers) isStoreStage() bool {
func (l StageLayers) isStoreStage() bool {
return l.LastLayer().IsStoreLayer()
}

func (l stageLayers) LastLayer() layerModules {
func (l StageLayers) LastLayer() LayerModules {
return l[len(l)-1]
}

// The list of modules in a given layer of either maps or stores. A given layer
// will always be comprised of only the same kind of modules.
type layerModules []*pbsubstreams.Module
type LayerModules []*pbsubstreams.Module

func (l layerModules) IsStoreLayer() bool {
func (l LayerModules) IsStoreLayer() bool {
return l[0].GetKindStore() != nil
}

func computeStages(mods []*pbsubstreams.Module) (stages ExecutionStages) {
seen := map[string]bool{}

var layers stageLayers
var layers StageLayers

for i := 0; ; i++ {
if len(seen) == len(mods) {
break
}
var layer layerModules
var layer LayerModules
modLoop:
for _, mod := range mods {
switch mod.Kind.(type) {
Expand Down Expand Up @@ -169,7 +173,7 @@ func computeStages(mods []*pbsubstreams.Module) (stages ExecutionStages) {
}

lastLayerIndex := len(layers) - 1
var newStage stageLayers
var newStage StageLayers
for idx, layer := range layers {
isLastStage := idx == lastLayerIndex
isStoreLayer := layer.IsStoreLayer()
Expand Down
Loading

0 comments on commit 72ad2e5

Please sign in to comment.