Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bloom Gateway: Implement chunk filtering using workers that multiplex requests #11181

Merged
merged 44 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
eb07d82
Extract existing worker code into separate service struct
chaudum Oct 20, 2023
4a10400
Remove duplicate import
chaudum Oct 20, 2023
9177822
Multiplex bloom query tasks
chaudum Oct 25, 2023
8af5f39
Add sync pool
chaudum Nov 6, 2023
e66f1d3
Functional test for FilterChunkRefs() gRPC method
chaudum Nov 8, 2023
8ade59c
Add basic worker metrics
chaudum Nov 9, 2023
4cd9961
Fix filterRequestForDay function
chaudum Nov 9, 2023
16899e6
Add tests for taskMergeIterator
chaudum Nov 9, 2023
8c5a945
Fix linter warnings
chaudum Nov 9, 2023
c3f2368
Fix import order
chaudum Nov 9, 2023
266479b
Implement Err() for taskMergeIterator
chaudum Nov 9, 2023
f443fe0
Fix test and linter
chaudum Nov 9, 2023
afec782
Add comment about optimized processing block
chaudum Nov 9, 2023
6205140
Fix filter test
chaudum Nov 10, 2023
e330977
Add helper function for parsing time in tests
chaudum Nov 10, 2023
218ec25
Remove unused utility function
chaudum Nov 14, 2023
7cb3a6a
Improve comments
chaudum Nov 14, 2023
bae443d
Make function name more explicit
chaudum Nov 14, 2023
a68af7d
Unify bloomgateway metrics creation
chaudum Nov 14, 2023
b4e623f
Better variable naming
chaudum Nov 14, 2023
cd4011a
Split worker code into separate go file
chaudum Nov 14, 2023
e15094c
Make separate interface for NesterIterator
chaudum Nov 14, 2023
d9fa50b
Test iterator Reset() more thoroughly
chaudum Nov 14, 2023
388d818
Fix import order
chaudum Nov 14, 2023
01a6166
Remove unnecessary tenant ID assertion
chaudum Nov 15, 2023
f4eeb5a
Remove unnecessary cast
chaudum Nov 15, 2023
6aee5e7
Use `for select` instead of `for ctx.Err()==nil`
chaudum Nov 15, 2023
29c2d14
Change task merge iterator to also filter chunkrefs by day
chaudum Nov 16, 2023
f01220c
Add TODO for using a pool when dequeueing many
chaudum Nov 16, 2023
81c4982
Use a buffer pool when dequeuing many
chaudum Nov 16, 2023
ed09492
Improve best case for chunk filtering by day
chaudum Nov 17, 2023
c2ac04a
Implement inverted logic for checking chunks against blooms
chaudum Nov 21, 2023
66a4ac5
Towards single-pass iteration for querying of block
chaudum Nov 21, 2023
6112b7f
Implement single-pass iterator when querying bloom filters
chaudum Nov 22, 2023
7340e5d
Avoid unnecessary chunk removal function call
chaudum Nov 23, 2023
7dd6f56
Add TODO for search string conversion
chaudum Nov 23, 2023
14079c9
Fix clearing of slice buffer in the worker
chaudum Nov 23, 2023
d010e99
Avoid collecting items from interator into slice
chaudum Nov 23, 2023
4900e24
Revert accidental renaming of variable
chaudum Nov 23, 2023
65bfcc9
Remove duplicate BoundsCheck declaration
chaudum Nov 23, 2023
579250c
Rename BuffePool to SlicePool
chaudum Nov 23, 2023
733c42c
Avoid mutability bug
chaudum Nov 23, 2023
4f3d7df
Only send error to the affected tasks if fuse operation fails
chaudum Nov 23, 2023
8893eef
Fix broken import from rebase
chaudum Nov 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 103 additions & 114 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ of line filter expressions.
|
bloomgateway.Gateway
|
queue.RequestQueue
|
bloomgateway.Worker
|
bloomshipper.Store
|
bloomshipper.Shipper
Expand Down Expand Up @@ -56,14 +60,14 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/queue"
"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/constants"
)

var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring")
var errInvalidTenant = errors.New("invalid tenant in chunk refs")

// TODO(chaudum): Make these configurable
const (
Expand All @@ -72,22 +76,26 @@ const (
pendingTasksInitialCap = 1024
)

const (
metricsSubsystem = "bloom_gateway"
)

type metrics struct {
queueDuration prometheus.Histogram
inflightRequests prometheus.Summary
}

func newMetrics(subsystem string, registerer prometheus.Registerer) *metrics {
func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics {
return &metrics{
queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Namespace: constants.Loki,
Namespace: namespace,
Subsystem: subsystem,
Name: "queue_duration_seconds",
Help: "Time spent by tasks in queue before getting picked up by a worker.",
Buckets: prometheus.DefBuckets,
}),
inflightRequests: promauto.With(registerer).NewSummary(prometheus.SummaryOpts{
Namespace: constants.Loki,
Namespace: namespace,
Subsystem: subsystem,
Name: "inflight_tasks",
Help: "Number of inflight tasks (either queued or processing) sampled at a regular interval. Quantile buckets keep track of inflight tasks over the last 60s.",
Expand All @@ -98,40 +106,6 @@ func newMetrics(subsystem string, registerer prometheus.Registerer) *metrics {
}
}

// Task is the data structure that is enqueued to the internal queue and queued by query workers
type Task struct {
// ID is a lexcographically sortable unique identifier of the task
ID ulid.ULID
// Tenant is the tenant ID
Tenant string
// Request is the original request
Request *logproto.FilterChunkRefRequest
// ErrCh is a send-only channel to write an error to
ErrCh chan<- error
// ResCh is a send-only channel to write partial responses to
ResCh chan<- *logproto.GroupedChunkRefs
}

// newTask returns a new Task that can be enqueued to the task queue.
// As additional arguments, it returns a result and an error channel, as well
// as an error if the instantiation fails.
func newTask(tenantID string, req *logproto.FilterChunkRefRequest) (Task, chan *logproto.GroupedChunkRefs, chan error, error) {
key, err := ulid.New(ulid.Now(), nil)
if err != nil {
return Task{}, nil, nil, err
}
errCh := make(chan error, 1)
resCh := make(chan *logproto.GroupedChunkRefs, 1)
task := Task{
ID: key,
Tenant: tenantID,
Request: req,
ErrCh: errCh,
ResCh: resCh,
}
return task, resCh, errCh, nil
}

// SyncMap is a map structure which can be synchronized using the RWMutex
type SyncMap[k comparable, v any] struct {
sync.RWMutex
Expand Down Expand Up @@ -169,34 +143,43 @@ func makePendingTasks(n int) *pendingTasks {
type Gateway struct {
services.Service

cfg Config
logger log.Logger
metrics *metrics
cfg Config
logger log.Logger

queue *queue.RequestQueue
queueMetrics *queue.Metrics
activeUsers *util.ActiveUsersCleanupService
bloomStore bloomshipper.Store
metrics *metrics
workerMetrics *workerMetrics
queueMetrics *queue.Metrics

queue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService
bloomStore bloomshipper.Store

sharding ShardingStrategy

pendingTasks *pendingTasks

serviceMngr *services.Manager
serviceWatcher *services.FailureWatcher

workerConfig workerConfig
}

// New returns a new instance of the Bloom Gateway.
func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, overrides Limits, shardingStrategy ShardingStrategy, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
g := &Gateway{
cfg: cfg,
logger: logger,
metrics: newMetrics("bloom_gateway", reg),
metrics: newMetrics(reg, constants.Loki, metricsSubsystem),
sharding: shardingStrategy,
pendingTasks: makePendingTasks(pendingTasksInitialCap),
workerConfig: workerConfig{
maxWaitTime: 200 * time.Millisecond,
maxItems: 100,
},
workerMetrics: newWorkerMetrics(reg, constants.Loki, metricsSubsystem),
queueMetrics: queue.NewMetrics(reg, constants.Loki, metricsSubsystem),
}

g.queueMetrics = queue.NewMetrics(reg, constants.Loki, "bloom_gateway")
g.queue = queue.NewRequestQueue(maxTasksPerTenant, time.Minute, g.queueMetrics)
g.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(g.queueMetrics.Cleanup)

Expand All @@ -215,19 +198,32 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, o
return nil, err
}

// We need to keep a reference to be able to call Stop() on shutdown of the gateway.
g.bloomStore = bloomStore

if err := g.initServices(); err != nil {
return nil, err
}
g.Service = services.NewBasicService(g.starting, g.running, g.stopping).WithName("bloom-gateway")

return g, nil
}

func (g *Gateway) initServices() error {
var err error
svcs := []services.Service{g.queue, g.activeUsers}
for i := 0; i < numWorkers; i++ {
salvacorts marked this conversation as resolved.
Show resolved Hide resolved
id := fmt.Sprintf("bloom-query-worker-%d", i)
w := newWorker(id, g.workerConfig, g.queue, g.bloomStore, g.pendingTasks, g.logger, g.workerMetrics)
svcs = append(svcs, w)
}
g.serviceMngr, err = services.NewManager(svcs...)
if err != nil {
return nil, err
return err
}
g.serviceWatcher = services.NewFailureWatcher()
g.serviceWatcher.WatchManager(g.serviceMngr)

g.Service = services.NewBasicService(g.starting, g.running, g.stopping).WithName("bloom-gateway")

return g, nil
return nil
}

func (g *Gateway) starting(ctx context.Context) error {
Expand All @@ -245,10 +241,6 @@ func (g *Gateway) starting(ctx context.Context) error {
return errors.Wrap(err, "unable to start bloom gateway subservices")
}

for i := 0; i < numWorkers; i++ {
go g.startWorker(ctx, fmt.Sprintf("worker-%d", i))
}

return nil
}

Expand Down Expand Up @@ -278,71 +270,26 @@ func (g *Gateway) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr)
}

// This is just a dummy implementation of the worker!
// TODO(chaudum): Implement worker that dequeues multiple pending tasks and
// multiplexes them prior to execution.
func (g *Gateway) startWorker(_ context.Context, id string) error {
level.Info(g.logger).Log("msg", "starting worker", "worker", id)

g.queue.RegisterConsumerConnection(id)
defer g.queue.UnregisterConsumerConnection(id)

idx := queue.StartIndexWithLocalQueue

for {
ctx := context.Background()
item, newIdx, err := g.queue.Dequeue(ctx, idx, id)
if err != nil {
if err != queue.ErrStopped {
level.Error(g.logger).Log("msg", "failed to dequeue task", "worker", id, "err", err)
continue
}
level.Info(g.logger).Log("msg", "stopping worker", "worker", id)
return err
}
task, ok := item.(Task)
if !ok {
level.Error(g.logger).Log("msg", "failed to cast to Task", "item", item)
continue
}

idx = newIdx
level.Info(g.logger).Log("msg", "dequeued task", "worker", id, "task", task.ID)
g.pendingTasks.Delete(task.ID)

r := task.Request
if len(r.Filters) > 0 {
r.Refs, err = g.bloomStore.FilterChunkRefs(ctx, task.Tenant, r.From.Time(), r.Through.Time(), r.Refs, r.Filters...)
}
if err != nil {
task.ErrCh <- err
} else {
for _, ref := range r.Refs {
task.ResCh <- ref
}
}
}
}

// FilterChunkRefs implements BloomGatewayServer
func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest) (*logproto.FilterChunkRefResponse, error) {
tenantID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

for _, ref := range req.Refs {
if ref.Tenant != tenantID {
return nil, errors.Wrapf(errInvalidTenant, "expected chunk refs from tenant %s, got tenant %s", tenantID, ref.Tenant)
}
// Shortcut if request does not contain filters
if len(req.Filters) == 0 {
return &logproto.FilterChunkRefResponse{
ChunkRefs: req.Refs,
}, nil
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should loop over req.Refs to ensure the tenant matches our expectation here. This is costly in terms of CPU cycles and we should ensure it beforehand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Removed the assertion

// Sort ChunkRefs by fingerprint in ascending order
sort.Slice(req.Refs, func(i, j int) bool {
return req.Refs[i].Fingerprint < req.Refs[j].Fingerprint
})

task, resCh, errCh, err := newTask(tenantID, req)
task, resCh, errCh, err := NewTask(tenantID, req)
if err != nil {
return nil, err
}
Expand All @@ -354,19 +301,61 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
g.pendingTasks.Add(task.ID, task)
})

response := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs))
requestCount := len(req.Refs)
// TODO(chaudum): Use pool
responses := make([]v1.Output, 0, requestCount)

for {
select {
case <-ctx.Done():
return nil, ctx.Err()
return nil, errors.Wrap(ctx.Err(), "waiting for results")
case err := <-errCh:
return nil, err
return nil, errors.Wrap(err, "waiting for results")
case res := <-resCh:
level.Info(g.logger).Log("msg", "got result", "task", task.ID, "tenant", tenantID, "res", res)
responses = append(responses, res)
// log line is helpful for debugging tests
// level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp", uint64(res.Fp), "chunks", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount))
// wait for all parts of the full response
response = append(response, res)
if len(response) == len(req.Refs) {
return &logproto.FilterChunkRefResponse{ChunkRefs: response}, nil
if len(responses) == requestCount {
for _, o := range responses {
if res.Removals.Len() == 0 {
continue
}
// we must not remove items from req.Refs as long as the worker may iterater over them
g.removeNotMatchingChunks(req, o)
Copy link
Contributor

@vlad-diachenko vlad-diachenko Nov 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could skip the call to this method if Removals.Len() is 0.

}
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
}
}
}
}

func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) {
// binary search index of fingerprint
idx := sort.Search(len(req.Refs), func(i int) bool {
return req.Refs[i].Fingerprint >= uint64(res.Fp)
})

// fingerprint not found
if idx >= len(req.Refs) {
level.Error(g.logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp))
return
}

// if all chunks of a fingerprint are are removed
// then remove the whole group from the response
if len(req.Refs[idx].Refs) == res.Removals.Len() {
req.Refs[idx] = nil // avoid leaking pointer
req.Refs = append(req.Refs[:idx], req.Refs[idx+1:]...)
return
}

for i := range res.Removals {
toRemove := res.Removals[i]
for j := range req.Refs[idx].Refs {
if toRemove.Checksum == req.Refs[idx].Refs[j].Checksum {
req.Refs[idx].Refs[j] = nil // avoid leaking pointer
req.Refs[idx].Refs = append(req.Refs[idx].Refs[:j], req.Refs[idx].Refs[j+1:]...)
}
}
}
Expand Down
Loading
Loading