diff --git a/orchestrator/scheduler/scheduler.go b/orchestrator/scheduler/scheduler.go index 6de072042..dc218f744 100644 --- a/orchestrator/scheduler/scheduler.go +++ b/orchestrator/scheduler/scheduler.go @@ -2,9 +2,6 @@ package scheduler import ( "context" - "fmt" - "os" - "os/exec" "go.uber.org/zap" @@ -68,10 +65,10 @@ func (s *Scheduler) Init() loop.Cmd { } func (s *Scheduler) Update(msg loop.Msg) loop.Cmd { - fmt.Printf("Scheduler message: %T %v\n", msg, msg) - fmt.Print(s.Stages.StatesString()) - cmd, _ := exec.Command("bash", "-c", "cd "+os.Getenv("TEST_TEMP_DIR")+"; find .").Output() - fmt.Print(string(cmd)) + //fmt.Printf("Scheduler message: %T %v\n", msg, msg) + //fmt.Print(s.Stages.StatesString()) + //cmd, _ := exec.Command("bash", "-c", "cd "+os.Getenv("TEST_TEMP_DIR")+"; find .").Output() + //fmt.Print(string(cmd)) var cmds []loop.Cmd switch msg := msg.(type) { @@ -153,9 +150,9 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd { } - if len(cmds) != 0 { - fmt.Printf("Schedule: %T %+v\n", cmds, cmds) - } + //if len(cmds) != 0 { + // fmt.Printf("Schedule: %T %+v\n", cmds, cmds) + //} return loop.Batch(cmds...) } diff --git a/orchestrator/stage/stages.go b/orchestrator/stage/stages.go index 53a2a5650..756e2a1fd 100644 --- a/orchestrator/stage/stages.go +++ b/orchestrator/stage/stages.go @@ -188,32 +188,26 @@ func (s *Stages) CmdTryMerge(stageIdx int) loop.Cmd { stage := s.stages[stageIdx] if stage.kind != KindStore { - fmt.Println("TRYM: kindnot store") return nil } mergeUnit := stage.nextUnit() if mergeUnit.Segment > s.storeSegmenter.LastIndex() { - fmt.Println("TRYM: past last segment") - return nil // We're done here. } if s.getState(mergeUnit) != UnitPartialPresent { - fmt.Println("TRYM: wasn't in partial state") return nil } if !s.previousUnitComplete(mergeUnit) { - fmt.Println("TRYM: prev unit not complete") return nil } s.MarkSegmentMerging(mergeUnit) return func() loop.Msg { - fmt.Println("TRYM: launching multiSquash", stage, mergeUnit) if err := s.multiSquash(stage, mergeUnit); err != nil { return MsgMergeFailed{Unit: mergeUnit, Error: err} } @@ -328,13 +322,11 @@ func (s *Stages) NextJob() (Unit, *block.Range) { } func (s *Stages) allocSegments(segmentIdx int) { - if len(s.segmentStates) > segmentIdx-s.segmentOffset { + segmentsNeeded := segmentIdx - s.segmentOffset + if len(s.segmentStates) > segmentsNeeded { return } - by := len(s.segmentStates) - if by == 0 { - by = 2 - } + by := segmentsNeeded - len(s.segmentStates) + 1 for i := 0; i < by; i++ { s.segmentStates = append(s.segmentStates, make([]UnitState, len(s.stages))) } diff --git a/orchestrator/stage/stages_test.go b/orchestrator/stage/stages_test.go index b0a32265f..24cbf89ad 100644 --- a/orchestrator/stage/stages_test.go +++ b/orchestrator/stage/stages_test.go @@ -2,6 +2,7 @@ package stage import ( "context" + "strconv" "strings" "testing" @@ -231,3 +232,26 @@ func TestStages_previousUnitComplete(t *testing.T) { s.setState(u00, UnitCompleted) assert.True(t, s.previousUnitComplete(u01)) // u00 is now complete } + +func TestStages_allocSegments(t *testing.T) { + tests := []struct { + offset int + allocSegment int + expectLen int + }{ + {10, 11, 2}, + {0, 11, 12}, + {5, 1, 0}, + {5, 5, 1}, + {1, 5, 5}, + } + for idx, tt := range tests { + t.Run(strconv.Itoa(idx), func(t *testing.T) { + s := &Stages{ + segmentOffset: tt.offset, + } + s.allocSegments(tt.allocSegment) + assert.Len(t, s.segmentStates, tt.expectLen) + }) + } +}