Skip to content

Commit

Permalink
run: Parse query files early
Browse files Browse the repository at this point in the history
  • Loading branch information
ethanyzhang committed Jun 19, 2024
1 parent 2f4156f commit 9ab4888
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 21 deletions.
10 changes: 6 additions & 4 deletions cmd/run/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func Run(_ *cobra.Command, args []string) {
}
defaultRunNameBuilder.WriteString(st.Id)
}
} else {
os.Exit(-1)
}
}
if defaultRunNameBuilder != nil {
Expand All @@ -68,8 +70,7 @@ func Run(_ *cobra.Command, args []string) {
}
log.Info().Str("run_name", mainStage.States.RunName).Send()

_, _, err := stage.ParseStageGraph(mainStage)
if err != nil {
if _, _, err := stage.ParseStageGraph(mainStage); err != nil {
log.Fatal().Err(err).Msg("failed to parse benchmark stage graph")
}

Expand Down Expand Up @@ -103,9 +104,10 @@ func processStagePath(path string) (st *stage.Stage, returnErr error) {
continue
}
fullPath := filepath.Join(path, entry.Name())
newStage, err := processStagePath(fullPath)
if err == nil {
if newStage, err := processStagePath(fullPath); err == nil {
st.MergeWith(newStage)
} else {
return nil, err
}
}
return st, nil
Expand Down
17 changes: 9 additions & 8 deletions stage/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ func ReadStageFromFile(filePath string) (*Stage, error) {
if err = json.Unmarshal(bytes, stage); err != nil {
return nil, fmt.Errorf("failed to parse json %s: %w", filePath, err)
}
for i, queryFile := range stage.QueryFiles {
if !filepath.IsAbs(queryFile) {
queryFile = filepath.Join(stage.BaseDir, queryFile)
stage.QueryFiles[i] = queryFile
}
if _, err = os.Stat(queryFile); err != nil {
return nil, fmt.Errorf("%s links to an invalid query file %s: %w", stage.Id, queryFile, err)
}
}
log.Debug().Str("id", stage.Id).Str("path", filePath).Msg("read stage file")
return stage, nil
}
Expand All @@ -80,14 +89,6 @@ func ParseStage(stage *Stage, stages Map) (*Stage, error) {
log.Debug().Msgf("%s already parsed, returned", stage.Id)
return stageFound, nil
}
for _, queryFile := range stage.QueryFiles {
if !filepath.IsAbs(queryFile) {
queryFile = filepath.Join(stage.BaseDir, queryFile)
}
if _, err := os.Stat(queryFile); err != nil {
return nil, fmt.Errorf("%s links to an invalid query file %s: %w", stage.Id, queryFile, err)
}
}
for i, nextStagePath := range stage.NextStagePaths {
if !filepath.IsAbs(nextStagePath) {
nextStagePath = filepath.Join(stage.BaseDir, nextStagePath)
Expand Down
5 changes: 4 additions & 1 deletion stage/no_influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
"errors"
)

func NewInfluxRunRecorder(_ string) RunRecorder {
func NewInfluxRunRecorder(cfgPath string) RunRecorder {
if cfgPath == "" {
return nil
}
return &NotSupportedRecorder{}
}

Expand Down
22 changes: 14 additions & 8 deletions stage/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,15 @@ func (s *Stage) runSequentially(ctx context.Context) (returnErr error) {
}

func (s *Stage) runQueryFile(ctx context.Context, queryFile string, expectedRowCountStartIndex *int, fileAlias *string) error {
queryFileAbsPath := queryFile
if !filepath.IsAbs(queryFileAbsPath) {
queryFileAbsPath = filepath.Join(s.BaseDir, queryFileAbsPath)
file, err := os.Open(queryFile)
if fileAlias == nil {
if relPath, relErr := filepath.Rel(s.BaseDir, queryFile); relErr == nil {
fileAlias = &relPath
} else {
fileAlias = &queryFile
}
}
file, err := os.Open(queryFileAbsPath)

var queries []string
if err == nil {
queries, err = presto.SplitQueries(file)
Expand All @@ -292,9 +296,7 @@ func (s *Stage) runQueryFile(ctx context.Context, queryFile string, expectedRowC
}
return err
}
if fileAlias == nil {
fileAlias = &queryFile
}

if expectedRowCountStartIndex != nil {
err = s.runQueries(ctx, queries, fileAlias, *expectedRowCountStartIndex)
*expectedRowCountStartIndex += len(queries)
Expand Down Expand Up @@ -345,7 +347,11 @@ func (s *Stage) runRandomly(ctx context.Context) error {
}
} else {
queryFile := s.QueryFiles[idx-len(s.Queries)]
fileAlias := fmt.Sprintf("rand_%d/%s", i, queryFile)
fileAlias := queryFile
if relPath, relErr := filepath.Rel(s.BaseDir, queryFile); relErr == nil {
fileAlias = relPath
}
fileAlias = fmt.Sprintf("rand_%d_%s", i, fileAlias)
if err := s.runQueryFile(ctx, queryFile, nil, &fileAlias); err != nil {
return err
}
Expand Down

0 comments on commit 9ab4888

Please sign in to comment.