Skip to content

Commit

Permalink
Take out logs. Fix the allocSegments() call.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandre Bourget committed Jul 5, 2023
1 parent b6b966d commit 10358cb
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 21 deletions.
17 changes: 7 additions & 10 deletions orchestrator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package scheduler

import (
"context"
"fmt"
"os"
"os/exec"

"go.uber.org/zap"

Expand Down Expand Up @@ -68,10 +65,10 @@ func (s *Scheduler) Init() loop.Cmd {
}

func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {
fmt.Printf("Scheduler message: %T %v\n", msg, msg)
fmt.Print(s.Stages.StatesString())
cmd, _ := exec.Command("bash", "-c", "cd "+os.Getenv("TEST_TEMP_DIR")+"; find .").Output()
fmt.Print(string(cmd))
//fmt.Printf("Scheduler message: %T %v\n", msg, msg)
//fmt.Print(s.Stages.StatesString())
//cmd, _ := exec.Command("bash", "-c", "cd "+os.Getenv("TEST_TEMP_DIR")+"; find .").Output()
//fmt.Print(string(cmd))
var cmds []loop.Cmd

switch msg := msg.(type) {
Expand Down Expand Up @@ -153,9 +150,9 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {

}

if len(cmds) != 0 {
fmt.Printf("Schedule: %T %+v\n", cmds, cmds)
}
//if len(cmds) != 0 {
// fmt.Printf("Schedule: %T %+v\n", cmds, cmds)
//}
return loop.Batch(cmds...)
}

Expand Down
14 changes: 3 additions & 11 deletions orchestrator/stage/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,32 +188,26 @@ func (s *Stages) CmdTryMerge(stageIdx int) loop.Cmd {

stage := s.stages[stageIdx]
if stage.kind != KindStore {
fmt.Println("TRYM: kindnot store")
return nil
}

mergeUnit := stage.nextUnit()

if mergeUnit.Segment > s.storeSegmenter.LastIndex() {
fmt.Println("TRYM: past last segment")

return nil // We're done here.
}

if s.getState(mergeUnit) != UnitPartialPresent {
fmt.Println("TRYM: wasn't in partial state")
return nil
}

if !s.previousUnitComplete(mergeUnit) {
fmt.Println("TRYM: prev unit not complete")
return nil
}

s.MarkSegmentMerging(mergeUnit)

return func() loop.Msg {
fmt.Println("TRYM: launching multiSquash", stage, mergeUnit)
if err := s.multiSquash(stage, mergeUnit); err != nil {
return MsgMergeFailed{Unit: mergeUnit, Error: err}
}
Expand Down Expand Up @@ -328,13 +322,11 @@ func (s *Stages) NextJob() (Unit, *block.Range) {
}

func (s *Stages) allocSegments(segmentIdx int) {
if len(s.segmentStates) > segmentIdx-s.segmentOffset {
segmentsNeeded := segmentIdx - s.segmentOffset
if len(s.segmentStates) > segmentsNeeded {
return
}
by := len(s.segmentStates)
if by == 0 {
by = 2
}
by := segmentsNeeded - len(s.segmentStates) + 1
for i := 0; i < by; i++ {
s.segmentStates = append(s.segmentStates, make([]UnitState, len(s.stages)))
}
Expand Down
24 changes: 24 additions & 0 deletions orchestrator/stage/stages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stage

import (
"context"
"strconv"
"strings"
"testing"

Expand Down Expand Up @@ -231,3 +232,26 @@ func TestStages_previousUnitComplete(t *testing.T) {
s.setState(u00, UnitCompleted)
assert.True(t, s.previousUnitComplete(u01)) // u00 is now complete
}

func TestStages_allocSegments(t *testing.T) {
tests := []struct {
offset int
allocSegment int
expectLen int
}{
{10, 11, 2},
{0, 11, 12},
{5, 1, 0},
{5, 5, 1},
{1, 5, 5},
}
for idx, tt := range tests {
t.Run(strconv.Itoa(idx), func(t *testing.T) {
s := &Stages{
segmentOffset: tt.offset,
}
s.allocSegments(tt.allocSegment)
assert.Len(t, s.segmentStates, tt.expectLen)
})
}
}

0 comments on commit 10358cb

Please sign in to comment.