Skip to content

Commit

Permalink
Switch sharding struct from array to map
Browse files Browse the repository at this point in the history
Improves the ability to perform resharding by switching the sharding
struct from array to map. Each map entry has a key which is used in
rendezvous hashing to deterministically select which shard to use from
the collection of keys. When a shard is removed it is guaranteed that
only blobs which belonged to the removed shard will resolve to a new
shard.

In combination with ReadFallbackConfigurations this allows adding and
removing shards with minimal need to rebalance the blobs between the
shards.

See https://github.com/buildbarn/bb-adrs #11 for more details.
  • Loading branch information
meroton-benjamin committed Feb 20, 2025
1 parent 214cfae commit 86f3bfb
Show file tree
Hide file tree
Showing 13 changed files with 480 additions and 355 deletions.
2 changes: 1 addition & 1 deletion internal/mock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ gomock(
gomock(
name = "blobstore_sharding",
out = "blobstore_sharding.go",
interfaces = ["ShardPermuter"],
interfaces = ["ShardSelector"],
library = "//pkg/blobstore/sharding",
mockgen_model_library = "@org_uber_go_mock//mockgen/model",
mockgen_tool = "@org_uber_go_mock//mockgen",
Expand Down
47 changes: 25 additions & 22 deletions pkg/blobstore/configuration/new_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,41 +85,44 @@ func (nc *simpleNestedBlobAccessCreator) newNestedBlobAccessBare(configuration *
DigestKeyFormat: slow.DigestKeyFormat,
}, "read_caching", nil
case *pb.BlobAccessConfiguration_Sharding:
backends := make([]blobstore.BlobAccess, 0, len(backend.Sharding.Shards))
weights := make([]uint32, 0, len(backend.Sharding.Shards))
backends := make([]sharding.ShardBackend, 0, len(backend.Sharding.ShardMap))
shards := make([]sharding.Shard, 0, len(backend.Sharding.ShardMap))
keys := make([]string, 0, len(backend.Sharding.ShardMap))
var combinedDigestKeyFormat *digest.KeyFormat
for _, shard := range backend.Sharding.Shards {
if shard.Backend == nil {
// Drained backend.
backends = append(backends, nil)
for key, shard := range backend.Sharding.ShardMap {
backend, err := nc.NewNestedBlobAccess(shard.Backend, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
backends = append(backends, sharding.ShardBackend{Backend: backend.BlobAccess, Key: key })
if combinedDigestKeyFormat == nil {
combinedDigestKeyFormat = &backend.DigestKeyFormat
} else {
// Undrained backend.
backend, err := nc.NewNestedBlobAccess(shard.Backend, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
backends = append(backends, backend.BlobAccess)
if combinedDigestKeyFormat == nil {
combinedDigestKeyFormat = &backend.DigestKeyFormat
} else {
newDigestKeyFormat := combinedDigestKeyFormat.Combine(backend.DigestKeyFormat)
combinedDigestKeyFormat = &newDigestKeyFormat
}
newDigestKeyFormat := combinedDigestKeyFormat.Combine(backend.DigestKeyFormat)
combinedDigestKeyFormat = &newDigestKeyFormat
}

if shard.Weight == 0 {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Shards must have positive weights")
}
weights = append(weights, shard.Weight)
shards = append(shards, sharding.Shard{
Key: key,
Weight: shard.Weight,
})
keys = append(keys, key)
}
if combinedDigestKeyFormat == nil {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Cannot create sharding blob access without any undrained backends")
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Cannot create sharding blob access without any backends")
}
shardSelector, err := sharding.NewRendezvousShardSelector(shards)
if err != nil {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Could not create rendezvous shard selector")
}
return BlobAccessInfo{
BlobAccess: sharding.NewShardingBlobAccess(
backends,
sharding.NewWeightedShardPermuter(weights),
backend.Sharding.HashInitialization),
shardSelector,
),
DigestKeyFormat: *combinedDigestKeyFormat,
}, "sharding", nil
case *pb.BlobAccessConfiguration_Mirrored:
Expand Down
7 changes: 3 additions & 4 deletions pkg/blobstore/sharding/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "sharding",
srcs = [
"shard_permuter.go",
"shard_selector.go",
"sharding_blob_access.go",
"weighted_shard_permuter.go",
"rendezvous_shard_selector.go",
],
importpath = "github.com/buildbarn/bb-storage/pkg/blobstore/sharding",
visibility = ["//visibility:public"],
Expand All @@ -16,7 +16,6 @@ go_library(
"//pkg/digest",
"//pkg/util",
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_lazybeaver_xorshift//:xorshift",
"@org_golang_x_sync//errgroup",
],
)
Expand All @@ -25,7 +24,7 @@ go_test(
name = "sharding_test",
srcs = [
"sharding_blob_access_test.go",
"weighted_shard_permuter_test.go",
"rendezvous_shard_selector_test.go",
],
deps = [
":sharding",
Expand Down
146 changes: 146 additions & 0 deletions pkg/blobstore/sharding/rendezvous_shard_selector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package sharding

import (
"encoding/binary"
"fmt"
"crypto/sha256"
"math/bits"
"sort"
)

type rendezvousShard struct {
weight uint32
index int
hash uint64
}

type rendezvousShardSelector struct {
shards []rendezvousShard
}

func hashServer(key string) uint64 {
h := sha256.Sum256([]byte(key))
return binary.BigEndian.Uint64(h[:8])
}

func NewRendezvousShardSelector(shards []Shard) (*rendezvousShardSelector, error) {
if len(shards) == 0 {
return nil, fmt.Errorf("RendezvousShardSelector must have shards to be defined")
}
internalShards := make([]rendezvousShard, 0, len(shards))
keyMap := make(map[uint64]string, len(shards))
for index, shard := range shards {
hash := hashServer(shard.Key)
if collision, exists := keyMap[hash]; exists {
return nil, fmt.Errorf("Hash collision between shards: %s and %s", shard.Key, collision)
}
keyMap[hash] = shard.Key
internalShards = append(internalShards, rendezvousShard{
index: index,
weight: shard.Weight,
hash: hash,
})
}
sort.Slice(internalShards, func(i, j int) bool {
return internalShards[i].hash < internalShards[j].hash
})
return &rendezvousShardSelector{ shards: internalShards }, nil
}

func score(x uint64, weight uint32) uint64 {
// The mathematical formula we are approximating is -weight/log(X) where X
// is a uniform random number between ]0,1[. For stability and performance
// reasons we are foregoing any floating point operations and approximating
// the logarithm.
//
// Since we are interested in the relative ordering rather than the absolute
// value of the score we can pick log2 as our desired implementation. Log2
// is simple to approximate numerically.
//
// x is already random and uniform, we can turn it into a number between 0
// (inclusive) and 1 (exclusive) by simply dividing by MaxUint64+1. By the
// properties of the logarithm we can simplify -log2(x/(MaxUint64+1)) to
// log2(MaxUint64+1)-log2(x), which will be 64-log2(x)
logFixed := uint64(64)<<16 - Log2Fixed(x)
// Replace weight with fixed point representation of weight. We're not using
// floating point math so we relative size of the weight to be as big as
// possible compared to the log. Since weight is 32 bit it is safe to shift
// it by an additional 32 bits.
weightFixed := uint64(weight)<<32
return weightFixed/logFixed
}

const (
LUT_ENTRY_BITS = 6
)
// Lookup table used for the log2 fraction, it is a fixed point representation
// of log2(x) for x between [1,2] which is a a value between 0 and 1. It uses 16
// bits of precision containing 1<<LUT_ENTRY_BITS+1 entries. The entry is picked
// by truncating to the remaining LUT_ENTRY_BITS of precision. We add the last
// value to simplify interpolation logic.
var lut = [(1<<LUT_ENTRY_BITS)+1]uint16 {
0x0000, 0x05ba, 0x0b5d, 0x10eb, 0x1664, 0x1bc8, 0x2119, 0x2656,
0x2b80, 0x3098, 0x359f, 0x3a94, 0x3f78, 0x444c, 0x4910, 0x4dc5,
0x526a, 0x5700, 0x5b89, 0x6003, 0x646f, 0x68ce, 0x6d20, 0x7165,
0x759d, 0x79ca, 0x7dea, 0x81ff, 0x8608, 0x8a06, 0x8dfa, 0x91e2,
0x95c0, 0x9994, 0x9d5e, 0xa11e, 0xa4d4, 0xa881, 0xac24, 0xafbe,
0xb350, 0xb6d9, 0xba59, 0xbdd1, 0xc140, 0xc4a8, 0xc807, 0xcb5f,
0xceaf, 0xd1f7, 0xd538, 0xd872, 0xdba5, 0xded0, 0xe1f5, 0xe513,
0xe82a, 0xeb3b, 0xee45, 0xf149, 0xf446, 0xf73e, 0xfa2f, 0xfd1a,
0x0000, // the overflow of 0x10000, cancels out when interpolating
}

func Log2Fixed(x uint64) uint64 {
// Fixed point approximation of log2 with a lookup table for deterministic
// math. 16 bits of precision represents the fractional value. Calculates
// the logarithm as the sum of three pieces:
//
// 1. The integer value, which is calculated by counting number of bits.
//
// 2. A value calculated by a lookup table of LUT_ENTRY_BITS
//
// 3. The linearly interpolated value between the lookup table and the next
// value.
//
// Since log2(x) = N+log2(x/2^N) we can easily remove the integer part of
// the logaritm. We calculate that exactly by counting the number of bits in
// the number. log(x/2^N) will then be a number between 0 and 1 for which we
// can use a lookup table to get precomputed values.
//
// In contrast with mathematical logarithm, this function is defined for x=0
// removing the need for conditionals the maximum value this function
// produces is 64 << 16 - 1.
msb := bits.Len64(x >> 1)
var bitfield = x << (64 - msb)
index := bitfield >> (64 - LUT_ENTRY_BITS)
interp := bitfield << LUT_ENTRY_BITS >> 16
base := lut[index]
next := lut[index + 1]
delta := uint64(next - base)
frac := uint64(base) << 48 + (delta * interp)
return (uint64(msb) << 16) | uint64(frac) >> 48
}

// A very fast PRNG with strong mixing properties
func splitmix64(x uint64) uint64 {
x ^= x >> 30
x *= 0xbf58476d1ce4e5b9
x ^= x >> 27
x *= 0x94d049bb133111eb
x ^= x >> 31
return x
}

func (s *rendezvousShardSelector) GetShard(hash uint64) int {
var best uint64
var bestIndex int
for _, shard := range s.shards {
mixed := splitmix64(shard.hash^hash)
current := score(mixed, shard.weight)
if current > best {
best = current
bestIndex = shard.index
}
}
return bestIndex
}
92 changes: 92 additions & 0 deletions pkg/blobstore/sharding/rendezvous_shard_selector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package sharding

import (
"fmt"
"math"
"testing"

"github.com/buildbarn/bb-storage/pkg/blobstore/sharding"
"github.com/stretchr/testify/require"
)

func TestLog2Fixed(t *testing.T) {
bits := 16
// test all powers of 2 (answer should be exact)
for i := 0; i < 64; i++ {
expected := uint64(i)<<bits
actual := sharding.Log2Fixed(uint64(1)<<i)
require.Equal(t, expected, actual, "Power of two should give exact result")
}
// test numbers < 100_000, expect less than 0.01% relative error from true result
for i := 2; i < 100_000; i++ {
expected := math.Log2(float64(i))
actual := float64(sharding.Log2Fixed(uint64(i))) / math.Pow(2,float64(bits))
require.InEpsilon(t, expected, actual, 1e-5, fmt.Sprintf("Error is too high for %d", i))
}
}

const COUNT = 10_000_000
var precomputedResults = [20]int{3, 2, 0, 3, 3, 3, 0, 0, 1, 3, 0, 3, 1, 2, 2, 2, 3, 3, 1, 3}
var precomputedOccurrences = [5]int{668687, 1332248, 2666353, 4666342, 666370}

func TestRendezvousShardSelectorDistribution(t *testing.T) {
// Distribution across multiple backends
weights := []sharding.Shard{
{Key: "a", Weight: 1},
{Key: "b", Weight: 2},
{Key: "c", Weight: 4},
{Key: "d", Weight: 7},
{Key: "e", Weight: 1},
}
s, err := sharding.NewRendezvousShardSelector(weights)
require.NoError(t, err, "Selector construction should succeed")
results := make([]int, len(precomputedResults))
occurrences := make([]int, len(weights))

// Request the shard for a very large amount of blobs
for i := 0; i < COUNT; i++ {
result := s.GetShard(uint64(i))
if i < len(results) {
results[i] = result
}
occurrences[result] += 1
}

t.Run("Distribution Error", func(t *testing.T) {
// Requests should be fanned out with a small error margin.
weightSum := uint32(0)
for _, shard := range weights {
weightSum += shard.Weight
}
for index, shard := range weights {
require.InEpsilon(t, shard.Weight*COUNT/weightSum, occurrences[index], 1e-2)
}
})

t.Run("Distribution Shape", func(t *testing.T) {
shapeError := "The sharding algorithm has produced unexpected results, changing this distribution is a breaking change to buildbarn"
require.Equal(t, precomputedResults[:], results, shapeError)
require.Equal(t, precomputedOccurrences[:], occurrences, shapeError)
})

t.Run("Stability Test", func(t *testing.T) {
// Removing a shard should only affect the shard that is removed
results = make([]int, 10000)
for i := 0; i < len(results); i++ {
results[i] = s.GetShard(uint64(i))
}
// drop the last shard in the slice
weightsSubset := weights[:len(weights)-1]
sharder, err := sharding.NewRendezvousShardSelector(weightsSubset)
require.NoError(t, err, "Selector construction should succeed")
for i := 0; i < len(results); i++ {
result := sharder.GetShard(uint64(i))
if results[i] == len(weights)-1 {
continue
}
// result should be unchanged for all slices which did not resolve
// to the dropped one
require.Equal(t, results[i], result, "Dropping a shard should not effect other shards")
}
})
}
19 changes: 0 additions & 19 deletions pkg/blobstore/sharding/shard_permuter.go

This file was deleted.

20 changes: 20 additions & 0 deletions pkg/blobstore/sharding/shard_selector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package sharding

// ShardSelector is an algorithm that for a hash resolves into an index which
// corresponds to the specific backend for that shard.
//
// The algorithm must be stable, the removal of an unavailable backend should
// not result in the reshuffling of any other blobs. It must also be
// numerically stable so that it produces the same result no matter the
// architecture.
type ShardSelector interface {
GetShard(hash uint64) int
}

// A description of shard. The shard selector will resolve to the same shard
// independent of the order of shards, but the returned index will correspond
// to the index sent to the ShardSelectors constructor.
type Shard struct {
Key string
Weight uint32
}
Loading

0 comments on commit 86f3bfb

Please sign in to comment.