-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bloom Gateway: Implement chunk filtering using workers that multiplex requests #11181
Conversation
f223187
to
80f3a88
Compare
80f3a88
to
2c1586c
Compare
Trivy scan found the following vulnerabilities:
|
} | ||
|
||
// convertToShortRefs converts a v1.ChunkRefs into []*logproto.ShortRef | ||
// TODO(chaudum): Avoid conversion by transferring v1.ChunkRefs in gRPC request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you planning to do this on a followup PR or you forgot about it?
Should be simple to do by using gogoproto's nullable=false
and customtype=v1.ChunkRefs
Here's a similar example:
loki/pkg/logproto/logproto.proto
Lines 329 to 332 in b49b3ce
repeated LabelPair labels = 1 [ | |
(gogoproto.nullable) = false, | |
(gogoproto.customtype) = "LabelAdapter" | |
]; |
And the generated code:
loki/pkg/logproto/logproto.pb.go
Line 1879 in b49b3ce
Labels []LabelAdapter `protobuf:"bytes,1,rep,name=labels,proto3,customtype=LabelAdapter" json:"labels"` |
Alternatively, as long as you use nullable=false
to avoid having a slice of pointers, and v1.ChunkRefs
has the same mem layout as logproto.ShortRef
, you can do a cast like:
Lines 49 to 51 in aedf6df
func FromLabelAdaptersToLabels(ls []LabelAdapter) labels.Labels { | |
return *(*labels.Labels)(unsafe.Pointer(&ls)) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you planning to do this on a followup PR or you forgot about it?
I planned to do this once the datastructures are fully settled. The request format may still change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good to me 👍 . We can also just specify protos in the v1
package directly and import them elsewhere so we don't have to do special casting -- just use them directly
pkg/bloomgateway/bloomgateway.go
Outdated
hasNext := it.Next() | ||
for _, bq := range bqs { | ||
requests = requests[:0] | ||
for hasNext && it.At().Fp <= bq.MaxFp { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think this would better read as:
for it.Next() && it.At().Fp <= bq.MaxFp {
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to read, but it would not work as you think:
When you call it.Next()
the iterator will proceed to the next item. However, if the condition it.At().Fp <= bq.MaxFp
does not match, the loop is exited.
Then the loop is started for the next bq
again, and first, it.Next()
will be called, proceeding to the next item, and therefore skipping one item.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see. Thanks for the clarification. I'd probably add a comment explaining that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking really good.
I need to refactor the bloom querying to return the list of chunks which can be removed rather than the ones which need to be queried in storage. The reasoning for this is we can merge a removal list of chunks across bloom blocks by unioning them, but we can't do the opposite.
pkg/bloomgateway/bloomgateway.go
Outdated
for _, ref := range req.Refs { | ||
if ref.Tenant != tenantID { | ||
return nil, errors.Wrapf(errInvalidTenant, "expected chunk refs from tenant %s, got tenant %s", tenantID, ref.Tenant) | ||
} | ||
// Sort ShortRefs by From time in ascending order |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about reusing logproto.ChunkRef
instead? It's more verbose, but also more consistent. Ultimately, I'd like to refactor everything to use a more idiomatic & efficient repr like you've included in GroupedChunkRefs
, but I think consistency makes more sense right now.
We should also assume these chunkrefs are already sorted -- no need to sort them again here. The index gateway should take care of that (I think it does already since the index is laid out in this order as well)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also assume these chunkrefs are already sorted -- no need to sort them again here. The index gateway should take care of that (I think it does already since the index is laid out in this order as well)
You're right, they are sorted already.
ChunkRefs: req.Refs, | ||
}, nil | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should loop over req.Refs to ensure the tenant matches our expectation here. This is costly in terms of CPU cycles and we should ensure it beforehand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Removed the assertion
pkg/bloomgateway/multiplexing.go
Outdated
return false | ||
} | ||
|
||
currIter, ok := it.heap.Iter().(*SliceIterWithIndex[*logproto.GroupedChunkRefs]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if you include the index
in the type you're iterator iterates over, you won't need to cast. Something like
type IndexedVal[T any] struct {
idx int
val T
}
func NewIterWithIndex[T any](iter Iterator[T], idx int) Iterator[IndexedVal[T]] {...etc}
pkg/bloomgateway/util.go
Outdated
|
||
refs := make([]*logproto.GroupedChunkRefs, 0, len(r.Refs)) | ||
for i := range r.Refs { | ||
groupedChunkRefs := &logproto.GroupedChunkRefs{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: avoid all the O(n) casting. Can be done later 👍
On second thought, as it stands, you'd have to do this O(n) work m
times where m
equals the number of day-buckets you're looking for. Instead, you could build an iterator over the underlying chunks which filters out chunks which aren't in the day in question. Basically, build multiple "views" over the same list of chunks depending on which day-bucket you care about. WDYT? It'd avoid all the O(n) casting you're doing below and feels conceptually simple.
} | ||
|
||
// convertToShortRefs converts a v1.ChunkRefs into []*logproto.ShortRef | ||
// TODO(chaudum): Avoid conversion by transferring v1.ChunkRefs in gRPC request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good to me 👍 . We can also just specify protos in the v1
package directly and import them elsewhere so we don't have to do special casting -- just use them directly
pkg/bloomgateway/worker.go
Outdated
requests := make([]v1.Request, 0, 128) | ||
fingerprints := make([]uint64, 0, 1024) | ||
|
||
for ctx.Err() == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be for select ctx.Done()
because otherwise it'll tight-loop your CPU. We want to halt execution waiting on a channel so the go scheduler can hand that cpu back to wherever more work is waiting to be done in process.
pkg/bloomgateway/worker.go
Outdated
|
||
it := newTaskMergeIterator(tasks...) | ||
|
||
fingerprints = fingerprints[:0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I don't think we need another list of all the fingerprints here. We could instead build a function which found the intersecting blocks for a list of fingerprints from the original requests.
Simplified below,
func OverlappingBlocksForRequests(reqs [][]model.Fingerprint, blocks []Block) []Block
We could binary search over the fp lists, comparing them to blocks rather than iterate over every fp directly (can easily be n=millions
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've gone ahead and put together something like this here: #11237
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'd still need the list of fingerprints to calculating the overlapping blocks. Or do I miss something?
pkg/bloomgateway/worker.go
Outdated
// fingerprints are already sorted. we can skip duplicates by checking | ||
// if the next is greater than the previous | ||
fp := uint64(it.At().Fp) | ||
if len(fingerprints) > 0 && fp <= fingerprints[len(fingerprints)-1] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you seen NewDedupingIter
? Might be a bit heavyweight here, but I've used it to wrap a heap-iter before to handle items with the same keys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I saw it. I thought it was too much of an overhead.
pkg/bloomgateway/worker.go
Outdated
continue | ||
} | ||
|
||
hasNext := it.Next() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can use the view
iter idea described earlier for this
pkg/queue/queue.go
Outdated
defer cancel() | ||
|
||
var idx QueueIndex | ||
items := make([]Request, 0, maxItems) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: pool
3595653
to
6270a27
Compare
b730276
to
4b394d8
Compare
if len(responses) == requestCount { | ||
for _, o := range responses { | ||
// we must not remove items from req.Refs as long as the worker may iterater over them | ||
g.removeNotMatchingChunks(req, o) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we could skip the call to this method if Removals.Len()
is 0
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good
return false | ||
} | ||
} | ||
it.cache = it.transform(it.iter.At()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a cache here at all? It seems like At()
can just call it.transform(it.iter.At())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to avoid the it.transform(it.iter.At())
function call every time it.At()
is called.
Depending on the transform function, it could be expensive to do so.
func convertToSearches(filters []*logproto.LineFilterExpression) [][]byte { | ||
searches := make([][]byte, 0, len(filters)) | ||
for _, f := range filters { | ||
searches = append(searches, []byte(f.Match)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should only work when the match type is =
. It's good to have a conversion function here like you've done because it allows us to add future optimizations like |~"ab(c|d)"
-> |="abc" or |= "abd"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I haven't thought about other operators than =
.
Gonna add a TODO comment.
pkg/bloomgateway/worker.go
Outdated
} | ||
|
||
boundedRefs := partitionFingerprintRange(tasks, blockRefs) | ||
blockRefs = blockRefs[0:] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blockRefs = blockRefs[0:] | |
blockRefs = blockRefs[:0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! 🙈
pkg/bloomgateway/worker.go
Outdated
it := newTaskMergeIterator(day, boundedRefs[i].tasks...) | ||
requests = requests[:0] | ||
for it.Next() { | ||
requests = append(requests, it.At().Request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the advantage of collecting these into a slice rather than building an iterator over the underlying iterator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is none.
pkg/queue/queue.go
Outdated
|
||
items := q.pool.Get(maxItems) | ||
defer func() { | ||
q.pool.Put(items) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it puts the items back into the pool when it returns them to the caller as well, creating a mutability bug waiting to happen.
pkg/queue/util.go
Outdated
|
||
// BufferPool uses a bucket pool and wraps the Get() and Put() functions for | ||
// simpler access. | ||
type BufferPool[T any] struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super-nit: Maybe just SlicePool[T]
would be a better name?
pkg/storage/bloom/v1/merge.go
Outdated
cur := mbq.itrs[0] | ||
if ok := cur.Next(); !ok { | ||
curr := mbq.itrs[0] | ||
if ok := curr.Next(); !ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't like my naming? 😭
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This happened accidentally when I reverted a temporary change.
@@ -29,6 +28,14 @@ const ( | |||
fileNamePartDelimiter = "-" | |||
) | |||
|
|||
type BoundsCheck uint8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd love to keep this in the v1
lib so I can use it there as well (can't import this package there)
035ecfc
to
9235e83
Compare
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
and sorting of the inputs Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Putting the returned slice of requests back to the pool but also returning them to the caller could lead to a mutability bug. Now the caller of DequeueMany() is responsible for returning the request slice back to the pool of the queue by calling ReleaseRequests(). Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
3bd8f7a
to
8893eef
Compare
… requests (grafana#11181) This change adds an internal request queue to the bloom gateway. Instead of executing every single request individually, which involves resolving bloom blocks, downloading them if needed and executing the chunk filtering, requests are now enqueued to the internal, per-tenant queue. The queue implements the same shuffle sharding mechanism as the queue in the query scheduler component. Workers then dequeue a batch of requests for a single tenant and multiplex them into a single processing task for each day. This has the big advantage that the chunks of multiple requests can be processed in a single sequential scan through a set a bloom blocks, without needing to skip back and forth within the binary stream of the block. --------- Signed-off-by: Christian Haudum <[email protected]>
What this PR does / why we need it:
This PR adds the worker implementation in the bloom gateways. The workers pull multiple items from the queue, multiplex them, and execute the chunk matching on the resolved bloom blocks.
The multiplexing is used to minimise the overhead of seeking and skipping through bloom blocks when matching chunks.
Todo
Checklist
CONTRIBUTING.md
guide (required)CHANGELOG.md
updatedadd-to-release-notes
labeldocs/sources/setup/upgrade/_index.md
production/helm/loki/Chart.yaml
and updateproduction/helm/loki/CHANGELOG.md
andproduction/helm/loki/README.md
. Example PRdeprecated-config.yaml
anddeleted-config.yaml
files respectively in thetools/deprecated-config-checker
directory. Example PR