Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement sharding based on Rendezvous Hashing #238

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/mock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ gomock(
gomock(
name = "blobstore_sharding",
out = "blobstore_sharding.go",
interfaces = ["ShardPermuter"],
interfaces = ["ShardSelector"],
library = "//pkg/blobstore/sharding",
mockgen_model_library = "@org_uber_go_mock//mockgen/model",
mockgen_tool = "@org_uber_go_mock//mockgen",
Expand Down
34 changes: 14 additions & 20 deletions pkg/blobstore/configuration/new_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,40 +85,34 @@ func (nc *simpleNestedBlobAccessCreator) newNestedBlobAccessBare(configuration *
DigestKeyFormat: slow.DigestKeyFormat,
}, "read_caching", nil
case *pb.BlobAccessConfiguration_Sharding:
backends := make([]blobstore.BlobAccess, 0, len(backend.Sharding.Shards))
weights := make([]uint32, 0, len(backend.Sharding.Shards))
backends := make(map[string]blobstore.BlobAccess, len(backend.Sharding.ShardMap))
weights := make(map[string]uint32, len(backend.Sharding.ShardMap))
var combinedDigestKeyFormat *digest.KeyFormat
for _, shard := range backend.Sharding.Shards {
if shard.Backend == nil {
// Drained backend.
backends = append(backends, nil)
for key, shard := range backend.Sharding.ShardMap {
backend, err := nc.NewNestedBlobAccess(shard.Backend, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
backends[key] = backend.BlobAccess
if combinedDigestKeyFormat == nil {
combinedDigestKeyFormat = &backend.DigestKeyFormat
} else {
// Undrained backend.
backend, err := nc.NewNestedBlobAccess(shard.Backend, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
backends = append(backends, backend.BlobAccess)
if combinedDigestKeyFormat == nil {
combinedDigestKeyFormat = &backend.DigestKeyFormat
} else {
newDigestKeyFormat := combinedDigestKeyFormat.Combine(backend.DigestKeyFormat)
combinedDigestKeyFormat = &newDigestKeyFormat
}
newDigestKeyFormat := combinedDigestKeyFormat.Combine(backend.DigestKeyFormat)
combinedDigestKeyFormat = &newDigestKeyFormat
}

if shard.Weight == 0 {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Shards must have positive weights")
}
weights = append(weights, shard.Weight)
weights[key] = shard.Weight
}
if combinedDigestKeyFormat == nil {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Cannot create sharding blob access without any undrained backends")
}
return BlobAccessInfo{
BlobAccess: sharding.NewShardingBlobAccess(
backends,
sharding.NewWeightedShardPermuter(weights),
sharding.NewRendezvousShardSelector(weights),
backend.Sharding.HashInitialization),
DigestKeyFormat: *combinedDigestKeyFormat,
}, "sharding", nil
Expand Down
7 changes: 3 additions & 4 deletions pkg/blobstore/sharding/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "sharding",
srcs = [
"shard_permuter.go",
"shard_selector.go",
"sharding_blob_access.go",
"weighted_shard_permuter.go",
"rendezvous_shard_selector.go",
],
importpath = "github.com/buildbarn/bb-storage/pkg/blobstore/sharding",
visibility = ["//visibility:public"],
Expand All @@ -16,7 +16,6 @@ go_library(
"//pkg/digest",
"//pkg/util",
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_lazybeaver_xorshift//:xorshift",
"@org_golang_x_sync//errgroup",
],
)
Expand All @@ -25,7 +24,7 @@ go_test(
name = "sharding_test",
srcs = [
"sharding_blob_access_test.go",
"weighted_shard_permuter_test.go",
"rendezvous_shard_selector_test.go",
],
deps = [
":sharding",
Expand Down
64 changes: 64 additions & 0 deletions pkg/blobstore/sharding/rendezvous_shard_selector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package sharding

import (
"hash/fnv"
"math"
)

type rendezvousShardSelector struct {
keyMap map[uint64]string
weightMap map[uint64]uint32
}

func hashServer(key string) uint64 {
h := fnv.New64a()
h.Write([]byte(key))
return h.Sum64()
}

func NewRendezvousShardSelector(weights map[string]uint32) *rendezvousShardSelector {
weightMap := make(map[uint64]uint32, len(weights))
keyMap := make(map[uint64]string, len(weights))

for key, weight := range weights {
keyHash := hashServer(key)
keyMap[keyHash] = key
weightMap[keyHash] = weight
}
return &rendezvousShardSelector{
keyMap: keyMap,
weightMap: weightMap,
}
}

func score(x uint64) float64 {
// branchless clamp to [1,MAX_UINT64-1]
x = x - ((x|-x)>>63) + (((^x)|-(^x)) >> 63)
frac := float64(x)/float64(^uint64(0))
return 1.0/-math.Log(frac)
Comment on lines +36 to +38
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand the math behind this.

  • Why do we need the clamping? I understand math.Log(0) doesn't work as expected, but what's wrong with MaxUint64?
  • Why do we need the division by MaxUint64? Considering that we're applying a log() afterwards, wouldn't that be the same as computing the log() first, but then subtracting a constant?
  • Why do we need the natural logarithm? Would the algorithm work equally well with log2()?
  • If log2() works, would there be any way to simply approximate the result without relying on any floating point math?

Copy link
Author

@meroton-benjamin meroton-benjamin Feb 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. MaxUint64 would give log(1)=0 which would be a division by zero.

However this is ieee floating point math and not real math, weight/-log(1) = -Inf which would be infinitely undesired, while while we would expect it to be infinitely desired. Thinking more about it log(0) seems to be defined as -Inf which would make the quota weight/-(-Inf) = +0 in IEEE math which is the result we actually want. If my math is correct then this means the code actually handles the extreme values in a somewhat decent manner with the MaxUint64 essentially wrapping around.

If we replace f(x) = 1/-Log(x/MaxUint64) with g(x) = 1/(Log(MaxUint64+1) - Log(x)) we should get the desired result, g(MaxUint64) = +Inf, g(0) == 0

  1. The original proof uses log of a random uniform ]0,1[ distribution, so approximating the uniform distribution with random_uint64 / max_uint64 we could do the equivalent instead of 1.0/(log(max_uint64)-log(x))

  2. The original proof uses the natural logarithm, log2 is just a constant away from log and we're only interested in their relative ordering rather than the score itself so we can use log2 without any difference.

  3. There are some possible log2 bit trickery hacks, such approximating the logarithm by its integer part (i.e. number of set bits) + a polynomial that approximates its fractional part, technically I think this is the actual implementation of the the standard log algorithm just that the chosen polynomial is of high enough order to give the 15 or so bits of precision possible in float64.

Since we don't have to support the entire range of positive float64s and we don't need the full precision we could probably get something slightly faster, maybe by abusing the float64 mantissa in it's binary representation. I'll have a look if I can get something which produces sufficient precision faster.

}

// PRNG without branches or allocations
func splitmix64(x uint64) uint64 {
x ^= x >> 30
x *= 0xbf58476d1ce4e5b9
x ^= x >> 27
x *= 0x94d049bb133111eb
x ^= x >> 31
return x
}

func (s *rendezvousShardSelector) GetShard(hash uint64) string {
var best float64
var bestKey string

for keyHash, weight := range s.weightMap {
mixed := splitmix64(hash^keyHash)
current := float64(weight) * score(mixed)
if current > best {
best = current
bestKey = s.keyMap[keyHash]
}
}
Comment on lines +55 to +62
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we are simply iterating over weightMap, I don't think we actually need to use maps here. Have you considered using a simple slice?

type rendezvousShard struct {
    keyHash uint64
    weight uint32
    key string
}

type rendezvousShardSelector struct {
    shards []rendezvousShard
}

Assuming this works, is it then still necessary to change ShardingBlobAccess to take a map? This function could return an index of the shard like it did before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the unlikely event of two backends getting the same current value, it is by random which one wins when iterating over a map. Maybe the configuration should stay as a map to get the unique identifiers but internally it can be a slice.

The requirement is more that the keys should be different. I think a slice should work, but during setup it might be worth checking for duplicated key hashes because loosing one storage shard without knowing why is pretty annoying.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation can iterate over a slice of shards, that would make the tiebreaker in case of a collision result into a stable value (biased towards earlier in the slice) and significantly reduce the changes in sharding_blob_access.go.

I still think it's better to replace the interface of ShardingBlobAccessConfiguration.Shards with ShardingBlobAccessConfiguration.ShardMap since a gives a natural interface for removing and inserting new shards compared to an array.

We could also error out at startup when there is a hash collision between two shards, it should never happen but maybe erroring out is preferable to ignoring an entire shard (which would be the effect of two shards sharing a hash).

return bestKey
}
26 changes: 26 additions & 0 deletions pkg/blobstore/sharding/rendezvous_shard_selector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package sharding_test

import (
"testing"

"github.com/buildbarn/bb-storage/pkg/blobstore/sharding"
"github.com/stretchr/testify/require"
)

func TestRendezvousShardSelectorDistribution(t *testing.T) {
// Distribution across five backends with a total weight of 15.
weights := map[string]uint32{"a": 1, "b": 4, "c:": 2, "d": 5, "e": 3}
s := sharding.NewRendezvousShardSelector(weights)

// Request the shard for a very large amount of blobs
occurrences := map[string]uint32{}
for i := 0; i < 1000000; i++ {
hash := uint64(i)
occurrences[s.GetShard(hash)] += 1
}

// Requests should be fanned out with a small error margin.
for shard, weight := range weights {
require.InEpsilon(t, weight*1000000/15, occurrences[shard], 0.01)
}
}
19 changes: 0 additions & 19 deletions pkg/blobstore/sharding/shard_permuter.go

This file was deleted.

10 changes: 10 additions & 0 deletions pkg/blobstore/sharding/shard_selector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package sharding

// ShardSelector is an algorithm that for a hash resolves into a key which
// corresponds to the specific backend for that shard.
//
// The algorithm must be stable, the removal of an unavailable backend should
// not result in the reshuffling of any other blobs.
type ShardSelector interface {
GetShard(hash uint64) string
}
78 changes: 35 additions & 43 deletions pkg/blobstore/sharding/sharding_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,90 +15,82 @@ import (
)

type shardingBlobAccess struct {
backends []blobstore.BlobAccess
shardPermuter ShardPermuter
backends map[string]blobstore.BlobAccess
shardSelector ShardSelector
hashInitialization uint64
getCapabilitiesRound atomic.Uint64
}

// NewShardingBlobAccess is an adapter for BlobAccess that partitions
// requests across backends by hashing the digest. A ShardPermuter is
// requests across backends by hashing the digest. A ShardSelector is
// used to map hashes to backends.
func NewShardingBlobAccess(backends []blobstore.BlobAccess, shardPermuter ShardPermuter, hashInitialization uint64) blobstore.BlobAccess {
func NewShardingBlobAccess(backends map[string]blobstore.BlobAccess, shardSelector ShardSelector, hashInitialization uint64) blobstore.BlobAccess {
return &shardingBlobAccess{
backends: backends,
shardPermuter: shardPermuter,
shardSelector: shardSelector,
hashInitialization: hashInitialization,
}
}

func (ba *shardingBlobAccess) getBackendIndexByDigest(blobDigest digest.Digest) int {
func (ba *shardingBlobAccess) getBackendKeyByDigest(blobDigest digest.Digest) string {
// Hash the key using FNV-1a.
h := ba.hashInitialization
for _, c := range blobDigest.GetKey(digest.KeyWithoutInstance) {
h ^= uint64(c)
h *= 1099511628211
}
return ba.getBackendIndexByHash(h)
return ba.getBackendKeyByHash(h)
}

func (ba *shardingBlobAccess) getBackendIndexByHash(h uint64) int {
// Keep requesting shards until matching one that is undrained.
var selectedIndex int
ba.shardPermuter.GetShard(h, func(index int) bool {
if ba.backends[index] == nil {
return true
}
selectedIndex = index
return false
})
return selectedIndex
func (ba *shardingBlobAccess) getBackendKeyByHash(h uint64) string {
var selectedKey string = ba.shardSelector.GetShard(h)
return selectedKey
}

func (ba *shardingBlobAccess) Get(ctx context.Context, digest digest.Digest) buffer.Buffer {
index := ba.getBackendIndexByDigest(digest)
key := ba.getBackendKeyByDigest(digest)
return buffer.WithErrorHandler(
ba.backends[index].Get(ctx, digest),
shardIndexAddingErrorHandler{index: index})
ba.backends[key].Get(ctx, digest),
shardKeyAddingErrorHandler{key: key})
}

func (ba *shardingBlobAccess) GetFromComposite(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer {
index := ba.getBackendIndexByDigest(parentDigest)
key := ba.getBackendKeyByDigest(parentDigest)
return buffer.WithErrorHandler(
ba.backends[index].GetFromComposite(ctx, parentDigest, childDigest, slicer),
shardIndexAddingErrorHandler{index: index})
ba.backends[key].GetFromComposite(ctx, parentDigest, childDigest, slicer),
shardKeyAddingErrorHandler{key: key})
}

func (ba *shardingBlobAccess) Put(ctx context.Context, digest digest.Digest, b buffer.Buffer) error {
index := ba.getBackendIndexByDigest(digest)
if err := ba.backends[index].Put(ctx, digest, b); err != nil {
return util.StatusWrapf(err, "Shard %d", index)
key := ba.getBackendKeyByDigest(digest)
if err := ba.backends[key].Put(ctx, digest, b); err != nil {
return util.StatusWrapf(err, "Shard %s", key)
}
return nil
}

func (ba *shardingBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (digest.Set, error) {
// Partition all digests by shard.
digestsPerBackend := make([]digest.SetBuilder, 0, len(ba.backends))
for range ba.backends {
digestsPerBackend = append(digestsPerBackend, digest.NewSetBuilder())
digestsPerBackend := make(map[string]digest.SetBuilder, len(ba.backends))
for key, _ := range ba.backends {
digestsPerBackend[key] = digest.NewSetBuilder()
}
for _, blobDigest := range digests.Items() {
digestsPerBackend[ba.getBackendIndexByDigest(blobDigest)].Add(blobDigest)
digestsPerBackend[ba.getBackendKeyByDigest(blobDigest)].Add(blobDigest)
}

// Asynchronously call FindMissing() on backends.
missingPerBackend := make([]digest.Set, 0, len(ba.backends))
group, ctxWithCancel := errgroup.WithContext(ctx)
for indexIter, digestsIter := range digestsPerBackend {
index, digests := indexIter, digestsIter
for keyIter, digestsIter := range digestsPerBackend {
key, digests := keyIter, digestsIter
if digests.Length() > 0 {
missingPerBackend = append(missingPerBackend, digest.EmptySet)
missingOut := &missingPerBackend[len(missingPerBackend)-1]
group.Go(func() error {
missing, err := ba.backends[index].FindMissing(ctxWithCancel, digests.Build())
missing, err := ba.backends[key].FindMissing(ctxWithCancel, digests.Build())
if err != nil {
return util.StatusWrapf(err, "Shard %d", index)
return util.StatusWrapf(err, "Shard %s", key)
}
*missingOut = missing
return nil
Expand All @@ -115,20 +107,20 @@ func (ba *shardingBlobAccess) FindMissing(ctx context.Context, digests digest.Se

func (ba *shardingBlobAccess) GetCapabilities(ctx context.Context, instanceName digest.InstanceName) (*remoteexecution.ServerCapabilities, error) {
// Spread requests across shards.
index := ba.getBackendIndexByHash(ba.getCapabilitiesRound.Add(1))
capabilities, err := ba.backends[index].GetCapabilities(ctx, instanceName)
key := ba.getBackendKeyByHash(ba.getCapabilitiesRound.Add(1))
capabilities, err := ba.backends[key].GetCapabilities(ctx, instanceName)
if err != nil {
return nil, util.StatusWrapf(err, "Shard %d", index)
return nil, util.StatusWrapf(err, "Shard %s", key)
}
return capabilities, nil
}

type shardIndexAddingErrorHandler struct {
index int
type shardKeyAddingErrorHandler struct {
key string
}

func (eh shardIndexAddingErrorHandler) OnError(err error) (buffer.Buffer, error) {
return nil, util.StatusWrapf(err, "Shard %d", eh.index)
func (eh shardKeyAddingErrorHandler) OnError(err error) (buffer.Buffer, error) {
return nil, util.StatusWrapf(err, "Shard %s", eh.key)
}

func (eh shardIndexAddingErrorHandler) Done() {}
func (eh shardKeyAddingErrorHandler) Done() {}
Loading