diff --git a/pkg/blobstore/configuration/BUILD.bazel b/pkg/blobstore/configuration/BUILD.bazel index ca4d7981..32f6a414 100644 --- a/pkg/blobstore/configuration/BUILD.bazel +++ b/pkg/blobstore/configuration/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//pkg/blobstore/grpcclients", "//pkg/blobstore/local", "//pkg/blobstore/mirrored", + "//pkg/blobstore/quorum", "//pkg/blobstore/readcaching", "//pkg/blobstore/readfallback", "//pkg/blobstore/replication", diff --git a/pkg/blobstore/configuration/new_blob_access.go b/pkg/blobstore/configuration/new_blob_access.go index 6f175005..ee212a78 100644 --- a/pkg/blobstore/configuration/new_blob_access.go +++ b/pkg/blobstore/configuration/new_blob_access.go @@ -10,6 +10,7 @@ import ( "github.com/buildbarn/bb-storage/pkg/blobstore" "github.com/buildbarn/bb-storage/pkg/blobstore/local" "github.com/buildbarn/bb-storage/pkg/blobstore/mirrored" + "github.com/buildbarn/bb-storage/pkg/blobstore/quorum" "github.com/buildbarn/bb-storage/pkg/blobstore/readcaching" "github.com/buildbarn/bb-storage/pkg/blobstore/readfallback" "github.com/buildbarn/bb-storage/pkg/blobstore/sharding" @@ -143,6 +144,35 @@ func (nc *simpleNestedBlobAccessCreator) newNestedBlobAccessBare(configuration * BlobAccess: mirrored.NewMirroredBlobAccess(backendA.BlobAccess, backendB.BlobAccess, replicatorAToB, replicatorBToA), DigestKeyFormat: backendA.DigestKeyFormat.Combine(backendB.DigestKeyFormat), }, "mirrored", nil + case *pb.BlobAccessConfiguration_Quorum: + backends := make([]blobstore.BlobAccess, 0, len(backend.Quorum.Backends)) + var combinedDigestKeyFormat *digest.KeyFormat + + for _, b := range backend.Quorum.Backends { + backend, err := nc.NewNestedBlobAccess(b, creator) + if err != nil { + return BlobAccessInfo{}, "", err + } + backends = append(backends, backend.BlobAccess) + if combinedDigestKeyFormat == nil { + combinedDigestKeyFormat = &backend.DigestKeyFormat + } else { + newDigestKeyFormat := combinedDigestKeyFormat.Combine(backend.DigestKeyFormat) + combinedDigestKeyFormat = &newDigestKeyFormat + } + } + if len(backends) == 0 { + return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Cannot create quorum blob access without any backends") + } + readQuorum := int(backend.Quorum.ReadQuorum) + writeQuorum := int(backend.Quorum.WriteQuorum) + if readQuorum + writeQuorum <= len(backends) { + return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Quorum blob access requires read_quorum + write_quorum > number of backends") + } + return BlobAccessInfo{ + BlobAccess: quorum.NewQuorumBlobAccess(backends, readQuorum, writeQuorum), + DigestKeyFormat: *combinedDigestKeyFormat, + }, "quorum", nil case *pb.BlobAccessConfiguration_Local: digestKeyFormat := digest.KeyWithInstance if !backend.Local.HierarchicalInstanceNames { diff --git a/pkg/blobstore/quorum/BUILD.bazel b/pkg/blobstore/quorum/BUILD.bazel new file mode 100644 index 00000000..4cdcf2d9 --- /dev/null +++ b/pkg/blobstore/quorum/BUILD.bazel @@ -0,0 +1,40 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "quorum", + srcs = ["quorum_blob_access.go"], + importpath = "github.com/buildbarn/bb-storage/pkg/blobstore/quorum", + visibility = ["//visibility:public"], + deps = [ + "//pkg/blobstore", + "//pkg/blobstore/buffer", + "//pkg/blobstore/replication", + "//pkg/blobstore/slicing", + "//pkg/digest", + "//pkg/random", + "//pkg/util", + "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:execution", + "@com_github_prometheus_client_golang//prometheus", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", + "@org_golang_x_sync//errgroup", + ], +) + +go_test( + name = "quorum_test", + srcs = ["quorum_blob_access_test.go"], + deps = [ + ":quorum", + "//internal/mock", + "//pkg/blobstore", + "//pkg/blobstore/buffer", + "//pkg/digest", + "//pkg/testutil", + "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:execution", + "@com_github_stretchr_testify//require", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", + "@org_uber_go_mock//gomock", + ], +) diff --git a/pkg/blobstore/quorum/quorum_blob_access.go b/pkg/blobstore/quorum/quorum_blob_access.go new file mode 100644 index 00000000..df107a2b --- /dev/null +++ b/pkg/blobstore/quorum/quorum_blob_access.go @@ -0,0 +1,218 @@ +package quorum + +import ( + "context" + "sync" + + remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" + "github.com/buildbarn/bb-storage/pkg/blobstore" + "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" + "github.com/buildbarn/bb-storage/pkg/blobstore/slicing" + "github.com/buildbarn/bb-storage/pkg/digest" + "github.com/buildbarn/bb-storage/pkg/random" + "github.com/buildbarn/bb-storage/pkg/util" + "github.com/prometheus/client_golang/prometheus" + + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var ( + quorumBlobAccessPrometheusMetrics sync.Once + + quorumBlobAccessFindMissingSynchronizations = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "buildbarn", + Subsystem: "blobstore", + Name: "quorum_blob_access_find_missing_synchronizations", + Help: "Number of blobs synchronized in FindMissing()", + Buckets: append([]float64{0}, prometheus.ExponentialBuckets(1.0, 2.0, 16)...), + }, + []string{"direction"}) +) + +type quorumBlobAccess struct { + backends []blobstore.BlobAccess + readQuorum int + writeQuorum int + generator random.ThreadSafeGenerator +} + +// NewQuorumBlobAccess creates a BlobAccess that applies operations to a subset +// of storage backends, retrying on infrastructure errors. Read and write quorum +// size should be chosen so that they overlap by at least one backend. +// Note: Data is not replicated again after the original write. +func NewQuorumBlobAccess(backends []blobstore.BlobAccess, readQuorum, writeQuorum int) blobstore.BlobAccess { + quorumBlobAccessPrometheusMetrics.Do(func() { + prometheus.MustRegister(quorumBlobAccessFindMissingSynchronizations) + }) + + return &quorumBlobAccess{ + backends: backends, + readQuorum: readQuorum, + writeQuorum: writeQuorum, + generator: random.FastThreadSafeGenerator, + } +} + +func (ba *quorumBlobAccess) shuffledBackends() []blobstore.BlobAccess { + backends := make([]blobstore.BlobAccess, len(ba.backends)) + copy(backends, ba.backends) + + ba.generator.Shuffle(len(backends), func(i, j int) { + backends[i], backends[j] = backends[j], backends[i] + }) + + return backends +} + +type getQuorumErrorHandler struct { + remainingBackends []blobstore.BlobAccess + remainingReads int + retry func(blobstore.BlobAccess) buffer.Buffer +} + +func (eh *getQuorumErrorHandler) tryNextBackendOrError(err error) (buffer.Buffer, error) { + if len(eh.remainingBackends) > 0 { + nextBackend := eh.remainingBackends[0] + eh.remainingBackends = eh.remainingBackends[1:] + return eh.retry(nextBackend), nil + } + return nil, err +} + +func (eh *getQuorumErrorHandler) OnError(err error) (buffer.Buffer, error) { + fallbackErr := status.Error(codes.Unavailable, "Too many backends unavailable") + if util.IsInfrastructureError(err) { + // I/O error. Try again on another backend. + return eh.tryNextBackendOrError(fallbackErr) + + } else if status.Code(err) == codes.NotFound { + // Not found. Try again on another backend - if we haven't seen enough yet. + if eh.remainingReads <= 1 { + // Observed sufficient NotFounds. Return conclusive NotFound. + return nil, err + } + eh.remainingReads-- + + // Haven't been able to check enough backends. Can't conclude not found. + return eh.tryNextBackendOrError(fallbackErr) + } + + return nil, err +} + +func (eh getQuorumErrorHandler) Done() {} + +func (ba *quorumBlobAccess) get(getter func(b blobstore.BlobAccess) buffer.Buffer) buffer.Buffer { + backends := ba.shuffledBackends() + + backend := backends[0] + remainingBackends := backends[1:] + + return buffer.WithErrorHandler( + getter(backend), + &getQuorumErrorHandler{ + remainingBackends: remainingBackends, + remainingReads: ba.readQuorum, + retry: getter, + }) +} + +func (ba *quorumBlobAccess) Get(ctx context.Context, digest digest.Digest) buffer.Buffer { + return ba.get(func(b blobstore.BlobAccess) buffer.Buffer { + return b.Get(ctx, digest) + }) +} + +func (ba *quorumBlobAccess) GetFromComposite(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer { + return ba.get(func(b blobstore.BlobAccess) buffer.Buffer { + return b.GetFromComposite(ctx, parentDigest, childDigest, slicer) + }) +} + +func (ba *quorumBlobAccess) shuffledBackendQueue() <-chan blobstore.BlobAccess { + queue := make(chan blobstore.BlobAccess) + go func() error { + backends := ba.shuffledBackends() + + for _, b := range backends { + queue <- b + } + + close(queue) + return nil + }() + return queue +} + +func (ba *quorumBlobAccess) Put(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { + // Store object in at least writeQuorum storage backends. + group, ctx := errgroup.WithContext(ctx) + backendQueue := ba.shuffledBackendQueue() + + // Spawn writeQuorum writers. Each of these goroutines needs to succeed once. + for i := 0; i < ba.writeQuorum; i++ { + var b1 buffer.Buffer + if i == ba.writeQuorum-1 { + // Last writer, no need to clone buffer + b1 = b + } else { + b, b1 = b.CloneStream() + } + + group.Go(func() error { + var err error + for backend := range backendQueue { + err = backend.Put(ctx, digest, b1) + if err == nil { + // Success + return nil + } + } + return err + }) + } + + return group.Wait() +} + +func (ba *quorumBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (digest.Set, error) { + // Call FindMissing() on readQuorum backends. + group, ctx := errgroup.WithContext(ctx) + backendQueue := ba.shuffledBackendQueue() + + results := make([]digest.Set, ba.readQuorum) + for i := 0; i < ba.readQuorum; i++ { + resultIdx := i + group.Go(func() error { + var err error + for backend := range backendQueue { + results[resultIdx], err = backend.FindMissing(ctx, digests) + if err == nil { + // Success + return nil + } + } + return err + }) + } + + if err := group.Wait(); err != nil { + return digest.EmptySet, err + } + + // Find intersection of all results + missingFromAll := results[0] + for _, result := range results[1:] { + _, missingFromAll, _ = digest.GetDifferenceAndIntersection(missingFromAll, result) + } + return missingFromAll, nil +} + +func (ba *quorumBlobAccess) GetCapabilities(ctx context.Context, instanceName digest.InstanceName) (*remoteexecution.ServerCapabilities, error) { + backends := ba.shuffledBackends() + + return backends[0].GetCapabilities(ctx, instanceName) +} diff --git a/pkg/blobstore/quorum/quorum_blob_access_test.go b/pkg/blobstore/quorum/quorum_blob_access_test.go new file mode 100644 index 00000000..d2292c19 --- /dev/null +++ b/pkg/blobstore/quorum/quorum_blob_access_test.go @@ -0,0 +1,296 @@ +package quorum_test + +import ( + "context" + "sync/atomic" + "testing" + + remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" + "github.com/buildbarn/bb-storage/internal/mock" + "github.com/buildbarn/bb-storage/pkg/blobstore" + "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" + "github.com/buildbarn/bb-storage/pkg/blobstore/quorum" + "github.com/buildbarn/bb-storage/pkg/digest" + "github.com/buildbarn/bb-storage/pkg/testutil" + "github.com/stretchr/testify/require" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "go.uber.org/mock/gomock" +) + +func setup(t *testing.T) (*gomock.Controller, context.Context, []*mock.MockBlobAccess, blobstore.BlobAccess) { + ctrl, ctx := gomock.WithContext(context.Background(), t) + + mockBackends := make([]*mock.MockBlobAccess, 3) + backends := make([]blobstore.BlobAccess, 3) + for i := range backends { + b := mock.NewMockBlobAccess(ctrl) + mockBackends[i] = b + backends[i] = b + } + + blobAccess := quorum.NewQuorumBlobAccess(backends, 2, 2) + + return ctrl, ctx, mockBackends, blobAccess +} + +var blobDigest = digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "64ec88ca00b268e5ba1a35678a1b5316d212f4f366b2477232534a8aeca37f3c", 11) + +func TestQuorumBlobAccessGet_Success(t *testing.T) { + // Common case: Blob exists on a quorum of backends. + ctrl, ctx, backends, blobAccess := setup(t) + + backends[0].EXPECT().Get(ctx, blobDigest).Return(buffer.NewValidatedBufferFromByteSlice([]byte("Hello world"))).MinTimes(1) + backends[1].EXPECT().Get(ctx, blobDigest).Return(buffer.NewValidatedBufferFromByteSlice([]byte("Hello world"))).MinTimes(1) + backends[2].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.NotFound, "Blob not found"))).MinTimes(1) + + // Requests should alternate between backends to spread + // the load between backends equally. + for i := 0; i < 100 && !ctrl.Satisfied(); i++ { + data, err := blobAccess.Get(ctx, blobDigest).ToByteSlice(100) + require.NoError(t, err) + require.Equal(t, []byte("Hello world"), data) + } +} + +func TestQuorumBlobAccessGet_NotFoundAll(t *testing.T) { + // Common case: Blob is not present on any backend. + _, ctx, backends, blobAccess := setup(t) + + backends[0].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.NotFound, "Blob not found"))).MaxTimes(1) + backends[1].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.NotFound, "Blob not found"))).MaxTimes(1) + backends[2].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.NotFound, "Blob not found"))).MaxTimes(1) + + _, err := blobAccess.Get(ctx, blobDigest).ToByteSlice(100) + testutil.RequireEqualStatus(t, status.Error(codes.NotFound, "Blob not found"), err) +} + +func TestQuorumBlobAccessGet_UnreachableSuccess(t *testing.T) { + // One backend unavailable. Should query remaining two, and find blob. + _, ctx, backends, blobAccess := setup(t) + + backends[0].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.Unavailable, "Server gone to lunch"))).MaxTimes(1) + backends[1].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.NotFound, "Blob not found"))).MaxTimes(1) + backends[2].EXPECT().Get(ctx, blobDigest).Return(buffer.NewValidatedBufferFromByteSlice([]byte("Hello world"))).Times(1) + + data, err := blobAccess.Get(ctx, blobDigest).ToByteSlice(100) + require.NoError(t, err) + require.Equal(t, []byte("Hello world"), data) +} + +func TestQuorumBlobAccessGet_UnreachableNotFound(t *testing.T) { + // One backend unavailable. Should query remaining two, and conclude not found. + _, ctx, backends, blobAccess := setup(t) + + backends[0].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.Unavailable, "Server gone to lunch"))).MaxTimes(1) + backends[1].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.NotFound, "Blob not found"))).Times(1) + backends[2].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.NotFound, "Blob not found"))).Times(1) + + _, err := blobAccess.Get(ctx, blobDigest).ToByteSlice(100) + testutil.RequireEqualStatus(t, status.Error(codes.NotFound, "Blob not found"), err) +} + +func TestQuorumBlobAccessGet_Unreachable2Success(t *testing.T) { + // Two backends unavailable. Should query remaining one, and find blob. + _, ctx, backends, blobAccess := setup(t) + + backends[0].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.Unavailable, "Server gone to lunch"))).MaxTimes(1) + backends[1].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.Unavailable, "Server gone to lunch"))).MaxTimes(1) + backends[2].EXPECT().Get(ctx, blobDigest).Return(buffer.NewValidatedBufferFromByteSlice([]byte("Hello world"))).Times(1) + + data, err := blobAccess.Get(ctx, blobDigest).ToByteSlice(100) + require.NoError(t, err) + require.Equal(t, []byte("Hello world"), data) +} + +func TestQuorumBlobAccessGet_Unreachable2Failure(t *testing.T) { + // Two backends unavailable. Should query remaining one, but be unable to conclude not found. + _, ctx, backends, blobAccess := setup(t) + + backends[0].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.Unavailable, "Server gone to lunch"))) + backends[1].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.Unavailable, "Server gone to lunch"))) + backends[2].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.NotFound, "Blob not found"))).Times(1) + + // Can't conclude the blob doesn't exist. + _, err := blobAccess.Get(ctx, blobDigest).ToByteSlice(100) + testutil.RequireEqualStatus(t, status.Error(codes.Unavailable, "Too many backends unavailable"), err) +} + +func TestQuorumBlobAccessGet_Unreachable3Failure(t *testing.T) { + // All backends unavailable. + _, ctx, backends, blobAccess := setup(t) + + backends[0].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.Unavailable, "Server gone to lunch"))).Times(1) + backends[1].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.Unavailable, "Server gone to lunch"))).Times(1) + backends[2].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.Unavailable, "Server gone to lunch"))).Times(1) + + // Can't conclude the blob doesn't exist. + _, err := blobAccess.Get(ctx, blobDigest).ToByteSlice(100) + testutil.RequireEqualStatus(t, status.Error(codes.Unavailable, "Too many backends unavailable"), err) +} + +func TestQuorumBlobAccessGetFromComposite_Success(t *testing.T) { + // Common case: Blob exists on a quorum of backends. + // + // We assume that tests for Get() provides coverage for other + // scenarios. + ctrl, ctx, backends, blobAccess := setup(t) + + parentDigest := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "834c514174f3a7d5952dfa68d4b657f3c4cf78b3973dcf2721731c3861559828", 100) + childDigest := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "64ec88ca00b268e5ba1a35678a1b5316d212f4f366b2477232534a8aeca37f3c", 11) + slicer := mock.NewMockBlobSlicer(ctrl) + + backends[0].EXPECT().GetFromComposite(ctx, parentDigest, childDigest, slicer).Return(buffer.NewValidatedBufferFromByteSlice([]byte("Hello world"))).MaxTimes(1) + backends[1].EXPECT().GetFromComposite(ctx, parentDigest, childDigest, slicer).Return(buffer.NewValidatedBufferFromByteSlice([]byte("Hello world"))).MaxTimes(1) + backends[2].EXPECT().GetFromComposite(ctx, parentDigest, childDigest, slicer).Return(buffer.NewBufferFromError(status.Error(codes.NotFound, "Blob not found"))).MaxTimes(1) + + data, err := blobAccess.GetFromComposite(ctx, parentDigest, childDigest, slicer).ToByteSlice(100) + require.NoError(t, err) + require.Equal(t, []byte("Hello world"), data) +} + +func TestQuorumBlobAccessPut_Success(t *testing.T) { + _, ctx, backends, blobAccess := setup(t) + + var numWrites atomic.Int32 + doSuccess := func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { + data, err := b.ToByteSlice(100) + require.NoError(t, err) + require.Equal(t, []byte("Hello world"), data) + numWrites.Add(1) + return nil + } + + backends[0].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doSuccess).MaxTimes(1) + backends[1].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doSuccess).MaxTimes(1) + backends[2].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doSuccess).MaxTimes(1) + + require.NoError(t, blobAccess.Put(ctx, blobDigest, buffer.NewValidatedBufferFromByteSlice([]byte("Hello world")))) + require.EqualValues(t, 2, numWrites.Load()) +} + +func TestQuorumBlobAccessPut_UnreachableSuccess(t *testing.T) { + // One backend unavailable. Should write to remaining two. + _, ctx, backends, blobAccess := setup(t) + + var numWrites atomic.Int32 + doSuccess := func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { + b.Discard() + numWrites.Add(1) + return nil + } + doUnavail := func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { + b.Discard() + return status.Error(codes.Unavailable, "Server gone to lunch") + } + + backends[0].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doSuccess).Times(1) + backends[1].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doSuccess).Times(1) + backends[2].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doUnavail).MaxTimes(1) + + require.NoError(t, blobAccess.Put(ctx, blobDigest, buffer.NewValidatedBufferFromByteSlice([]byte("Hello world")))) + require.EqualValues(t, 2, numWrites.Load()) +} + +func TestQuorumBlobAccessPut_Unreachable2Failure(t *testing.T) { + // Two backends unavailable. Should not report success. + _, ctx, backends, blobAccess := setup(t) + + doSuccess := func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { + b.Discard() + return nil + } + doUnavail := func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { + b.Discard() + return status.Error(codes.Unavailable, "Server gone to lunch") + } + + backends[0].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doSuccess).MaxTimes(1) + backends[1].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doUnavail).MaxTimes(1) + backends[2].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doUnavail).MaxTimes(1) + + testutil.RequireEqualStatus( + t, + status.Error(codes.Unavailable, "Server gone to lunch"), + blobAccess.Put(ctx, blobDigest, buffer.NewValidatedBufferFromByteSlice([]byte("Hello world")))) +} + +func TestQuorumBlobAccessPut_Failure(t *testing.T) { + // Non-infrastructure error. Should return error cause. + _, ctx, backends, blobAccess := setup(t) + + doFail := func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { + b.Discard() + return status.Error(codes.InvalidArgument, "Computer says no") + } + + backends[0].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doFail).MaxTimes(1) + backends[1].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doFail).MaxTimes(1) + backends[2].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doFail).MaxTimes(1) + + testutil.RequireEqualStatus( + t, + status.Error(codes.InvalidArgument, "Computer says no"), + blobAccess.Put(ctx, blobDigest, buffer.NewValidatedBufferFromByteSlice([]byte("Hello world")))) +} + +func TestQuorumBlobAccessFindMissing_Success(t *testing.T) { + _, ctx, backends, blobAccess := setup(t) + + digestNone := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "64ec88ca00b268e5ba1a35678a1b5316d212f4f366b2477232534a8aeca37f3c", 11) + digestA := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 0) + digestB := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "522b44d647b6989f60302ef755c277e508d5bcc38f05e139906ebdb03a5b19f2", 9) + allDigests := digest.NewSetBuilder().Add(digestNone).Add(digestA).Add(digestB).Build() + missingFrom0 := digest.NewSetBuilder().Add(digestNone).Add(digestA).Build() // Missing A + missingFrom1 := digest.NewSetBuilder().Add(digestNone).Add(digestB).Build() // Missing B + missingFrom2 := digest.NewSetBuilder().Add(digestNone).Build() // Has A and B + + backends[0].EXPECT().FindMissing(gomock.Any(), allDigests).Return(missingFrom0, nil).MaxTimes(1) + backends[1].EXPECT().FindMissing(gomock.Any(), allDigests).Return(missingFrom1, nil).MaxTimes(1) + backends[2].EXPECT().FindMissing(gomock.Any(), allDigests).Return(missingFrom2, nil).MaxTimes(1) + + missing, err := blobAccess.FindMissing(ctx, allDigests) + require.NoError(t, err) + require.Equal(t, digestNone.ToSingletonSet(), missing) +} + +func TestQuorumBlobAccessFindMissing_UnavailableSuccess(t *testing.T) { + // One server unavailable. Doesn't change result. + _, ctx, backends, blobAccess := setup(t) + + digestNone := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "64ec88ca00b268e5ba1a35678a1b5316d212f4f366b2477232534a8aeca37f3c", 11) + digestA := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 0) + digestB := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "522b44d647b6989f60302ef755c277e508d5bcc38f05e139906ebdb03a5b19f2", 9) + allDigests := digest.NewSetBuilder().Add(digestNone).Add(digestA).Add(digestB).Build() + missingFrom0 := digest.NewSetBuilder().Add(digestNone).Add(digestA).Build() // Missing A + missingFrom1 := digest.NewSetBuilder().Add(digestNone).Add(digestB).Build() // Missing B + //missingFrom2 := digest.NewSetBuilder().Add(digestNone).Build() // Has A and B + + backends[0].EXPECT().FindMissing(gomock.Any(), allDigests).Return(missingFrom0, nil).MaxTimes(1) + backends[1].EXPECT().FindMissing(gomock.Any(), allDigests).Return(missingFrom1, nil).MaxTimes(1) + backends[2].EXPECT().FindMissing(gomock.Any(), allDigests).Return(digest.EmptySet, status.Error(codes.Unavailable, "Server gone to lunch")).MaxTimes(1) + + missing, err := blobAccess.FindMissing(ctx, allDigests) + require.NoError(t, err) + require.Equal(t, digestNone.ToSingletonSet(), missing) +} + +func TestQuorumBlobAccessFindMissing_Unavailable2Failure(t *testing.T) { + // Two servers unavailable. Return failure. + _, ctx, backends, blobAccess := setup(t) + + digestNone := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "64ec88ca00b268e5ba1a35678a1b5316d212f4f366b2477232534a8aeca37f3c", 11) + digestA := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 0) + digestB := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "522b44d647b6989f60302ef755c277e508d5bcc38f05e139906ebdb03a5b19f2", 9) + allDigests := digest.NewSetBuilder().Add(digestNone).Add(digestA).Add(digestB).Build() + missingFrom0 := digest.NewSetBuilder().Add(digestNone).Add(digestA).Build() // Missing A + + backends[0].EXPECT().FindMissing(gomock.Any(), allDigests).Return(missingFrom0, nil).MaxTimes(1) + backends[1].EXPECT().FindMissing(gomock.Any(), allDigests).Return(digest.EmptySet, status.Error(codes.Unavailable, "Server gone to lunch")).MaxTimes(1) + backends[2].EXPECT().FindMissing(gomock.Any(), allDigests).Return(digest.EmptySet, status.Error(codes.Unavailable, "Server gone to lunch")).MaxTimes(1) + + _, err := blobAccess.FindMissing(ctx, allDigests) + testutil.RequireEqualStatus(t, status.Error(codes.Unavailable, "Server gone to lunch"), err) +} diff --git a/pkg/proto/configuration/blobstore/blobstore.proto b/pkg/proto/configuration/blobstore/blobstore.proto index f3641528..819cf21a 100644 --- a/pkg/proto/configuration/blobstore/blobstore.proto +++ b/pkg/proto/configuration/blobstore/blobstore.proto @@ -50,6 +50,14 @@ message BlobAccessConfiguration { // be refilled automatically. MirroredBlobAccessConfiguration mirrored = 14; + // Read and write blobs to a subset of multiple backends. + // + // This backend provides high availability, but not high durability. + // Operations on unavailable backends are retried on other backends. Data + // is not replicated again after the initial writes, on the assumption that + // underlying storage is durable, and unavailability is temporary. + QuorumBlobAccessConfiguration quorum = 28; + // Store blobs on the local system. LocalBlobAccessConfiguration local = 15; @@ -312,6 +320,34 @@ message MirroredBlobAccessConfiguration { BlobReplicatorConfiguration replicator_b_to_a = 4; } +// QuorumBlobAccess reads and writes data to a subset of backends. +// The read and write quorum sizes must be chosen such that reads and writes are +// guaranteed to overlap by at least one backend. +// ie: read_quorum + write_quorum > len(backends) +// +// A typical highly available setup is 3 backends, with read_quorum == +// write_quorum == 2. This requires write operations succeed on 2/3 backends, +// which means a single backend can be unavailable for writes. Reads must +// successfully check at most 2/3 backends before concluding a blob does not +// exist, which also means a single backend can be unavailable for reads. +// +// Other less common configurations are 3-read + 3-write from 5 backends (2 +// backends can be unavailable), and 1-read + 2-write from 2 backends, which +// effectively recreates mirrored MirroredBlobaccess (ignoring replication). +message QuorumBlobAccessConfiguration { + // The set of backends. Backends are selected randomly for each operation and + // order is not significant. + repeated BlobAccessConfiguration backends = 1; + + // Reads must successfully check this many backends, before concluding a blob + // does not exist. + uint32 read_quorum = 2; + + // Writes must succeed on this many backends, before the overall write + // operation succeeds. + uint32 write_quorum = 3; +} + // LocalBlobAccess stores all data onto disk in block sizes. A block // cannot span multiple blocks, meaning that blocks generally need to // be large in size (gigabytes). The number of blocks may be relatively