From f0b6649fef81275ada6498868c8f965050fe1e81 Mon Sep 17 00:00:00 2001 From: Tom Coldrick Date: Fri, 26 Jan 2024 11:19:47 +0000 Subject: [PATCH] AC: Treat Directories as blobs We currently do some cleverness to pop Directory objects into the OutputDirectories field of the ActionResult for semantic purposes. Unfortunately doing this requires jumping over many hurdles, partially of our own making and partially from REAPI. To detect whether the object is a Directory, we fetch it from CAS and attempt to unmarshal it into a Directory message. We then must convert it to a Tree to store in the ActionResult, which also requires the raw proto. The catch is that things that are not Directory messages may successfully unmarshal into one -- as a motivating example, BuildStream's Source proto is such a message. When this happens, we're very likely to fail the request as we attempt to use the corrupted Directory we've created. To solve this, we could pass through data on whether the asset is supposed to be a directory or not, however doing so is inelegant and has another subtle problem -- if the client decides to use PushDirectory to push something that isn't a Directory (which isn't explicitly banned by the spec), then we will error. As the Action and ActionResults we generate are intended only to be used by us, we can instead break the semantics somewhat and treat the Directory as an opaque file, as we do for Blobs. Note: this intentionally breaks the sharing of Actions should the RemoteExecutionFetcher be used. This feels more correct to me, as we were overwriting the one we actually ran anyway. Under this usage we instead get multiple Actions pointing to the same ActionResult -- one for the actually executed Action, and another for the asset reference. --- pkg/storage/BUILD.bazel | 2 - pkg/storage/action_cache_asset_store.go | 162 ++----------------- pkg/storage/action_cache_asset_store_test.go | 56 +------ 3 files changed, 21 insertions(+), 199 deletions(-) diff --git a/pkg/storage/BUILD.bazel b/pkg/storage/BUILD.bazel index 5143b21..915588d 100644 --- a/pkg/storage/BUILD.bazel +++ b/pkg/storage/BUILD.bazel @@ -22,8 +22,6 @@ go_library( "@com_github_buildbarn_bb_storage//pkg/blobstore", "@com_github_buildbarn_bb_storage//pkg/blobstore/buffer", "@com_github_buildbarn_bb_storage//pkg/digest", - "@org_golang_google_grpc//codes", - "@org_golang_google_grpc//status", "@org_golang_google_protobuf//proto", "@org_golang_google_protobuf//types/known/timestamppb", ], diff --git a/pkg/storage/action_cache_asset_store.go b/pkg/storage/action_cache_asset_store.go index 7ab6322..900c952 100644 --- a/pkg/storage/action_cache_asset_store.go +++ b/pkg/storage/action_cache_asset_store.go @@ -10,8 +10,6 @@ import ( "github.com/buildbarn/bb-storage/pkg/blobstore" "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" "github.com/buildbarn/bb-storage/pkg/digest" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -32,59 +30,13 @@ func NewActionCacheAssetStore(actionCache, contentAddressableStorage blobstore.B } } -func (rs *actionCacheAssetStore) assetToDirectory(ctx context.Context, asset *asset.Asset, instance digest.InstanceName) (*remoteexecution.Directory, error) { - digestFunction, err := instance.GetDigestFunction(remoteexecution.DigestFunction_UNKNOWN, len(asset.Digest.GetHash())) - if err != nil { - return nil, err - } - digest, err := digestFunction.NewDigestFromProto(asset.Digest) - if err != nil { - return nil, err - } - directory, err := rs.contentAddressableStorage.Get(ctx, digest).ToProto(&remoteexecution.Directory{}, rs.maximumMessageSizeBytes) - if err != nil { - return nil, err - } - return directory.(*remoteexecution.Directory), nil -} - func (rs *actionCacheAssetStore) actionResultToAsset(ctx context.Context, a *remoteexecution.ActionResult, instance digest.InstanceName) (*asset.Asset, error) { digest := &remoteexecution.Digest{} - // Check if there is an output directory in the action result - for _, dir := range a.OutputDirectories { - if dir.Path == "out" { - digest = dir.TreeDigest - } - } - // If the required output directory is present - if digest.Hash != "" { - digestFunction, err := instance.GetDigestFunction(remoteexecution.DigestFunction_UNKNOWN, len(digest.Hash)) - if err != nil { - return nil, err - } - treeDigest, err := digestFunction.NewDigestFromProto(digest) - if err != nil { - return nil, err - } - // The action result contains a tree digest, but a directory digest - // is needed, so retrieve the tree message and get the digest of the - // root - tree, err := rs.contentAddressableStorage.Get(ctx, treeDigest).ToProto(&remoteexecution.Tree{}, rs.maximumMessageSizeBytes) - if err != nil { - return nil, err - } - root := tree.(*remoteexecution.Tree).Root - digest, err = ProtoToDigest(root) - if err != nil { - return nil, err - } - } else { - // Required output directory is not present, look for required - // output file - for _, file := range a.OutputFiles { - if file.Path == "out" { - digest = file.Digest - } + // Required output directory is not present, look for required + // output file + for _, file := range a.OutputFiles { + if file.Path == "out" { + digest = file.Digest } } return &asset.Asset{ @@ -204,22 +156,10 @@ func (rs *actionCacheAssetStore) Put(ctx context.Context, ref *asset.AssetRefere } var action *remoteexecution.Action var command *remoteexecution.Command - if commandGenerator, err := qualifier.QualifiersToCommand(ref.Qualifiers); err != nil || len(ref.Uris) > 1 { - // Create the action with the qualifier directory as the input root - action, command, err = assetReferenceToAction(ref, directoryDigest) - if err != nil { - return err - } - } else { - command = commandGenerator(ref.Uris[0]) - commandDigest, err := ProtoToDigest(command) - if err != nil { - return err - } - action = &remoteexecution.Action{ - CommandDigest: commandDigest, - InputRootDigest: EmptyDigest, - } + // Create the action with the qualifier directory as the input root + action, command, err = assetReferenceToAction(ref, directoryDigest) + if err != nil { + return err } actionPb, err := proto.Marshal(action) if err != nil { @@ -256,93 +196,17 @@ func (rs *actionCacheAssetStore) Put(ctx context.Context, ref *asset.AssetRefere } actionResult := &remoteexecution.ActionResult{ + OutputFiles: []*remoteexecution.OutputFile{{ + Path: "out", + Digest: data.Digest, + }}, ExecutionMetadata: &remoteexecution.ExecutedActionMetadata{ QueuedTimestamp: data.LastUpdated, }, } - - // Check if the input asset is a directory or blob - d, err := rs.assetToDirectory(ctx, data, instance) - if err == nil { - // If it is a directory, construct a tree from it as tree digest is - // required for action result - tree, err := rs.directoryToTree(ctx, d, instance) - if err != nil { - return err - } - treePb, err := proto.Marshal(tree) - if err != nil { - return err - } - treeDigest, err := ProtoToDigest(tree) - if err != nil { - return err - } - bbTreeDigest, err := digestFunction.NewDigestFromProto(treeDigest) - if err != nil { - return err - } - err = rs.contentAddressableStorage.Put(ctx, bbTreeDigest, buffer.NewCASBufferFromByteSlice(bbTreeDigest, treePb, buffer.UserProvided)) - if err != nil { - return err - } - actionResult.OutputDirectories = []*remoteexecution.OutputDirectory{{ - Path: "out", - TreeDigest: treeDigest, - }} - } else { - // If it isn't a directory, use the digest as an output file digest - if status.Code(err) != codes.InvalidArgument { - return err - } - actionResult.OutputFiles = []*remoteexecution.OutputFile{{ - Path: "out", - Digest: data.Digest, - }} - } return rs.actionCache.Put(ctx, bbActionDigest, buffer.NewProtoBufferFromProto(actionResult, buffer.UserProvided)) } -func (rs *actionCacheAssetStore) directoryToTree(ctx context.Context, directory *remoteexecution.Directory, instance digest.InstanceName) (*remoteexecution.Tree, error) { - children := []*remoteexecution.Directory{} - for _, node := range directory.Directories { - nodeChildren, err := rs.directoryNodeToDirectories(ctx, instance, node) - if err != nil { - return nil, err - } - children = append(children, nodeChildren...) - } - - return &remoteexecution.Tree{ - Root: directory, - Children: children, - }, nil -} - -func (rs *actionCacheAssetStore) directoryNodeToDirectories(ctx context.Context, instance digest.InstanceName, node *remoteexecution.DirectoryNode) ([]*remoteexecution.Directory, error) { - digestFunction, err := instance.GetDigestFunction(remoteexecution.DigestFunction_UNKNOWN, len(node.GetDigest().GetHash())) - if err != nil { - return nil, err - } - digest, err := digestFunction.NewDigestFromProto(node.Digest) - if err != nil { - return nil, err - } - directory, err := rs.contentAddressableStorage.Get(ctx, digest).ToProto(&remoteexecution.Directory{}, rs.maximumMessageSizeBytes) - if err != nil { - return nil, err - } - directories := []*remoteexecution.Directory{directory.(*remoteexecution.Directory)} - for _, node := range directory.(*remoteexecution.Directory).Directories { - children, err := rs.directoryNodeToDirectories(ctx, instance, node) - if err != nil { - return nil, err - } - directories = append(directories, children...) - } - return directories, nil -} - func getDefaultTimestamp() *timestamppb.Timestamp { return timestamppb.New(time.Unix(0, 0)) } diff --git a/pkg/storage/action_cache_asset_store_test.go b/pkg/storage/action_cache_asset_store_test.go index 0970286..35c6993 100644 --- a/pkg/storage/action_cache_asset_store_test.go +++ b/pkg/storage/action_cache_asset_store_test.go @@ -29,12 +29,6 @@ func TestActionCacheAssetStorePutBlob(t *testing.T) { Hash: "58de0f27ce0f781e5c109f18b0ee6905bdf64f2b1009e225ac67a27f656a0643", SizeBytes: 111, } - bbBlobDigest := digest.MustNewDigest( - "", - remoteexecution.DigestFunction_SHA256, - blobDigest.Hash, - blobDigest.SizeBytes, - ) uri := "https://example.com/example.txt" assetRef := storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{{Name: "test", Value: "test"}}) @@ -70,8 +64,6 @@ func TestActionCacheAssetStorePutBlob(t *testing.T) { cas.EXPECT().Put(ctx, directoryDigest, gomock.Any()).Return(nil) cas.EXPECT().Put(ctx, actionDigest, gomock.Any()).Return(nil) cas.EXPECT().Put(ctx, commandDigest, gomock.Any()).Return(nil) - cas.EXPECT().Get(ctx, bbBlobDigest).Return( - buffer.NewValidatedBufferFromByteSlice([]byte("Hello"))) ac.EXPECT().Put(ctx, actionDigest, gomock.Any()).DoAndReturn( func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { m, err := b.ToProto(&remoteexecution.ActionResult{}, 1000) @@ -101,12 +93,6 @@ func TestActionCacheAssetStorePutDirectory(t *testing.T) { Hash: "58de0f27ce0f781e5c109f18b0ee6905bdf64f2b1009e225ac67a27f656a0643", SizeBytes: 111, } - bbRootDirectoryDigest := digest.MustNewDigest( - "", - remoteexecution.DigestFunction_SHA256, - rootDirectoryDigest.Hash, - rootDirectoryDigest.SizeBytes, - ) uri := "https://example.com/example.txt" assetRef := storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{{Name: "test", Value: "test"}}) @@ -136,16 +122,6 @@ func TestActionCacheAssetStorePutDirectory(t *testing.T) { "e6842def39984b212641b9796c162b9e3085da84257bae614418f2255b0addc5", 38, ) - bbTreeDigest := digest.MustNewDigest( - "", - remoteexecution.DigestFunction_SHA256, - "102b51b9765a56a3e899f7cf0ee38e5251f9c503b357b330a49183eb7b155604", - 2, - ) - treeDigest := &remoteexecution.Digest{ - Hash: "102b51b9765a56a3e899f7cf0ee38e5251f9c503b357b330a49183eb7b155604", - SizeBytes: 2, - } ac := mock.NewMockBlobAccess(ctrl) cas := mock.NewMockBlobAccess(ctrl) @@ -153,18 +129,14 @@ func TestActionCacheAssetStorePutDirectory(t *testing.T) { cas.EXPECT().Put(ctx, directoryDigest, gomock.Any()).Return(nil) cas.EXPECT().Put(ctx, actionDigest, gomock.Any()).Return(nil) cas.EXPECT().Put(ctx, commandDigest, gomock.Any()).Return(nil) - cas.EXPECT().Get(ctx, bbRootDirectoryDigest).Return( - buffer.NewProtoBufferFromProto(&remoteexecution.Directory{}, - buffer.UserProvided)) - cas.EXPECT().Put(ctx, bbTreeDigest, gomock.Any()).Return(nil) ac.EXPECT().Put(ctx, actionDigest, gomock.Any()).DoAndReturn( func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { m, err := b.ToProto(&remoteexecution.ActionResult{}, 1000) require.NoError(t, err) a := m.(*remoteexecution.ActionResult) - for _, d := range a.OutputDirectories { + for _, d := range a.OutputFiles { if d.Path == "out" { - require.True(t, proto.Equal(d.TreeDigest, treeDigest)) + require.True(t, proto.Equal(d.Digest, rootDirectoryDigest)) return nil } } @@ -222,16 +194,6 @@ func TestActionCacheAssetStoreGetDirectory(t *testing.T) { instanceName := digest.MustNewInstanceName("") - treeDigest := &remoteexecution.Digest{ - Hash: "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f", - SizeBytes: 222, - } - bbTreeDigest := digest.MustNewDigest( - "", - remoteexecution.DigestFunction_SHA256, - "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f", - 222, - ) uri := "https://example.com/example.txt" assetRef := storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{}) @@ -244,10 +206,13 @@ func TestActionCacheAssetStoreGetDirectory(t *testing.T) { ts := timestamppb.New(time.Unix(0, 0)) buf := buffer.NewProtoBufferFromProto(&remoteexecution.ActionResult{ - OutputDirectories: []*remoteexecution.OutputDirectory{ + OutputFiles: []*remoteexecution.OutputFile{ { - Path: "out", - TreeDigest: treeDigest, + Path: "out", + Digest: &remoteexecution.Digest{ + Hash: "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f", + SizeBytes: 222, + }, }, }, ExecutionMetadata: &remoteexecution.ExecutedActionMetadata{ @@ -255,14 +220,9 @@ func TestActionCacheAssetStoreGetDirectory(t *testing.T) { }, }, buffer.UserProvided) - treeBuf := buffer.NewProtoBufferFromProto(&remoteexecution.Tree{ - Root: &remoteexecution.Directory{}, - }, buffer.UserProvided) - ac := mock.NewMockBlobAccess(ctrl) cas := mock.NewMockBlobAccess(ctrl) ac.EXPECT().Get(ctx, actionDigest).Return(buf) - cas.EXPECT().Get(ctx, bbTreeDigest).Return(treeBuf) assetStore := storage.NewActionCacheAssetStore(ac, cas, 16*1024*1024) _, err := assetStore.Get(ctx, assetRef, instanceName)