Skip to content

Commit

Permalink
Implement inverted logic for checking chunks against blooms
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Nov 21, 2023
1 parent 476dd14 commit 6270a27
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 217 deletions.
57 changes: 44 additions & 13 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/queue"
"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/util"
Expand Down Expand Up @@ -300,28 +301,58 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
g.pendingTasks.Add(task.ID, task)
})

response := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs))
responseCount := 0
requestCount := len(req.Refs)
// TODO(chaudum): Use pool
responses := make([]v1.Output, 0, requestCount)

for {
select {
case <-ctx.Done():
return nil, errors.Wrap(ctx.Err(), "waiting for results")
case err := <-errCh:
return nil, errors.Wrap(err, "waiting for results")
case res := <-resCh:
responseCount++
responses = append(responses, res)
// log line is helpful for debugging tests
// level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp", res.Fp, "chunks", res.Chks.Len(), "progress", fmt.Sprintf("%d/%d", responseCount, len(req.Refs)))
if res.Chks.Len() > 0 {
response = append(response, &logproto.GroupedChunkRefs{
Tenant: tenantID,
Fingerprint: uint64(res.Fp),
Refs: convertToShortRefs(res.Chks),
})
}
// level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp", uint64(res.Fp), "chunks", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount))
// wait for all parts of the full response
if responseCount == len(req.Refs) {
return &logproto.FilterChunkRefResponse{ChunkRefs: response}, nil
if len(responses) == requestCount {
for _, o := range responses {
// we must not remove items from req.Refs as long as the worker may iterater over them
g.removeNotMatchingChunks(req, o)
}
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
}
}
}
}

func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) {
// binary search index of fingerprint
idx := sort.Search(len(req.Refs), func(i int) bool {
return req.Refs[i].Fingerprint >= uint64(res.Fp)
})

// fingerprint not found
if idx >= len(req.Refs) {
level.Error(g.logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp))
return
}

// if all chunks of a fingerprint are are removed
// then remove the whole group from the response
if len(req.Refs[idx].Refs) == res.Removals.Len() {
req.Refs[idx] = nil // avoid leaking pointer
req.Refs = append(req.Refs[:idx], req.Refs[idx+1:]...)
return
}

for i := range res.Removals {
toRemove := res.Removals[i]
for j := range req.Refs[idx].Refs {
if toRemove.Checksum == req.Refs[idx].Refs[j].Checksum {
req.Refs[idx].Refs[j] = nil // avoid leaking pointer
req.Refs[idx].Refs = append(req.Refs[idx].Refs[:j], req.Refs[idx].Refs[j+1:]...)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
})

chunkRefs := createQueryInputFromBlockData(t, tenantID, data, 100)
inputChunkRefs := groupRefs(t, chunkRefs)

t.Run("no match - return empty response", func(t *testing.T) {
inputChunkRefs := groupRefs(t, chunkRefs)
req := &logproto.FilterChunkRefRequest{
From: now.Add(-8 * time.Hour),
Through: now,
Expand All @@ -279,6 +279,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
})

t.Run("match - return filtered", func(t *testing.T) {
inputChunkRefs := groupRefs(t, chunkRefs)
// hack to get indexed key for a specific series
// the indexed key range for a series is defined as
// i * keysPerSeries ... i * keysPerSeries + keysPerSeries - 1
Expand Down
38 changes: 14 additions & 24 deletions pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,12 @@ func (t Task) Bounds() (time.Time, time.Time) {
return getDayTime(t.Request.From), getDayTime(t.Request.Through)
}

// CopyWithRequest returns a copy of the original task, but with newly provided request
func (t Task) CopyWithRequest(req *logproto.FilterChunkRefRequest) Task {
return Task{
ID: t.ID,
Tenant: t.Tenant,
Request: req,
ErrCh: t.ErrCh,
ResCh: t.ResCh,
func (t Task) ChunkIterForDay(day time.Time) v1.Iterator[*logproto.GroupedChunkRefs] {
cf := filterGroupedChunkRefsByDay{day: day}
return &FilterIter[*logproto.GroupedChunkRefs]{
iter: v1.NewSliceIter(t.Request.Refs),
matches: cf.contains,
transform: cf.filter,
}
}

Expand All @@ -81,19 +79,21 @@ func (cf filterGroupedChunkRefsByDay) filter(a *logproto.GroupedChunkRefs) *logp
minTs, maxTs := getFromThrough(a.Refs)

// in most cases, all chunks are within day range
if minTs.Time().Compare(cf.day) >= 0 && maxTs.Time().Before(cf.day.Add(24*time.Hour)) {
if minTs.Time().Compare(cf.day) >= 0 && maxTs.Time().Before(cf.day.Add(Day)) {
return a
}

// case where certain chunks are outside of day range
// using binary search to get min and max index of chunks that fall into the day range
min := sort.Search(len(a.Refs), func(i int) bool {
start := a.Refs[i].From.Time()
return start.Compare(cf.day) >= 0 && start.Compare(cf.day.Add(Day)) < 0
end := a.Refs[i].Through.Time()
return start.Compare(cf.day) >= 0 || end.Compare(cf.day) >= 0
})

max := sort.Search(len(a.Refs), func(i int) bool {
start := a.Refs[i].From.Time()
return start.Compare(cf.day.Add(Day)) >= 0
return start.Compare(cf.day.Add(Day)) > 0
})

return &logproto.GroupedChunkRefs{
Expand All @@ -103,22 +103,12 @@ func (cf filterGroupedChunkRefsByDay) filter(a *logproto.GroupedChunkRefs) *logp
}
}

func (t Task) ChunkIterForDay(day time.Time) v1.PeekingIterator[*logproto.GroupedChunkRefs] {
cf := filterGroupedChunkRefsByDay{day: day}
iter := &FilterIter[*logproto.GroupedChunkRefs]{
iter: v1.NewSliceIter(t.Request.Refs),
predicate: cf.contains,
transform: cf.filter,
}
return v1.NewPeekingIter[*logproto.GroupedChunkRefs](iter)
}

type Predicate[T any] func(a T) bool
type Transform[T any] func(a T) T

type FilterIter[T any] struct {
iter v1.Iterator[T]
predicate Predicate[T]
matches Predicate[T]
transform Transform[T]
cache T
zero T // zero value of the return type of Next()
Expand All @@ -130,7 +120,7 @@ func (it *FilterIter[T]) Next() bool {
it.cache = it.zero
return false
}
for next && !it.predicate(it.iter.At()) {
for next && !it.matches(it.iter.At()) {
next = it.iter.Next()
if !next {
it.cache = it.zero
Expand Down Expand Up @@ -178,7 +168,7 @@ func (it *taskMergeIterator) init() {
sequences := make([]v1.PeekingIterator[IndexedValue[*logproto.GroupedChunkRefs]], 0, len(it.tasks))
for i := range it.tasks {
iter := NewIterWithIndex(it.tasks[i].ChunkIterForDay(it.day), i)
sequences = append(sequences, iter)
sequences = append(sequences, v1.NewPeekingIter(iter))
}
it.heap = v1.NewHeapIterator(
func(i, j IndexedValue[*logproto.GroupedChunkRefs]) bool {
Expand Down
70 changes: 70 additions & 0 deletions pkg/bloomgateway/multiplexing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,73 @@ func TestTaskMergeIterator(t *testing.T) {
}
})
}

func TestChunkIterForDay(t *testing.T) {
tenant := "fake"

// Thu Nov 09 2023 10:56:50 UTC
ts := model.TimeFromUnix(1699523810)

t.Run("filter chunk refs that fall into the day range", func(t *testing.T) {
input := &logproto.FilterChunkRefRequest{
From: ts.Add(-168 * time.Hour), // 1w ago
Through: ts,
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-168 * time.Hour), Through: ts.Add(-167 * time.Hour), Checksum: 100},
{From: ts.Add(-143 * time.Hour), Through: ts.Add(-142 * time.Hour), Checksum: 101},
}},
{Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-144 * time.Hour), Through: ts.Add(-143 * time.Hour), Checksum: 200},
{From: ts.Add(-119 * time.Hour), Through: ts.Add(-118 * time.Hour), Checksum: 201},
}},
{Fingerprint: 300, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-120 * time.Hour), Through: ts.Add(-119 * time.Hour), Checksum: 300},
{From: ts.Add(-95 * time.Hour), Through: ts.Add(-94 * time.Hour), Checksum: 301},
}},
{Fingerprint: 400, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-96 * time.Hour), Through: ts.Add(-95 * time.Hour), Checksum: 400},
{From: ts.Add(-71 * time.Hour), Through: ts.Add(-70 * time.Hour), Checksum: 401},
}},
{Fingerprint: 500, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-72 * time.Hour), Through: ts.Add(-71 * time.Hour), Checksum: 500},
{From: ts.Add(-47 * time.Hour), Through: ts.Add(-46 * time.Hour), Checksum: 501},
}},
{Fingerprint: 600, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-48 * time.Hour), Through: ts.Add(-47 * time.Hour), Checksum: 600},
{From: ts.Add(-23 * time.Hour), Through: ts.Add(-22 * time.Hour), Checksum: 601},
}},
{Fingerprint: 700, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-24 * time.Hour), Through: ts.Add(-23 * time.Hour), Checksum: 700},
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 701},
}},
},
Filters: []*logproto.LineFilterExpression{
{Operator: 1, Match: "foo"},
{Operator: 1, Match: "bar"},
},
}

// day ranges from ts-48h to ts-24h
day := getDayTime(ts.Add(-36 * time.Hour))

expected := []*logproto.GroupedChunkRefs{
{Fingerprint: 500, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-47 * time.Hour), Through: ts.Add(-46 * time.Hour), Checksum: 501},
}},
{Fingerprint: 600, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-48 * time.Hour), Through: ts.Add(-47 * time.Hour), Checksum: 600},
}},
}

task, _, _, _ := NewTask(tenant, input)
it := task.ChunkIterForDay(day)

output := make([]*logproto.GroupedChunkRefs, 0, len(input.Refs))
for it.Next() {
output = append(output, it.At())
}

require.Equal(t, expected, output)
})
}
73 changes: 5 additions & 68 deletions pkg/bloomgateway/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,20 @@ type IndexedValue[T any] struct {
}

type IterWithIndex[T any] struct {
v1.PeekingIterator[T]
v1.Iterator[T]
zero T // zero value of T
cache IndexedValue[T]
}

func (it *IterWithIndex[T]) At() IndexedValue[T] {
it.cache.val = it.PeekingIterator.At()
it.cache.val = it.Iterator.At()
return it.cache
}

func (it *IterWithIndex[T]) Peek() (IndexedValue[T], bool) {
peek, ok := it.PeekingIterator.Peek()
if !ok {
it.cache.val = it.zero
return it.cache, false
}
it.cache.val = peek
return it.cache, true
}

func NewIterWithIndex[T any](iter v1.PeekingIterator[T], idx int) v1.PeekingIterator[IndexedValue[T]] {
func NewIterWithIndex[T any](iter v1.Iterator[T], idx int) v1.Iterator[IndexedValue[T]] {
return &IterWithIndex[T]{
PeekingIterator: iter,
cache: IndexedValue[T]{idx: idx},
Iterator: iter,
cache: IndexedValue[T]{idx: idx},
}
}

Expand Down Expand Up @@ -84,59 +74,6 @@ func getDayTime(ts model.Time) time.Time {
return time.Date(ts.Time().Year(), ts.Time().Month(), ts.Time().Day(), 0, 0, 0, 0, time.UTC)
}

func filterRequestForDay(r *logproto.FilterChunkRefRequest, day time.Time) *logproto.FilterChunkRefRequest {
through := model.TimeFromUnix(day.Unix())
from := model.TimeFromUnix(day.Add(24 * time.Hour).Unix())

refs := make([]*logproto.GroupedChunkRefs, 0, len(r.Refs))
for i := range r.Refs {
groupedChunkRefs := &logproto.GroupedChunkRefs{
Fingerprint: r.Refs[i].Fingerprint,
Tenant: r.Refs[i].Tenant,
Refs: make([]*logproto.ShortRef, 0, len(r.Refs[i].Refs)),
}
for j := range r.Refs[i].Refs {
shortRef := r.Refs[i].Refs[j]
fromDay := getDayTime(shortRef.From)
if fromDay.After(day) {
break
}
throughDay := getDayTime(shortRef.Through)
if fromDay.Equal(day) || throughDay.Equal(day) {
groupedChunkRefs.Refs = append(groupedChunkRefs.Refs, shortRef)
}
}

// do not add empty groups to request
if len(groupedChunkRefs.Refs) == 0 {
continue
}

groupFrom, groupThrough := getFromThrough(groupedChunkRefs.Refs)
if groupFrom.Before(from) {
from = groupFrom
}
if groupThrough.After(through) {
through = groupThrough
}
refs = append(refs, groupedChunkRefs)
}

// The initial value of `from` is the through time and vice versa.
// This is, in order to determine min From and max Through.
// In case no chunk refs match, we need to swap the initial value again.
if len(refs) == 0 {
from, through = through, from
}

return &logproto.FilterChunkRefRequest{
From: from,
Through: through,
Refs: refs,
Filters: r.Filters,
}
}

// TODO(chaudum): Fix Through time calculation
// getFromThrough assumes a list of ShortRefs sorted by From time
// However, it does also assume that the last item has the highest
Expand Down
Loading

0 comments on commit 6270a27

Please sign in to comment.