Skip to content

Commit

Permalink
Tune sampling algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
alpinskiy committed Oct 12, 2023
1 parent 411c785 commit 8202824
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 19 deletions.
14 changes: 6 additions & 8 deletions internal/aggregator/aggregator_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,14 +390,12 @@ func (a *Aggregator) RowDataMarshalAppendPositions(b *aggregatorBucket, rnd *ran
key1 = format.TagValueIDConveyorHistoric
}
resPos := len(res)
for _, s := range samplerStat.Steps {
for _, item := range s.Groups {
k := item.MetricID
sf := item.SF
key := data_model.AggKey(b.time, format.BuiltinMetricIDAggSamplingFactor, [16]int32{0, 0, 0, 0, k, format.TagValueIDAggSamplingFactorReasonInsertSize}, a.aggregatorHost, a.shardKey, a.replicaKey)
res = appendBadge(res, key, data_model.ItemValue{Counter: 1, ValueSum: sf}, metricCache, usedTimestamps)
res = appendSimpleValueStat(res, key, sf, 1, a.aggregatorHost, metricCache, usedTimestamps)
}
for _, s := range samplerStat.GetSampleFactors(nil) {
k := s.Metric
sf := float64(s.Value)
key := data_model.AggKey(b.time, format.BuiltinMetricIDAggSamplingFactor, [16]int32{0, 0, 0, 0, k, format.TagValueIDAggSamplingFactorReasonInsertSize}, a.aggregatorHost, a.shardKey, a.replicaKey)
res = appendBadge(res, key, data_model.ItemValue{Counter: 1, ValueSum: sf}, metricCache, usedTimestamps)
res = appendSimpleValueStat(res, key, sf, 1, a.aggregatorHost, metricCache, usedTimestamps)
}
res = appendSimpleValueStat(res, data_model.AggKey(b.time, format.BuiltinMetricIDAggInsertSize, [16]int32{0, 0, 0, 0, key1, format.TagValueIDSizeCounter},
a.aggregatorHost, a.shardKey, a.replicaKey), float64(sizeCounters), 1, a.aggregatorHost, metricCache, usedTimestamps)
Expand Down
26 changes: 23 additions & 3 deletions internal/data_model/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewSampler(capacity int, config SamplerConfig) Sampler {
config.RoundF = RoundSampleFactor
}
if config.SelectF == nil {
config.SelectF = selectRandom
config.SelectF = SelectRandom
}
h := Sampler{
items: make([]SamplingMultiItemPair, 0, capacity),
Expand Down Expand Up @@ -250,7 +250,7 @@ func (h *Sampler) sampleGroup(g *SamplerGroup, budgetNum, budgetDenom, sumWeight
h.config.KeepF(items[i].Key, items[i].Item)
}
}
sf *= float64(len(items)) / float64(pos) // space has been taken by whales
sf *= 2 // space has been taken by whales
items = items[pos:]
}
// Sample tail
Expand Down Expand Up @@ -409,6 +409,12 @@ func (h *Sampler) getGroupMeta(groupID int32) *format.MetricsGroup {
}

func (s SamplerStatistics) GetSampleFactors(sf []tlstatshouse.SampleFactor) []tlstatshouse.SampleFactor {
if s.Count == 0 {
return sf
}
if sf == nil {
sf = make([]tlstatshouse.SampleFactor, 0, s.Count)
}
for _, step := range s.Steps {
sf = step.GetSampleFactors(sf)
}
Expand All @@ -427,7 +433,21 @@ func (s SamplerStep) GetSampleFactors(sf []tlstatshouse.SampleFactor) []tlstatsh
return sf
}

func selectRandom(s []SamplingMultiItemPair, sf float64, r *rand.Rand) int {
func SelectRandom(s []SamplingMultiItemPair, sf float64, r *rand.Rand) int {
if sf <= 1 {
return len(s)
}
n := 0
for i := range s {
if r.Float64()*sf < 1 {
s[i], s[n] = s[n], s[i]
n++
}
}
return n
}

func SelectRandom2(s []SamplingMultiItemPair, sf float64, r *rand.Rand) int { // experimental
if sf <= 1 {
return len(s)
}
Expand Down
60 changes: 52 additions & 8 deletions internal/data_model/sampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ func TestSampling(t *testing.T) {
maxMetricSize: rapid.IntRange(512, 1024).Draw(t, "max metric size"),
})
var keepN, discardN int
var sumSize int64
var keepSumSize int64
m := make(map[int32]*metricInfo)
s := NewSampler(len(b.series), SamplerConfig{
KeepF: func(k Key, item *MultiItem) {
keepN++
sumSize += int64(samplingTestSizeOf(k, item))
keepSumSize += int64(samplingTestSizeOf(k, item))
stat := m[k.Metric]
require.LessOrEqual(t, 1., item.SF)
require.LessOrEqual(t, stat.maxSF, float32(item.SF))
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestSampling(t *testing.T) {
budget := rapid.Int64Range(0, b.sumSize*2).Draw(t, "budget")
metricCount := len(b.series)
samplerStat := b.run(&s, budget, 1)
require.LessOrEqual(t, sumSize, budget)
require.LessOrEqual(t, keepSumSize, budget)
require.Equal(t, samplerStat.Count, len(samplerStat.GetSampleFactors(nil)))
require.Equal(t, metricCount, keepN+discardN, "some series were neither keeped nor discarded")
if b.sumSize <= budget {
Expand Down Expand Up @@ -164,12 +164,48 @@ func TestNormalDistributionPreserved(t *testing.T) {
})
}

func TestCompareSampleFactors(t *testing.T) {
rapid.Check(t, func(t *rapid.T) {
b := newSamplingTestBucket()
b.generateSeriesSize(t, samplingTestSpec{
maxMetricCount: rapid.IntRange(1, 1024).Draw(t, "max metric count"),
minMetricSize: rapid.IntRange(28, 256).Draw(t, "min metric size"),
maxMetricSize: rapid.IntRange(512, 1024).Draw(t, "max metric size"),
})
bucket := MetricsBucket{MultiItems: b.series}
config := samplerConfigEx{
SamplerConfig: SamplerConfig{Rand: rand.New()},
stringTopCountSend: 20,
numShards: 1,
sampleBudget: 10,
}
sampleFactors := sampleBucket(&bucket, config)
sampleFactorsLegacy := sampleBucketLegacy(&bucket, config)
require.Equal(t, len(sampleFactors), len(sampleFactorsLegacy))
for metricID, sf := range sampleFactors {
sfLegacy, ok := sampleFactorsLegacy[metricID]
require.True(t, ok)
require.Equal(t, sfLegacy, sf)
}
})
}

func TestSelectRandom(t *testing.T) {
if os.Getenv("STATSHOUSE_TEST_SELECT_RANDOM") == "1" {
testSelectRandom(t, SelectRandom)
}
}

func TestSelectRandom2(t *testing.T) {
testSelectRandom(t, SelectRandom2)
}

func testSelectRandom(t *testing.T, fn func([]SamplingMultiItemPair, float64, *rand.Rand) int) {
rapid.Check(t, func(t *rapid.T) {
var (
n = rapid.IntRange(1, 1024).Draw(t, "number of items")
f = rapid.Float64().Draw(t, "sample factor")
k = selectRandom(make([]SamplingMultiItemPair, n), f, rand.New())
k = fn(make([]SamplingMultiItemPair, n), f, rand.New())
)
if 1 < f {
require.LessOrEqual(t, 0, k)
Expand Down Expand Up @@ -340,7 +376,7 @@ func BenchmarkSampleBucket(b *testing.B) {
}
}

func benchmarkSampleBucket(b *testing.B, f func(*MetricsBucket, samplerConfigEx)) {
func benchmarkSampleBucket(b *testing.B, f func(*MetricsBucket, samplerConfigEx) map[int32]float32) {
var (
metricCount = 2000
seriesCount = 2000
Expand Down Expand Up @@ -369,7 +405,7 @@ func benchmarkSampleBucket(b *testing.B, f func(*MetricsBucket, samplerConfigEx)
}
}

func sampleBucket(bucket *MetricsBucket, config samplerConfigEx) {
func sampleBucket(bucket *MetricsBucket, config samplerConfigEx) map[int32]float32 {
sampler := NewSampler(len(bucket.MultiItems), config.SamplerConfig)
for k, item := range bucket.MultiItems {
whaleWeight := item.FinishStringTop(config.stringTopCountSend) // all excess items are baked into Tail
Expand Down Expand Up @@ -397,10 +433,15 @@ func sampleBucket(bucket *MetricsBucket, config samplerConfigEx) {
}
numShards := config.numShards
remainingBudget := int64((config.sampleBudget + numShards - 1) / numShards)
sampler.Run(remainingBudget, 1)
samplerStat := sampler.Run(remainingBudget, 1)
sampleFactors := map[int32]float32{}
for _, v := range samplerStat.GetSampleFactors(nil) {
sampleFactors[v.Metric] = v.Value
}
return sampleFactors
}

func sampleBucketLegacy(bucket *MetricsBucket, config samplerConfigEx) {
func sampleBucketLegacy(bucket *MetricsBucket, config samplerConfigEx) map[int32]float32 {
// Same algorithm as in aggregator, but instead of inserting selected, we remove items which were not selected by sampling algorithm
metricsMap := map[int32]*samplingMetric{}
var metricsList []*samplingMetric
Expand Down Expand Up @@ -461,6 +502,7 @@ func sampleBucketLegacy(bucket *MetricsBucket, config samplerConfigEx) {
if remainingBudget > MaxUncompressedBucketSize/2 { // Algorithm is not exact
remainingBudget = MaxUncompressedBucketSize / 2
}
sampleFactors := map[int32]float32{}
pos := 0
for ; pos < len(metricsList) && metricsList[pos].sumSize*remainingWeight <= remainingBudget*metricsList[pos].metricWeight; pos++ { // statIdCount <= totalBudget/remainedStats
samplingMetric := metricsList[pos]
Expand All @@ -481,6 +523,7 @@ func sampleBucketLegacy(bucket *MetricsBucket, config samplerConfigEx) {
continue
}
}
sampleFactors[samplingMetric.metricID] = float32(sf)
whalesAllowed := int64(0)
if samplingMetric.sumSize*remainingWeight > 0 { // should be never but check is cheap
whalesAllowed = int64(len(samplingMetric.items)) * (samplingMetric.metricWeight * remainingBudget) / (samplingMetric.sumSize * remainingWeight) / 2 // len(items) / sf / 2
Expand Down Expand Up @@ -508,4 +551,5 @@ func sampleBucketLegacy(bucket *MetricsBucket, config samplerConfigEx) {
// delete(bucket.MultiItems, v.Key)
}
}
return sampleFactors
}

0 comments on commit 8202824

Please sign in to comment.