Skip to content

Commit

Permalink
Implement executionstages -> stagelayers -> layermodules.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandre Bourget committed Jul 2, 2023
1 parent 1b9fb05 commit 703d7cc
Show file tree
Hide file tree
Showing 15 changed files with 302 additions and 136 deletions.
4 changes: 2 additions & 2 deletions orchestrator/parallelprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func BuildParallelProcessor(
// FIXME: Are all the progress messages properly sent? When we skip some stores and mark them complete,
// for whatever reason,

execOutSegmenter := reqPlan.WriteOutSegmenter()
if execOutSegmenter != nil {
if reqPlan.WriteExecOut != nil {
execOutSegmenter := reqPlan.WriteOutSegmenter()
// note: since we are *NOT* in a sub-request and are setting up output module is a map
requestedModule := outputGraph.OutputModule()
if requestedModule.GetKindStore() != nil {
Expand Down
2 changes: 1 addition & 1 deletion orchestrator/stage/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ func stageKind(mods []*pbsubstreams.Module) Kind {
type Kind int

const (
KindMap = iota
KindMap = Kind(iota)
KindStore
)
19 changes: 11 additions & 8 deletions orchestrator/stage/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,18 @@ func NewStages(
logger := reqctx.Logger(ctx)

stagedModules := outputGraph.StagedUsedModules()
lastIndex := len(stagedModules) - 1
out = &Stages{
ctx: ctx,
traceID: traceID,
segmenter: segmenter,
segmentOffset: segmenter.IndexForStartBlock(outputGraph.LowestInitBlock()),
}
for idx, mods := range stagedModules {
isLastStage := idx == lastIndex
kind := stageKind(mods)
if kind == KindMap && !isLastStage {
continue
for idx, stageLayer := range stagedModules {
mods := stageLayer.LastLayer()

kind := KindMap
if mods.IsStoreLayer() {
kind = KindStore
}

var moduleStates []*ModuleState
Expand Down Expand Up @@ -96,7 +96,7 @@ func (s *Stages) AllStagesFinished() bool {
}

for idx, stage := range s.stages {
if stage.kind == KindMap {
if stage.kind != KindStore {
continue
}
if s.getState(Unit{Segment: lastSegment, Stage: idx}) != UnitCompleted {
Expand Down Expand Up @@ -129,7 +129,10 @@ func (s *Stages) InitialProgressMessages() map[string]block.Ranges {

func (s *Stages) CmdStartMerge() loop.Cmd {
var cmds []loop.Cmd
for idx := range s.stages {
for idx, stage := range s.stages {
if stage.kind != KindStore {
continue
}
cmds = append(cmds, s.CmdTryMerge(idx))
}
return loop.Batch(cmds...)
Expand Down
22 changes: 11 additions & 11 deletions pipeline/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,17 @@ func (p *Pipeline) OnStreamTerminated(ctx context.Context, err error) error {
return fmt.Errorf("sending bytes meter %w", err)
}

if p.stores.partialsWritten != nil {
p.respFunc(&pbssinternal.ProcessRangeResponse{
ModuleName: reqDetails.OutputModule,
Type: &pbssinternal.ProcessRangeResponse_Completed{
Completed: &pbssinternal.Completed{
AllProcessedRanges: toPBInternalBlockRanges(p.stores.partialsWritten),
TraceId: p.traceID,
},
},
})
}
//if p.stores.partialsWritten != nil {
// p.respFunc(&pbssinternal.ProcessRangeResponse{
// ModuleName: reqDetails.OutputModule,
// Type: &pbssinternal.ProcessRangeResponse_Completed{
// Completed: &pbssinternal.Completed{
// AllProcessedRanges: toPBInternalBlockRanges(p.stores.partialsWritten),
// TraceId: p.traceID,
// },
// },
// })
//}

return nil
}
Expand Down
70 changes: 55 additions & 15 deletions pipeline/outputmodules/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

type Graph struct {
requestModules *pbsubstreams.Modules
usedModules []*pbsubstreams.Module // all modules that need to be processed (requested directly or a required module ancestor)
stagedUsedModules [][]*pbsubstreams.Module // all modules that need to be processed (requested directly or a required module ancestor)
usedModules []*pbsubstreams.Module // all modules that need to be processed (requested directly or a required module ancestor)
stagedUsedModules ExecutionStages // all modules that need to be processed (requested directly or a required module ancestor)
moduleHashes *manifest.ModuleHashes
stores []*pbsubstreams.Module // subset of allModules: only the stores
lowestInitBlock uint64
Expand All @@ -21,13 +21,13 @@ type Graph struct {
schedulableAncestorsMap map[string][]string // modules that are ancestors (therefore dependencies) of a given module
}

func (g *Graph) OutputModule() *pbsubstreams.Module { return g.outputModule }
func (g *Graph) Stores() []*pbsubstreams.Module { return g.stores }
func (g *Graph) UsedModules() []*pbsubstreams.Module { return g.usedModules }
func (g *Graph) StagedUsedModules() [][]*pbsubstreams.Module { return g.stagedUsedModules }
func (g *Graph) IsOutputModule(name string) bool { return g.outputModule.Name == name }
func (g *Graph) ModuleHashes() *manifest.ModuleHashes { return g.moduleHashes }
func (g *Graph) LowestInitBlock() uint64 { return g.lowestInitBlock }
func (g *Graph) OutputModule() *pbsubstreams.Module { return g.outputModule }
func (g *Graph) Stores() []*pbsubstreams.Module { return g.stores }
func (g *Graph) UsedModules() []*pbsubstreams.Module { return g.usedModules }
func (g *Graph) StagedUsedModules() ExecutionStages { return g.stagedUsedModules }
func (g *Graph) IsOutputModule(name string) bool { return g.outputModule.Name == name }
func (g *Graph) ModuleHashes() *manifest.ModuleHashes { return g.moduleHashes }
func (g *Graph) LowestInitBlock() uint64 { return g.lowestInitBlock }

func NewOutputModuleGraph(outputModule string, productionMode bool, modules *pbsubstreams.Modules) (out *Graph, err error) {
out = &Graph{
Expand Down Expand Up @@ -88,14 +88,40 @@ func computeLowestInitBlock(modules []*pbsubstreams.Module) (out uint64) {
return lowest
}

func computeStages(mods []*pbsubstreams.Module) (stages [][]*pbsubstreams.Module) {
// A list of units that we can schedule, that might include some mappers and a store,
// or the last module could be an exeuction layer with only a map.
type ExecutionStages []stageLayers

// For a given execution stage, the layers of execution, for example:
// a layer of mappers, followed by a layer of stores.
type stageLayers []layerModules

func (l stageLayers) isStoreStage() bool {
return l.LastLayer().IsStoreLayer()
}

func (l stageLayers) LastLayer() layerModules {
return l[len(l)-1]
}

// The list of modules in a given layer of either maps or stores. A given layer
// will always be comprised of only the same kind of modules.
type layerModules []*pbsubstreams.Module

func (l layerModules) IsStoreLayer() bool {
return l[0].GetKindStore() != nil
}

func computeStages(mods []*pbsubstreams.Module) (stages ExecutionStages) {
seen := map[string]bool{}

var layers stageLayers

for i := 0; ; i++ {
if len(seen) == len(mods) {
break
}
var stage []*pbsubstreams.Module
var layer layerModules
modLoop:
for _, mod := range mods {
switch mod.Kind.(type) {
Expand Down Expand Up @@ -132,15 +158,29 @@ func computeStages(mods []*pbsubstreams.Module) (stages [][]*pbsubstreams.Module
}
}

stage = append(stage, mod)
layer = append(layer, mod)
}
if len(stage) != 0 {
stages = append(stages, stage)
for _, mod := range stage {
if len(layer) != 0 {
layers = append(layers, layer)
for _, mod := range layer {
seen[mod.Name] = true
}
}
}

lastLayerIndex := len(layers) - 1
var newStage stageLayers
for idx, layer := range layers {
isLastStage := idx == lastLayerIndex
isStoreLayer := layer.IsStoreLayer()

newStage = append(newStage, layer)
if isStoreLayer || isLastStage {
stages = append(stages, newStage)
newStage = nil
}
}

return stages
}

Expand Down
107 changes: 105 additions & 2 deletions pipeline/outputmodules/graph_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,117 @@
package outputmodules

import (
"fmt"
"strings"
"testing"

"github.com/stretchr/testify/assert"

pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"github.com/stretchr/testify/assert"
)

func TestGraph_computeStages(t *testing.T) {
tests := []struct {
name string
input string
expect string
}{
{
name: "some graph",
input: "Sa Mb Mc Sd:Sa,Mb Me:Sd",
expect: "[[Sa]] [[Mb Mc] [Sd]] [[Me]]",
},
{
name: "other graph",
input: "Ma Mb:Ma Sc:Mb",
expect: "[[Ma] [Mb] [Sc]]",
},
{
name: "third graph",
input: "Ma Mb:Ma Sc:Mb Md:Sc Se:Md",
expect: "[[Ma] [Mb] [Sc]] [[Md] [Se]]",
},
{
name: "fourth graph",
input: "Ma Mb:Ma Sc:Mb Md:Sc Se:Md,Sg Mf:Ma Sg:Mf",
expect: "[[Ma] [Mb Mf] [Sc Sg]] [[Md] [Se]]",
},
{
name: "fifth graph",
input: "Ma Mb:Ma Sc:Mb Md:Sc Se:Md,Sg Mf:Ma Sg:Mf Mh:Se,Ma",
expect: "[[Ma] [Mb Mf] [Sc Sg]] [[Md] [Se]] [[Mh]]",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
out := computeStages(computeStagesInput(test.input))
assert.Equal(t, test.expect, computeStagesOutput(out))
})
}
}

func computeStagesInput(in string) (out []*pbsubstreams.Module) {
for _, mod := range strings.Split(in, " ") {
if mod == "" {
continue
}
params := strings.Split(mod, ":")
modName := params[0]
newMod := &pbsubstreams.Module{}
switch modName[0] {
case 'S':
newMod.Kind = &pbsubstreams.Module_KindStore_{KindStore: &pbsubstreams.Module_KindStore{}}
newMod.Name = modName[1:]
case 'M':
newMod.Kind = &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{}}
newMod.Name = modName[1:]
default:
panic("invalid prefix in word: " + modName)
}
if len(params) > 1 {
for _, input := range strings.Split(params[1], ",") {
inputName := input[1:]
switch input[0] {
case 'S':
newMod.Inputs = append(newMod.Inputs, &pbsubstreams.Module_Input{Input: &pbsubstreams.Module_Input_Store_{Store: &pbsubstreams.Module_Input_Store{ModuleName: inputName}}})
case 'M':
newMod.Inputs = append(newMod.Inputs, &pbsubstreams.Module_Input{Input: &pbsubstreams.Module_Input_Map_{Map: &pbsubstreams.Module_Input_Map{ModuleName: inputName}}})
case 'P':
newMod.Inputs = append(newMod.Inputs, &pbsubstreams.Module_Input{Input: &pbsubstreams.Module_Input_Params_{}})
case 'R':
newMod.Inputs = append(newMod.Inputs, &pbsubstreams.Module_Input{Input: &pbsubstreams.Module_Input_Source_{}})
default:
panic("invalid input prefix: " + input)
}
}
}
out = append(out, newMod)
}
return
}

func computeStagesOutput(in ExecutionStages) string {
var level1 []string
for _, l1 := range in {
var level2 []string
for _, l2 := range l1 {
var level3 []string
for _, l3 := range l2 {
modKind := "S"
if l3.GetKindMap() != nil {
modKind = "M"
}
level3 = append(level3, modKind+l3.Name)
}
level2 = append(level2, fmt.Sprintf("%v", level3))
}
level1 = append(level1, fmt.Sprintf("%v", level2))
}
return strings.Join(level1, " ")
}

func TestGraph_computeSchedulableModules(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -52,5 +156,4 @@ func TestGraph_computeSchedulableModules(t *testing.T) {
assert.Equal(t, test.expect, out)
})
}

}
53 changes: 31 additions & 22 deletions pipeline/outputmodules/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,40 @@ func TestGraphStagedModules(initialBlock1, ib2, ib3, ib4, ib5 uint64) *Graph {
lowest = utils.MinOf(lowest, ib5)
return &Graph{
lowestInitBlock: lowest,
stagedUsedModules: [][]*pbsubstreams.Module{
stagedUsedModules: ExecutionStages{
{
&pbsubstreams.Module{
Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{}},
InitialBlock: initialBlock1,
{
&pbsubstreams.Module{
Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{}},
InitialBlock: initialBlock1,
},
}, {
&pbsubstreams.Module{
Kind: &pbsubstreams.Module_KindStore_{KindStore: &pbsubstreams.Module_KindStore{}},
InitialBlock: ib2,
},
},
}, {
&pbsubstreams.Module{
Kind: &pbsubstreams.Module_KindStore_{KindStore: &pbsubstreams.Module_KindStore{}},
InitialBlock: ib2,
},
}, {
&pbsubstreams.Module{
Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{}},
InitialBlock: ib3,
},
}, {
&pbsubstreams.Module{
Kind: &pbsubstreams.Module_KindStore_{KindStore: &pbsubstreams.Module_KindStore{}},
InitialBlock: ib4,
},
{

{
&pbsubstreams.Module{
Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{}},
InitialBlock: ib3,
},
}, {
&pbsubstreams.Module{
Kind: &pbsubstreams.Module_KindStore_{KindStore: &pbsubstreams.Module_KindStore{}},
InitialBlock: ib4,
},
},
}, {
&pbsubstreams.Module{
Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{}},
InitialBlock: ib5,
},
{
{
&pbsubstreams.Module{
Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{}},
InitialBlock: ib5,
},
},
},
},
Expand Down
Loading

0 comments on commit 703d7cc

Please sign in to comment.