Skip to content

Commit

Permalink
Fix latest bugs.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandre Bourget committed Jul 5, 2023
1 parent 2cac5e4 commit 26f8e2c
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 57 deletions.
94 changes: 40 additions & 54 deletions orchestrator/stage/stages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,43 +24,23 @@ func TestNewStages(t *testing.T) {
"trace",
)

assert.Equal(t, 8, stages.storeSegmenter.Count()) // from 5 to 75
assert.Equal(t, false, stages.storeSegmenter.EndsOnInterval(7))
assert.Equal(t, 8, stages.globalSegmenter.Count()) // from 5 to 75
assert.Equal(t, true, stages.storeSegmenter.EndsOnInterval(6))
assert.Equal(t, false, stages.globalSegmenter.EndsOnInterval(7))
assert.Equal(t, 6, stages.storeSegmenter.IndexForStartBlock(60), "index in range")
assert.Equal(t, 8, stages.storeSegmenter.IndexForStartBlock(80), "index out of range still returned here")
assert.Nil(t, stages.storeSegmenter.Range(8), "out of range")

assert.Equal(t, block.ParseRange("5-10"), stages.storeSegmenter.Range(0))
assert.Equal(t, block.ParseRange("10-20"), stages.storeSegmenter.Range(1))
assert.Equal(t, block.ParseRange("70-75"), stages.storeSegmenter.Range(7))
}

func TestNewStagesMapNotAlignedWithStoreEndBlock(t *testing.T) {
reqPlan := plan.BuildTier1RequestPlan(true, 10, 5, 5, 75, 75)
assert.Equal(t, "", reqPlan.String())

stages := NewStages(
context.Background(),
outputmodules.TestGraphStagedModules(5, 7, 12, 22, 25),
reqPlan,
nil,
"trace",
)

assert.Equal(t, 8, stages.storeSegmenter.Count()) // from 5 to 75
assert.Equal(t, false, stages.storeSegmenter.EndsOnInterval(7))
assert.Equal(t, 6, stages.storeSegmenter.IndexForStartBlock(60), "index in range")
assert.Equal(t, 8, stages.storeSegmenter.IndexForStartBlock(80), "index out of range still returned here")
assert.Nil(t, stages.storeSegmenter.Range(8), "out of range")

assert.Equal(t, block.ParseRange("5-10"), stages.storeSegmenter.Range(0))
assert.Equal(t, block.ParseRange("10-20"), stages.storeSegmenter.Range(1))
assert.Equal(t, block.ParseRange("70-75"), stages.storeSegmenter.Range(7))
assert.Nil(t, stages.storeSegmenter.Range(7))
assert.Equal(t, block.ParseRange("70-75"), stages.globalSegmenter.Range(7))
}

func TestNewStagesNextJobs(t *testing.T) {
//seg := block.NewSegmenter(10, 5, 50)
reqPlan := plan.BuildTier1RequestPlan(true, 10, 5, 5, 50, 50)
assert.Equal(t, "interval=10, stores=[5, 40), map_write=[5, 50), map_read=[5, 50), linear=[nil)", reqPlan.String())
stages := NewStages(
context.Background(),
outputmodules.TestGraphStagedModules(5, 5, 5, 5, 5),
Expand Down Expand Up @@ -159,61 +139,67 @@ S:CCSS
S:CSS.
M:NC..`)

stages.NextJob()

segmentStateEquals(t, stages, `
S:CCSSS...
S:CSS.....
M:NC......`)

stages.NextJob()
_, r := stages.NextJob()
assert.Nil(t, r)

segmentStateEquals(t, stages, `
S:CCSSSS..
S:CSS.....
M:NC......`)
S:CCSS
S:CSS.
M:NC..`)

_, r := stages.NextJob()
assert.Nil(t, r)
// segmentStateEquals(t, stages, `
//S:CCSSS...
//S:CSS.....
//M:NC......`)
//
// stages.NextJob()
//
// segmentStateEquals(t, stages, `
//S:CCSSSS..
//S:CSS.....
//M:NC......`)
//
// _, r := stages.NextJob()
// assert.Nil(t, r)
stages.MarkSegmentPartialPresent(id(2, 0))

segmentStateEquals(t, stages, `
S:CCPSSS..
S:CSS.....
M:NC......`)
S:CCPS
S:CSS.
M:NC..`)

_, r = stages.NextJob()
assert.Nil(t, r)
stages.MarkSegmentMerging(id(2, 0))

segmentStateEquals(t, stages, `
S:CCMSSS..
S:CSS.....
M:NC......`)
S:CCMS
S:CSS.
M:NC..`)

_, r = stages.NextJob()
assert.Nil(t, r)
stages.markSegmentCompleted(id(2, 0))

segmentStateEquals(t, stages, `
S:CCCSSS..
S:CSS.....
M:NC......`)
S:CCCS
S:CSS.
M:NC..`)

stages.NextJob()

segmentStateEquals(t, stages, `
S:CCCSSS..
S:CSSS....
M:NC......`)
S:CCCS
S:CSSS
M:NC..`)

stages.forceTransition(1, 1, UnitCompleted)
stages.NextJob()

segmentStateEquals(t, stages, `
S:CCCSSS..
S:CCSS....
M:NCS.....`)
S:CCCS
S:CCSS
M:NCS.`)

}

Expand Down
2 changes: 0 additions & 2 deletions storage/execout/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@ type Writer struct {

fileWalker *FileWalker
currentFile *File
files map[string]*File // moduleName => file
outputModule string
}

func NewWriter(initialBlockBoundary, exclusiveEndBlock uint64, outputModule string, configs *Configs) *Writer {
w := &Writer{
wg: &sync.WaitGroup{},
files: make(map[string]*File),
outputModule: outputModule,
}

Expand Down
2 changes: 1 addition & 1 deletion storage/execout/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ var testConfigs = &Configs{
func TestNewExecOutputWriterIsSubRequest(t *testing.T) {
res := NewWriter(11, 15, "A", testConfigs)
require.NotNil(t, res)
assert.Equal(t, 15, int(res.files["A"].ExclusiveEndBlock))
assert.Equal(t, 15, int(res.fileWalker.segmenter.ExclusiveEndBlock()))
}

0 comments on commit 26f8e2c

Please sign in to comment.