From ff747fa6ed95236d8c262b1b424aafef19143425 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 26 Dec 2024 11:41:00 -0800 Subject: [PATCH] Store Gateway: Fetch and cache expanded postings for individual posting group if it has more than 1 key Signed-off-by: Ben Ye --- CHANGELOG.md | 2 + pkg/store/bucket.go | 143 +++++++++++---- pkg/store/bucket_e2e_test.go | 2 +- pkg/store/cache/cache.go | 5 +- pkg/store/cache/filter_cache.go | 7 +- pkg/store/cache/filter_cache_test.go | 26 +-- pkg/store/cache/inmemory.go | 35 ++-- pkg/store/cache/inmemory_test.go | 3 +- pkg/store/cache/memcached.go | 35 ++-- pkg/store/cache/memcached_test.go | 37 ++-- pkg/store/cache/tracing_index_cache.go | 17 +- pkg/store/lazy_postings.go | 109 ++++++++--- pkg/store/lazy_postings_test.go | 238 +++++++++++++++++++++++-- pkg/store/postings.go | 32 ++++ pkg/store/postings_test.go | 59 ++++++ 15 files changed, 608 insertions(+), 142 deletions(-) create mode 100644 pkg/store/postings_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index cb965c19fd..181c1f9347 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Changed +- [#8023](https://github.com/thanos-io/thanos/pull/8023) Store Gateway: Cache expanded postings for matchers with more than 1 key. + ### Removed ## [v0.37.2](https://github.com/thanos-io/thanos/tree/release-0.37) - 11.12.2024 diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 33fb4732dc..cf3ed6c2cd 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -453,8 +453,8 @@ func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []label } func (noopCache) StoreExpandedPostings(_ ulid.ULID, _ []*labels.Matcher, _ []byte, tenant string) {} -func (noopCache) FetchExpandedPostings(_ context.Context, _ ulid.ULID, _ []*labels.Matcher, tenant string) ([]byte, bool) { - return []byte{}, false +func (noopCache) FetchExpandedPostings(_ context.Context, _ ulid.ULID, _ [][]*labels.Matcher, tenant string) [][]byte { + return [][]byte{} } func (noopCache) StoreSeries(ulid.ULID, storage.SeriesRef, []byte, string) {} @@ -2606,7 +2606,7 @@ func (r *bucketIndexReader) ExpandedPostings( return nil, nil } - hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter, tenant) + hit, postings, err := r.fetchAlignedExpandedPostingFromCacheAndExpand(ctx, ms, bytesLimiter, tenant) if err != nil { return nil, err } @@ -2705,6 +2705,7 @@ type postingGroup struct { cardinality int64 existentKeys int lazy bool + postings index.Postings } func newPostingGroup(addAll bool, name string, addKeys, removeKeys []string) *postingGroup { @@ -2955,48 +2956,81 @@ type postingPtr struct { ptr index.Range } -func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string) (bool, []storage.SeriesRef, error) { - dataFromCache, hit := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms, tenant) - if !hit { - return false, nil, nil - } - if err := bytesLimiter.ReserveWithType(uint64(len(dataFromCache)), PostingsTouched); err != nil { - return false, nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading expanded postings from index cache: %s", err) - } - - r.stats.add(PostingsTouched, 1, len(dataFromCache)) - p, closeFns, err := r.decodeCachedPostings(dataFromCache) +func (r *bucketIndexReader) fetchAlignedExpandedPostingFromCacheAndExpand(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string) (bool, []storage.SeriesRef, error) { + postings, closers, err := r.fetchExpandedPostingsFromCache(ctx, [][]*labels.Matcher{ms}, bytesLimiter, tenant, true) defer func() { - for _, closeFn := range closeFns { - closeFn() + for _, closer := range closers { + closer() } }() - // If failed to decode or expand cached postings, return and expand postings again. if err != nil { - level.Error(r.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) + return false, nil, err + } + + // Cache miss. + if postings == nil || postings[0] == nil { return false, nil, nil } - ps, err := ExpandPostingsWithContext(ctx, p) + ps, err := ExpandPostingsWithContext(ctx, postings[0]) if err != nil { level.Error(r.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) return false, nil, nil } + return true, ps, nil +} - if len(ps) > 0 { - // As of version two all series entries are 16 byte padded. All references - // we get have to account for that to get the correct offset. - version, err := r.block.indexHeaderReader.IndexVersion() - if err != nil { - return false, nil, errors.Wrap(err, "get index version") +func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms [][]*labels.Matcher, bytesLimiter BytesLimiter, tenant string, seriesByteAligned bool) ([]index.Postings, []func(), error) { + if len(ms) == 0 { + return nil, nil, nil + } + + var closeFns []func() + res := make([]index.Postings, len(ms)) + + dataFromCache := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms, tenant) + size := 0 + hits := 0 + for _, v := range dataFromCache { + if v != nil { + size += len(v) + hits++ } - if version >= 2 { - for i, id := range ps { - ps[i] = id * 16 + } + if err := bytesLimiter.ReserveWithType(uint64(size), PostingsTouched); err != nil { + return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading expanded postings from index cache: %s", err) + } + + r.stats.add(PostingsTouched, hits, len(dataFromCache)) + for i, v := range dataFromCache { + if v != nil { + p, closers, err := r.decodeCachedPostings(v) + closeFns = append(closeFns, closers...) + + // If failed to decode cached postings, continue the loop so we try to fetch it again. + if err != nil { + level.Error(r.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) + continue + } + + res[i] = p + if seriesByteAligned { + // As of version two all series entries are 16 byte padded. All references + // we get have to account for that to get the correct offset. + version, err := r.IndexVersion() + if err != nil { + return nil, closeFns, errors.Wrap(err, "get index version") + } + + if version >= 2 { + // Index version 2 series are padded by 16 bytes. + res[i] = newSeriesByteAlignedPostings(p) + } } } } - return true, ps, nil + + return res, closeFns, nil } func (r *bucketIndexReader) storeExpandedPostingsToCache(ms []*labels.Matcher, ps index.Postings, length int, tenant string) { @@ -3020,15 +3054,54 @@ var bufioReaderPool = sync.Pool{ // fetchPostings fill postings requested by posting groups. // It returns one posting for each key, in the same order. // If postings for given key is not fetched, entry at given index will be nil. -func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter, tenant string) ([]index.Postings, []func(), error) { +func (r *bucketIndexReader) fetchPostings(ctx context.Context, postingGroups []*postingGroup, bytesLimiter BytesLimiter, tenant string) ([]index.Postings, []func(), error) { var closeFns []func() + var ( + expandedPostingMatchers [][]*labels.Matcher + expandedPostingMatchersIdx []int + ) + keysLength := 0 + expandedPostingGroupSet := map[string]struct{}{} + // Find out posting groups which fetch more than 1 key to fetch expanded posting cache. + for i := 0; i < len(postingGroups); i++ { + pg := postingGroups[i] + if !pg.lazy { + // If posting group has more than 1 key to fetch, fetch expanded postings first. + // This helps for matcher such as !="", =~".+" to avoid fetching too many keys from cache. + if len(pg.addKeys) > 1 || len(pg.removeKeys) > 1 { + expandedPostingMatchers = append(expandedPostingMatchers, pg.matchers) + expandedPostingMatchersIdx = append(expandedPostingMatchersIdx, i) + expandedPostingGroupSet[pg.name] = struct{}{} + continue + } + + // A posting group has either add key or remove key and cannot have both the same time. + keysLength += len(pg.addKeys) + len(pg.removeKeys) + } + } + timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration.WithLabelValues(tenant)) defer timer.ObserveDuration() - var ptrs []postingPtr + expandedPostings, closers, err := r.fetchExpandedPostingsFromCache(ctx, expandedPostingMatchers, bytesLimiter, tenant, false) + closeFns = append(closeFns, closers...) + if err != nil { + return nil, closeFns, errors.Wrap(err, "fetch expanded postings from cache") + } + for i, ep := range expandedPostings { + pgIdx := expandedPostingMatchersIdx[i] + if ep != nil { + postingGroups[pgIdx].postings = ep + continue + } + // Cache miss, have additional keys to fetch. + keysLength += len(postingGroups[pgIdx].addKeys) + len(postingGroups[pgIdx].removeKeys) + } - output := make([]index.Postings, len(keys)) + var ptrs []postingPtr + keys := keysToFetchFromPostingGroups(postingGroups, keysLength) + output := make([]index.Postings, keysLength) // Fetch postings from the cache with a single call. fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys, tenant) @@ -3131,6 +3204,12 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab output[keyID] = newDiffVarintPostings(diffVarintPostings, nil) + // If the corresponding posting group tries to fetch expanded postings, don't backfill each + // encoded postings in case a cache miss. Cache expanded postings for the whole group instead. + if _, ok := expandedPostingGroupSet[keys[keyID].Name]; ok { + continue + } + startCompression := time.Now() dataToCache, err := snappyStreamedEncode(int(postingsCount), diffVarintPostings) if err != nil { diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index bee87898c6..eda3f632c0 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -71,7 +71,7 @@ func (c *swappableCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*la c.ptr.StoreExpandedPostings(blockID, matchers, v, tenant) } -func (c *swappableCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { +func (c *swappableCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers [][]*labels.Matcher, tenant string) [][]byte { return c.ptr.FetchExpandedPostings(ctx, blockID, matchers, tenant) } diff --git a/pkg/store/cache/cache.go b/pkg/store/cache/cache.go index c20a1f2459..3645441974 100644 --- a/pkg/store/cache/cache.go +++ b/pkg/store/cache/cache.go @@ -51,8 +51,9 @@ type IndexCache interface { // StoreExpandedPostings stores expanded postings for a set of label matchers. StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) - // FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. - FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) + // FetchExpandedPostings fetches expanded postings and returns a slice of bytes. If cache miss, the corresponding index + // of the response slice will be nil. + FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers [][]*labels.Matcher, tenant string) [][]byte // StoreSeries stores a single series. StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) diff --git a/pkg/store/cache/filter_cache.go b/pkg/store/cache/filter_cache.go index ade9da4c81..940b3b9fe9 100644 --- a/pkg/store/cache/filter_cache.go +++ b/pkg/store/cache/filter_cache.go @@ -53,12 +53,13 @@ func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers [ } } -// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. -func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { +// FetchExpandedPostings fetches expanded postings and returns a slice of bytes. If cache miss, the corresponding index +// of the response slice will be nil. +func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers [][]*labels.Matcher, tenant string) [][]byte { if c.expandedPostingsEnabled { return c.cache.FetchExpandedPostings(ctx, blockID, matchers, tenant) } - return nil, false + return nil } // StoreSeries sets the series identified by the ulid and id to the value v, diff --git a/pkg/store/cache/filter_cache_test.go b/pkg/store/cache/filter_cache_test.go index ea3144fb8d..d09061170a 100644 --- a/pkg/store/cache/filter_cache_test.go +++ b/pkg/store/cache/filter_cache_test.go @@ -56,9 +56,9 @@ func TestFilterCache(t *testing.T) { testutil.Equals(t, 0, len(missed)) testutil.Equals(t, testPostingData, hits[postingKeys[0]]) - ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) - testutil.Equals(t, true, hit) - testutil.Equals(t, testExpandedPostingsData, ep) + ep := c.FetchExpandedPostings(ctx, blockID, [][]*labels.Matcher{expandedPostingsMatchers}, tenancy.DefaultTenant) + testutil.Equals(t, 1, len(ep)) + testutil.Equals(t, testExpandedPostingsData, ep[0]) seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) testutil.Equals(t, 0, len(misses)) @@ -77,9 +77,9 @@ func TestFilterCache(t *testing.T) { testutil.Equals(t, 0, len(missed)) testutil.Equals(t, testPostingData, hits[postingKeys[0]]) - ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) - testutil.Assert(t, true, hit) - testutil.Equals(t, testExpandedPostingsData, ep) + ep := c.FetchExpandedPostings(ctx, blockID, [][]*labels.Matcher{expandedPostingsMatchers}, tenancy.DefaultTenant) + testutil.Equals(t, 1, len(ep)) + testutil.Equals(t, testExpandedPostingsData, ep[0]) seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) testutil.Equals(t, 0, len(misses)) @@ -98,8 +98,8 @@ func TestFilterCache(t *testing.T) { testutil.Equals(t, 0, len(missed)) testutil.Equals(t, testPostingData, hits[postingKeys[0]]) - _, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) - testutil.Equals(t, false, hit) + ep := c.FetchExpandedPostings(ctx, blockID, [][]*labels.Matcher{expandedPostingsMatchers}, tenancy.DefaultTenant) + testutil.Equals(t, 0, len(ep)) seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) testutil.Equals(t, 1, len(misses)) @@ -118,9 +118,9 @@ func TestFilterCache(t *testing.T) { testutil.Equals(t, 1, len(missed)) testutil.Equals(t, 0, len(hits)) - ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) - testutil.Equals(t, true, hit) - testutil.Equals(t, testExpandedPostingsData, ep) + ep := c.FetchExpandedPostings(ctx, blockID, [][]*labels.Matcher{expandedPostingsMatchers}, tenancy.DefaultTenant) + testutil.Equals(t, 1, len(ep)) + testutil.Equals(t, testExpandedPostingsData, ep[0]) seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) testutil.Equals(t, 1, len(misses)) @@ -139,8 +139,8 @@ func TestFilterCache(t *testing.T) { testutil.Equals(t, 1, len(missed)) testutil.Equals(t, 0, len(hits)) - _, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) - testutil.Equals(t, false, hit) + ep := c.FetchExpandedPostings(ctx, blockID, [][]*labels.Matcher{expandedPostingsMatchers}, tenancy.DefaultTenant) + testutil.Equals(t, 0, len(ep)) seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) testutil.Equals(t, 0, len(misses)) diff --git a/pkg/store/cache/inmemory.go b/pkg/store/cache/inmemory.go index 3a8ddbb86d..cfd4baedc1 100644 --- a/pkg/store/cache/inmemory.go +++ b/pkg/store/cache/inmemory.go @@ -336,20 +336,35 @@ func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers [ c.set(CacheTypeExpandedPostings, CacheKey{Block: blockID.String(), Key: CacheKeyExpandedPostings(LabelMatchersToString(matchers))}, v) } -// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. -func (c *InMemoryIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { +// FetchExpandedPostings fetches expanded postings and returns a slice of bytes. If cache miss, the corresponding index +// of the response slice will be nil. +func (c *InMemoryIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers [][]*labels.Matcher, tenant string) [][]byte { timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(CacheTypeExpandedPostings, tenant)) defer timer.ObserveDuration() - if ctx.Err() != nil { - return nil, false - } - c.commonMetrics.RequestTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Inc() - if b, ok := c.get(CacheKey{blockID.String(), CacheKeyExpandedPostings(LabelMatchersToString(matchers)), ""}); ok { - c.commonMetrics.HitsTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Inc() - return b, true + blockIDKey := blockID.String() + requests := 0 + hits := 0 + data := make([][]byte, len(matchers)) + for i, matcher := range matchers { + if (i+1)%checkContextEveryNIterations == 0 { + if ctx.Err() != nil { + c.commonMetrics.HitsTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Add(float64(hits)) + c.commonMetrics.RequestTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Add(float64(requests)) + return data + } + } + + requests++ + if b, ok := c.get(CacheKey{blockIDKey, CacheKeyExpandedPostings(LabelMatchersToString(matcher)), ""}); ok { + hits++ + data[i] = b + } } - return nil, false + c.commonMetrics.HitsTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Add(float64(hits)) + c.commonMetrics.RequestTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Add(float64(requests)) + + return data } // StoreSeries sets the series identified by the ulid and id to the value v, diff --git a/pkg/store/cache/inmemory_test.go b/pkg/store/cache/inmemory_test.go index 688ec51cbc..92df6b972b 100644 --- a/pkg/store/cache/inmemory_test.go +++ b/pkg/store/cache/inmemory_test.go @@ -158,7 +158,8 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) { cache.StoreExpandedPostings(uid(id), []*labels.Matcher{matcher}, b, tenancy.DefaultTenant) }, get: func(id storage.SeriesRef) ([]byte, bool) { - return cache.FetchExpandedPostings(ctx, uid(id), []*labels.Matcher{matcher}, tenancy.DefaultTenant) + res := cache.FetchExpandedPostings(ctx, uid(id), [][]*labels.Matcher{{matcher}}, tenancy.DefaultTenant) + return res[0], res[0] != nil }, }, } { diff --git a/pkg/store/cache/memcached.go b/pkg/store/cache/memcached.go index b99babfe03..913f566a3f 100644 --- a/pkg/store/cache/memcached.go +++ b/pkg/store/cache/memcached.go @@ -142,26 +142,37 @@ func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labe } } -// FetchExpandedPostings fetches multiple postings - each identified by a label - -// and returns a map containing cache hits, along with a list of missing keys. -// In case of error, it logs and return an empty cache hits map. -func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, lbls []*labels.Matcher, tenant string) ([]byte, bool) { +// FetchExpandedPostings fetches expanded postings and returns a slice of bytes. If cache miss, the corresponding index +// of the response slice will be nil. +func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers [][]*labels.Matcher, tenant string) [][]byte { timer := prometheus.NewTimer(c.fetchLatency.WithLabelValues(CacheTypeExpandedPostings, tenant)) defer timer.ObserveDuration() - key := CacheKey{blockID.String(), CacheKeyExpandedPostings(LabelMatchersToString(lbls)), c.compressionScheme}.String() + output := make([][]byte, len(matchers)) + keys := make([]string, 0, len(matchers)) + blockIDStr := blockID.String() + for _, matcher := range matchers { + key := CacheKey{blockIDStr, CacheKeyExpandedPostings(LabelMatchersToString(matcher)), c.compressionScheme}.String() + keys = append(keys, key) + } // Fetch the keys from memcached in a single request. - c.requestTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Add(1) - results := c.memcached.GetMulti(ctx, []string{key}) + c.requestTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Add(float64(len(matchers))) + results := c.memcached.GetMulti(ctx, keys) if len(results) == 0 { - return nil, false + return output } - if res, ok := results[key]; ok { - c.hitsTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Add(1) - return res, true + hits := 0 + for i := 0; i < len(matchers); i++ { + key := keys[i] + if res, ok := results[key]; ok { + hits++ + output[i] = res + } } - return nil, false + c.hitsTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Add(float64(hits)) + + return output } // StoreSeries sets the series identified by the ulid and id to the value v. diff --git a/pkg/store/cache/memcached_test.go b/pkg/store/cache/memcached_test.go index e0ae72ae3d..a313da6d21 100644 --- a/pkg/store/cache/memcached_test.go +++ b/pkg/store/cache/memcached_test.go @@ -127,24 +127,22 @@ func TestMemcachedIndexCache_FetchExpandedPostings(t *testing.T) { setup []mockedExpandedPostings mockedErr error fetchBlockID ulid.ULID - fetchMatchers []*labels.Matcher - expectedHit bool - expectedValue []byte + fetchMatchers [][]*labels.Matcher + expectedValue [][]byte }{ "should return no hits on empty cache": { setup: []mockedExpandedPostings{}, fetchBlockID: block1, - fetchMatchers: []*labels.Matcher{matcher1, matcher2}, - expectedHit: false, + fetchMatchers: [][]*labels.Matcher{{matcher1, matcher2}}, + expectedValue: [][]byte{nil}, }, "should return no misses on 100% hit ratio": { setup: []mockedExpandedPostings{ {block: block1, matchers: []*labels.Matcher{matcher1}, value: value1}, }, fetchBlockID: block1, - fetchMatchers: []*labels.Matcher{matcher1}, - expectedHit: true, - expectedValue: value1, + fetchMatchers: [][]*labels.Matcher{{matcher1}}, + expectedValue: [][]byte{value1}, }, "Cache miss when matchers key doesn't match": { setup: []mockedExpandedPostings{ @@ -152,8 +150,8 @@ func TestMemcachedIndexCache_FetchExpandedPostings(t *testing.T) { {block: block2, matchers: []*labels.Matcher{matcher2}, value: value2}, }, fetchBlockID: block1, - fetchMatchers: []*labels.Matcher{matcher1, matcher2}, - expectedHit: false, + fetchMatchers: [][]*labels.Matcher{{matcher1, matcher2}}, + expectedValue: [][]byte{nil}, }, "should return no hits on memcached error": { setup: []mockedExpandedPostings{ @@ -161,8 +159,8 @@ func TestMemcachedIndexCache_FetchExpandedPostings(t *testing.T) { }, mockedErr: errors.New("mocked error"), fetchBlockID: block1, - fetchMatchers: []*labels.Matcher{matcher3}, - expectedHit: false, + fetchMatchers: [][]*labels.Matcher{{matcher3}}, + expectedValue: [][]byte{nil}, }, } @@ -179,17 +177,18 @@ func TestMemcachedIndexCache_FetchExpandedPostings(t *testing.T) { } // Fetch postings from cached and assert on it. - val, hit := c.FetchExpandedPostings(ctx, testData.fetchBlockID, testData.fetchMatchers, tenancy.DefaultTenant) - testutil.Equals(t, testData.expectedHit, hit) - if hit { - testutil.Equals(t, testData.expectedValue, val) - } + val := c.FetchExpandedPostings(ctx, testData.fetchBlockID, testData.fetchMatchers, tenancy.DefaultTenant) + testutil.Equals(t, testData.expectedValue, val) // Assert on metrics. testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.requestTotal.WithLabelValues(CacheTypeExpandedPostings, tenancy.DefaultTenant))) - if testData.expectedHit { - testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.hitsTotal.WithLabelValues(CacheTypeExpandedPostings, tenancy.DefaultTenant))) + hits := 0 + for _, v := range testData.expectedValue { + if len(v) > 0 { + hits++ + } } + testutil.Equals(t, float64(hits), prom_testutil.ToFloat64(c.hitsTotal.WithLabelValues(CacheTypeExpandedPostings, tenancy.DefaultTenant))) testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.requestTotal.WithLabelValues(CacheTypePostings, tenancy.DefaultTenant))) testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.hitsTotal.WithLabelValues(CacheTypePostings, tenancy.DefaultTenant))) testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.requestTotal.WithLabelValues(CacheTypeSeries, tenancy.DefaultTenant))) diff --git a/pkg/store/cache/tracing_index_cache.go b/pkg/store/cache/tracing_index_cache.go index 38a0f61822..2b88183211 100644 --- a/pkg/store/cache/tracing_index_cache.go +++ b/pkg/store/cache/tracing_index_cache.go @@ -53,17 +53,24 @@ func (c *TracingIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers [] } // FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. -func (c *TracingIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) (data []byte, exists bool) { +func (c *TracingIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers [][]*labels.Matcher, tenant string) [][]byte { span, newCtx := tracing.StartSpan(ctx, "fetch_expanded_postings", tracing.Tags{ "name": c.name, "block.id": blockID.String(), }) defer span.Finish() - data, exists = c.cache.FetchExpandedPostings(newCtx, blockID, matchers, tenant) - if exists { - span.SetTag("bytes", len(data)) + data := c.cache.FetchExpandedPostings(newCtx, blockID, matchers, tenant) + dataBytes := 0 + hits := 0 + for _, v := range data { + if v != nil { + hits++ + dataBytes += len(v) + } } - return data, exists + span.SetTag("bytes", dataBytes) + span.SetTag("hits", hits) + return data } // StoreSeries stores a single series. Skip instrumenting this method diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index 783e04b31b..514daaf8c4 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -250,7 +250,7 @@ func fetchLazyExpandedPostings( } } - ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter, tenant) + ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter, tenant, lazyExpandedPostingEnabled) if err != nil { return nil, err } @@ -260,22 +260,12 @@ func fetchLazyExpandedPostings( return &lazyExpandedPostings{postings: ps, matchers: matchers}, nil } -// keysToFetchFromPostingGroups returns label pairs (postings) to fetch -// and matchers we need to use for lazy posting expansion. -// Input `postingGroups` needs to be ordered by cardinality in case lazy -// expansion is enabled. When we find the first lazy posting group we can exit. -func keysToFetchFromPostingGroups(postingGroups []*postingGroup) ([]labels.Label, []*labels.Matcher) { - var lazyMatchers []*labels.Matcher - keys := make([]labels.Label, 0) - i := 0 - for i < len(postingGroups) { +// keysToFetchFromPostingGroups returns label pairs (postings) to fetch. +func keysToFetchFromPostingGroups(postingGroups []*postingGroup, length int) []labels.Label { + keys := make([]labels.Label, 0, length) + for i := 0; i < len(postingGroups); i++ { pg := postingGroups[i] - if pg.lazy { - if len(lazyMatchers) == 0 { - lazyMatchers = make([]*labels.Matcher, 0) - } - lazyMatchers = append(lazyMatchers, postingGroups[i].matchers...) - } else { + if !pg.lazy && pg.postings == nil { // Postings returned by fetchPostings will be in the same order as keys // so it's important that we iterate them in the same order later. // We don't have any other way of pairing keys and fetched postings. @@ -286,16 +276,37 @@ func keysToFetchFromPostingGroups(postingGroups []*postingGroup) ([]labels.Label keys = append(keys, labels.Label{Name: pg.name, Value: key}) } } - - i++ } - return keys, lazyMatchers + return keys +} + +// lazyMatchersFromPostingGroups returns matchers for lazy posting expansion. +func lazyMatchersFromPostingGroups(postingGroups []*postingGroup) []*labels.Matcher { + var lazyMatchers []*labels.Matcher + for i := 0; i < len(postingGroups); i++ { + pg := postingGroups[i] + if pg.lazy { + lazyMatchers = append(lazyMatchers, postingGroups[i].matchers...) + } + } + return lazyMatchers } -func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, postingGroups []*postingGroup, bytesLimiter BytesLimiter, tenant string) ([]storage.SeriesRef, []*labels.Matcher, error) { - keys, lazyMatchers := keysToFetchFromPostingGroups(postingGroups) - fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter, tenant) +func fetchAndExpandPostingGroups( + ctx context.Context, + r *bucketIndexReader, + postingGroups []*postingGroup, + bytesLimiter BytesLimiter, + tenant string, + lazyExpandedPostingEnabled bool, +) ([]storage.SeriesRef, []*labels.Matcher, error) { + var lazyMatchers []*labels.Matcher + if lazyExpandedPostingEnabled { + lazyMatchers = lazyMatchersFromPostingGroups(postingGroups) + } + + fetchedPostings, closeFns, err := r.fetchPostings(ctx, postingGroups, bytesLimiter, tenant) defer func() { for _, closeFn := range closeFns { closeFn() @@ -305,7 +316,10 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post return nil, nil, errors.Wrap(err, "get postings") } - result := mergeFetchedPostings(ctx, fetchedPostings, postingGroups) + result, err := mergeFetchedPostings(ctx, r, fetchedPostings, postingGroups, tenant) + if err != nil { + return nil, nil, errors.Wrap(err, "merge postings") + } if err := ctx.Err(); err != nil { return nil, nil, err } @@ -316,7 +330,7 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post return ps, lazyMatchers, nil } -func mergeFetchedPostings(ctx context.Context, fetchedPostings []index.Postings, postingGroups []*postingGroup) index.Postings { +func mergeFetchedPostings(ctx context.Context, r *bucketIndexReader, fetchedPostings []index.Postings, postingGroups []*postingGroup, tenant string) (index.Postings, error) { // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys // again, and this is exactly the same order as before (when building the groups), so we can simply // use one incrementing index to fetch postings from returned slice. @@ -327,6 +341,16 @@ func mergeFetchedPostings(ctx context.Context, fetchedPostings []index.Postings, if g.lazy { continue } + // Already fetched expanded postings from cache. + if g.postings != nil { + if len(g.addKeys) > 0 { + groupAdds = append(groupAdds, g.postings) + } else if len(g.removeKeys) > 0 { + groupRemovals = append(groupRemovals, g.postings) + } + continue + } + // We cannot add empty set to groupAdds, since they are intersected. if len(g.addKeys) > 0 { toMerge := make([]index.Postings, 0, len(g.addKeys)) @@ -334,16 +358,43 @@ func mergeFetchedPostings(ctx context.Context, fetchedPostings []index.Postings, toMerge = append(toMerge, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) postingIndex++ } + p := index.Merge(ctx, toMerge...) + // Cache expanded postings. + if len(g.addKeys) > 1 { + refs, err := ExpandPostingsWithContext(ctx, p) + if err != nil { + return nil, err + } + r.storeExpandedPostingsToCache(g.matchers, index.NewListPostings(refs), len(refs), tenant) + groupAdds = append(groupAdds, index.NewListPostings(refs)) + } else { + groupAdds = append(groupAdds, p) + } - groupAdds = append(groupAdds, index.Merge(ctx, toMerge...)) + continue } - for _, l := range g.removeKeys { - groupRemovals = append(groupRemovals, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) - postingIndex++ + if len(g.removeKeys) > 0 { + toMerge := make([]index.Postings, 0, len(g.addKeys)) + for _, l := range g.removeKeys { + toMerge = append(toMerge, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) + postingIndex++ + } + p := index.Merge(ctx, toMerge...) + // Cache expanded postings. + if len(g.removeKeys) > 1 { + refs, err := ExpandPostingsWithContext(ctx, p) + if err != nil { + return nil, err + } + r.storeExpandedPostingsToCache(g.matchers, index.NewListPostings(refs), len(refs), tenant) + groupRemovals = append(groupRemovals, index.NewListPostings(refs)) + } else { + groupRemovals = append(groupRemovals, p) + } } } result := index.Without(index.Intersect(groupAdds...), index.Merge(ctx, groupRemovals...)) - return result + return result, nil } diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go index cb52dac412..862732b95e 100644 --- a/pkg/store/lazy_postings_test.go +++ b/pkg/store/lazy_postings_test.go @@ -19,21 +19,21 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/index" - "github.com/stretchr/testify/require" "github.com/thanos-io/objstore/providers/filesystem" "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" + storecache "github.com/thanos-io/thanos/pkg/store/cache" ) func TestKeysToFetchFromPostingGroups(t *testing.T) { t.Parallel() for _, tc := range []struct { - name string - pgs []*postingGroup - expectedLabels []labels.Label - expectedMatchers []*labels.Matcher + name string + pgs []*postingGroup + length int + expectedLabels []labels.Label }{ { name: "empty group", @@ -133,6 +133,145 @@ func TestKeysToFetchFromPostingGroups(t *testing.T) { {Name: "foo", Value: "bar"}, }, }, + { + name: "skip groups with postings fetched", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + postings: index.EmptyPostings(), + }, + { + name: "foo", + removeKeys: []string{"bar"}, + postings: index.EmptyPostings(), + }, + }, + expectedLabels: []labels.Label{}, + }, + { + name: "skip lazy posting group", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + lazy: true, + }, + }, + expectedLabels: []labels.Label{}, + }, + { + name: "skip multiple lazy posting groups", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + lazy: true, + }, + { + name: "job", + addKeys: []string{"prometheus"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*")}, + lazy: true, + }, + }, + expectedLabels: []labels.Label{}, + }, + { + name: "multiple non lazy and lazy posting groups", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + }, + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + lazy: true, + }, + { + name: "job", + addKeys: []string{"prometheus"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*")}, + lazy: true, + }, + }, + expectedLabels: []labels.Label{{Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}}, + }, + { + name: "multiple non lazy and lazy posting groups with lazy posting groups in the middle", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + }, + { + name: "cluster", + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "cluster", "bar")}, + lazy: true, + }, + { + name: "env", + addKeys: []string{"beta", "gamma", "prod"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "env", "beta|gamma|prod")}, + lazy: true, + }, + { + name: "job", + addKeys: []string{"prometheus"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*")}, + }, + }, + expectedLabels: []labels.Label{{Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}, {Name: "job", Value: "prometheus"}}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + keys := keysToFetchFromPostingGroups(tc.pgs, tc.length) + testutil.Equals(t, tc.expectedLabels, keys) + }) + } +} + +func TestLazyMatchersToFetchFromPostingGroups(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + pgs []*postingGroup + expectedMatchers []*labels.Matcher + }{ + { + name: "empty group", + pgs: []*postingGroup{ + { + addKeys: []string{}, + removeKeys: []string{}, + }, + }, + }, + { + name: "empty groups", + pgs: []*postingGroup{ + { + addKeys: []string{}, + removeKeys: []string{}, + }, + { + addKeys: []string{}, + removeKeys: []string{}, + }, + { + addKeys: []string{}, + removeKeys: []string{}, + }, + }, + }, { name: "lazy posting group with empty matchers", pgs: []*postingGroup{ @@ -143,7 +282,6 @@ func TestKeysToFetchFromPostingGroups(t *testing.T) { lazy: true, }, }, - expectedLabels: []labels.Label{}, expectedMatchers: []*labels.Matcher{}, }, { @@ -156,7 +294,6 @@ func TestKeysToFetchFromPostingGroups(t *testing.T) { lazy: true, }, }, - expectedLabels: []labels.Label{}, expectedMatchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, { @@ -175,7 +312,6 @@ func TestKeysToFetchFromPostingGroups(t *testing.T) { lazy: true, }, }, - expectedLabels: []labels.Label{}, expectedMatchers: []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*"), @@ -202,7 +338,6 @@ func TestKeysToFetchFromPostingGroups(t *testing.T) { lazy: true, }, }, - expectedLabels: []labels.Label{{Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}}, expectedMatchers: []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*"), @@ -234,7 +369,6 @@ func TestKeysToFetchFromPostingGroups(t *testing.T) { matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*")}, }, }, - expectedLabels: []labels.Label{{Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}, {Name: "job", Value: "prometheus"}}, expectedMatchers: []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, "cluster", "bar"), labels.MustNewMatcher(labels.MatchRegexp, "env", "beta|gamma|prod"), @@ -242,8 +376,7 @@ func TestKeysToFetchFromPostingGroups(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - keys, matchers := keysToFetchFromPostingGroups(tc.pgs) - testutil.Equals(t, tc.expectedLabels, keys) + matchers := lazyMatchersFromPostingGroups(tc.pgs) testutil.Assert(t, len(tc.expectedMatchers) == len(matchers)) for i := 0; i < len(tc.expectedMatchers); i++ { testutil.Equals(t, tc.expectedMatchers[i].String(), matchers[i].String()) @@ -749,7 +882,27 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { } func TestMergeFetchedPostings(t *testing.T) { + t.Parallel() + ctx := context.Background() + dir := t.TempDir() + bkt, err := filesystem.NewBucket(dir) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + logger := log.NewNopLogger() + blockID := ulid.MustNew(1, nil) + meta := &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: blockID}, + Thanos: metadata.Thanos{ + Labels: map[string]string{ + "a": "b", + "c": "d", + }, + }, + } + headerReader := &mockIndexHeaderReader{} + for _, tc := range []struct { name string fetchedPostings []index.Postings @@ -802,6 +955,19 @@ func TestMergeFetchedPostings(t *testing.T) { }, expectedSeriesRefs: []storage.SeriesRef{3, 5}, }, + { + name: "posting group with multiple remove keys", + fetchedPostings: []index.Postings{ + index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5, 6, 7}), + index.NewListPostings([]storage.SeriesRef{1, 2, 4}), + index.NewListPostings([]storage.SeriesRef{6, 7}), + }, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", removeKeys: []string{"bar", "foo"}, addAll: true}, + }, + expectedSeriesRefs: []storage.SeriesRef{3, 5}, + }, { name: "multiple posting groups with add key and ignore lazy posting groups", fetchedPostings: []index.Postings{ @@ -829,12 +995,54 @@ func TestMergeFetchedPostings(t *testing.T) { }, expectedSeriesRefs: []storage.SeriesRef{1, 2, 4}, }, + { + name: "multiple posting groups with pre-filled postings", + fetchedPostings: []index.Postings{}, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}, postings: index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + {name: "bar", addKeys: []string{"foo"}, postings: index.NewListPostings([]storage.SeriesRef{5, 6, 7})}, + }, + expectedSeriesRefs: []storage.SeriesRef{5}, + }, + { + name: "multiple posting groups with pre-filled postings", + fetchedPostings: []index.Postings{}, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}, postings: index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + {name: "bar", addKeys: []string{"foo"}, postings: index.NewListPostings([]storage.SeriesRef{5, 6, 7})}, + }, + expectedSeriesRefs: []storage.SeriesRef{5}, + }, + { + name: "mixed posting groups with pre-filled postings and multiple keys", + fetchedPostings: []index.Postings{ + index.NewListPostings([]storage.SeriesRef{3, 4, 5, 8}), + index.NewListPostings([]storage.SeriesRef{3, 4, 5, 9}), + index.NewListPostings([]storage.SeriesRef{3}), + index.NewListPostings([]storage.SeriesRef{4}), + }, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}, postings: index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + {name: "bar", addKeys: []string{"foo", "bar"}, postings: index.NewListPostings([]storage.SeriesRef{3, 4, 5, 6, 7})}, + {name: "baz", addKeys: []string{"foo", "bar"}}, + {name: "bbb", removeKeys: []string{"foo", "bar"}, addAll: true}, + }, + expectedSeriesRefs: []storage.SeriesRef{5}, + }, } { t.Run(tc.name, func(t *testing.T) { - p := mergeFetchedPostings(ctx, tc.fetchedPostings, tc.postingGroups) + registry := prometheus.NewRegistry() + indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, registry, storecache.DefaultInMemoryIndexCacheConfig) + testutil.Ok(t, err) + block, err := newBucketBlock(ctx, newBucketStoreMetrics(registry), meta, bkt, path.Join(dir, blockID.String()), indexCache, nil, headerReader, nil, nil, nil) + testutil.Ok(t, err) + ir := newBucketIndexReader(block, logger) + testutil.Ok(t, err) + p, err := mergeFetchedPostings(ctx, ir, tc.fetchedPostings, tc.postingGroups, "") + testutil.Ok(t, err) res, err := index.ExpandPostings(p) - require.NoError(t, err) - require.Equal(t, tc.expectedSeriesRefs, res) + testutil.Ok(t, err) + testutil.Equals(t, tc.expectedSeriesRefs, res) }) } } diff --git a/pkg/store/postings.go b/pkg/store/postings.go index e44dfbe66c..45d424c40b 100644 --- a/pkg/store/postings.go +++ b/pkg/store/postings.go @@ -11,6 +11,12 @@ import ( "io" "github.com/pkg/errors" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/index" +) + +const ( + seriesByteAlign = 16 ) type postingsReaderBuilder struct { @@ -141,3 +147,29 @@ func (r *postingsReaderBuilder) Error() error { func (r *postingsReaderBuilder) AtDiffVarint() ([]byte, uint64, int) { return r.cur, r.numberOfPostingsInCur, r.keyID } + +type seriesByteAlignedPostings struct { + // Multiply series ref by seriesByteAlign. + postings index.Postings +} + +func newSeriesByteAlignedPostings(postings index.Postings) index.Postings { + return &seriesByteAlignedPostings{postings: postings} +} + +func (p *seriesByteAlignedPostings) Next() bool { + return p.postings.Next() +} + +func (p *seriesByteAlignedPostings) Seek(v storage.SeriesRef) bool { + // Apply reverse order when seeking. + // If v is not multiple of seriesByteAlign, the value might be wrong due to round of floats. + // Make sure to only use this if all postings are multiple of seriesByteAlign. + return p.postings.Seek(v / seriesByteAlign) +} + +func (p *seriesByteAlignedPostings) At() storage.SeriesRef { + return p.postings.At() * seriesByteAlign +} + +func (p *seriesByteAlignedPostings) Err() error { return p.postings.Err() } diff --git a/pkg/store/postings_test.go b/pkg/store/postings_test.go new file mode 100644 index 0000000000..f6c1d6dcb6 --- /dev/null +++ b/pkg/store/postings_test.go @@ -0,0 +1,59 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/index" +) + +func TestSeriesByteAlignedPostings_NextAndAt(t *testing.T) { + for _, tc := range []struct { + name string + align bool + input index.Postings + expected []storage.SeriesRef + }{ + { + name: "empty", + input: index.EmptyPostings(), + }, + { + name: "align", + align: true, + input: index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}), + expected: []storage.SeriesRef{16, 32, 48, 64, 80}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + output := newSeriesByteAlignedPostings(tc.input) + var refs []storage.SeriesRef + for output.Next() { + refs = append(refs, output.At()) + } + testutil.Equals(t, tc.expected, refs) + }) + } +} + +func TestSeriesByteAlignedPostings_Seek(t *testing.T) { + alignedPosting := newSeriesByteAlignedPostings(index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})) + testutil.Equals(t, true, alignedPosting.Seek(40)) + // 40 / 16 is round to 2 so aligned posting at 40 instead of 48. + // This behavior is different from ListPostings but we don't expect + // Seek to be called with a number that cannot be multiplied by 16 + // so it should be ok. + testutil.Equals(t, storage.SeriesRef(32), alignedPosting.At()) + testutil.Equals(t, true, alignedPosting.Seek(48)) + testutil.Equals(t, storage.SeriesRef(48), alignedPosting.At()) + testutil.Equals(t, true, alignedPosting.Seek(80)) + testutil.Equals(t, storage.SeriesRef(80), alignedPosting.At()) + // 90 / 16 is round to 5. + testutil.Equals(t, true, alignedPosting.Seek(90)) + // 100 / 16 is round to 6. + testutil.Equals(t, false, alignedPosting.Seek(100)) +}