From 8c669a0f437432c7b3ee7cbca8e1236f729b262e Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Fri, 27 Sep 2024 20:17:03 +0200 Subject: [PATCH] feat: Add option to run indefinitely; fix races (#90) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat add option to run indefinitely Signed-off-by: Juraj Michalek * Update metrics/write.go Co-authored-by: Bartlomiej Plotka Signed-off-by: Juraj Michálek * ran go fmt Signed-off-by: Juraj Michalek * fix rename the variable everywhere Signed-off-by: Juraj Michalek * fix: Fixed race during update, improved infinite run logging and readability. Signed-off-by: bwplotka --------- Signed-off-by: Juraj Michalek Signed-off-by: Juraj Michálek Signed-off-by: bwplotka Co-authored-by: Juraj Michalek --- cmd/avalanche.go | 2 +- metrics/serve.go | 6 ++++++ metrics/write.go | 46 ++++++++++++++++++++++++++++++++++------------ 3 files changed, 41 insertions(+), 13 deletions(-) diff --git a/cmd/avalanche.go b/cmd/avalanche.go index 489a1e4..8ecdbb9 100644 --- a/cmd/avalanche.go +++ b/cmd/avalanche.go @@ -69,7 +69,7 @@ func main() { remotePprofURLs := kingpin.Flag("remote-pprof-urls", "a list of urls to download pprofs during the remote write: --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/heap --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/profile").URLList() remotePprofInterval := kingpin.Flag("remote-pprof-interval", "how often to download pprof profiles. When not provided it will download a profile once before the end of the test.").Duration() remoteBatchSize := kingpin.Flag("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").Int() - remoteRequestCount := kingpin.Flag("remote-requests-count", "how many requests to send in total to the remote_write API.").Default("100").Int() + remoteRequestCount := kingpin.Flag("remote-requests-count", "How many requests to send in total to the remote_write API. Set to -1 to run indefinitely.").Default("100").Int() remoteReqsInterval := kingpin.Flag("remote-write-interval", "delay between each remote write request.").Default("100ms").Duration() remoteTenant := kingpin.Flag("remote-tenant", "Tenant ID to include in remote_write send").Default("0").String() tlsClientInsecure := kingpin.Flag("tls-client-insecure", "Skip certificate check on tls connection").Default("false").Bool() diff --git a/metrics/serve.go b/metrics/serve.go index 7bff8d8..835f9e0 100644 --- a/metrics/serve.go +++ b/metrics/serve.go @@ -517,6 +517,12 @@ func (c *Collector) Run() error { go c.handleSeriesTicks(&mutableState.seriesCycle, unsafeReadOnlyGetState) go c.handleMetricTicks(&mutableState.metricCycle, unsafeReadOnlyGetState) + // Mark best-effort update, so remote write knows (if enabled). + select { + case c.updateNotifyCh <- struct{}{}: + default: + } + <-c.stopCh return nil } diff --git a/metrics/write.go b/metrics/write.go index 8c66faa..9cecd02 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -110,6 +110,13 @@ func cloneRequest(r *http.Request) *http.Request { } func (c *Client) write(ctx context.Context) error { + select { + // Wait for update first as write and collector.Run runs simultaneously. + case <-c.config.UpdateNotify: + case <-ctx.Done(): + return ctx.Err() + } + tss, err := collectMetrics(c.gatherer, c.config.OutOfOrder) if err != nil { return err @@ -125,23 +132,39 @@ func (c *Client) write(ctx context.Context) error { merr = &errors.MultiError{} ) - log.Printf("Sending: %v timeseries, %v samples, %v timeseries per request, %v delay between requests\n", len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval) + shouldRunForever := c.config.RequestCount == -1 + if shouldRunForever { + log.Printf("Sending: %v timeseries infinitely, %v timeseries per request, %v delay between requests\n", + len(tss), c.config.BatchSize, c.config.RequestInterval) + } else { + log.Printf("Sending: %v timeseries, %v times, %v timeseries per request, %v delay between requests\n", + len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval) + } + ticker := time.NewTicker(c.config.RequestInterval) defer ticker.Stop() - for ii := 0; ii < c.config.RequestCount; ii++ { + + for i := 0; ; { if ctx.Err() != nil { return ctx.Err() } - // Download the pprofs during half of the iteration to get avarege readings. - // Do that only when it is not set to take profiles at a given interval. - if len(c.config.PprofURLs) > 0 && ii == c.config.RequestCount/2 { - wgPprof.Add(1) - go func() { - download.URLs(c.config.PprofURLs, time.Now().Format("2-Jan-2006-15:04:05")) - wgPprof.Done() - }() + if !shouldRunForever { + if i < c.config.RequestCount { + break + } + i++ + // Download the pprofs during half of the iteration to get avarege readings. + // Do that only when it is not set to take profiles at a given interval. + if len(c.config.PprofURLs) > 0 && i == c.config.RequestCount/2 { + wgPprof.Add(1) + go func() { + download.URLs(c.config.PprofURLs, time.Now().Format("2-Jan-2006-15:04:05")) + wgPprof.Done() + }() + } } + <-ticker.C select { case <-c.config.UpdateNotify: @@ -166,8 +189,7 @@ func (c *Client) write(ctx context.Context) error { req := &prompb.WriteRequest{ Timeseries: tss[i:end], } - err := c.Store(context.TODO(), req) - if err != nil { + if err := c.Store(context.TODO(), req); err != nil { merr.Add(err) return }