Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new 'quorum' blob access #213

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/blobstore/configuration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//pkg/blobstore/grpcclients",
"//pkg/blobstore/local",
"//pkg/blobstore/mirrored",
"//pkg/blobstore/quorum",
"//pkg/blobstore/readcaching",
"//pkg/blobstore/readfallback",
"//pkg/blobstore/replication",
Expand Down
30 changes: 30 additions & 0 deletions pkg/blobstore/configuration/new_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/local"
"github.com/buildbarn/bb-storage/pkg/blobstore/mirrored"
"github.com/buildbarn/bb-storage/pkg/blobstore/quorum"
"github.com/buildbarn/bb-storage/pkg/blobstore/readcaching"
"github.com/buildbarn/bb-storage/pkg/blobstore/readfallback"
"github.com/buildbarn/bb-storage/pkg/blobstore/sharding"
Expand Down Expand Up @@ -143,6 +144,35 @@ func (nc *simpleNestedBlobAccessCreator) newNestedBlobAccessBare(configuration *
BlobAccess: mirrored.NewMirroredBlobAccess(backendA.BlobAccess, backendB.BlobAccess, replicatorAToB, replicatorBToA),
DigestKeyFormat: backendA.DigestKeyFormat.Combine(backendB.DigestKeyFormat),
}, "mirrored", nil
case *pb.BlobAccessConfiguration_Quorum:
backends := make([]blobstore.BlobAccess, 0, len(backend.Quorum.Backends))
var combinedDigestKeyFormat *digest.KeyFormat

for _, b := range backend.Quorum.Backends {
backend, err := nc.NewNestedBlobAccess(b, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
backends = append(backends, backend.BlobAccess)
if combinedDigestKeyFormat == nil {
combinedDigestKeyFormat = &backend.DigestKeyFormat
} else {
newDigestKeyFormat := combinedDigestKeyFormat.Combine(backend.DigestKeyFormat)
combinedDigestKeyFormat = &newDigestKeyFormat
}
Comment on lines +157 to +162
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was probably copy-pasted from the ShardingBlobAccess code. But that code only needs to use a pointer to keep track of whether the value is actually set, because not all backends may be undrained.

My recommendation would be to not let combinedDigestKeyFormat be a pointer here. Then do this:

for i, b := range ... {
     if i == 0 {
         combinedDigestKeyFormat = backend.DigestKeyFormat
     } else {
         combinedDigestKeyFormat = combinedDigestKeyFormat.Combine(backend.DigestKeyFormat)
     }
} 

}
if len(backends) == 0 {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Cannot create quorum blob access without any backends")
}
readQuorum := int(backend.Quorum.ReadQuorum)
writeQuorum := int(backend.Quorum.WriteQuorum)
if readQuorum + writeQuorum <= len(backends) {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Quorum blob access requires read_quorum + write_quorum > number of backends")
}
Comment on lines +167 to +171
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that int may not always be large enough to represent all values of uint32. Also note that there are more constraints that need to be applied. I would write the following instead:

if l := uint32(len(backends)); readQuorum < 1 || readQuorum > l || writeQuorum < 1 || writeQuorum > l || readQuorum + writeQuorum <= l {
  ...
}

Then later only convert the values to int as part of the call to NewQuorumBlobAccess.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I felt it was far more likely for someone to mistakenly put a small negative number in here rather than a super large positive number - so I went with a uint in the proto. I agree I don't like casting int types and I'm happy to change this to a int throughout. (Or whatever other solution you would prefer)

return BlobAccessInfo{
BlobAccess: quorum.NewQuorumBlobAccess(backends, readQuorum, writeQuorum),
DigestKeyFormat: *combinedDigestKeyFormat,
}, "quorum", nil
case *pb.BlobAccessConfiguration_Local:
digestKeyFormat := digest.KeyWithInstance
if !backend.Local.HierarchicalInstanceNames {
Expand Down
40 changes: 40 additions & 0 deletions pkg/blobstore/quorum/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "quorum",
srcs = ["quorum_blob_access.go"],
importpath = "github.com/buildbarn/bb-storage/pkg/blobstore/quorum",
visibility = ["//visibility:public"],
deps = [
"//pkg/blobstore",
"//pkg/blobstore/buffer",
"//pkg/blobstore/replication",
"//pkg/blobstore/slicing",
"//pkg/digest",
"//pkg/random",
"//pkg/util",
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:execution",
"@com_github_prometheus_client_golang//prometheus",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_x_sync//errgroup",
],
)

go_test(
name = "quorum_test",
srcs = ["quorum_blob_access_test.go"],
deps = [
":quorum",
"//internal/mock",
"//pkg/blobstore",
"//pkg/blobstore/buffer",
"//pkg/digest",
"//pkg/testutil",
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:execution",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_uber_go_mock//gomock",
],
)
218 changes: 218 additions & 0 deletions pkg/blobstore/quorum/quorum_blob_access.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package quorum

import (
"context"
"sync"

remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/buffer"
"github.com/buildbarn/bb-storage/pkg/blobstore/slicing"
"github.com/buildbarn/bb-storage/pkg/digest"
"github.com/buildbarn/bb-storage/pkg/random"
"github.com/buildbarn/bb-storage/pkg/util"
"github.com/prometheus/client_golang/prometheus"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var (
quorumBlobAccessPrometheusMetrics sync.Once

quorumBlobAccessFindMissingSynchronizations = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "buildbarn",
Subsystem: "blobstore",
Name: "quorum_blob_access_find_missing_synchronizations",
Help: "Number of blobs synchronized in FindMissing()",
Buckets: append([]float64{0}, prometheus.ExponentialBuckets(1.0, 2.0, 16)...),
},
[]string{"direction"})
)

type quorumBlobAccess struct {
backends []blobstore.BlobAccess
readQuorum int
writeQuorum int
generator random.ThreadSafeGenerator
}

// NewQuorumBlobAccess creates a BlobAccess that applies operations to a subset
// of storage backends, retrying on infrastructure errors. Read and write quorum
// size should be chosen so that they overlap by at least one backend.
// Note: Data is not replicated again after the original write.
func NewQuorumBlobAccess(backends []blobstore.BlobAccess, readQuorum, writeQuorum int) blobstore.BlobAccess {
quorumBlobAccessPrometheusMetrics.Do(func() {
prometheus.MustRegister(quorumBlobAccessFindMissingSynchronizations)
})

return &quorumBlobAccess{
backends: backends,
readQuorum: readQuorum,
writeQuorum: writeQuorum,
generator: random.FastThreadSafeGenerator,
}
}

func (ba *quorumBlobAccess) shuffledBackends() []blobstore.BlobAccess {
backends := make([]blobstore.BlobAccess, len(ba.backends))
copy(backends, ba.backends)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

backends := append([]blobstore.BlobAccess(nil), ba.backends...)


ba.generator.Shuffle(len(backends), func(i, j int) {
backends[i], backends[j] = backends[j], backends[i]
})

return backends
}

type getQuorumErrorHandler struct {
remainingBackends []blobstore.BlobAccess
remainingReads int
retry func(blobstore.BlobAccess) buffer.Buffer
}

func (eh *getQuorumErrorHandler) tryNextBackendOrError(err error) (buffer.Buffer, error) {
if len(eh.remainingBackends) > 0 {
nextBackend := eh.remainingBackends[0]
eh.remainingBackends = eh.remainingBackends[1:]
return eh.retry(nextBackend), nil
}
return nil, err
}

func (eh *getQuorumErrorHandler) OnError(err error) (buffer.Buffer, error) {
fallbackErr := status.Error(codes.Unavailable, "Too many backends unavailable")
if util.IsInfrastructureError(err) {
// I/O error. Try again on another backend.
return eh.tryNextBackendOrError(fallbackErr)

} else if status.Code(err) == codes.NotFound {
// Not found. Try again on another backend - if we haven't seen enough yet.
if eh.remainingReads <= 1 {
// Observed sufficient NotFounds. Return conclusive NotFound.
return nil, err
}
eh.remainingReads--

// Haven't been able to check enough backends. Can't conclude not found.
return eh.tryNextBackendOrError(fallbackErr)
}

return nil, err
}

func (eh getQuorumErrorHandler) Done() {}

func (ba *quorumBlobAccess) get(getter func(b blobstore.BlobAccess) buffer.Buffer) buffer.Buffer {
backends := ba.shuffledBackends()

backend := backends[0]
remainingBackends := backends[1:]

return buffer.WithErrorHandler(
getter(backend),
&getQuorumErrorHandler{
remainingBackends: remainingBackends,
remainingReads: ba.readQuorum,
retry: getter,
})
}

func (ba *quorumBlobAccess) Get(ctx context.Context, digest digest.Digest) buffer.Buffer {
return ba.get(func(b blobstore.BlobAccess) buffer.Buffer {
return b.Get(ctx, digest)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you deal with servers being down in such a way that calls to Get() spend a lot of time hanging, e.g. waiting for a TCP connection to establish?

})
}

func (ba *quorumBlobAccess) GetFromComposite(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer {
return ba.get(func(b blobstore.BlobAccess) buffer.Buffer {
return b.GetFromComposite(ctx, parentDigest, childDigest, slicer)
})
}

func (ba *quorumBlobAccess) shuffledBackendQueue() <-chan blobstore.BlobAccess {
queue := make(chan blobstore.BlobAccess)
go func() error {
backends := ba.shuffledBackends()

for _, b := range backends {
queue <- b
}

close(queue)
return nil
}()
return queue
}

func (ba *quorumBlobAccess) Put(ctx context.Context, digest digest.Digest, b buffer.Buffer) error {
// Store object in at least writeQuorum storage backends.
group, ctx := errgroup.WithContext(ctx)
backendQueue := ba.shuffledBackendQueue()

// Spawn writeQuorum writers. Each of these goroutines needs to succeed once.
for i := 0; i < ba.writeQuorum; i++ {
var b1 buffer.Buffer
if i == ba.writeQuorum-1 {
// Last writer, no need to clone buffer
b1 = b
} else {
b, b1 = b.CloneStream()
}

group.Go(func() error {
var err error
for backend := range backendQueue {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh? So we launch n coroutines, and each of those attempt to send n requests?

err = backend.Put(ctx, digest, b1)
Copy link
Member

@EdSchouten EdSchouten Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlobAccess.Put() takes ownership of b1. You can't call it again with the same buffer. The reason being that it could have converted the buffer to a reader and already consumed some of the data inside. We can't rewind, as we may be streaming data coming from the client/server directly.

Think of what would happen if a user tries to upload a 10 GB blob. There is no way we can hold all of that data in memory in a single buffer, and repeatedly attempt to store it in a bunch of backends.

if err == nil {
// Success
return nil
}
}
return err
})
}

return group.Wait()
}

func (ba *quorumBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (digest.Set, error) {
// Call FindMissing() on readQuorum backends.
group, ctx := errgroup.WithContext(ctx)
backendQueue := ba.shuffledBackendQueue()

results := make([]digest.Set, ba.readQuorum)
for i := 0; i < ba.readQuorum; i++ {
resultIdx := i
group.Go(func() error {
var err error
for backend := range backendQueue {
results[resultIdx], err = backend.FindMissing(ctx, digests)
if err == nil {
// Success
return nil
}
}
return err
})
}

if err := group.Wait(); err != nil {
return digest.EmptySet, err
}

// Find intersection of all results
missingFromAll := results[0]
for _, result := range results[1:] {
_, missingFromAll, _ = digest.GetDifferenceAndIntersection(missingFromAll, result)
}
return missingFromAll, nil
Comment on lines +206 to +211
Copy link
Member

@EdSchouten EdSchouten Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imagine I have 5 storage backends, named 0 to 4. Read quorum is set to 4.

  • I first issue a Put(). Due to network flakiness, the blob only ends up getting written to backend 0.
  • Then I issue FindMissingBlobs(), which gets run against backends 0, 1, 2, and 3. Backend 0 reports it as present, while backends 1, 2 and 3 report it as absent. As we compute the intersection, we report it as being present.
  • Then I issue Get(), which gets run against backends 1, 2, 3 and 4. All of those return NOT_FOUND.

This is in violation of the protocol.

}

func (ba *quorumBlobAccess) GetCapabilities(ctx context.Context, instanceName digest.InstanceName) (*remoteexecution.ServerCapabilities, error) {
backends := ba.shuffledBackends()

return backends[0].GetCapabilities(ctx, instanceName)
}
Loading
Loading