From 14be4e35bb5166aad6623be70f2210a0fef02212 Mon Sep 17 00:00:00 2001 From: gus Date: Wed, 14 Aug 2024 11:02:57 +1000 Subject: [PATCH] Add new 'quorum' blob access Add a new 'quorum' blob access, that provides high availability on top of already-durable storage. Quorum blob access only requires that a subset of backends are available in order to function successfully. The exact quorum size is configurable, but almost all cases should use "half the number of backends, rounded up" for both read and write quorum size. ie: 2 out of 3 backends for single-failure tolerance, or 3 out of 5 backends for double-failure tolerance. Writes (Put) must succeed on at least write_quorum number of backends, and occur in parallel. Reads (Get) must see at least read_quorum number of not-found responses before concluding the blob does not exist, and occur sequentially. FindMissing reads are performed in parallel on read_quorum number of backends, and results are merged. Note: blobs are not replicated again after the initial Put, so the underlying storage should be durable. --- pkg/blobstore/configuration/BUILD.bazel | 1 + .../configuration/new_blob_access.go | 30 ++ pkg/blobstore/quorum/BUILD.bazel | 40 +++ pkg/blobstore/quorum/quorum_blob_access.go | 218 +++++++++++++ .../quorum/quorum_blob_access_test.go | 296 ++++++++++++++++++ .../configuration/blobstore/blobstore.proto | 36 +++ 6 files changed, 621 insertions(+) create mode 100644 pkg/blobstore/quorum/BUILD.bazel create mode 100644 pkg/blobstore/quorum/quorum_blob_access.go create mode 100644 pkg/blobstore/quorum/quorum_blob_access_test.go diff --git a/pkg/blobstore/configuration/BUILD.bazel b/pkg/blobstore/configuration/BUILD.bazel index ca4d7981..32f6a414 100644 --- a/pkg/blobstore/configuration/BUILD.bazel +++ b/pkg/blobstore/configuration/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//pkg/blobstore/grpcclients", "//pkg/blobstore/local", "//pkg/blobstore/mirrored", + "//pkg/blobstore/quorum", "//pkg/blobstore/readcaching", "//pkg/blobstore/readfallback", "//pkg/blobstore/replication", diff --git a/pkg/blobstore/configuration/new_blob_access.go b/pkg/blobstore/configuration/new_blob_access.go index 6f175005..ee212a78 100644 --- a/pkg/blobstore/configuration/new_blob_access.go +++ b/pkg/blobstore/configuration/new_blob_access.go @@ -10,6 +10,7 @@ import ( "github.com/buildbarn/bb-storage/pkg/blobstore" "github.com/buildbarn/bb-storage/pkg/blobstore/local" "github.com/buildbarn/bb-storage/pkg/blobstore/mirrored" + "github.com/buildbarn/bb-storage/pkg/blobstore/quorum" "github.com/buildbarn/bb-storage/pkg/blobstore/readcaching" "github.com/buildbarn/bb-storage/pkg/blobstore/readfallback" "github.com/buildbarn/bb-storage/pkg/blobstore/sharding" @@ -143,6 +144,35 @@ func (nc *simpleNestedBlobAccessCreator) newNestedBlobAccessBare(configuration * BlobAccess: mirrored.NewMirroredBlobAccess(backendA.BlobAccess, backendB.BlobAccess, replicatorAToB, replicatorBToA), DigestKeyFormat: backendA.DigestKeyFormat.Combine(backendB.DigestKeyFormat), }, "mirrored", nil + case *pb.BlobAccessConfiguration_Quorum: + backends := make([]blobstore.BlobAccess, 0, len(backend.Quorum.Backends)) + var combinedDigestKeyFormat *digest.KeyFormat + + for _, b := range backend.Quorum.Backends { + backend, err := nc.NewNestedBlobAccess(b, creator) + if err != nil { + return BlobAccessInfo{}, "", err + } + backends = append(backends, backend.BlobAccess) + if combinedDigestKeyFormat == nil { + combinedDigestKeyFormat = &backend.DigestKeyFormat + } else { + newDigestKeyFormat := combinedDigestKeyFormat.Combine(backend.DigestKeyFormat) + combinedDigestKeyFormat = &newDigestKeyFormat + } + } + if len(backends) == 0 { + return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Cannot create quorum blob access without any backends") + } + readQuorum := int(backend.Quorum.ReadQuorum) + writeQuorum := int(backend.Quorum.WriteQuorum) + if readQuorum + writeQuorum <= len(backends) { + return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Quorum blob access requires read_quorum + write_quorum > number of backends") + } + return BlobAccessInfo{ + BlobAccess: quorum.NewQuorumBlobAccess(backends, readQuorum, writeQuorum), + DigestKeyFormat: *combinedDigestKeyFormat, + }, "quorum", nil case *pb.BlobAccessConfiguration_Local: digestKeyFormat := digest.KeyWithInstance if !backend.Local.HierarchicalInstanceNames { diff --git a/pkg/blobstore/quorum/BUILD.bazel b/pkg/blobstore/quorum/BUILD.bazel new file mode 100644 index 00000000..4cdcf2d9 --- /dev/null +++ b/pkg/blobstore/quorum/BUILD.bazel @@ -0,0 +1,40 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "quorum", + srcs = ["quorum_blob_access.go"], + importpath = "github.com/buildbarn/bb-storage/pkg/blobstore/quorum", + visibility = ["//visibility:public"], + deps = [ + "//pkg/blobstore", + "//pkg/blobstore/buffer", + "//pkg/blobstore/replication", + "//pkg/blobstore/slicing", + "//pkg/digest", + "//pkg/random", + "//pkg/util", + "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:execution", + "@com_github_prometheus_client_golang//prometheus", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", + "@org_golang_x_sync//errgroup", + ], +) + +go_test( + name = "quorum_test", + srcs = ["quorum_blob_access_test.go"], + deps = [ + ":quorum", + "//internal/mock", + "//pkg/blobstore", + "//pkg/blobstore/buffer", + "//pkg/digest", + "//pkg/testutil", + "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:execution", + "@com_github_stretchr_testify//require", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", + "@org_uber_go_mock//gomock", + ], +) diff --git a/pkg/blobstore/quorum/quorum_blob_access.go b/pkg/blobstore/quorum/quorum_blob_access.go new file mode 100644 index 00000000..df107a2b --- /dev/null +++ b/pkg/blobstore/quorum/quorum_blob_access.go @@ -0,0 +1,218 @@ +package quorum + +import ( + "context" + "sync" + + remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" + "github.com/buildbarn/bb-storage/pkg/blobstore" + "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" + "github.com/buildbarn/bb-storage/pkg/blobstore/slicing" + "github.com/buildbarn/bb-storage/pkg/digest" + "github.com/buildbarn/bb-storage/pkg/random" + "github.com/buildbarn/bb-storage/pkg/util" + "github.com/prometheus/client_golang/prometheus" + + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var ( + quorumBlobAccessPrometheusMetrics sync.Once + + quorumBlobAccessFindMissingSynchronizations = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "buildbarn", + Subsystem: "blobstore", + Name: "quorum_blob_access_find_missing_synchronizations", + Help: "Number of blobs synchronized in FindMissing()", + Buckets: append([]float64{0}, prometheus.ExponentialBuckets(1.0, 2.0, 16)...), + }, + []string{"direction"}) +) + +type quorumBlobAccess struct { + backends []blobstore.BlobAccess + readQuorum int + writeQuorum int + generator random.ThreadSafeGenerator +} + +// NewQuorumBlobAccess creates a BlobAccess that applies operations to a subset +// of storage backends, retrying on infrastructure errors. Read and write quorum +// size should be chosen so that they overlap by at least one backend. +// Note: Data is not replicated again after the original write. +func NewQuorumBlobAccess(backends []blobstore.BlobAccess, readQuorum, writeQuorum int) blobstore.BlobAccess { + quorumBlobAccessPrometheusMetrics.Do(func() { + prometheus.MustRegister(quorumBlobAccessFindMissingSynchronizations) + }) + + return &quorumBlobAccess{ + backends: backends, + readQuorum: readQuorum, + writeQuorum: writeQuorum, + generator: random.FastThreadSafeGenerator, + } +} + +func (ba *quorumBlobAccess) shuffledBackends() []blobstore.BlobAccess { + backends := make([]blobstore.BlobAccess, len(ba.backends)) + copy(backends, ba.backends) + + ba.generator.Shuffle(len(backends), func(i, j int) { + backends[i], backends[j] = backends[j], backends[i] + }) + + return backends +} + +type getQuorumErrorHandler struct { + remainingBackends []blobstore.BlobAccess + remainingReads int + retry func(blobstore.BlobAccess) buffer.Buffer +} + +func (eh *getQuorumErrorHandler) tryNextBackendOrError(err error) (buffer.Buffer, error) { + if len(eh.remainingBackends) > 0 { + nextBackend := eh.remainingBackends[0] + eh.remainingBackends = eh.remainingBackends[1:] + return eh.retry(nextBackend), nil + } + return nil, err +} + +func (eh *getQuorumErrorHandler) OnError(err error) (buffer.Buffer, error) { + fallbackErr := status.Error(codes.Unavailable, "Too many backends unavailable") + if util.IsInfrastructureError(err) { + // I/O error. Try again on another backend. + return eh.tryNextBackendOrError(fallbackErr) + + } else if status.Code(err) == codes.NotFound { + // Not found. Try again on another backend - if we haven't seen enough yet. + if eh.remainingReads <= 1 { + // Observed sufficient NotFounds. Return conclusive NotFound. + return nil, err + } + eh.remainingReads-- + + // Haven't been able to check enough backends. Can't conclude not found. + return eh.tryNextBackendOrError(fallbackErr) + } + + return nil, err +} + +func (eh getQuorumErrorHandler) Done() {} + +func (ba *quorumBlobAccess) get(getter func(b blobstore.BlobAccess) buffer.Buffer) buffer.Buffer { + backends := ba.shuffledBackends() + + backend := backends[0] + remainingBackends := backends[1:] + + return buffer.WithErrorHandler( + getter(backend), + &getQuorumErrorHandler{ + remainingBackends: remainingBackends, + remainingReads: ba.readQuorum, + retry: getter, + }) +} + +func (ba *quorumBlobAccess) Get(ctx context.Context, digest digest.Digest) buffer.Buffer { + return ba.get(func(b blobstore.BlobAccess) buffer.Buffer { + return b.Get(ctx, digest) + }) +} + +func (ba *quorumBlobAccess) GetFromComposite(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer { + return ba.get(func(b blobstore.BlobAccess) buffer.Buffer { + return b.GetFromComposite(ctx, parentDigest, childDigest, slicer) + }) +} + +func (ba *quorumBlobAccess) shuffledBackendQueue() <-chan blobstore.BlobAccess { + queue := make(chan blobstore.BlobAccess) + go func() error { + backends := ba.shuffledBackends() + + for _, b := range backends { + queue <- b + } + + close(queue) + return nil + }() + return queue +} + +func (ba *quorumBlobAccess) Put(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { + // Store object in at least writeQuorum storage backends. + group, ctx := errgroup.WithContext(ctx) + backendQueue := ba.shuffledBackendQueue() + + // Spawn writeQuorum writers. Each of these goroutines needs to succeed once. + for i := 0; i < ba.writeQuorum; i++ { + var b1 buffer.Buffer + if i == ba.writeQuorum-1 { + // Last writer, no need to clone buffer + b1 = b + } else { + b, b1 = b.CloneStream() + } + + group.Go(func() error { + var err error + for backend := range backendQueue { + err = backend.Put(ctx, digest, b1) + if err == nil { + // Success + return nil + } + } + return err + }) + } + + return group.Wait() +} + +func (ba *quorumBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (digest.Set, error) { + // Call FindMissing() on readQuorum backends. + group, ctx := errgroup.WithContext(ctx) + backendQueue := ba.shuffledBackendQueue() + + results := make([]digest.Set, ba.readQuorum) + for i := 0; i < ba.readQuorum; i++ { + resultIdx := i + group.Go(func() error { + var err error + for backend := range backendQueue { + results[resultIdx], err = backend.FindMissing(ctx, digests) + if err == nil { + // Success + return nil + } + } + return err + }) + } + + if err := group.Wait(); err != nil { + return digest.EmptySet, err + } + + // Find intersection of all results + missingFromAll := results[0] + for _, result := range results[1:] { + _, missingFromAll, _ = digest.GetDifferenceAndIntersection(missingFromAll, result) + } + return missingFromAll, nil +} + +func (ba *quorumBlobAccess) GetCapabilities(ctx context.Context, instanceName digest.InstanceName) (*remoteexecution.ServerCapabilities, error) { + backends := ba.shuffledBackends() + + return backends[0].GetCapabilities(ctx, instanceName) +} diff --git a/pkg/blobstore/quorum/quorum_blob_access_test.go b/pkg/blobstore/quorum/quorum_blob_access_test.go new file mode 100644 index 00000000..d2292c19 --- /dev/null +++ b/pkg/blobstore/quorum/quorum_blob_access_test.go @@ -0,0 +1,296 @@ +package quorum_test + +import ( + "context" + "sync/atomic" + "testing" + + remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" + "github.com/buildbarn/bb-storage/internal/mock" + "github.com/buildbarn/bb-storage/pkg/blobstore" + "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" + "github.com/buildbarn/bb-storage/pkg/blobstore/quorum" + "github.com/buildbarn/bb-storage/pkg/digest" + "github.com/buildbarn/bb-storage/pkg/testutil" + "github.com/stretchr/testify/require" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "go.uber.org/mock/gomock" +) + +func setup(t *testing.T) (*gomock.Controller, context.Context, []*mock.MockBlobAccess, blobstore.BlobAccess) { + ctrl, ctx := gomock.WithContext(context.Background(), t) + + mockBackends := make([]*mock.MockBlobAccess, 3) + backends := make([]blobstore.BlobAccess, 3) + for i := range backends { + b := mock.NewMockBlobAccess(ctrl) + mockBackends[i] = b + backends[i] = b + } + + blobAccess := quorum.NewQuorumBlobAccess(backends, 2, 2) + + return ctrl, ctx, mockBackends, blobAccess +} + +var blobDigest = digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "64ec88ca00b268e5ba1a35678a1b5316d212f4f366b2477232534a8aeca37f3c", 11) + +func TestQuorumBlobAccessGet_Success(t *testing.T) { + // Common case: Blob exists on a quorum of backends. + ctrl, ctx, backends, blobAccess := setup(t) + + backends[0].EXPECT().Get(ctx, blobDigest).Return(buffer.NewValidatedBufferFromByteSlice([]byte("Hello world"))).MinTimes(1) + backends[1].EXPECT().Get(ctx, blobDigest).Return(buffer.NewValidatedBufferFromByteSlice([]byte("Hello world"))).MinTimes(1) + backends[2].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.NotFound, "Blob not found"))).MinTimes(1) + + // Requests should alternate between backends to spread + // the load between backends equally. + for i := 0; i < 100 && !ctrl.Satisfied(); i++ { + data, err := blobAccess.Get(ctx, blobDigest).ToByteSlice(100) + require.NoError(t, err) + require.Equal(t, []byte("Hello world"), data) + } +} + +func TestQuorumBlobAccessGet_NotFoundAll(t *testing.T) { + // Common case: Blob is not present on any backend. + _, ctx, backends, blobAccess := setup(t) + + backends[0].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.NotFound, "Blob not found"))).MaxTimes(1) + backends[1].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.NotFound, "Blob not found"))).MaxTimes(1) + backends[2].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.NotFound, "Blob not found"))).MaxTimes(1) + + _, err := blobAccess.Get(ctx, blobDigest).ToByteSlice(100) + testutil.RequireEqualStatus(t, status.Error(codes.NotFound, "Blob not found"), err) +} + +func TestQuorumBlobAccessGet_UnreachableSuccess(t *testing.T) { + // One backend unavailable. Should query remaining two, and find blob. + _, ctx, backends, blobAccess := setup(t) + + backends[0].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.Unavailable, "Server gone to lunch"))).MaxTimes(1) + backends[1].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.NotFound, "Blob not found"))).MaxTimes(1) + backends[2].EXPECT().Get(ctx, blobDigest).Return(buffer.NewValidatedBufferFromByteSlice([]byte("Hello world"))).Times(1) + + data, err := blobAccess.Get(ctx, blobDigest).ToByteSlice(100) + require.NoError(t, err) + require.Equal(t, []byte("Hello world"), data) +} + +func TestQuorumBlobAccessGet_UnreachableNotFound(t *testing.T) { + // One backend unavailable. Should query remaining two, and conclude not found. + _, ctx, backends, blobAccess := setup(t) + + backends[0].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.Unavailable, "Server gone to lunch"))).MaxTimes(1) + backends[1].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.NotFound, "Blob not found"))).Times(1) + backends[2].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.NotFound, "Blob not found"))).Times(1) + + _, err := blobAccess.Get(ctx, blobDigest).ToByteSlice(100) + testutil.RequireEqualStatus(t, status.Error(codes.NotFound, "Blob not found"), err) +} + +func TestQuorumBlobAccessGet_Unreachable2Success(t *testing.T) { + // Two backends unavailable. Should query remaining one, and find blob. + _, ctx, backends, blobAccess := setup(t) + + backends[0].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.Unavailable, "Server gone to lunch"))).MaxTimes(1) + backends[1].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.Unavailable, "Server gone to lunch"))).MaxTimes(1) + backends[2].EXPECT().Get(ctx, blobDigest).Return(buffer.NewValidatedBufferFromByteSlice([]byte("Hello world"))).Times(1) + + data, err := blobAccess.Get(ctx, blobDigest).ToByteSlice(100) + require.NoError(t, err) + require.Equal(t, []byte("Hello world"), data) +} + +func TestQuorumBlobAccessGet_Unreachable2Failure(t *testing.T) { + // Two backends unavailable. Should query remaining one, but be unable to conclude not found. + _, ctx, backends, blobAccess := setup(t) + + backends[0].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.Unavailable, "Server gone to lunch"))) + backends[1].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.Unavailable, "Server gone to lunch"))) + backends[2].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.NotFound, "Blob not found"))).Times(1) + + // Can't conclude the blob doesn't exist. + _, err := blobAccess.Get(ctx, blobDigest).ToByteSlice(100) + testutil.RequireEqualStatus(t, status.Error(codes.Unavailable, "Too many backends unavailable"), err) +} + +func TestQuorumBlobAccessGet_Unreachable3Failure(t *testing.T) { + // All backends unavailable. + _, ctx, backends, blobAccess := setup(t) + + backends[0].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.Unavailable, "Server gone to lunch"))).Times(1) + backends[1].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.Unavailable, "Server gone to lunch"))).Times(1) + backends[2].EXPECT().Get(ctx, blobDigest).Return(buffer.NewBufferFromError(status.Error(codes.Unavailable, "Server gone to lunch"))).Times(1) + + // Can't conclude the blob doesn't exist. + _, err := blobAccess.Get(ctx, blobDigest).ToByteSlice(100) + testutil.RequireEqualStatus(t, status.Error(codes.Unavailable, "Too many backends unavailable"), err) +} + +func TestQuorumBlobAccessGetFromComposite_Success(t *testing.T) { + // Common case: Blob exists on a quorum of backends. + // + // We assume that tests for Get() provides coverage for other + // scenarios. + ctrl, ctx, backends, blobAccess := setup(t) + + parentDigest := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "834c514174f3a7d5952dfa68d4b657f3c4cf78b3973dcf2721731c3861559828", 100) + childDigest := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "64ec88ca00b268e5ba1a35678a1b5316d212f4f366b2477232534a8aeca37f3c", 11) + slicer := mock.NewMockBlobSlicer(ctrl) + + backends[0].EXPECT().GetFromComposite(ctx, parentDigest, childDigest, slicer).Return(buffer.NewValidatedBufferFromByteSlice([]byte("Hello world"))).MaxTimes(1) + backends[1].EXPECT().GetFromComposite(ctx, parentDigest, childDigest, slicer).Return(buffer.NewValidatedBufferFromByteSlice([]byte("Hello world"))).MaxTimes(1) + backends[2].EXPECT().GetFromComposite(ctx, parentDigest, childDigest, slicer).Return(buffer.NewBufferFromError(status.Error(codes.NotFound, "Blob not found"))).MaxTimes(1) + + data, err := blobAccess.GetFromComposite(ctx, parentDigest, childDigest, slicer).ToByteSlice(100) + require.NoError(t, err) + require.Equal(t, []byte("Hello world"), data) +} + +func TestQuorumBlobAccessPut_Success(t *testing.T) { + _, ctx, backends, blobAccess := setup(t) + + var numWrites atomic.Int32 + doSuccess := func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { + data, err := b.ToByteSlice(100) + require.NoError(t, err) + require.Equal(t, []byte("Hello world"), data) + numWrites.Add(1) + return nil + } + + backends[0].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doSuccess).MaxTimes(1) + backends[1].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doSuccess).MaxTimes(1) + backends[2].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doSuccess).MaxTimes(1) + + require.NoError(t, blobAccess.Put(ctx, blobDigest, buffer.NewValidatedBufferFromByteSlice([]byte("Hello world")))) + require.EqualValues(t, 2, numWrites.Load()) +} + +func TestQuorumBlobAccessPut_UnreachableSuccess(t *testing.T) { + // One backend unavailable. Should write to remaining two. + _, ctx, backends, blobAccess := setup(t) + + var numWrites atomic.Int32 + doSuccess := func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { + b.Discard() + numWrites.Add(1) + return nil + } + doUnavail := func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { + b.Discard() + return status.Error(codes.Unavailable, "Server gone to lunch") + } + + backends[0].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doSuccess).Times(1) + backends[1].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doSuccess).Times(1) + backends[2].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doUnavail).MaxTimes(1) + + require.NoError(t, blobAccess.Put(ctx, blobDigest, buffer.NewValidatedBufferFromByteSlice([]byte("Hello world")))) + require.EqualValues(t, 2, numWrites.Load()) +} + +func TestQuorumBlobAccessPut_Unreachable2Failure(t *testing.T) { + // Two backends unavailable. Should not report success. + _, ctx, backends, blobAccess := setup(t) + + doSuccess := func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { + b.Discard() + return nil + } + doUnavail := func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { + b.Discard() + return status.Error(codes.Unavailable, "Server gone to lunch") + } + + backends[0].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doSuccess).MaxTimes(1) + backends[1].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doUnavail).MaxTimes(1) + backends[2].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doUnavail).MaxTimes(1) + + testutil.RequireEqualStatus( + t, + status.Error(codes.Unavailable, "Server gone to lunch"), + blobAccess.Put(ctx, blobDigest, buffer.NewValidatedBufferFromByteSlice([]byte("Hello world")))) +} + +func TestQuorumBlobAccessPut_Failure(t *testing.T) { + // Non-infrastructure error. Should return error cause. + _, ctx, backends, blobAccess := setup(t) + + doFail := func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { + b.Discard() + return status.Error(codes.InvalidArgument, "Computer says no") + } + + backends[0].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doFail).MaxTimes(1) + backends[1].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doFail).MaxTimes(1) + backends[2].EXPECT().Put(gomock.Any(), blobDigest, gomock.Any()).DoAndReturn(doFail).MaxTimes(1) + + testutil.RequireEqualStatus( + t, + status.Error(codes.InvalidArgument, "Computer says no"), + blobAccess.Put(ctx, blobDigest, buffer.NewValidatedBufferFromByteSlice([]byte("Hello world")))) +} + +func TestQuorumBlobAccessFindMissing_Success(t *testing.T) { + _, ctx, backends, blobAccess := setup(t) + + digestNone := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "64ec88ca00b268e5ba1a35678a1b5316d212f4f366b2477232534a8aeca37f3c", 11) + digestA := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 0) + digestB := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "522b44d647b6989f60302ef755c277e508d5bcc38f05e139906ebdb03a5b19f2", 9) + allDigests := digest.NewSetBuilder().Add(digestNone).Add(digestA).Add(digestB).Build() + missingFrom0 := digest.NewSetBuilder().Add(digestNone).Add(digestA).Build() // Missing A + missingFrom1 := digest.NewSetBuilder().Add(digestNone).Add(digestB).Build() // Missing B + missingFrom2 := digest.NewSetBuilder().Add(digestNone).Build() // Has A and B + + backends[0].EXPECT().FindMissing(gomock.Any(), allDigests).Return(missingFrom0, nil).MaxTimes(1) + backends[1].EXPECT().FindMissing(gomock.Any(), allDigests).Return(missingFrom1, nil).MaxTimes(1) + backends[2].EXPECT().FindMissing(gomock.Any(), allDigests).Return(missingFrom2, nil).MaxTimes(1) + + missing, err := blobAccess.FindMissing(ctx, allDigests) + require.NoError(t, err) + require.Equal(t, digestNone.ToSingletonSet(), missing) +} + +func TestQuorumBlobAccessFindMissing_UnavailableSuccess(t *testing.T) { + // One server unavailable. Doesn't change result. + _, ctx, backends, blobAccess := setup(t) + + digestNone := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "64ec88ca00b268e5ba1a35678a1b5316d212f4f366b2477232534a8aeca37f3c", 11) + digestA := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 0) + digestB := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "522b44d647b6989f60302ef755c277e508d5bcc38f05e139906ebdb03a5b19f2", 9) + allDigests := digest.NewSetBuilder().Add(digestNone).Add(digestA).Add(digestB).Build() + missingFrom0 := digest.NewSetBuilder().Add(digestNone).Add(digestA).Build() // Missing A + missingFrom1 := digest.NewSetBuilder().Add(digestNone).Add(digestB).Build() // Missing B + //missingFrom2 := digest.NewSetBuilder().Add(digestNone).Build() // Has A and B + + backends[0].EXPECT().FindMissing(gomock.Any(), allDigests).Return(missingFrom0, nil).MaxTimes(1) + backends[1].EXPECT().FindMissing(gomock.Any(), allDigests).Return(missingFrom1, nil).MaxTimes(1) + backends[2].EXPECT().FindMissing(gomock.Any(), allDigests).Return(digest.EmptySet, status.Error(codes.Unavailable, "Server gone to lunch")).MaxTimes(1) + + missing, err := blobAccess.FindMissing(ctx, allDigests) + require.NoError(t, err) + require.Equal(t, digestNone.ToSingletonSet(), missing) +} + +func TestQuorumBlobAccessFindMissing_Unavailable2Failure(t *testing.T) { + // Two servers unavailable. Return failure. + _, ctx, backends, blobAccess := setup(t) + + digestNone := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "64ec88ca00b268e5ba1a35678a1b5316d212f4f366b2477232534a8aeca37f3c", 11) + digestA := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 0) + digestB := digest.MustNewDigest("default", remoteexecution.DigestFunction_SHA256, "522b44d647b6989f60302ef755c277e508d5bcc38f05e139906ebdb03a5b19f2", 9) + allDigests := digest.NewSetBuilder().Add(digestNone).Add(digestA).Add(digestB).Build() + missingFrom0 := digest.NewSetBuilder().Add(digestNone).Add(digestA).Build() // Missing A + + backends[0].EXPECT().FindMissing(gomock.Any(), allDigests).Return(missingFrom0, nil).MaxTimes(1) + backends[1].EXPECT().FindMissing(gomock.Any(), allDigests).Return(digest.EmptySet, status.Error(codes.Unavailable, "Server gone to lunch")).MaxTimes(1) + backends[2].EXPECT().FindMissing(gomock.Any(), allDigests).Return(digest.EmptySet, status.Error(codes.Unavailable, "Server gone to lunch")).MaxTimes(1) + + _, err := blobAccess.FindMissing(ctx, allDigests) + testutil.RequireEqualStatus(t, status.Error(codes.Unavailable, "Server gone to lunch"), err) +} diff --git a/pkg/proto/configuration/blobstore/blobstore.proto b/pkg/proto/configuration/blobstore/blobstore.proto index f3641528..819cf21a 100644 --- a/pkg/proto/configuration/blobstore/blobstore.proto +++ b/pkg/proto/configuration/blobstore/blobstore.proto @@ -50,6 +50,14 @@ message BlobAccessConfiguration { // be refilled automatically. MirroredBlobAccessConfiguration mirrored = 14; + // Read and write blobs to a subset of multiple backends. + // + // This backend provides high availability, but not high durability. + // Operations on unavailable backends are retried on other backends. Data + // is not replicated again after the initial writes, on the assumption that + // underlying storage is durable, and unavailability is temporary. + QuorumBlobAccessConfiguration quorum = 28; + // Store blobs on the local system. LocalBlobAccessConfiguration local = 15; @@ -312,6 +320,34 @@ message MirroredBlobAccessConfiguration { BlobReplicatorConfiguration replicator_b_to_a = 4; } +// QuorumBlobAccess reads and writes data to a subset of backends. +// The read and write quorum sizes must be chosen such that reads and writes are +// guaranteed to overlap by at least one backend. +// ie: read_quorum + write_quorum > len(backends) +// +// A typical highly available setup is 3 backends, with read_quorum == +// write_quorum == 2. This requires write operations succeed on 2/3 backends, +// which means a single backend can be unavailable for writes. Reads must +// successfully check at most 2/3 backends before concluding a blob does not +// exist, which also means a single backend can be unavailable for reads. +// +// Other less common configurations are 3-read + 3-write from 5 backends (2 +// backends can be unavailable), and 1-read + 2-write from 2 backends, which +// effectively recreates mirrored MirroredBlobaccess (ignoring replication). +message QuorumBlobAccessConfiguration { + // The set of backends. Backends are selected randomly for each operation and + // order is not significant. + repeated BlobAccessConfiguration backends = 1; + + // Reads must successfully check this many backends, before concluding a blob + // does not exist. + uint32 read_quorum = 2; + + // Writes must succeed on this many backends, before the overall write + // operation succeeds. + uint32 write_quorum = 3; +} + // LocalBlobAccess stores all data onto disk in block sizes. A block // cannot span multiple blocks, meaning that blocks generally need to // be large in size (gigabytes). The number of blocks may be relatively