Skip to content

Commit

Permalink
Integration tests now compile and run. Stalls somewhere.
Browse files Browse the repository at this point in the history
Some clean-ups:
* Manage errors for FinalStoreMap
* CLean-up of some comments.
* Optimally allocate segmentStates.
* Complete -> FullKV.
  • Loading branch information
Alexandre Bourget committed Jun 29, 2023
1 parent 30e4649 commit 4112332
Show file tree
Hide file tree
Showing 17 changed files with 109 additions and 396 deletions.
6 changes: 2 additions & 4 deletions orchestrator/parallelprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func BuildParallelProcessor(
// We don't need to plan work for ranges where we have ExecOut
// already.
// BUT we'll need to have stores to be able to schedule work after
// so there's a mix of Complete stores and ExecOut files we need
// so there's a mix of FullKV stores and ExecOut files we need
// to check. We can push the `segmentCompleted` based on the
// execout files.

Expand Down Expand Up @@ -141,7 +141,5 @@ func (b *ParallelProcessor) Run(ctx context.Context) (storeMap store.Map, err er
return nil, fmt.Errorf("scheduler run: %w", err)
}

storeMap = b.scheduler.FinalStoreMap(reqctx.Details(ctx).LinearHandoffBlockNum)

return storeMap, nil
return b.scheduler.FinalStoreMap(reqctx.Details(ctx).LinearHandoffBlockNum)
}
2 changes: 1 addition & 1 deletion orchestrator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,6 @@ func (s *Scheduler) cmdShutdownWhenComplete() loop.Cmd {

}

func (s *Scheduler) FinalStoreMap(exclusiveEndBlock uint64) store.Map {
func (s *Scheduler) FinalStoreMap(exclusiveEndBlock uint64) (store.Map, error) {
return s.Stages.FinalStoreMap(exclusiveEndBlock)
}
8 changes: 1 addition & 7 deletions orchestrator/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ import (
)

func TestSched2_JobFinished(t *testing.T) {
s := &Scheduler{
ExecOutWalker: newTestWalker(),
}
s := &Scheduler{}
cmd := s.Update(execout.MsgFileDownloaded{})
msg := cmd()
assert.Equal(t, 1, len(msg.(loop.BatchMsg)))
Expand All @@ -36,7 +34,3 @@ func TestSched2_JobFinished(t *testing.T) {
// * NextSegment()

}

func newTestWalker() Walker {

}
69 changes: 11 additions & 58 deletions orchestrator/stage/fetchstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,7 @@ func (s *Stages) FetchStoresState(

upToBlock := segmenter.ExclusiveEndBlock()

// TODO: this method has two goals:
// load the matrix with complete stores and partials present
// load the initial blocks, but why not just use the stores
// at the VERY END? instead of merging all the time?
// let the stages produce the complete snapshots
// and don't try to merge them..
// only when we're near the LinearHandoff should we
// start loading those stores in preparation for the
// linear handoff.

// TODO: make sure the Tier2 jobs write the FULL stores
// but NO, the tier2 produce partials jobs.. that need to be
// merged, but right now we have TWO needs for merging:
// 1) preparing the storeMap for the end
// 2) taking the previous full and fuse it with a partial
// in order to produce a new complete store, so that the
// next job can start.
// In this case, we mustn't start a job until the full store
// has been properly written to storage.. right now we
// ship that in a go routine..
// with that `writerErrGroup`.. what's the use of that?

// FIXME: why load stores if there could be ExecOut data present
// OPTIMIZATION: why load stores if there could be ExecOut data present
// on disk already, which avoid the need to do _any_ processing whatsoever?
state, err := state.FetchState(ctx, storeConfigMap, upToBlock)
if err != nil {
Expand All @@ -59,46 +37,19 @@ func (s *Stages) FetchStoresState(
modSegmenter := mod.segmenter

// TODO: what happens to the Unit's state if we don't have
// compelte sores for all modules within?
// complete sores for all modules within?
// We'll need to do the same alignment of Complete stores
complete := files.LastCompleteSnapshotBefore(upToBlock)
if complete != nil {
fullKV := files.LastFullKVSnapshotBefore(upToBlock)
if fullKV != nil {
// HERE WE should actually just load the CLOSEST to the start
// point
segmentIdx := modSegmenter.IndexForEndBlock(complete.Range.ExclusiveEndBlock)
segmentIdx := modSegmenter.IndexForEndBlock(fullKV.Range.ExclusiveEndBlock)
rng := segmenter.Range(segmentIdx)
if rng.ExclusiveEndBlock != complete.Range.ExclusiveEndBlock {
if rng.ExclusiveEndBlock != fullKV.Range.ExclusiveEndBlock {
continue
}
unit := Unit{Stage: stageIdx, Segment: segmentIdx}
if allDone := markFound(completes, unit, mod.name, moduleCount); allDone {
// TODO: we should push the `segmentComplete` and LOAD all the stores
// aligned at this block, but only for the _highest_ of the
// completed bundles.

// TODO: do we need another state, for when a CompleteStore is
// present? or FullKV is present, in which case we can load it
// altogether instead of merging it. a Full followed by a PartialPresent
// could do with a `Load()` of the previous `Full`, and then a merge
// of the partial.
// But if we have FullKV here and there, we don't need to schedule
// work to produce them, they are already there.

// TODO: that might mean that the `moduleState` needs to keep
// track itself of the state of the advancement of its `store`.
// Also it should produce a Message when a FullKV is being written
// and when it written, in which case we can lauch the next job that
// would consume it. And if we receive notice that a FullKV already
// exists, we don't schedule work to produce it, and we potentially
// load it to merge the following stuff.

// TODO: review the meaning of `UnitCompleted`, perhaps rename to
// `UnitFullPresent`. And that state should not mean that
// all stores for a Unit have been merged, or whatever the state
// of the merging process. That should be kept inside the `state`
// which are linear view, and always going forward.. to produce
// whatever is necessary to generate the ExecOu or the final
//`StoreMap` as a setup-phase for the LinearPipeline
s.markSegmentCompleted(unit)
}
}
Expand All @@ -114,10 +65,12 @@ func (s *Stages) FetchStoresState(
}
unit := Unit{Stage: stageIdx, Segment: segmentIdx}

if s.getState(unit) == UnitCompleted {
// FullKVs take precedence over partial stores' presence.
continue
}

if allDone := markFound(partials, unit, mod.name, moduleCount); allDone {
// TODO: if we discover both a Partial and a Complete, we need to make
// sure that the Complete is the thing that wins in the state, because
// we don't want to lose time merging a partial
s.MarkSegmentPartialPresent(unit)
}
}
Expand Down
8 changes: 4 additions & 4 deletions orchestrator/stage/squash.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ func (s *Stages) multiSquash(stage *Stage, mergeUnit Unit) error {
return stage.syncWork.Wait()
}

// The singleSquash oepration's goal is to take the up-most contiguous unit
// tha is compete, and take the very next partial, squash it and produce a complete
// The singleSquash operation's goal is to take the up-most contiguous unit
// tha is compete, and take the very next partial, squash it and produce a FullKV
// store.
// If we happen to have some complete stores in the middle, then our goal is
// If we happen to have some FullKV stores in the middle, then our goal is
// to load that compete store, and squash the next partial segment.
// We keep the cache of the latest complete store, to speed up things
// We keep the cache of the latest FullKV store, to speed up things
// if they are linear
func (s *Stages) singleSquash(stage *Stage, modState *ModuleState, mergeUnit Unit) error {
metrics := mergeMetrics{}
Expand Down
31 changes: 17 additions & 14 deletions orchestrator/stage/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewStages(
func (s *Stages) AllStagesFinished() bool {
lastSegment := s.segmenter.LastIndex()
lastSegmentIndex := lastSegment - s.segmentOffset
if len(s.segmentStates) < lastSegmentIndex {
if lastSegmentIndex >= len(s.segmentStates) {
return false
}

Expand Down Expand Up @@ -154,13 +154,13 @@ func (s *Stages) CmdTryMerge(stageIdx int) loop.Cmd {
mergeUnit := stage.nextUnit()

if mergeUnit.Segment > s.segmenter.LastIndex() {
// TODO: we could affect a state for the whole Stage here
// and the AllStagesFinished() could be a quick
// check of that state instead of plucking through the
// Unit matrix.
return nil // We're done here.
}

if s.getState(mergeUnit) != UnitPartialPresent {
return nil
}

if !s.previousUnitComplete(mergeUnit) {
return nil
}
Expand All @@ -181,7 +181,11 @@ func (s *Stages) MergeCompleted(mergeUnit Unit) {
}

func (s *Stages) getState(u Unit) UnitState {
return s.segmentStates[u.Segment-s.segmentOffset][u.Stage]
index := u.Segment - s.segmentOffset
if index >= len(s.segmentStates) {
return UnitPending
}
return s.segmentStates[index][u.Stage]
}

func (s *Stages) setState(u Unit, state UnitState) {
Expand Down Expand Up @@ -211,9 +215,6 @@ func (s *Stages) NextJob() (Unit, *block.Range) {
// each time contiguous segments are completed for all stages.
segmentIdx := s.segmenter.FirstIndex()
for {
if len(s.segmentStates) <= segmentIdx-s.segmentOffset {
s.growSegments()
}
if segmentIdx > s.segmenter.LastIndex() {
break
}
Expand All @@ -239,7 +240,10 @@ func (s *Stages) NextJob() (Unit, *block.Range) {
return Unit{}, nil
}

func (s *Stages) growSegments() {
func (s *Stages) allocSegments(segmentIdx int) {
if len(s.segmentStates) > segmentIdx-s.segmentOffset {
return
}
by := len(s.segmentStates)
if by == 0 {
by = 2
Expand Down Expand Up @@ -271,17 +275,16 @@ func (s *Stages) previousUnitComplete(u Unit) bool {
return s.getState(Unit{Segment: u.Segment - 1, Stage: u.Stage}) == UnitCompleted
}

func (s *Stages) FinalStoreMap(exclusiveEndBlock uint64) store.Map {
func (s *Stages) FinalStoreMap(exclusiveEndBlock uint64) (store.Map, error) {
out := store.NewMap()
for _, stage := range s.stages {
for _, modState := range stage.moduleStates {
fullKV, err := modState.getStore(s.ctx, exclusiveEndBlock)
if err != nil {
// TODO: do proper error propagation
panic(fmt.Errorf("stores didn't sync up properly, expected store %q to be at block %d but was at %d: %w", modState.name, exclusiveEndBlock, modState.lastBlockInStore, err))
return nil, fmt.Errorf("stores didn't sync up properly, expected store %q to be at block %d but was at %d: %w", modState.name, exclusiveEndBlock, modState.lastBlockInStore, err)
}
out[modState.name] = fullKV
}
}
return out
return out, nil
}
10 changes: 10 additions & 0 deletions orchestrator/stage/transitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ stateDiagram-v2
*/

// TODO: UnitCompleted might mean two things: the fact a FullKV is on disk,
// or the fact that we've merged a partial and we're done. It might be
// that we sent a goroutine to write to storage, but it hasn't reached the Bucket
// yet.
// We want to distinguish those to make sure merging happens as _quickly_ as
// possible when there are many partials to merge, but we also want to know
// when the files are indeed present on disk (being detected, or after the _write_
// operation of the merging operation successfully finished).

func (s *Stages) MarkSegmentMerging(u Unit) {
if !s.previousUnitComplete(u) {
panic("can only merge segments if previous is complete")
Expand Down Expand Up @@ -118,6 +127,7 @@ func (s *Stages) markSegmentCompleted(u Unit) {
}

func (s *Stages) transition(u Unit, to UnitState, allowedPreviousStates ...UnitState) {
s.allocSegments(u.Segment)
prev := s.getState(u)
for _, from := range allowedPreviousStates {
if prev == from {
Expand Down
2 changes: 0 additions & 2 deletions orchestrator/work/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package work
import (
"github.com/streamingfast/substreams/orchestrator/loop"
"github.com/streamingfast/substreams/orchestrator/stage"
"github.com/streamingfast/substreams/storage/store"
)

// Messages
Expand All @@ -15,7 +14,6 @@ type MsgJobFailed struct {

type MsgJobSucceeded struct {
Unit stage.Unit
Files store.FileInfos
Worker Worker
}

Expand Down
7 changes: 4 additions & 3 deletions orchestrator/work/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (w *RemoteWorker) ID() string {
return fmt.Sprintf("%d", w.id)
}

func newRequest(req *reqctx.RequestDetails, stageIndex int, workRange *block.Range) *pbssinternal.ProcessRangeRequest {
func NewRequest(req *reqctx.RequestDetails, stageIndex int, workRange *block.Range) *pbssinternal.ProcessRangeRequest {
return &pbssinternal.ProcessRangeRequest{
StartBlockNum: workRange.StartBlock,
StopBlockNum: workRange.ExclusiveEndBlock,
Expand All @@ -93,7 +93,7 @@ func newRequest(req *reqctx.RequestDetails, stageIndex int, workRange *block.Ran
}

func (w *RemoteWorker) Work(ctx context.Context, unit stage.Unit, workRange *block.Range, upstream *response.Stream) loop.Cmd {
request := newRequest(reqctx.Details(ctx), unit.Stage, workRange)
request := NewRequest(reqctx.Details(ctx), unit.Stage, workRange)
logger := reqctx.Logger(ctx)

return func() loop.Msg {
Expand Down Expand Up @@ -131,7 +131,8 @@ func (w *RemoteWorker) Work(ctx context.Context, unit stage.Unit, workRange *blo
return MsgJobSucceeded{
Unit: unit,
Worker: w,
Files: res.PartialFilesWritten,
// TODO: Clean the PartialFilesWritten from the res because it's not needed anymore.
//Files: res.PartialFilesWritten,
}
}
}
Expand Down
1 change: 0 additions & 1 deletion pipeline/forkhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,5 @@ func (f *ForkHandler) removeReversibleOutput(blockID string) {
}

func (f *ForkHandler) addReversibleOutput(moduleOutput *pbssinternal.ModuleOutput, blockID string) {
// TODO: ADD MUTEX
f.reversibleOutputs[blockID] = append(f.reversibleOutputs[blockID], moduleOutput)
}
Loading

0 comments on commit 4112332

Please sign in to comment.