Skip to content

Commit

Permalink
nomad-load: support increasing updates frequency (#133)
Browse files Browse the repository at this point in the history
  • Loading branch information
pkazmierczak committed May 16, 2024
1 parent c2870f1 commit 2888b82
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 20 deletions.
31 changes: 16 additions & 15 deletions tools/nomad-load/cmd/nomad-load/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@ var (
httpAddr = flag.String("http-addr", "0.0.0.0", "The address to bind the HTTP server to")
httpPort = flag.String("http-port", "8080", "The port to bind the HTTP server to")

jobType = flag.String("type", internal.JobTypeBatch, "The type of job to create (batch or service)")
jobDriver = flag.String("driver", internal.JobDriverMock, "The driver to use for the job (mock or docker)")
updatesDelay = flag.Duration("updates-delay", 500*time.Millisecond, "The delay between updates of the service job")
numOfUpdates = flag.Int("num-of-updates", 0, "The number of updates to perform on the service job. If 0, the job will be continuosly updated")
numOfDispatches = flag.Int("num-of-dispatches", 0, "The number of dispatches to perform on the batch job. If 0, the job will be continuosly dispatched")
count = flag.Int("count", 1, "The count number per job (number of allocations is count * groups)")
groups = flag.Int("groups", 1, "The number of groups to create per job")
spread = flag.Bool("spread", false, "Should the jobs be spread across the datacenters?")
reqRate = flag.Float64("rate", 10, "The rate of constant job dispatches per second")
burstRate = flag.Int("burst", 1, "The burst rate of constant job dispatches")
randomize = flag.Bool("random", false, "Should the rate at which the jobs are dispatched be randomized?")
seed1 = flag.Uint64("seed1", rand.Uint64(), "First uint64 of the PCG seed used by the random number generator")
seed2 = flag.Uint64("seed2", rand.Uint64(), "Second uint64 of the PCG seed used by the random number generator")
workers = flag.Int("workers", 10*runtime.NumCPU(), "The number of workers to use")
jobType = flag.String("type", internal.JobTypeBatch, "The type of job to create (batch or service)")
jobDriver = flag.String("driver", internal.JobDriverMock, "The driver to use for the job (mock or docker)")
updatesDelay = flag.Duration("updates-delay", 3*time.Second, "The delay between updates of the service job")
updatesDelayTarget = flag.Duration("updates-delay-target", 0, "The target delay between updates. For finite num-of-updates values, this will be delay reached before the last update. Ignored if unset")
numOfUpdates = flag.Int("num-of-updates", 0, "The number of updates to perform on the service job. If 0, the job will be continuosly updated")
numOfDispatches = flag.Int("num-of-dispatches", 0, "The number of dispatches to perform on the batch job. If 0, the job will be continuosly dispatched")
count = flag.Int("count", 1, "The count number per job (number of allocations is count * groups)")
groups = flag.Int("groups", 1, "The number of groups to create per job")
spread = flag.Bool("spread", false, "Should the jobs be spread across the datacenters?")
reqRate = flag.Float64("rate", 10, "The rate of constant job dispatches per second")
burstRate = flag.Int("burst", 1, "The burst rate of constant job dispatches")
randomize = flag.Bool("random", false, "Should the rate at which the jobs are dispatched be randomized?")
seed1 = flag.Uint64("seed1", rand.Uint64(), "First uint64 of the PCG seed used by the random number generator")
seed2 = flag.Uint64("seed2", rand.Uint64(), "Second uint64 of the PCG seed used by the random number generator")
workers = flag.Int("workers", 10*runtime.NumCPU(), "The number of workers to use")

logLevel = flag.String("log-level", "DEBUG", "The log level to use")
ver = flag.Bool("version", false, "Prints out the version")
Expand Down Expand Up @@ -118,7 +119,7 @@ func main() {
case internal.JobTypeBatch:
go job.DispatchBatch(&wg, *numOfDispatches, lim, rng)
case internal.JobTypeService:
go job.RunService(&wg, i, *numOfUpdates, *updatesDelay)
go job.RunService(&wg, i, *numOfUpdates, *updatesDelay, *updatesDelayTarget)
}
}

Expand Down
19 changes: 14 additions & 5 deletions tools/nomad-load/internal/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (j *TestJob) DispatchBatch(wg *sync.WaitGroup, numOfDispatches int, lim *ra
j.logger.Info("sccessfully dispatched jobs", "num_of_dispatches", numOfDispatches)
}

func (j *TestJob) RunService(wg *sync.WaitGroup, worker int, numOfUpdates int, updatesDelay time.Duration) {
func (j *TestJob) RunService(wg *sync.WaitGroup, worker int, numOfUpdates int, updatesDelay, updatesDelayTarget time.Duration) {
defer wg.Done()

parsed, err := j.render(worker)
Expand All @@ -136,8 +136,8 @@ func (j *TestJob) RunService(wg *sync.WaitGroup, worker int, numOfUpdates int, u
metrics.IncrCounter([]string{"registrations"}, 1)
j.logger.Info("successfully registered job", "job_id", *parsed.ID)

update := func(i int) {
time.Sleep(updatesDelay)
update := func(i int, delay time.Duration) {
time.Sleep(delay)

// re-parse the jobspec so that the echo string gets updated
parsed, err := j.render(worker)
Expand All @@ -154,14 +154,23 @@ func (j *TestJob) RunService(wg *sync.WaitGroup, worker int, numOfUpdates int, u
}

if numOfUpdates > 0 {
// check if we're supposed to gradually increase the frequency of updates
var decrementStep time.Duration
if updatesDelayTarget != 0 {
decrementStep = (updatesDelay - updatesDelayTarget) / time.Duration(numOfUpdates-1)
}

for i := 0; i < numOfUpdates; i++ {
update(i)
update(i, updatesDelay)
if decrementStep != 0 {
updatesDelay -= decrementStep
}
}
} else {
// 0 is "infinity"
i := 0
for {
update(i)
update(i, updatesDelay)
i++
}
}
Expand Down

0 comments on commit 2888b82

Please sign in to comment.