diff --git a/pkg/util/metric/hdrhistogram.go b/pkg/util/metric/hdrhistogram.go index 9ac10b88ded1..fbb3e6895fe0 100644 --- a/pkg/util/metric/hdrhistogram.go +++ b/pkg/util/metric/hdrhistogram.go @@ -18,14 +18,9 @@ import ( prometheusgo "github.com/prometheus/client_model/go" ) -const ( - // HdrHistogramMaxLatency is the maximum value tracked in latency histograms. Higher - // values will be recorded as this value instead. - HdrHistogramMaxLatency = 10 * time.Second - - // The number of histograms to keep in rolling window. - hdrHistogramHistWrapNum = 2 // TestSampleInterval is passed to histograms during tests which don't -) +// HdrHistogramMaxLatency is the maximum value tracked in latency histograms. Higher +// values will be recorded as this value instead. +const HdrHistogramMaxLatency = 10 * time.Second // A HdrHistogram collects observed values by keeping bucketed counts. For // convenience, internally two sets of buckets are kept: A cumulative set (i.e. @@ -64,12 +59,12 @@ func NewHdrHistogram( Metadata: metadata, maxVal: maxVal, } - wHist := hdrhistogram.NewWindowed(hdrHistogramHistWrapNum, 0, maxVal, sigFigs) + wHist := hdrhistogram.NewWindowed(WindowedHistogramWrapNum, 0, maxVal, sigFigs) h.mu.cumulative = hdrhistogram.New(0, maxVal, sigFigs) h.mu.sliding = wHist h.mu.tickHelper = &tickHelper{ nextT: now(), - tickInterval: duration / hdrHistogramHistWrapNum, + tickInterval: duration / WindowedHistogramWrapNum, onTick: func() { wHist.Rotate() }, @@ -171,15 +166,19 @@ func (h *HdrHistogram) ToPrometheusMetric() *prometheusgo.Metric { // TotalWindowed implements the WindowedHistogram interface. func (h *HdrHistogram) TotalWindowed() (int64, float64) { - pHist := h.ToPrometheusMetricWindowed().Histogram - return int64(pHist.GetSampleCount()), pHist.GetSampleSum() + h.mu.Lock() + defer h.mu.Unlock() + hist := h.mu.sliding.Merge() + totalSum := float64(hist.TotalCount()) * hist.Mean() + return hist.TotalCount(), totalSum } func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric { hist := &prometheusgo.Histogram{} maybeTick(h.mu.tickHelper) - bars := h.mu.sliding.Current.Distribution() + mergedHist := h.mu.sliding.Merge() + bars := mergedHist.Distribution() hist.Bucket = make([]*prometheusgo.Bucket, 0, len(bars)) var cumCount uint64 @@ -202,7 +201,6 @@ func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric { } hist.SampleCount = &cumCount hist.SampleSum = &sum // can do better here; we approximate in the loop - return &prometheusgo.Metric{ Histogram: hist, } @@ -236,13 +234,12 @@ func (h *HdrHistogram) ValueAtQuantileWindowed(q float64) float64 { func (h *HdrHistogram) Mean() float64 { h.mu.Lock() defer h.mu.Unlock() - return h.mu.cumulative.Mean() } func (h *HdrHistogram) MeanWindowed() float64 { h.mu.Lock() defer h.mu.Unlock() - - return h.mu.sliding.Current.Mean() + hist := h.mu.sliding.Merge() + return hist.Mean() } diff --git a/pkg/util/metric/metric.go b/pkg/util/metric/metric.go index 682480cf7de6..ecdcf5ca6de9 100644 --- a/pkg/util/metric/metric.go +++ b/pkg/util/metric/metric.go @@ -27,9 +27,14 @@ import ( "github.com/rcrowley/go-metrics" ) -// TestSampleInterval is passed to histograms during tests which don't -// want to concern themselves with supplying a "correct" interval. -const TestSampleInterval = time.Duration(math.MaxInt64) +const ( + // TestSampleInterval is passed to histograms during tests which don't + // want to concern themselves with supplying a "correct" interval. + TestSampleInterval = time.Duration(math.MaxInt64) + // WindowedHistogramWrapNum is the number of histograms to keep in rolling + // window. + WindowedHistogramWrapNum = 2 +) // Iterable provides a method for synchronized access to interior objects. type Iterable interface { @@ -97,10 +102,12 @@ type WindowedHistogram interface { Total() (int64, float64) // MeanWindowed returns the average of the samples in the current window. MeanWindowed() float64 - // Mean returns the average of the sample in teh cumulative histogram. + // Mean returns the average of the sample in the cumulative histogram. Mean() float64 // ValueAtQuantileWindowed takes a quantile value [0,100] and returns the // interpolated value at that quantile for the windowed histogram. + // Methods implementing this interface should the merge buckets, sums, + // and counts of previous and current windows. ValueAtQuantileWindowed(q float64) float64 } @@ -228,7 +235,10 @@ const ( type HistogramOptions struct { // Metadata is the metric Metadata associated with the histogram. Metadata Metadata - // Duration is the histogram's window duration. + // Duration is the total duration of all windows in the histogram. + // The individual window duration is equal to the + // Duration/WindowedHistogramWrapNum (i.e., the number of windows + // in the histogram). Duration time.Duration // MaxVal is only relevant to the HdrHistogram, and represents the // highest trackable value in the resulting histogram buckets. @@ -261,7 +271,7 @@ func NewHistogram(opt HistogramOptions) IHistogram { // NewHistogram is a prometheus-backed histogram. Depending on the value of // opts.Buckets, this is suitable for recording any kind of quantity. Common // sensible choices are {IO,Network}LatencyBuckets. -func newHistogram(meta Metadata, windowDuration time.Duration, buckets []float64) *Histogram { +func newHistogram(meta Metadata, duration time.Duration, buckets []float64) *Histogram { // TODO(obs-inf): prometheus supports labeled histograms but they require more // plumbing and don't fit into the PrometheusObservable interface any more. opts := prometheus.HistogramOpts{ @@ -273,8 +283,11 @@ func newHistogram(meta Metadata, windowDuration time.Duration, buckets []float64 cum: cum, } h.windowed.tickHelper = &tickHelper{ - nextT: now(), - tickInterval: windowDuration, + nextT: now(), + // We want to divide the total window duration by the number of windows + // because we need to rotate the windows at uniformly distributed + // intervals within a histogram's total duration. + tickInterval: duration / WindowedHistogramWrapNum, onTick: func() { h.windowed.prev = h.windowed.cur h.windowed.cur = prometheus.NewHistogram(opts) @@ -298,16 +311,13 @@ type Histogram struct { Metadata cum prometheus.Histogram - // TODO(obs-inf): the way we implement windowed histograms is not great. If - // the windowed histogram is pulled right after a tick, it will be mostly - // empty. We could add a third bucket and represent the merged view of the two - // most recent buckets to avoid that. Or we could "just" double the rotation - // interval (so that the histogram really collects for 20s when we expect to - // persist the contents every 10s). Really it would make more sense to - // explicitly rotate the histogram atomically with collecting its contents, - // but that is now how we have set it up right now. It should be doable - // though, since there is only one consumer of windowed histograms - our - // internal timeseries system. + // TODO(obs-inf): the way we implement windowed histograms is not great. + // We could "just" double the rotation interval (so that the histogram really + // collects for 20s when we expect to persist the contents every 10s). + // Really it would make more sense to explicitly rotate the histogram + // atomically with collecting its contents, but that is now how we have set + // it up right now. It should be doable though, since there is only one + // consumer of windowed histograms - our internal timeseries system. windowed struct { // prometheus.Histogram is thread safe, so we only // need an RLock to record into it. But write lock @@ -375,15 +385,23 @@ func (h *Histogram) ToPrometheusMetric() *prometheusgo.Metric { return m } -// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the right type. +// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the +// right type. func (h *Histogram) ToPrometheusMetricWindowed() *prometheusgo.Metric { h.windowed.Lock() defer h.windowed.Unlock() - m := &prometheusgo.Metric{} - if err := h.windowed.cur.Write(m); err != nil { + cur := &prometheusgo.Metric{} + prev := &prometheusgo.Metric{} + if err := h.windowed.cur.Write(cur); err != nil { panic(err) } - return m + if h.windowed.prev != nil { + if err := h.windowed.prev.Write(prev); err != nil { + panic(err) + } + MergeWindowedHistogram(cur.Histogram, prev.Histogram) + } + return cur } // GetMetadata returns the metric's metadata including the Prometheus @@ -435,7 +453,8 @@ func (h *Histogram) MeanWindowed() float64 { // 2. Since the prometheus client library ensures buckets are in a strictly // increasing order at creation, we do not sort them. func (h *Histogram) ValueAtQuantileWindowed(q float64) float64 { - return ValueAtQuantileWindowed(h.ToPrometheusMetricWindowed().Histogram, q) + return ValueAtQuantileWindowed(h.ToPrometheusMetricWindowed().Histogram, + q) } // A Counter holds a single mutable atomic value. @@ -633,6 +652,21 @@ func (g *GaugeFloat64) GetMetadata() Metadata { return baseMetadata } +// MergeWindowedHistogram adds the bucket counts, sample count, and sample sum +// from the previous windowed histogram to those of the current windowed +// histogram. +// NB: Buckets on each histogram must be the same +func MergeWindowedHistogram(cur *prometheusgo.Histogram, prev *prometheusgo.Histogram) { + for i, bucket := range cur.Bucket { + count := *bucket.CumulativeCount + *prev.Bucket[i].CumulativeCount + *bucket.CumulativeCount = count + } + sampleCount := *cur.SampleCount + *prev.SampleCount + *cur.SampleCount = sampleCount + sampleSum := *cur.SampleSum + *prev.SampleSum + *cur.SampleSum = sampleSum +} + // ValueAtQuantileWindowed takes a quantile value [0,100] and returns the // interpolated value at that quantile for the given histogram. func ValueAtQuantileWindowed(histogram *prometheusgo.Histogram, q float64) float64 { diff --git a/pkg/util/metric/metric_test.go b/pkg/util/metric/metric_test.go index 83576f4c8e96..5527969406f0 100644 --- a/pkg/util/metric/metric_test.go +++ b/pkg/util/metric/metric_test.go @@ -15,6 +15,7 @@ import ( "encoding/json" "math" "reflect" + "sort" "sync" "testing" "time" @@ -184,11 +185,10 @@ func TestNewHistogramRotate(t *testing.T) { // But cumulative histogram has history (if i > 0). count, _ := h.Total() require.EqualValues(t, i, count) - // Add a measurement and verify it's there. { h.RecordValue(12345) - f := float64(12345) + f := float64(12345) + sum _, wSum := h.TotalWindowed() require.Equal(t, wSum, f) } @@ -197,3 +197,150 @@ func TestNewHistogramRotate(t *testing.T) { // Go to beginning. } } + +func TestHistogramWindowed(t *testing.T) { + defer TestingSetNow(nil)() + setNow(0) + + duration := 10 * time.Second + + h := NewHistogram(HistogramOptions{ + Mode: HistogramModePrometheus, + Metadata: Metadata{}, + Duration: duration, + Buckets: IOLatencyBuckets, + }) + + measurements := []int64{200000000, 0, 4000000, 5000000, 10000000, 20000000, + 25000000, 30000000, 40000000, 90000000} + + // Sort the measurements so we can calculate the expected quantile values + // for the first windowed histogram after the measurements have been recorded. + sortedMeasurements := make([]int64, len(measurements)) + copy(sortedMeasurements, measurements) + sort.Slice(sortedMeasurements, func(i, j int) bool { + return sortedMeasurements[i] < sortedMeasurements[j] + }) + + // Calculate the expected quantile values as the lowest bucket values that are + // greater than each measurement. + count := 0 + j := 0 + var expQuantileValues []float64 + for i := range IOLatencyBuckets { + if j < len(sortedMeasurements) && IOLatencyBuckets[i] > float64( + sortedMeasurements[j]) { + count += 1 + j += 1 + expQuantileValues = append(expQuantileValues, IOLatencyBuckets[i]) + } + } + + w := 2 + var expHist []prometheusgo.Histogram + var expSum float64 + var expCount uint64 + for i := 0; i < w; i++ { + h.Inspect(func(interface{}) {}) // trigger ticking + if i == 0 { + // If there is no previous window, we should be unable to calculate mean + // or quantile without any observations. + require.Equal(t, 0.0, h.ValueAtQuantileWindowed(99.99)) + if !math.IsNaN(h.MeanWindowed()) { + t.Fatalf("mean should be undefined with no observations") + } + // Record all measurements on first iteration. + for _, m := range measurements { + h.RecordValue(m) + expCount += 1 + expSum += float64(m) + } + // Because we have 10 observations, we expect quantiles to correspond + // to observation indices (e.g., the 8th expected quantile value is equal + // to the value interpolated at the 80th percentile). + require.Equal(t, 0.0, h.ValueAtQuantileWindowed(0)) + require.Equal(t, expQuantileValues[0], h.ValueAtQuantileWindowed(10)) + require.Equal(t, expQuantileValues[4], h.ValueAtQuantileWindowed(50)) + require.Equal(t, expQuantileValues[7], h.ValueAtQuantileWindowed(80)) + require.Equal(t, expQuantileValues[9], h.ValueAtQuantileWindowed(99.99)) + } else { + // The SampleSum and SampleCount values in the current window before any + // observations should be equal to those of the previous window, after all + // observations (the quantile values will also be the same). + expSum = *expHist[i-1].SampleSum + expCount = *expHist[i-1].SampleCount + + // After recording a few higher-value observations in the second window, + // the quantile values will shift in the direction of the observations. + for _, m := range sortedMeasurements[len(sortedMeasurements)-3 : len( + sortedMeasurements)-1] { + h.RecordValue(m) + expCount += 1 + expSum += float64(m) + } + require.Less(t, expQuantileValues[4], h.ValueAtQuantileWindowed(50)) + require.Less(t, expQuantileValues[7], h.ValueAtQuantileWindowed(80)) + require.Equal(t, expQuantileValues[9], h.ValueAtQuantileWindowed(99.99)) + } + + // In all cases, the windowed mean should be equal to the expected sum/count + require.Equal(t, expSum/float64(expCount), h.MeanWindowed()) + + expHist = append(expHist, prometheusgo.Histogram{ + SampleCount: &expCount, + SampleSum: &expSum, + }) + + // Increment Now time to trigger tick on the following iteration. + setNow(time.Duration(i+1) * (duration / 2)) + } +} + +func TestMergeWindowedHistogram(t *testing.T) { + measurements := []int64{4000000, 90000000} + opts := prometheus.HistogramOpts{ + Buckets: IOLatencyBuckets, + } + + prevWindow := prometheus.NewHistogram(opts) + curWindow := prometheus.NewHistogram(opts) + + cur := &prometheusgo.Metric{} + prev := &prometheusgo.Metric{} + + prevWindow.Observe(float64(measurements[0])) + require.NoError(t, prevWindow.Write(prev)) + require.NoError(t, curWindow.Write(cur)) + + MergeWindowedHistogram(cur.Histogram, prev.Histogram) + // Merging a non-empty previous histogram into an empty current histogram + // should result in the current histogram containing the same sample sum, + // sample count, and per-bucket cumulative count values as the previous + // histogram. + require.Equal(t, uint64(1), *cur.Histogram.SampleCount) + require.Equal(t, float64(measurements[0]), *cur.Histogram.SampleSum) + for _, bucket := range cur.Histogram.Bucket { + if *bucket.UpperBound > float64(measurements[0]) { + require.Equal(t, uint64(1), *bucket.CumulativeCount) + } + } + + curWindow.Observe(float64(measurements[1])) + require.NoError(t, curWindow.Write(cur)) + + MergeWindowedHistogram(cur.Histogram, prev.Histogram) + // Merging a non-empty previous histogram with a non-empty current histogram + // should result in the current histogram containing its original sample sum, + // sample count, and per-bucket cumulative count values, + // plus those of the previous histogram. + require.Equal(t, uint64(2), *cur.Histogram.SampleCount) + require.Equal(t, float64(measurements[0]+measurements[1]), + *cur.Histogram.SampleSum) + for _, bucket := range cur.Histogram.Bucket { + if *bucket.UpperBound > float64(measurements[1]) { + require.Equal(t, uint64(2), *bucket.CumulativeCount) + } else if *bucket.UpperBound > float64(measurements[0]) { + require.Equal(t, uint64(1), *bucket.CumulativeCount) + } + } +}