Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store Gateway: Cache expanded postings for expensive matcher #8023

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Changed

- [#8023](https://github.com/thanos-io/thanos/pull/8023) Store Gateway: Cache expanded postings for matchers with more than 1 key.

### Removed

## [v0.37.2](https://github.com/thanos-io/thanos/tree/release-0.37) - 11.12.2024
Expand Down
143 changes: 111 additions & 32 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,8 @@ func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []label
}

func (noopCache) StoreExpandedPostings(_ ulid.ULID, _ []*labels.Matcher, _ []byte, tenant string) {}
func (noopCache) FetchExpandedPostings(_ context.Context, _ ulid.ULID, _ []*labels.Matcher, tenant string) ([]byte, bool) {
return []byte{}, false
func (noopCache) FetchExpandedPostings(_ context.Context, _ ulid.ULID, _ [][]*labels.Matcher, tenant string) [][]byte {
return [][]byte{}
}

func (noopCache) StoreSeries(ulid.ULID, storage.SeriesRef, []byte, string) {}
Expand Down Expand Up @@ -2606,7 +2606,7 @@ func (r *bucketIndexReader) ExpandedPostings(
return nil, nil
}

hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter, tenant)
hit, postings, err := r.fetchAlignedExpandedPostingFromCacheAndExpand(ctx, ms, bytesLimiter, tenant)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2705,6 +2705,7 @@ type postingGroup struct {
cardinality int64
existentKeys int
lazy bool
postings index.Postings
}

func newPostingGroup(addAll bool, name string, addKeys, removeKeys []string) *postingGroup {
Expand Down Expand Up @@ -2955,48 +2956,81 @@ type postingPtr struct {
ptr index.Range
}

func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string) (bool, []storage.SeriesRef, error) {
dataFromCache, hit := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms, tenant)
if !hit {
return false, nil, nil
}
if err := bytesLimiter.ReserveWithType(uint64(len(dataFromCache)), PostingsTouched); err != nil {
return false, nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading expanded postings from index cache: %s", err)
}

r.stats.add(PostingsTouched, 1, len(dataFromCache))
p, closeFns, err := r.decodeCachedPostings(dataFromCache)
func (r *bucketIndexReader) fetchAlignedExpandedPostingFromCacheAndExpand(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string) (bool, []storage.SeriesRef, error) {
postings, closers, err := r.fetchExpandedPostingsFromCache(ctx, [][]*labels.Matcher{ms}, bytesLimiter, tenant, true)
defer func() {
for _, closeFn := range closeFns {
closeFn()
for _, closer := range closers {
closer()
}
}()
// If failed to decode or expand cached postings, return and expand postings again.
if err != nil {
level.Error(r.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
return false, nil, err
}

// Cache miss.
if postings == nil || postings[0] == nil {
return false, nil, nil
}

ps, err := ExpandPostingsWithContext(ctx, p)
ps, err := ExpandPostingsWithContext(ctx, postings[0])
if err != nil {
level.Error(r.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
return false, nil, nil
}
return true, ps, nil
}

if len(ps) > 0 {
// As of version two all series entries are 16 byte padded. All references
// we get have to account for that to get the correct offset.
version, err := r.block.indexHeaderReader.IndexVersion()
if err != nil {
return false, nil, errors.Wrap(err, "get index version")
func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms [][]*labels.Matcher, bytesLimiter BytesLimiter, tenant string, seriesByteAligned bool) ([]index.Postings, []func(), error) {
if len(ms) == 0 {
return nil, nil, nil
}

var closeFns []func()
res := make([]index.Postings, len(ms))

dataFromCache := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms, tenant)
size := 0
hits := 0
for _, v := range dataFromCache {
if v != nil {
size += len(v)
hits++
}
if version >= 2 {
for i, id := range ps {
ps[i] = id * 16
}
if err := bytesLimiter.ReserveWithType(uint64(size), PostingsTouched); err != nil {
return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading expanded postings from index cache: %s", err)
}

r.stats.add(PostingsTouched, hits, len(dataFromCache))
for i, v := range dataFromCache {
if v != nil {
p, closers, err := r.decodeCachedPostings(v)
closeFns = append(closeFns, closers...)

// If failed to decode cached postings, continue the loop so we try to fetch it again.
if err != nil {
level.Error(r.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
continue
}

res[i] = p
if seriesByteAligned {
// As of version two all series entries are 16 byte padded. All references
// we get have to account for that to get the correct offset.
version, err := r.IndexVersion()
if err != nil {
return nil, closeFns, errors.Wrap(err, "get index version")
}

if version >= 2 {
// Index version 2 series are padded by 16 bytes.
res[i] = newSeriesByteAlignedPostings(p)
}
}
}
}
return true, ps, nil

return res, closeFns, nil
}

func (r *bucketIndexReader) storeExpandedPostingsToCache(ms []*labels.Matcher, ps index.Postings, length int, tenant string) {
Expand All @@ -3020,15 +3054,54 @@ var bufioReaderPool = sync.Pool{
// fetchPostings fill postings requested by posting groups.
// It returns one posting for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter, tenant string) ([]index.Postings, []func(), error) {
func (r *bucketIndexReader) fetchPostings(ctx context.Context, postingGroups []*postingGroup, bytesLimiter BytesLimiter, tenant string) ([]index.Postings, []func(), error) {
var closeFns []func()

var (
expandedPostingMatchers [][]*labels.Matcher
expandedPostingMatchersIdx []int
)
keysLength := 0
expandedPostingGroupSet := map[string]struct{}{}
// Find out posting groups which fetch more than 1 key to fetch expanded posting cache.
for i := 0; i < len(postingGroups); i++ {
pg := postingGroups[i]
if !pg.lazy {
// If posting group has more than 1 key to fetch, fetch expanded postings first.
// This helps for matcher such as !="", =~".+" to avoid fetching too many keys from cache.
if len(pg.addKeys) > 1 || len(pg.removeKeys) > 1 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can make the condition > a certain number instead of > 1

expandedPostingMatchers = append(expandedPostingMatchers, pg.matchers)
expandedPostingMatchersIdx = append(expandedPostingMatchersIdx, i)
expandedPostingGroupSet[pg.name] = struct{}{}
continue
}

// A posting group has either add key or remove key and cannot have both the same time.
keysLength += len(pg.addKeys) + len(pg.removeKeys)
}
}

timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration.WithLabelValues(tenant))
defer timer.ObserveDuration()

var ptrs []postingPtr
expandedPostings, closers, err := r.fetchExpandedPostingsFromCache(ctx, expandedPostingMatchers, bytesLimiter, tenant, false)
closeFns = append(closeFns, closers...)
if err != nil {
return nil, closeFns, errors.Wrap(err, "fetch expanded postings from cache")
}
for i, ep := range expandedPostings {
pgIdx := expandedPostingMatchersIdx[i]
if ep != nil {
postingGroups[pgIdx].postings = ep
continue
}
// Cache miss, have additional keys to fetch.
keysLength += len(postingGroups[pgIdx].addKeys) + len(postingGroups[pgIdx].removeKeys)
Copy link
Contributor Author

@yeya24 yeya24 Dec 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case expanded posting cache miss, we still try to fetch each posting individually from index cache and then object store.

I wonder if we should just skip cache and fetch from object store directly. It makes no sense to fetch postings from cache if we don't store cache there, only if the posting are queried directly with = matcher.

}

output := make([]index.Postings, len(keys))
var ptrs []postingPtr
keys := keysToFetchFromPostingGroups(postingGroups, keysLength)
output := make([]index.Postings, keysLength)

// Fetch postings from the cache with a single call.
fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys, tenant)
Expand Down Expand Up @@ -3131,6 +3204,12 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab

output[keyID] = newDiffVarintPostings(diffVarintPostings, nil)

// If the corresponding posting group tries to fetch expanded postings, don't backfill each
// encoded postings in case a cache miss. Cache expanded postings for the whole group instead.
if _, ok := expandedPostingGroupSet[keys[keyID].Name]; ok {
continue
}

startCompression := time.Now()
dataToCache, err := snappyStreamedEncode(int(postingsCount), diffVarintPostings)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (c *swappableCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*la
c.ptr.StoreExpandedPostings(blockID, matchers, v, tenant)
}

func (c *swappableCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
func (c *swappableCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers [][]*labels.Matcher, tenant string) [][]byte {
return c.ptr.FetchExpandedPostings(ctx, blockID, matchers, tenant)
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/store/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ type IndexCache interface {
// StoreExpandedPostings stores expanded postings for a set of label matchers.
StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string)

// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool)
// FetchExpandedPostings fetches expanded postings and returns a slice of bytes. If cache miss, the corresponding index
// of the response slice will be nil.
FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers [][]*labels.Matcher, tenant string) [][]byte

// StoreSeries stores a single series.
StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string)
Expand Down
8 changes: 5 additions & 3 deletions pkg/store/cache/filter_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers [
}
}

// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
// FetchExpandedPostings fetches expanded postings and returns a slice of bytes. If cache miss, the corresponding index
// of the response slice will be nil.
func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers [][]*labels.Matcher, tenant string) [][]byte {
if c.expandedPostingsEnabled {
return c.cache.FetchExpandedPostings(ctx, blockID, matchers, tenant)
}
return nil, false

return make([][]byte, len(matchers))
}

// StoreSeries sets the series identified by the ulid and id to the value v,
Expand Down
28 changes: 15 additions & 13 deletions pkg/store/cache/filter_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func TestFilterCache(t *testing.T) {
testutil.Equals(t, 0, len(missed))
testutil.Equals(t, testPostingData, hits[postingKeys[0]])

ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, true, hit)
testutil.Equals(t, testExpandedPostingsData, ep)
ep := c.FetchExpandedPostings(ctx, blockID, [][]*labels.Matcher{expandedPostingsMatchers}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(ep))
testutil.Equals(t, testExpandedPostingsData, ep[0])

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(misses))
Expand All @@ -77,9 +77,9 @@ func TestFilterCache(t *testing.T) {
testutil.Equals(t, 0, len(missed))
testutil.Equals(t, testPostingData, hits[postingKeys[0]])

ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Assert(t, true, hit)
testutil.Equals(t, testExpandedPostingsData, ep)
ep := c.FetchExpandedPostings(ctx, blockID, [][]*labels.Matcher{expandedPostingsMatchers}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(ep))
testutil.Equals(t, testExpandedPostingsData, ep[0])

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(misses))
Expand All @@ -98,8 +98,9 @@ func TestFilterCache(t *testing.T) {
testutil.Equals(t, 0, len(missed))
testutil.Equals(t, testPostingData, hits[postingKeys[0]])

_, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, false, hit)
ep := c.FetchExpandedPostings(ctx, blockID, [][]*labels.Matcher{expandedPostingsMatchers}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(ep))
testutil.Equals(t, []byte(nil), ep[0])

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(misses))
Expand All @@ -118,9 +119,9 @@ func TestFilterCache(t *testing.T) {
testutil.Equals(t, 1, len(missed))
testutil.Equals(t, 0, len(hits))

ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, true, hit)
testutil.Equals(t, testExpandedPostingsData, ep)
ep := c.FetchExpandedPostings(ctx, blockID, [][]*labels.Matcher{expandedPostingsMatchers}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(ep))
testutil.Equals(t, testExpandedPostingsData, ep[0])

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(misses))
Expand All @@ -139,8 +140,9 @@ func TestFilterCache(t *testing.T) {
testutil.Equals(t, 1, len(missed))
testutil.Equals(t, 0, len(hits))

_, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, false, hit)
ep := c.FetchExpandedPostings(ctx, blockID, [][]*labels.Matcher{expandedPostingsMatchers}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(ep))
testutil.Equals(t, []byte(nil), ep[0])

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(misses))
Expand Down
35 changes: 25 additions & 10 deletions pkg/store/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,20 +336,35 @@ func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers [
c.set(CacheTypeExpandedPostings, CacheKey{Block: blockID.String(), Key: CacheKeyExpandedPostings(LabelMatchersToString(matchers))}, v)
}

// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
func (c *InMemoryIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
// FetchExpandedPostings fetches expanded postings and returns a slice of bytes. If cache miss, the corresponding index
// of the response slice will be nil.
func (c *InMemoryIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers [][]*labels.Matcher, tenant string) [][]byte {
timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(CacheTypeExpandedPostings, tenant))
defer timer.ObserveDuration()

if ctx.Err() != nil {
return nil, false
}
c.commonMetrics.RequestTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Inc()
if b, ok := c.get(CacheKey{blockID.String(), CacheKeyExpandedPostings(LabelMatchersToString(matchers)), ""}); ok {
c.commonMetrics.HitsTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Inc()
return b, true
blockIDKey := blockID.String()
requests := 0
hits := 0
data := make([][]byte, len(matchers))
for i, matcher := range matchers {
if (i+1)%checkContextEveryNIterations == 0 {
if ctx.Err() != nil {
c.commonMetrics.HitsTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Add(float64(hits))
c.commonMetrics.RequestTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Add(float64(requests))
return data
}
}

requests++
if b, ok := c.get(CacheKey{blockIDKey, CacheKeyExpandedPostings(LabelMatchersToString(matcher)), ""}); ok {
hits++
data[i] = b
}
}
return nil, false
c.commonMetrics.HitsTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Add(float64(hits))
c.commonMetrics.RequestTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Add(float64(requests))

return data
}

// StoreSeries sets the series identified by the ulid and id to the value v,
Expand Down
3 changes: 2 additions & 1 deletion pkg/store/cache/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) {
cache.StoreExpandedPostings(uid(id), []*labels.Matcher{matcher}, b, tenancy.DefaultTenant)
},
get: func(id storage.SeriesRef) ([]byte, bool) {
return cache.FetchExpandedPostings(ctx, uid(id), []*labels.Matcher{matcher}, tenancy.DefaultTenant)
res := cache.FetchExpandedPostings(ctx, uid(id), [][]*labels.Matcher{{matcher}}, tenancy.DefaultTenant)
return res[0], res[0] != nil
},
},
} {
Expand Down
Loading
Loading