From af48d8778287f48c7a58ac2c77e322be04153112 Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Mon, 13 May 2024 14:39:07 +0200 Subject: [PATCH] nomad-load: support increasing updates frequency --- tools/nomad-load/cmd/nomad-load/main.go | 31 +++++++++++++------------ tools/nomad-load/internal/job.go | 19 +++++++++++---- 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/tools/nomad-load/cmd/nomad-load/main.go b/tools/nomad-load/cmd/nomad-load/main.go index 2fe2a09..70e4e9a 100644 --- a/tools/nomad-load/cmd/nomad-load/main.go +++ b/tools/nomad-load/cmd/nomad-load/main.go @@ -27,20 +27,21 @@ var ( httpAddr = flag.String("http-addr", "0.0.0.0", "The address to bind the HTTP server to") httpPort = flag.String("http-port", "8080", "The port to bind the HTTP server to") - jobType = flag.String("type", internal.JobTypeBatch, "The type of job to create (batch or service)") - jobDriver = flag.String("driver", internal.JobDriverMock, "The driver to use for the job (mock or docker)") - updatesDelay = flag.Duration("updates-delay", 500*time.Millisecond, "The delay between updates of the service job") - numOfUpdates = flag.Int("num-of-updates", 0, "The number of updates to perform on the service job. If 0, the job will be continuosly updated") - numOfDispatches = flag.Int("num-of-dispatches", 0, "The number of dispatches to perform on the batch job. If 0, the job will be continuosly dispatched") - count = flag.Int("count", 1, "The count number per job (number of allocations is count * groups)") - groups = flag.Int("groups", 1, "The number of groups to create per job") - spread = flag.Bool("spread", false, "Should the jobs be spread across the datacenters?") - reqRate = flag.Float64("rate", 10, "The rate of constant job dispatches per second") - burstRate = flag.Int("burst", 1, "The burst rate of constant job dispatches") - randomize = flag.Bool("random", false, "Should the rate at which the jobs are dispatched be randomized?") - seed1 = flag.Uint64("seed1", rand.Uint64(), "First uint64 of the PCG seed used by the random number generator") - seed2 = flag.Uint64("seed2", rand.Uint64(), "Second uint64 of the PCG seed used by the random number generator") - workers = flag.Int("workers", 10*runtime.NumCPU(), "The number of workers to use") + jobType = flag.String("type", internal.JobTypeBatch, "The type of job to create (batch or service)") + jobDriver = flag.String("driver", internal.JobDriverMock, "The driver to use for the job (mock or docker)") + updatesDelay = flag.Duration("updates-delay", 3*time.Second, "The delay between updates of the service job") + updatesDelayTarget = flag.Duration("updates-delay-target", 0, "The target delay between updates. For finite num-of-updates values, this will be delay reached before the last update. Ignored if unset") + numOfUpdates = flag.Int("num-of-updates", 0, "The number of updates to perform on the service job. If 0, the job will be continuosly updated") + numOfDispatches = flag.Int("num-of-dispatches", 0, "The number of dispatches to perform on the batch job. If 0, the job will be continuosly dispatched") + count = flag.Int("count", 1, "The count number per job (number of allocations is count * groups)") + groups = flag.Int("groups", 1, "The number of groups to create per job") + spread = flag.Bool("spread", false, "Should the jobs be spread across the datacenters?") + reqRate = flag.Float64("rate", 10, "The rate of constant job dispatches per second") + burstRate = flag.Int("burst", 1, "The burst rate of constant job dispatches") + randomize = flag.Bool("random", false, "Should the rate at which the jobs are dispatched be randomized?") + seed1 = flag.Uint64("seed1", rand.Uint64(), "First uint64 of the PCG seed used by the random number generator") + seed2 = flag.Uint64("seed2", rand.Uint64(), "Second uint64 of the PCG seed used by the random number generator") + workers = flag.Int("workers", 10*runtime.NumCPU(), "The number of workers to use") logLevel = flag.String("log-level", "DEBUG", "The log level to use") ver = flag.Bool("version", false, "Prints out the version") @@ -118,7 +119,7 @@ func main() { case internal.JobTypeBatch: go job.DispatchBatch(&wg, *numOfDispatches, lim, rng) case internal.JobTypeService: - go job.RunService(&wg, i, *numOfUpdates, *updatesDelay) + go job.RunService(&wg, i, *numOfUpdates, *updatesDelay, *updatesDelayTarget) } } diff --git a/tools/nomad-load/internal/job.go b/tools/nomad-load/internal/job.go index ce57690..278f045 100644 --- a/tools/nomad-load/internal/job.go +++ b/tools/nomad-load/internal/job.go @@ -119,7 +119,7 @@ func (j *TestJob) DispatchBatch(wg *sync.WaitGroup, numOfDispatches int, lim *ra j.logger.Info("sccessfully dispatched jobs", "num_of_dispatches", numOfDispatches) } -func (j *TestJob) RunService(wg *sync.WaitGroup, worker int, numOfUpdates int, updatesDelay time.Duration) { +func (j *TestJob) RunService(wg *sync.WaitGroup, worker int, numOfUpdates int, updatesDelay, updatesDelayTarget time.Duration) { defer wg.Done() parsed, err := j.render(worker) @@ -136,8 +136,8 @@ func (j *TestJob) RunService(wg *sync.WaitGroup, worker int, numOfUpdates int, u metrics.IncrCounter([]string{"registrations"}, 1) j.logger.Info("successfully registered job", "job_id", *parsed.ID) - update := func(i int) { - time.Sleep(updatesDelay) + update := func(i int, delay time.Duration) { + time.Sleep(delay) // re-parse the jobspec so that the echo string gets updated parsed, err := j.render(worker) @@ -154,14 +154,23 @@ func (j *TestJob) RunService(wg *sync.WaitGroup, worker int, numOfUpdates int, u } if numOfUpdates > 0 { + // check if we're supposed to gradually increase the frequency of updates + var decrementStep time.Duration + if updatesDelayTarget != 0 { + decrementStep = (updatesDelay - updatesDelayTarget) / time.Duration(numOfUpdates-1) + } + for i := 0; i < numOfUpdates; i++ { - update(i) + update(i, updatesDelay) + if decrementStep != 0 { + updatesDelay -= decrementStep + } } } else { // 0 is "infinity" i := 0 for { - update(i) + update(i, updatesDelay) i++ } }