Skip to content

Commit

Permalink
make lazy posting series match ratio configurable (#8049)
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Jan 15, 2025
1 parent 4ba0ba4 commit caffc11
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 7 deletions.
1 change: 1 addition & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ func runStore(
}),
store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled),
store.WithPostingGroupMaxKeySeriesRatio(conf.postingGroupMaxKeySeriesRatio),
store.WithSeriesMatchRatio(0.5), // TODO: expose series match ratio as config.
store.WithIndexHeaderLazyDownloadStrategy(
indexheader.IndexHeaderLazyDownloadStrategy(conf.indexHeaderLazyDownloadStrategy).StrategyToDownloadFunc(),
),
Expand Down
21 changes: 19 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ type BucketStore struct {
enableChunkHashCalculation bool

enabledLazyExpandedPostings bool
seriesMatchRatio float64
postingGroupMaxKeySeriesRatio float64

sortingStrategy sortingStrategy
Expand Down Expand Up @@ -591,6 +592,15 @@ func WithPostingGroupMaxKeySeriesRatio(postingGroupMaxKeySeriesRatio float64) Bu
}
}

// WithSeriesMatchRatio configures how many series would match when intersecting posting groups.
// This is used for lazy posting optimization strategy. Ratio should be within (0, 1).
// The closer to 1, it means matchers have bad selectivity.
func WithSeriesMatchRatio(seriesMatchRatio float64) BucketStoreOption {
return func(s *BucketStore) {
s.seriesMatchRatio = seriesMatchRatio
}
}

// WithDontResort disables series resorting in Store Gateway.
func WithDontResort(true bool) BucketStoreOption {
return func(s *BucketStore) {
Expand Down Expand Up @@ -1065,6 +1075,7 @@ type blockSeriesClient struct {
bytesLimiter BytesLimiter

lazyExpandedPostingEnabled bool
seriesMatchRatio float64
// Mark posting group as lazy if it adds too many keys. 0 to disable.
postingGroupMaxKeySeriesRatio float64
lazyExpandedPostingsCount prometheus.Counter
Expand Down Expand Up @@ -1111,6 +1122,7 @@ func newBlockSeriesClient(
chunkFetchDurationSum *prometheus.HistogramVec,
extLsetToRemove map[string]struct{},
lazyExpandedPostingEnabled bool,
seriesMatchRatio float64,
postingGroupMaxKeySeriesRatio float64,
lazyExpandedPostingsCount prometheus.Counter,
lazyExpandedPostingByReason *prometheus.CounterVec,
Expand Down Expand Up @@ -1148,6 +1160,7 @@ func newBlockSeriesClient(
chunkFetchDurationSum: chunkFetchDurationSum,

lazyExpandedPostingEnabled: lazyExpandedPostingEnabled,
seriesMatchRatio: seriesMatchRatio,
postingGroupMaxKeySeriesRatio: postingGroupMaxKeySeriesRatio,
lazyExpandedPostingsCount: lazyExpandedPostingsCount,
lazyExpandedPostingGroupByReason: lazyExpandedPostingByReason,
Expand Down Expand Up @@ -1202,7 +1215,7 @@ func (b *blockSeriesClient) ExpandPostings(
matchers sortedMatchers,
seriesLimiter SeriesLimiter,
) error {
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.postingGroupMaxKeySeriesRatio, b.lazyExpandedPostingSizeBytes, b.lazyExpandedPostingGroupByReason, b.tenant)
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.seriesMatchRatio, b.postingGroupMaxKeySeriesRatio, b.lazyExpandedPostingSizeBytes, b.lazyExpandedPostingGroupByReason, b.tenant)
if err != nil {
return errors.Wrap(err, "expanded matching posting")
}
Expand Down Expand Up @@ -1635,6 +1648,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
s.metrics.chunkFetchDurationSum,
extLsetToRemove,
s.enabledLazyExpandedPostings,
s.seriesMatchRatio,
s.postingGroupMaxKeySeriesRatio,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
Expand Down Expand Up @@ -1951,6 +1965,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
nil,
extLsetToRemove,
s.enabledLazyExpandedPostings,
s.seriesMatchRatio,
s.postingGroupMaxKeySeriesRatio,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
Expand Down Expand Up @@ -2179,6 +2194,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
nil,
nil,
s.enabledLazyExpandedPostings,
s.seriesMatchRatio,
s.postingGroupMaxKeySeriesRatio,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
Expand Down Expand Up @@ -2647,6 +2663,7 @@ func (r *bucketIndexReader) ExpandedPostings(
ms sortedMatchers,
bytesLimiter BytesLimiter,
lazyExpandedPostingEnabled bool,
seriesMatchRatio float64,
postingGroupMaxKeySeriesRatio float64,
lazyExpandedPostingSizeBytes prometheus.Counter,
lazyExpandedPostingGroupsByReason *prometheus.CounterVec,
Expand Down Expand Up @@ -2703,7 +2720,7 @@ func (r *bucketIndexReader) ExpandedPostings(
postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil))
}

ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, tenant)
ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, seriesMatchRatio, postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, tenant)
if err != nil {
return nil, errors.Wrap(err, "fetch and expand postings")
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1305,7 +1305,7 @@ func benchmarkExpandedPostings(

t.ResetTimer()
for i := 0; i < t.N(); i++ {
p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, 0.5, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
testutil.Equals(t, c.expectedLen, len(p.postings))
}
Expand Down Expand Up @@ -1344,7 +1344,7 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) {
reg := prometheus.NewRegistry()
dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"})
dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"})
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, 0.5, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
testutil.Equals(t, ps, (*lazyExpandedPostings)(nil))
// Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings.
Expand Down Expand Up @@ -1384,7 +1384,7 @@ func TestLazyExpandedPostingsEmptyPostings(t *testing.T) {
reg := prometheus.NewRegistry()
dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"})
dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"})
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, 0.5, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
// We expect emptyLazyPostings rather than lazy postings with 0 length but with matchers.
testutil.Equals(t, ps, emptyLazyPostings)
Expand Down Expand Up @@ -2931,6 +2931,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
dummyHistogram,
nil,
false,
0.5,
0,
dummyCounter,
dummyCounterVec,
Expand Down Expand Up @@ -3591,7 +3592,7 @@ func TestExpandedPostingsRace(t *testing.T) {
wg.Add(1)

go func(i int, bb *bucketBlock) {
refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, 0.5, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
defer wg.Done()

Expand Down
3 changes: 2 additions & 1 deletion pkg/store/lazy_postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func fetchLazyExpandedPostings(
bytesLimiter BytesLimiter,
addAllPostings bool,
lazyExpandedPostingEnabled bool,
seriesMatchRatio float64,
postingGroupMaxKeySeriesRatio float64,
lazyExpandedPostingSizeBytes prometheus.Counter,
lazyExpandedPostingGroupsByReason *prometheus.CounterVec,
Expand All @@ -237,7 +238,7 @@ func fetchLazyExpandedPostings(
r,
postingGroups,
int64(r.block.estimatedMaxSeriesSize),
0.5, // TODO(yeya24): Expose this as a flag.
seriesMatchRatio,
postingGroupMaxKeySeriesRatio,
lazyExpandedPostingSizeBytes,
lazyExpandedPostingGroupsByReason,
Expand Down
17 changes: 17 additions & 0 deletions pkg/store/lazy_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,23 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) {
{name: "bar", addKeys: []string{"foo"}, cardinality: 250000, existentKeys: 1, lazy: true},
},
},
{
name: "two posting groups with add keys, group not marked as lazy because of lower series match ratio",
inputPostings: map[string]map[string]index.Range{
"foo": {"bar": index.Range{End: 44}},
"bar": {"foo": index.Range{Start: 44, End: 5052}},
},
seriesMaxSize: 1000,
seriesMatchRatio: 0.1,
postingGroups: []*postingGroup{
{name: "foo", addKeys: []string{"bar"}},
{name: "bar", addKeys: []string{"foo"}},
},
expectedPostingGroups: []*postingGroup{
{name: "foo", addKeys: []string{"bar"}, cardinality: 10, existentKeys: 1},
{name: "bar", addKeys: []string{"foo"}, cardinality: 1251, existentKeys: 1, lazy: false},
},
},
{
name: "three posting groups with add keys, two small posting group and a very large posting group, large one become lazy",
inputPostings: map[string]map[string]index.Range{
Expand Down

0 comments on commit caffc11

Please sign in to comment.