Skip to content

Commit

Permalink
APPS-1312-after-digest-fix
Browse files Browse the repository at this point in the history
- fix setting digest to partition filters
  • Loading branch information
filkeith committed Oct 21, 2024
1 parent 8ce777b commit 0165661
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions config_partition_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ func NewPartitionFilterByDigest(namespace, digest string) (*a.PartitionFilter, e
return nil, err
}

return a.NewPartitionFilterByKey(key), nil
pf := a.NewPartitionFilterByKey(key)
// We must nullify digest, to start partition from the beginning.
pf.Digest = nil

return pf, nil
}

// NewPartitionFilterAfterDigest returns partition filter to scan call records after digest.
Expand Down Expand Up @@ -86,7 +90,7 @@ func splitPartitions(partitionFilters []*a.PartitionFilter, numWorkers int) ([]*
}

// If we have one partition filter with range.
if len(partitionFilters) == 1 && partitionFilters[0].Count != 1 && partitionFilters[0].Digest == nil {
if len(partitionFilters) == 1 && partitionFilters[0].Count != 1 {
return splitPartitionRange(partitionFilters[0], numWorkers), nil
}

Expand Down Expand Up @@ -158,6 +162,10 @@ func splitPartitionRange(partitionFilters *a.PartitionFilter, numWorkers int) []
result[j].Begin = (j * partitionFilters.Count) / numWorkers
result[j].Count = (((j + 1) * partitionFilters.Count) / numWorkers) - result[j].Begin
result[j].Begin += partitionFilters.Begin
// Set digest property for the first group.
if partitionFilters.Digest != nil && j == 0 {
result[j].Digest = partitionFilters.Digest
}
}

return result
Expand Down

0 comments on commit 0165661

Please sign in to comment.