Skip to content

Commit

Permalink
feat: Add option to run indefinitely; fix races (#90)
Browse files Browse the repository at this point in the history
* feat add option to run indefinitely

Signed-off-by: Juraj Michalek <[email protected]>

* Update metrics/write.go

Co-authored-by: Bartlomiej Plotka <[email protected]>
Signed-off-by: Juraj Michálek <[email protected]>

* ran go fmt

Signed-off-by: Juraj Michalek <[email protected]>

* fix rename the variable everywhere

Signed-off-by: Juraj Michalek <[email protected]>

* fix: Fixed race during update, improved infinite run logging and readability.

Signed-off-by: bwplotka <[email protected]>

---------

Signed-off-by: Juraj Michalek <[email protected]>
Signed-off-by: Juraj Michálek <[email protected]>
Signed-off-by: bwplotka <[email protected]>
Co-authored-by: Juraj Michalek <[email protected]>
  • Loading branch information
bwplotka and jmichalek132 authored Sep 27, 2024
1 parent 5b08980 commit 8c669a0
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 13 deletions.
2 changes: 1 addition & 1 deletion cmd/avalanche.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func main() {
remotePprofURLs := kingpin.Flag("remote-pprof-urls", "a list of urls to download pprofs during the remote write: --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/heap --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/profile").URLList()
remotePprofInterval := kingpin.Flag("remote-pprof-interval", "how often to download pprof profiles. When not provided it will download a profile once before the end of the test.").Duration()
remoteBatchSize := kingpin.Flag("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").Int()
remoteRequestCount := kingpin.Flag("remote-requests-count", "how many requests to send in total to the remote_write API.").Default("100").Int()
remoteRequestCount := kingpin.Flag("remote-requests-count", "How many requests to send in total to the remote_write API. Set to -1 to run indefinitely.").Default("100").Int()
remoteReqsInterval := kingpin.Flag("remote-write-interval", "delay between each remote write request.").Default("100ms").Duration()
remoteTenant := kingpin.Flag("remote-tenant", "Tenant ID to include in remote_write send").Default("0").String()
tlsClientInsecure := kingpin.Flag("tls-client-insecure", "Skip certificate check on tls connection").Default("false").Bool()
Expand Down
6 changes: 6 additions & 0 deletions metrics/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,12 @@ func (c *Collector) Run() error {
go c.handleSeriesTicks(&mutableState.seriesCycle, unsafeReadOnlyGetState)
go c.handleMetricTicks(&mutableState.metricCycle, unsafeReadOnlyGetState)

// Mark best-effort update, so remote write knows (if enabled).
select {
case c.updateNotifyCh <- struct{}{}:
default:
}

<-c.stopCh
return nil
}
Expand Down
46 changes: 34 additions & 12 deletions metrics/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ func cloneRequest(r *http.Request) *http.Request {
}

func (c *Client) write(ctx context.Context) error {
select {
// Wait for update first as write and collector.Run runs simultaneously.
case <-c.config.UpdateNotify:
case <-ctx.Done():
return ctx.Err()
}

tss, err := collectMetrics(c.gatherer, c.config.OutOfOrder)
if err != nil {
return err
Expand All @@ -125,23 +132,39 @@ func (c *Client) write(ctx context.Context) error {
merr = &errors.MultiError{}
)

log.Printf("Sending: %v timeseries, %v samples, %v timeseries per request, %v delay between requests\n", len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval)
shouldRunForever := c.config.RequestCount == -1
if shouldRunForever {
log.Printf("Sending: %v timeseries infinitely, %v timeseries per request, %v delay between requests\n",
len(tss), c.config.BatchSize, c.config.RequestInterval)
} else {
log.Printf("Sending: %v timeseries, %v times, %v timeseries per request, %v delay between requests\n",
len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval)
}

ticker := time.NewTicker(c.config.RequestInterval)
defer ticker.Stop()
for ii := 0; ii < c.config.RequestCount; ii++ {

for i := 0; ; {
if ctx.Err() != nil {
return ctx.Err()
}

// Download the pprofs during half of the iteration to get avarege readings.
// Do that only when it is not set to take profiles at a given interval.
if len(c.config.PprofURLs) > 0 && ii == c.config.RequestCount/2 {
wgPprof.Add(1)
go func() {
download.URLs(c.config.PprofURLs, time.Now().Format("2-Jan-2006-15:04:05"))
wgPprof.Done()
}()
if !shouldRunForever {
if i < c.config.RequestCount {
break
}
i++
// Download the pprofs during half of the iteration to get avarege readings.
// Do that only when it is not set to take profiles at a given interval.
if len(c.config.PprofURLs) > 0 && i == c.config.RequestCount/2 {
wgPprof.Add(1)
go func() {
download.URLs(c.config.PprofURLs, time.Now().Format("2-Jan-2006-15:04:05"))
wgPprof.Done()
}()
}
}

<-ticker.C
select {
case <-c.config.UpdateNotify:
Expand All @@ -166,8 +189,7 @@ func (c *Client) write(ctx context.Context) error {
req := &prompb.WriteRequest{
Timeseries: tss[i:end],
}
err := c.Store(context.TODO(), req)
if err != nil {
if err := c.Store(context.TODO(), req); err != nil {
merr.Add(err)
return
}
Expand Down

0 comments on commit 8c669a0

Please sign in to comment.