Skip to content

Commit

Permalink
Bloom Gateway: Implement chunk filtering using workers that multiplex…
Browse files Browse the repository at this point in the history
… requests (#11181)

This change adds an internal request queue to the bloom gateway. Instead of executing every single request individually, which involves resolving bloom blocks, downloading them if needed and executing the chunk filtering, requests are now enqueued to the internal, per-tenant queue. The queue implements the same shuffle sharding mechanism as the queue in the query scheduler component.
Workers then dequeue a batch of requests for a single tenant and multiplex them into a single processing task for each day. This has the big advantage that the chunks of multiple requests can be processed in a single sequential scan through a set a bloom blocks, without needing to skip back and forth within the binary stream of the block. 

---------

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Nov 24, 2023
1 parent 09cb9ae commit d62d4e3
Show file tree
Hide file tree
Showing 17 changed files with 1,487 additions and 447 deletions.
217 changes: 103 additions & 114 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ of line filter expressions.
|
bloomgateway.Gateway
|
queue.RequestQueue
|
bloomgateway.Worker
|
bloomshipper.Store
|
bloomshipper.Shipper
Expand Down Expand Up @@ -56,14 +60,14 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/queue"
"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/constants"
)

var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring")
var errInvalidTenant = errors.New("invalid tenant in chunk refs")

// TODO(chaudum): Make these configurable
const (
Expand All @@ -72,22 +76,26 @@ const (
pendingTasksInitialCap = 1024
)

const (
metricsSubsystem = "bloom_gateway"
)

type metrics struct {
queueDuration prometheus.Histogram
inflightRequests prometheus.Summary
}

func newMetrics(subsystem string, registerer prometheus.Registerer) *metrics {
func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics {
return &metrics{
queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Namespace: constants.Loki,
Namespace: namespace,
Subsystem: subsystem,
Name: "queue_duration_seconds",
Help: "Time spent by tasks in queue before getting picked up by a worker.",
Buckets: prometheus.DefBuckets,
}),
inflightRequests: promauto.With(registerer).NewSummary(prometheus.SummaryOpts{
Namespace: constants.Loki,
Namespace: namespace,
Subsystem: subsystem,
Name: "inflight_tasks",
Help: "Number of inflight tasks (either queued or processing) sampled at a regular interval. Quantile buckets keep track of inflight tasks over the last 60s.",
Expand All @@ -98,40 +106,6 @@ func newMetrics(subsystem string, registerer prometheus.Registerer) *metrics {
}
}

// Task is the data structure that is enqueued to the internal queue and queued by query workers
type Task struct {
// ID is a lexcographically sortable unique identifier of the task
ID ulid.ULID
// Tenant is the tenant ID
Tenant string
// Request is the original request
Request *logproto.FilterChunkRefRequest
// ErrCh is a send-only channel to write an error to
ErrCh chan<- error
// ResCh is a send-only channel to write partial responses to
ResCh chan<- *logproto.GroupedChunkRefs
}

// newTask returns a new Task that can be enqueued to the task queue.
// As additional arguments, it returns a result and an error channel, as well
// as an error if the instantiation fails.
func newTask(tenantID string, req *logproto.FilterChunkRefRequest) (Task, chan *logproto.GroupedChunkRefs, chan error, error) {
key, err := ulid.New(ulid.Now(), nil)
if err != nil {
return Task{}, nil, nil, err
}
errCh := make(chan error, 1)
resCh := make(chan *logproto.GroupedChunkRefs, 1)
task := Task{
ID: key,
Tenant: tenantID,
Request: req,
ErrCh: errCh,
ResCh: resCh,
}
return task, resCh, errCh, nil
}

// SyncMap is a map structure which can be synchronized using the RWMutex
type SyncMap[k comparable, v any] struct {
sync.RWMutex
Expand Down Expand Up @@ -169,34 +143,43 @@ func makePendingTasks(n int) *pendingTasks {
type Gateway struct {
services.Service

cfg Config
logger log.Logger
metrics *metrics
cfg Config
logger log.Logger

queue *queue.RequestQueue
queueMetrics *queue.Metrics
activeUsers *util.ActiveUsersCleanupService
bloomStore bloomshipper.Store
metrics *metrics
workerMetrics *workerMetrics
queueMetrics *queue.Metrics

queue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService
bloomStore bloomshipper.Store

sharding ShardingStrategy

pendingTasks *pendingTasks

serviceMngr *services.Manager
serviceWatcher *services.FailureWatcher

workerConfig workerConfig
}

// New returns a new instance of the Bloom Gateway.
func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, overrides Limits, shardingStrategy ShardingStrategy, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
g := &Gateway{
cfg: cfg,
logger: logger,
metrics: newMetrics("bloom_gateway", reg),
metrics: newMetrics(reg, constants.Loki, metricsSubsystem),
sharding: shardingStrategy,
pendingTasks: makePendingTasks(pendingTasksInitialCap),
workerConfig: workerConfig{
maxWaitTime: 200 * time.Millisecond,
maxItems: 100,
},
workerMetrics: newWorkerMetrics(reg, constants.Loki, metricsSubsystem),
queueMetrics: queue.NewMetrics(reg, constants.Loki, metricsSubsystem),
}

g.queueMetrics = queue.NewMetrics(reg, constants.Loki, "bloom_gateway")
g.queue = queue.NewRequestQueue(maxTasksPerTenant, time.Minute, g.queueMetrics)
g.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(g.queueMetrics.Cleanup)

Expand All @@ -215,19 +198,32 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, o
return nil, err
}

// We need to keep a reference to be able to call Stop() on shutdown of the gateway.
g.bloomStore = bloomStore

if err := g.initServices(); err != nil {
return nil, err
}
g.Service = services.NewBasicService(g.starting, g.running, g.stopping).WithName("bloom-gateway")

return g, nil
}

func (g *Gateway) initServices() error {
var err error
svcs := []services.Service{g.queue, g.activeUsers}
for i := 0; i < numWorkers; i++ {
id := fmt.Sprintf("bloom-query-worker-%d", i)
w := newWorker(id, g.workerConfig, g.queue, g.bloomStore, g.pendingTasks, g.logger, g.workerMetrics)
svcs = append(svcs, w)
}
g.serviceMngr, err = services.NewManager(svcs...)
if err != nil {
return nil, err
return err
}
g.serviceWatcher = services.NewFailureWatcher()
g.serviceWatcher.WatchManager(g.serviceMngr)

g.Service = services.NewBasicService(g.starting, g.running, g.stopping).WithName("bloom-gateway")

return g, nil
return nil
}

func (g *Gateway) starting(ctx context.Context) error {
Expand All @@ -245,10 +241,6 @@ func (g *Gateway) starting(ctx context.Context) error {
return errors.Wrap(err, "unable to start bloom gateway subservices")
}

for i := 0; i < numWorkers; i++ {
go g.startWorker(ctx, fmt.Sprintf("worker-%d", i))
}

return nil
}

Expand Down Expand Up @@ -278,71 +270,26 @@ func (g *Gateway) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr)
}

// This is just a dummy implementation of the worker!
// TODO(chaudum): Implement worker that dequeues multiple pending tasks and
// multiplexes them prior to execution.
func (g *Gateway) startWorker(_ context.Context, id string) error {
level.Info(g.logger).Log("msg", "starting worker", "worker", id)

g.queue.RegisterConsumerConnection(id)
defer g.queue.UnregisterConsumerConnection(id)

idx := queue.StartIndexWithLocalQueue

for {
ctx := context.Background()
item, newIdx, err := g.queue.Dequeue(ctx, idx, id)
if err != nil {
if err != queue.ErrStopped {
level.Error(g.logger).Log("msg", "failed to dequeue task", "worker", id, "err", err)
continue
}
level.Info(g.logger).Log("msg", "stopping worker", "worker", id)
return err
}
task, ok := item.(Task)
if !ok {
level.Error(g.logger).Log("msg", "failed to cast to Task", "item", item)
continue
}

idx = newIdx
level.Info(g.logger).Log("msg", "dequeued task", "worker", id, "task", task.ID)
g.pendingTasks.Delete(task.ID)

r := task.Request
if len(r.Filters) > 0 {
r.Refs, err = g.bloomStore.FilterChunkRefs(ctx, task.Tenant, r.From.Time(), r.Through.Time(), r.Refs, r.Filters...)
}
if err != nil {
task.ErrCh <- err
} else {
for _, ref := range r.Refs {
task.ResCh <- ref
}
}
}
}

// FilterChunkRefs implements BloomGatewayServer
func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest) (*logproto.FilterChunkRefResponse, error) {
tenantID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

for _, ref := range req.Refs {
if ref.Tenant != tenantID {
return nil, errors.Wrapf(errInvalidTenant, "expected chunk refs from tenant %s, got tenant %s", tenantID, ref.Tenant)
}
// Shortcut if request does not contain filters
if len(req.Filters) == 0 {
return &logproto.FilterChunkRefResponse{
ChunkRefs: req.Refs,
}, nil
}

// Sort ChunkRefs by fingerprint in ascending order
sort.Slice(req.Refs, func(i, j int) bool {
return req.Refs[i].Fingerprint < req.Refs[j].Fingerprint
})

task, resCh, errCh, err := newTask(tenantID, req)
task, resCh, errCh, err := NewTask(tenantID, req)
if err != nil {
return nil, err
}
Expand All @@ -354,19 +301,61 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
g.pendingTasks.Add(task.ID, task)
})

response := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs))
requestCount := len(req.Refs)
// TODO(chaudum): Use pool
responses := make([]v1.Output, 0, requestCount)

for {
select {
case <-ctx.Done():
return nil, ctx.Err()
return nil, errors.Wrap(ctx.Err(), "waiting for results")
case err := <-errCh:
return nil, err
return nil, errors.Wrap(err, "waiting for results")
case res := <-resCh:
level.Info(g.logger).Log("msg", "got result", "task", task.ID, "tenant", tenantID, "res", res)
responses = append(responses, res)
// log line is helpful for debugging tests
// level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp", uint64(res.Fp), "chunks", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount))
// wait for all parts of the full response
response = append(response, res)
if len(response) == len(req.Refs) {
return &logproto.FilterChunkRefResponse{ChunkRefs: response}, nil
if len(responses) == requestCount {
for _, o := range responses {
if res.Removals.Len() == 0 {
continue
}
// we must not remove items from req.Refs as long as the worker may iterater over them
g.removeNotMatchingChunks(req, o)
}
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
}
}
}
}

func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) {
// binary search index of fingerprint
idx := sort.Search(len(req.Refs), func(i int) bool {
return req.Refs[i].Fingerprint >= uint64(res.Fp)
})

// fingerprint not found
if idx >= len(req.Refs) {
level.Error(g.logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp))
return
}

// if all chunks of a fingerprint are are removed
// then remove the whole group from the response
if len(req.Refs[idx].Refs) == res.Removals.Len() {
req.Refs[idx] = nil // avoid leaking pointer
req.Refs = append(req.Refs[:idx], req.Refs[idx+1:]...)
return
}

for i := range res.Removals {
toRemove := res.Removals[i]
for j := range req.Refs[idx].Refs {
if toRemove.Checksum == req.Refs[idx].Refs[j].Checksum {
req.Refs[idx].Refs[j] = nil // avoid leaking pointer
req.Refs[idx].Refs = append(req.Refs[idx].Refs[:j], req.Refs[idx].Refs[j+1:]...)
}
}
}
Expand Down
Loading

0 comments on commit d62d4e3

Please sign in to comment.