-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Store Gateway: Cache expanded postings for expensive matcher #8023
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -453,8 +453,8 @@ func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []label | |
} | ||
|
||
func (noopCache) StoreExpandedPostings(_ ulid.ULID, _ []*labels.Matcher, _ []byte, tenant string) {} | ||
func (noopCache) FetchExpandedPostings(_ context.Context, _ ulid.ULID, _ []*labels.Matcher, tenant string) ([]byte, bool) { | ||
return []byte{}, false | ||
func (noopCache) FetchExpandedPostings(_ context.Context, _ ulid.ULID, _ [][]*labels.Matcher, tenant string) [][]byte { | ||
return [][]byte{} | ||
} | ||
|
||
func (noopCache) StoreSeries(ulid.ULID, storage.SeriesRef, []byte, string) {} | ||
|
@@ -2606,7 +2606,7 @@ func (r *bucketIndexReader) ExpandedPostings( | |
return nil, nil | ||
} | ||
|
||
hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter, tenant) | ||
hit, postings, err := r.fetchAlignedExpandedPostingFromCacheAndExpand(ctx, ms, bytesLimiter, tenant) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -2705,6 +2705,7 @@ type postingGroup struct { | |
cardinality int64 | ||
existentKeys int | ||
lazy bool | ||
postings index.Postings | ||
} | ||
|
||
func newPostingGroup(addAll bool, name string, addKeys, removeKeys []string) *postingGroup { | ||
|
@@ -2955,48 +2956,81 @@ type postingPtr struct { | |
ptr index.Range | ||
} | ||
|
||
func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string) (bool, []storage.SeriesRef, error) { | ||
dataFromCache, hit := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms, tenant) | ||
if !hit { | ||
return false, nil, nil | ||
} | ||
if err := bytesLimiter.ReserveWithType(uint64(len(dataFromCache)), PostingsTouched); err != nil { | ||
return false, nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading expanded postings from index cache: %s", err) | ||
} | ||
|
||
r.stats.add(PostingsTouched, 1, len(dataFromCache)) | ||
p, closeFns, err := r.decodeCachedPostings(dataFromCache) | ||
func (r *bucketIndexReader) fetchAlignedExpandedPostingFromCacheAndExpand(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string) (bool, []storage.SeriesRef, error) { | ||
postings, closers, err := r.fetchExpandedPostingsFromCache(ctx, [][]*labels.Matcher{ms}, bytesLimiter, tenant, true) | ||
defer func() { | ||
for _, closeFn := range closeFns { | ||
closeFn() | ||
for _, closer := range closers { | ||
closer() | ||
} | ||
}() | ||
// If failed to decode or expand cached postings, return and expand postings again. | ||
if err != nil { | ||
level.Error(r.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) | ||
return false, nil, err | ||
} | ||
|
||
// Cache miss. | ||
if postings == nil || postings[0] == nil { | ||
return false, nil, nil | ||
} | ||
|
||
ps, err := ExpandPostingsWithContext(ctx, p) | ||
ps, err := ExpandPostingsWithContext(ctx, postings[0]) | ||
if err != nil { | ||
level.Error(r.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) | ||
return false, nil, nil | ||
} | ||
return true, ps, nil | ||
} | ||
|
||
if len(ps) > 0 { | ||
// As of version two all series entries are 16 byte padded. All references | ||
// we get have to account for that to get the correct offset. | ||
version, err := r.block.indexHeaderReader.IndexVersion() | ||
if err != nil { | ||
return false, nil, errors.Wrap(err, "get index version") | ||
func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms [][]*labels.Matcher, bytesLimiter BytesLimiter, tenant string, seriesByteAligned bool) ([]index.Postings, []func(), error) { | ||
if len(ms) == 0 { | ||
return nil, nil, nil | ||
} | ||
|
||
var closeFns []func() | ||
res := make([]index.Postings, len(ms)) | ||
|
||
dataFromCache := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms, tenant) | ||
size := 0 | ||
hits := 0 | ||
for _, v := range dataFromCache { | ||
if v != nil { | ||
size += len(v) | ||
hits++ | ||
} | ||
if version >= 2 { | ||
for i, id := range ps { | ||
ps[i] = id * 16 | ||
} | ||
if err := bytesLimiter.ReserveWithType(uint64(size), PostingsTouched); err != nil { | ||
return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading expanded postings from index cache: %s", err) | ||
} | ||
|
||
r.stats.add(PostingsTouched, hits, len(dataFromCache)) | ||
for i, v := range dataFromCache { | ||
if v != nil { | ||
p, closers, err := r.decodeCachedPostings(v) | ||
closeFns = append(closeFns, closers...) | ||
|
||
// If failed to decode cached postings, continue the loop so we try to fetch it again. | ||
if err != nil { | ||
level.Error(r.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) | ||
continue | ||
} | ||
|
||
res[i] = p | ||
if seriesByteAligned { | ||
// As of version two all series entries are 16 byte padded. All references | ||
// we get have to account for that to get the correct offset. | ||
version, err := r.IndexVersion() | ||
if err != nil { | ||
return nil, closeFns, errors.Wrap(err, "get index version") | ||
} | ||
|
||
if version >= 2 { | ||
// Index version 2 series are padded by 16 bytes. | ||
res[i] = newSeriesByteAlignedPostings(p) | ||
} | ||
} | ||
} | ||
} | ||
return true, ps, nil | ||
|
||
return res, closeFns, nil | ||
} | ||
|
||
func (r *bucketIndexReader) storeExpandedPostingsToCache(ms []*labels.Matcher, ps index.Postings, length int, tenant string) { | ||
|
@@ -3020,15 +3054,54 @@ var bufioReaderPool = sync.Pool{ | |
// fetchPostings fill postings requested by posting groups. | ||
// It returns one posting for each key, in the same order. | ||
// If postings for given key is not fetched, entry at given index will be nil. | ||
func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter, tenant string) ([]index.Postings, []func(), error) { | ||
func (r *bucketIndexReader) fetchPostings(ctx context.Context, postingGroups []*postingGroup, bytesLimiter BytesLimiter, tenant string) ([]index.Postings, []func(), error) { | ||
var closeFns []func() | ||
|
||
var ( | ||
expandedPostingMatchers [][]*labels.Matcher | ||
expandedPostingMatchersIdx []int | ||
) | ||
keysLength := 0 | ||
expandedPostingGroupSet := map[string]struct{}{} | ||
// Find out posting groups which fetch more than 1 key to fetch expanded posting cache. | ||
for i := 0; i < len(postingGroups); i++ { | ||
pg := postingGroups[i] | ||
if !pg.lazy { | ||
// If posting group has more than 1 key to fetch, fetch expanded postings first. | ||
// This helps for matcher such as !="", =~".+" to avoid fetching too many keys from cache. | ||
if len(pg.addKeys) > 1 || len(pg.removeKeys) > 1 { | ||
expandedPostingMatchers = append(expandedPostingMatchers, pg.matchers) | ||
expandedPostingMatchersIdx = append(expandedPostingMatchersIdx, i) | ||
expandedPostingGroupSet[pg.name] = struct{}{} | ||
continue | ||
} | ||
|
||
// A posting group has either add key or remove key and cannot have both the same time. | ||
keysLength += len(pg.addKeys) + len(pg.removeKeys) | ||
} | ||
} | ||
|
||
timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration.WithLabelValues(tenant)) | ||
defer timer.ObserveDuration() | ||
|
||
var ptrs []postingPtr | ||
expandedPostings, closers, err := r.fetchExpandedPostingsFromCache(ctx, expandedPostingMatchers, bytesLimiter, tenant, false) | ||
closeFns = append(closeFns, closers...) | ||
if err != nil { | ||
return nil, closeFns, errors.Wrap(err, "fetch expanded postings from cache") | ||
} | ||
for i, ep := range expandedPostings { | ||
pgIdx := expandedPostingMatchersIdx[i] | ||
if ep != nil { | ||
postingGroups[pgIdx].postings = ep | ||
continue | ||
} | ||
// Cache miss, have additional keys to fetch. | ||
keysLength += len(postingGroups[pgIdx].addKeys) + len(postingGroups[pgIdx].removeKeys) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In case expanded posting cache miss, we still try to fetch each posting individually from index cache and then object store. I wonder if we should just skip cache and fetch from object store directly. It makes no sense to fetch postings from cache if we don't store cache there, only if the posting are queried directly with |
||
} | ||
|
||
output := make([]index.Postings, len(keys)) | ||
var ptrs []postingPtr | ||
keys := keysToFetchFromPostingGroups(postingGroups, keysLength) | ||
output := make([]index.Postings, keysLength) | ||
|
||
// Fetch postings from the cache with a single call. | ||
fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys, tenant) | ||
|
@@ -3131,6 +3204,12 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab | |
|
||
output[keyID] = newDiffVarintPostings(diffVarintPostings, nil) | ||
|
||
// If the corresponding posting group tries to fetch expanded postings, don't backfill each | ||
// encoded postings in case a cache miss. Cache expanded postings for the whole group instead. | ||
if _, ok := expandedPostingGroupSet[keys[keyID].Name]; ok { | ||
continue | ||
} | ||
|
||
startCompression := time.Now() | ||
dataToCache, err := snappyStreamedEncode(int(postingsCount), diffVarintPostings) | ||
if err != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can make the condition > a certain number instead of > 1