Skip to content

Commit

Permalink
1. Added comment section of absolute and current block.
Browse files Browse the repository at this point in the history
2. Removed Err and absoluteTime from the configState object.
3. Added flag package for debugging purpose and removed hard coded file path.

Signed-off-by: Kushal Shukla <[email protected]>
  • Loading branch information
kushalShukla-web committed Nov 30, 2024
1 parent d1560eb commit 8b752ef
Showing 1 changed file with 32 additions and 28 deletions.
60 changes: 32 additions & 28 deletions tools/load-generator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"flag"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -98,10 +99,8 @@ type BucketConfig struct {
MaxTime int64 `yaml:"maxTime"`
}

type configState struct {
type bucketState struct {
bucketConfig *BucketConfig
Err error
absoluteTime int64
}

func NewQuerier(groupID int, target, prNumber string, qg QueryGroup) *Querier {
Expand Down Expand Up @@ -133,14 +132,16 @@ func NewQuerier(groupID int, target, prNumber string, qg QueryGroup) *Querier {
}

// Function to load `minTime` and `maxTime` from bucket-config.yml
func loadKeyConfig() (*BucketConfig, error) {
filePath := "/config/bucket-config.yml"
_, err := os.Stat(filePath)
func loadBucketConfig() (*BucketConfig, error) {
filePath := flag.String("bucketconfig-file", "/config/bucket-config.yml", "Path to the bucket configuration file")
flag.Parse()

_, err := os.Stat(*filePath)
if os.IsNotExist(err) {
return nil, fmt.Errorf("file not found: %s", filePath)
return nil, fmt.Errorf("file not found: %s", *filePath)
}

data, err := os.ReadFile(filePath)
data, err := os.ReadFile(*filePath)
if err != nil {
return nil, fmt.Errorf("error reading file: %w", err)
}
Expand All @@ -154,35 +155,37 @@ func loadKeyConfig() (*BucketConfig, error) {
return &bucketConfig, nil
}

func configstate(v *BucketConfig, err error) *configState {
var absolutetime int64
if v != nil {
absolutetime = v.MaxTime
func setconfig(v *BucketConfig, err error) *bucketState {
// If there is an error in reading bucket-config.yml file then just return nil.
if err != nil {
return nil
}
return &configState{
return &bucketState{
bucketConfig: v,
Err: err,
absoluteTime: absolutetime,
}
}

func (q *Querier) run(wg *sync.WaitGroup, timeBound *configState) {
func (q *Querier) run(wg *sync.WaitGroup, timeBound *bucketState) {
defer wg.Done()
fmt.Printf("Running querier %s %s for %s\n", q.target, q.name, q.url)
time.Sleep(20 * time.Second)

for {
start := time.Now()
// If timeBound is not nil, both the "absolute" and "current" blocks will run;
// otherwise, only the "current" block will execute. This execution pattern is used
// because if Downloaded block data is present, both the head block and downloaded block
// need to be processed.
runBlockMode := "current"
for _, query := range q.queries {
if runBlockMode == "current" {
q.query(query.Expr, "current", nil)
} else if timeBound.Err == nil {
} else if runBlockMode == "absolute" {
q.query(query.Expr, "absolute", timeBound)
}
if runBlockMode == "current" && timeBound.Err == nil {
if runBlockMode == "current" && timeBound != nil {
runBlockMode = "absolute"
} else if timeBound.Err == nil {
} else if timeBound != nil {
runBlockMode = "current"
}
}
Expand All @@ -194,7 +197,7 @@ func (q *Querier) run(wg *sync.WaitGroup, timeBound *configState) {
}
}

func (q *Querier) query(expr string, timeMode string, timeBound *configState) {
func (q *Querier) query(expr string, timeMode string, timeBound *bucketState) {
queryCount.WithLabelValues(q.target, q.name, expr, q.qtype).Inc()
start := time.Now()

Expand All @@ -219,7 +222,7 @@ func (q *Querier) query(expr string, timeMode string, timeBound *configState) {
qParams.Set("step", q.step)
}
} else if timeMode == "absolute" {
blockinstime := time.Unix(0, timeBound.absoluteTime*int64(time.Millisecond))
blockinstime := time.Unix(0, timeBound.bucketConfig.MaxTime*int64(time.Millisecond))
qParams.Set("time", fmt.Sprintf("%d", int64(blockinstime.Unix())))
}
req.URL.RawQuery = qParams.Encode()
Expand Down Expand Up @@ -268,9 +271,13 @@ func main() {
}
prNumber := os.Args[2]

configFile, err := os.ReadFile("/etc/loadgen/config.yaml")
configPath := flag.String("config-file", "/etc/loadgen/config.yaml", "Path to the configuration file")
flag.Parse()

configFile, err := os.ReadFile(*configPath)
if err != nil {
log.Fatalf("Failed to load config: %v", err)
fmt.Printf("Error reading config file: %v\n", err)
return
}

var config struct {
Expand All @@ -286,11 +293,8 @@ func main() {

var wg sync.WaitGroup

bucketConfig, err := loadKeyConfig()
if err != nil {
fmt.Printf("bucket-config.yml file is not present: %v\n", err)
}
timeBound := configstate(bucketConfig, err)
bucketConfig, err := loadBucketConfig()
timeBound := setconfig(bucketConfig, err)

for i, group := range config.Querier.Groups {
wg.Add(1)
Expand Down

0 comments on commit 8b752ef

Please sign in to comment.