From 4926e8ed7706446dc5fc74ba040a2cc6febaa279 Mon Sep 17 00:00:00 2001 From: Tom Coldrick Date: Thu, 24 Oct 2024 11:48:32 +0100 Subject: [PATCH] Tidy up action cache asset store The Action Cache Asset Store is a bit hard to work with at the moment. Lots of logic is included in the `Get` and `Put` methods that can be extracted and shared, while also clarifying what's really going on in these methods. I've also modified some of the utility functions to be more generic which has removed a load of boilerplate. --- pkg/fetch/caching_fetcher_test.go | 8 +- pkg/fetch/remote_execution_fetcher.go | 8 +- pkg/push/push_server_test.go | 4 +- pkg/storage/action_cache_asset_store.go | 364 ++++++++++--------- pkg/storage/action_cache_asset_store_test.go | 4 +- pkg/storage/asset_reference.go | 57 --- pkg/storage/blob_access_asset_store.go | 4 +- pkg/storage/blob_access_asset_store_test.go | 4 +- pkg/storage/digest.go | 47 ++- 9 files changed, 246 insertions(+), 254 deletions(-) diff --git a/pkg/fetch/caching_fetcher_test.go b/pkg/fetch/caching_fetcher_test.go index 3323981..aca1474 100644 --- a/pkg/fetch/caching_fetcher_test.go +++ b/pkg/fetch/caching_fetcher_test.go @@ -35,7 +35,7 @@ func TestFetchBlobCaching(t *testing.T) { Uris: []string{uri}, } blobDigest := &remoteexecution.Digest{Hash: "d0d829c4c0ce64787cb1c998a9c29a109f8ed005633132fda4f29982487b04db", SizeBytes: 123} - refDigest, err := storage.AssetReferenceToDigest(storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{}), instanceName) + refDigest, err := storage.ProtoToDigest(storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{}), instanceName) require.NoError(t, err) t.Logf("Ref digest was %v", refDigest) @@ -93,7 +93,7 @@ func TestFetchDirectoryCaching(t *testing.T) { Uris: []string{uri}, } dirDigest := &remoteexecution.Digest{Hash: "d0d829c4c0ce64787cb1c998a9c29a109f8ed005633132fda4f29982487b04db", SizeBytes: 123} - refDigest, err := storage.AssetReferenceToDigest(storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{}), instanceName) + refDigest, err := storage.ProtoToDigest(storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{}), instanceName) require.NoError(t, err) backend := mock.NewMockBlobAccess(ctrl) @@ -148,7 +148,7 @@ func TestCachingFetcherExpiry(t *testing.T) { InstanceName: "foo", Uris: []string{uri}, } - refDigest, err := storage.AssetReferenceToDigest(storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{}), instanceName) + refDigest, err := storage.ProtoToDigest(storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{}), instanceName) require.NoError(t, err) backend := mock.NewMockBlobAccess(ctrl) @@ -188,7 +188,7 @@ func TestCachingFetcherOldestContentAccepted(t *testing.T) { Uris: []string{uri}, OldestContentAccepted: timestamppb.Now(), } - refDigest, err := storage.AssetReferenceToDigest(storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{}), instanceName) + refDigest, err := storage.ProtoToDigest(storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{}), instanceName) require.NoError(t, err) backend := mock.NewMockBlobAccess(ctrl) diff --git a/pkg/fetch/remote_execution_fetcher.go b/pkg/fetch/remote_execution_fetcher.go index 54074fe..1bea45c 100644 --- a/pkg/fetch/remote_execution_fetcher.go +++ b/pkg/fetch/remote_execution_fetcher.go @@ -46,7 +46,7 @@ func (rf *remoteExecutionFetcher) fetchCommon(ctx context.Context, req *remoteas } for _, uri := range req.Uris { command := commandGenerator(uri) - commandDigest, err := storage.ProtoToDigest(command) + _, commandDigest, err := storage.ProtoSerialise(command) if err != nil { return nil, "", "", err } @@ -59,7 +59,7 @@ func (rf *remoteExecutionFetcher) fetchCommon(ctx context.Context, req *remoteas CommandDigest: commandDigest, InputRootDigest: storage.EmptyDigest, } - actionDigest, err := storage.ProtoToDigest(action) + _, actionDigest, err := storage.ProtoSerialise(action) if err != nil { return nil, "", "", err } @@ -208,7 +208,7 @@ func (rf *remoteExecutionFetcher) FetchDirectory(ctx context.Context, req *remot return nil, err } root := tree.(*remoteexecution.Tree).Root - rootDigest, err := storage.ProtoToDigest(root) + _, rootDigest, err := storage.ProtoSerialise(root) if err != nil { return nil, err } @@ -221,7 +221,7 @@ func (rf *remoteExecutionFetcher) FetchDirectory(ctx context.Context, req *remot return nil, err } for _, child := range tree.(*remoteexecution.Tree).Children { - childDigest, err := storage.ProtoToDigest(child) + _, childDigest, err := storage.ProtoSerialise(child) if err != nil { return nil, err } diff --git a/pkg/push/push_server_test.go b/pkg/push/push_server_test.go index 884ba19..7c724aa 100644 --- a/pkg/push/push_server_test.go +++ b/pkg/push/push_server_test.go @@ -42,7 +42,7 @@ func TestPushServerPushBlobSuccess(t *testing.T) { BlobDigest: blobDigest, Qualifiers: qualifiers, } - refDigest, err := storage.AssetReferenceToDigest(storage.NewAssetReference([]string{uri}, qualifiers), instanceName) + refDigest, err := storage.ProtoToDigest(storage.NewAssetReference([]string{uri}, qualifiers), instanceName) require.NoError(t, err) backend := mock.NewMockBlobAccess(ctrl) @@ -82,7 +82,7 @@ func TestPushServerPushDirectorySuccess(t *testing.T) { RootDirectoryDigest: rootDirectoryDigest, Qualifiers: qualifiers, } - refDigest, err := storage.AssetReferenceToDigest(storage.NewAssetReference([]string{uri}, qualifiers), instanceName) + refDigest, err := storage.ProtoToDigest(storage.NewAssetReference([]string{uri}, qualifiers), instanceName) require.NoError(t, err) backend := mock.NewMockBlobAccess(ctrl) diff --git a/pkg/storage/action_cache_asset_store.go b/pkg/storage/action_cache_asset_store.go index f15da81..23d873a 100644 --- a/pkg/storage/action_cache_asset_store.go +++ b/pkg/storage/action_cache_asset_store.go @@ -17,6 +17,21 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) +// An actionCacheAssetStore uses an Action Cache to store the relation between +// asset references and assets. The Remote Asset API associates an identifier +// (that is, URIs and Qualifiers) with an object in the CAS. The Action Cache +// acts similarly, albeit with more metadata, associating an Action with an +// ActionResult. +// +// We can take advantage of this similarity by converting our URIs and Qualifiers +// to an Action, and our Asset to an ActionResult, and simply forwarding the +// request to an Action Cache. Under this mode of operation, bb-remote-asset acts +// as a lightweight translation between the Remote Asset API and the Action Cache. +// +// The primary reason for this is to eliminate the requirement for bb-remote-asset +// to maintain state in its deployment, allowing for simpler operation. It also +// means we can use the existing Action Cache implementation to e.g. ensure +// referential integrity between the references and the CAS. type actionCacheAssetStore struct { actionCache blobstore.BlobAccess contentAddressableStorage blobstore.BlobAccess @@ -33,22 +48,146 @@ func NewActionCacheAssetStore(actionCache, contentAddressableStorage blobstore.B } } -func (rs *actionCacheAssetStore) assetToDirectory(ctx context.Context, asset *asset.Asset, instance digest.InstanceName) (*remoteexecution.Directory, error) { - digestFunction, err := instance.GetDigestFunction(remoteexecution.DigestFunction_UNKNOWN, len(asset.Digest.GetHash())) +// Convert an AssetReference to an Action, Command and Input Root Directory +// +// This does not interact with the CAS in any way. If using this to upload +// an Action and ActionResult, then the Command and all objects returned must be +// uploaded to the CAS to ensure referential integrity. +func (rs *actionCacheAssetStore) assetReferenceToAction(ref *asset.AssetReference) (*remoteexecution.Action, []proto.Message, error) { + objects := []proto.Message{} + + // 1. Create a reference excluding the URIs. + // This is used to associate the Qualifiers with the Action, which + // must all match, but not the URIs, which can match individually. + qr := NewAssetReference(nil, ref.Qualifiers) + _, qrDigest, err := ProtoSerialise(qr) if err != nil { - return nil, err + return nil, nil, err } - digest, err := digestFunction.NewDigestFromProto(asset.Digest) - if err != nil { - return nil, err + objects = append(objects, qr) + + // 2. Construct a directory that contains the qualifiers as a file + directory := &remoteexecution.Directory{ + Files: []*remoteexecution.FileNode{{ + Name: "AssetReference", + Digest: qrDigest, + }}, } - directory, err := rs.contentAddressableStorage.Get(ctx, digest).ToProto(&remoteexecution.Directory{}, rs.maximumMessageSizeBytes) + _, directoryDigest, err := ProtoSerialise(directory) if err != nil { - return nil, err + return nil, nil, err + } + objects = append(objects, directory) + + // 3. Create a Command and Action based on the URIs and Qualifiers + var command *remoteexecution.Command + var action *remoteexecution.Action + if commandGenerator, err := qualifier.QualifiersToCommand(ref.Qualifiers); err != nil || len(ref.Uris) > 1 { + // Can't generate a Command. Use the URIs as arguments + command = &remoteexecution.Command{ + Arguments: ref.Uris, + OutputPaths: []string{"out"}, + OutputDirectoryFormat: remoteexecution.Command_TREE_AND_DIRECTORY, + } + _, commandDigest, err := ProtoSerialise(command) + if err != nil { + return nil, nil, err + } + objects = append(objects, command) + action = &remoteexecution.Action{ + CommandDigest: commandDigest, + InputRootDigest: directoryDigest, + } + } else { + // Generate a command based on the qualifiers + command := commandGenerator(ref.Uris[0]) + _, commandDigest, err := ProtoSerialise(command) + if err != nil { + return nil, nil, err + } + objects = append(objects, command) + action = &remoteexecution.Action{ + CommandDigest: commandDigest, + InputRootDigest: EmptyDigest, + } } - return directory.(*remoteexecution.Directory), nil + + return action, objects, nil } +// Convert an Asset to an ActionResult proto +// +// Any items that the ActionResult proto references are uploaded to the CAS +// as part of this method. For example, directory assets must be converted to a +// Tree proto in order to be referenced by the ActionResult. The Tree protos are +// uploaded in this method, so that the ActionResult returned has referential +// integrity with the CAS. +func (rs *actionCacheAssetStore) assetToActionResult(ctx context.Context, data *asset.Asset, instance digest.InstanceName) (*remoteexecution.ActionResult, error) { + result := &remoteexecution.ActionResult{ + ExecutionMetadata: &remoteexecution.ExecutedActionMetadata{ + QueuedTimestamp: data.LastUpdated, + }, + } + + if data.Type == asset.Asset_DIRECTORY { + // If the asset is a Directory, then we need to convert it into + // a Tree proto for the ActionResult. Alas, the only way to + // do this is to recursively follow the Directory protos from the + // CAS. + + // 1. Get the asset from the CAS and parse it as a Directory + digestFunction, err := instance.GetDigestFunction(remoteexecution.DigestFunction_UNKNOWN, len(data.Digest.GetHash())) + if err != nil { + return nil, err + } + digest, err := digestFunction.NewDigestFromProto(data.Digest) + if err != nil { + return nil, err + } + d, err := rs.contentAddressableStorage.Get(ctx, digest).ToProto(&remoteexecution.Directory{}, rs.maximumMessageSizeBytes) + if err != nil { + // Users will hit this if they upload an digest referencing an + // arbitary Blob in `PushDirectory` or a digest that does not + // reference any blob at all. + return nil, util.StatusWrapf( + err, + "digest in directory asset does not reference a Directory", + ) + } + directory := d.(*remoteexecution.Directory) + + // 2. Construct a Tree from the Directory and upload it to the CAS + tree, err := rs.directoryToTree(ctx, directory, instance) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "Failed to convert directory to tree (one of the subdirs is not in the CAS?): %v", err) + } + + treePb, treeDigest, err := ProtoSerialise(tree) + if err != nil { + return nil, err + } + rs.uploadToCAS(ctx, treePb, treeDigest, instance) + + // 3. Use the directory from the asset as the directory "out" in the ActionResult. + result.OutputDirectories = []*remoteexecution.OutputDirectory{{ + Path: "out", + RootDirectoryDigest: data.Digest, + TreeDigest: treeDigest, + }} + } else if data.Type == asset.Asset_BLOB { + // Simply use the digest as an Output File, called "out" + result.OutputFiles = []*remoteexecution.OutputFile{{ + Path: "out", + Digest: data.Digest, + }} + } else { + return nil, status.Errorf(codes.InvalidArgument, "unknown asset Type %v", data.Type) + } + + return result, nil +} + +// Convert an ActionResult proto to an Asset func (rs *actionCacheAssetStore) actionResultToAsset(a *remoteexecution.ActionResult) (*asset.Asset, error) { digest := &remoteexecution.Digest{} assetType := asset.Asset_DIRECTORY @@ -84,53 +223,11 @@ func (rs *actionCacheAssetStore) actionResultToAsset(a *remoteexecution.ActionRe } func (rs *actionCacheAssetStore) Get(ctx context.Context, ref *asset.AssetReference, instance digest.InstanceName) (*asset.Asset, error) { - // Create asset reference using only the qualifiers of the request - qualifierReference := NewAssetReference(nil, ref.Qualifiers) - refDigest, err := ProtoToDigest(qualifierReference) - if err != nil { - return nil, err - } - // Construct a directory using the reference of only qualifiers - directory := &remoteexecution.Directory{ - Files: []*remoteexecution.FileNode{{ - Name: "AssetReference", - Digest: refDigest, - }}, - } - directoryDigest, err := ProtoToDigest(directory) - if err != nil { - return nil, err - } - var action *remoteexecution.Action - if commandGenerator, err := qualifier.QualifiersToCommand(ref.Qualifiers); err != nil || len(ref.Uris) > 1 { - // Create the action with the qualifier directory as the input root - action, _, err = assetReferenceToAction(ref, directoryDigest) - if err != nil { - return nil, err - } - } else { - command := commandGenerator(ref.Uris[0]) - commandDigest, err := ProtoToDigest(command) - if err != nil { - return nil, err - } - action = &remoteexecution.Action{ - CommandDigest: commandDigest, - InputRootDigest: EmptyDigest, - } - } - actionDigest, err := ProtoToDigest(action) - if err != nil { - return nil, err - } - digestFunction, err := instance.GetDigestFunction(remoteexecution.DigestFunction_UNKNOWN, len(actionDigest.GetHash())) - if err != nil { - return nil, err - } - digest, err := digestFunction.NewDigestFromProto(actionDigest) + action, _, err := rs.assetReferenceToAction(ref) if err != nil { return nil, err } + digest, err := ProtoToDigest(action, instance) data, err := rs.actionCache.Get(ctx, digest).ToProto( &remoteexecution.ActionResult{}, @@ -142,79 +239,38 @@ func (rs *actionCacheAssetStore) Get(ctx context.Context, ref *asset.AssetRefere } func (rs *actionCacheAssetStore) Put(ctx context.Context, ref *asset.AssetReference, data *asset.Asset, instance digest.InstanceName) error { - digestFunction, err := instance.GetDigestFunction(remoteexecution.DigestFunction_UNKNOWN, len(data.GetDigest().GetHash())) - if err != nil { - return err - } - // Create asset reference using only the qualifiers of the request - qualifierReference := NewAssetReference(nil, ref.Qualifiers) - refDigest, err := ProtoToDigest(qualifierReference) - if err != nil { - return err - } - refPb, err := proto.Marshal(qualifierReference) + // Convert the AssetReference to an Action + action, extraObjs, err := rs.assetReferenceToAction(ref) if err != nil { return err } - bbRefDigest, err := AssetReferenceToDigest(qualifierReference, instance) - if err != nil { - return err - } - // Put the qualifier reference in the CAS to ensure completeness of - // the action result - err = rs.contentAddressableStorage.Put(ctx, bbRefDigest, buffer.NewCASBufferFromByteSlice(bbRefDigest, refPb, buffer.UserProvided)) - if err != nil { - return err - } - // Construct a directory using the reference of only qualifiers - // This is how qualifiers are linked to the assets when stored as - // action results - directory := &remoteexecution.Directory{ - Files: []*remoteexecution.FileNode{{ - Name: "AssetReference", - Digest: refDigest, - }}, - } - directoryPb, err := proto.Marshal(directory) - if err != nil { - return err - } - directoryDigest, err := ProtoToDigest(directory) + + // Upload the Action + actionPb, actionDigest, err := ProtoSerialise(action) if err != nil { return err } - bbDirectoryDigest, err := digestFunction.NewDigestFromProto(directoryDigest) - if err != nil { - return nil - } - err = rs.contentAddressableStorage.Put(ctx, bbDirectoryDigest, buffer.NewCASBufferFromByteSlice(bbDirectoryDigest, directoryPb, buffer.UserProvided)) + err = rs.uploadToCAS(ctx, actionPb, actionDigest, instance) if err != nil { return err } - var action *remoteexecution.Action - var command *remoteexecution.Command - if commandGenerator, err := qualifier.QualifiersToCommand(ref.Qualifiers); err != nil || len(ref.Uris) > 1 { - // Create the action with the qualifier directory as the input root - action, command, err = assetReferenceToAction(ref, directoryDigest) - if err != nil { - return err - } - } else { - command = commandGenerator(ref.Uris[0]) - commandDigest, err := ProtoToDigest(command) + + // Upload extra things required for referential integrity + for _, obj := range extraObjs { + err = rs.uploadProtoToCAS(ctx, obj, instance) if err != nil { return err } - action = &remoteexecution.Action{ - CommandDigest: commandDigest, - InputRootDigest: EmptyDigest, - } } - actionPb, err := proto.Marshal(action) + + // Convert the Asset to an ActionResult + actionResult, err := rs.assetToActionResult(ctx, data, instance) if err != nil { return err } - actionDigest, err := ProtoToDigest(action) + + // Upload to the ActionCache + digestFunction, err := instance.GetDigestFunction(remoteexecution.DigestFunction_UNKNOWN, len(data.GetDigest().GetHash())) if err != nil { return err } @@ -222,88 +278,34 @@ func (rs *actionCacheAssetStore) Put(ctx context.Context, ref *asset.AssetRefere if err != nil { return err } - err = rs.contentAddressableStorage.Put(ctx, bbActionDigest, buffer.NewCASBufferFromByteSlice(bbActionDigest, actionPb, buffer.UserProvided)) - if err != nil { - return err - } + return rs.actionCache.Put(ctx, bbActionDigest, buffer.NewProtoBufferFromProto(actionResult, buffer.UserProvided)) +} - commandPb, err := proto.Marshal(command) - if err != nil { - return err - } - commandDigest, err := ProtoToDigest(command) +func (rs *actionCacheAssetStore) uploadProtoToCAS(ctx context.Context, pb proto.Message, instance digest.InstanceName) error { + buf := buffer.NewProtoBufferFromProto(pb, buffer.UserProvided) + bbDigest, err := ProtoToDigest(pb, instance) if err != nil { return err } - bbCommandDigest, err := digestFunction.NewDigestFromProto(commandDigest) + rs.contentAddressableStorage.Put(ctx, bbDigest, buf) + return nil +} + +// Utility to push an arbitrary object and its digest into the CAS +func (rs *actionCacheAssetStore) uploadToCAS(ctx context.Context, obj []byte, digest *remoteexecution.Digest, instance digest.InstanceName) error { + digestFunction, err := instance.GetDigestFunction(remoteexecution.DigestFunction_UNKNOWN, len(digest.GetHash())) if err != nil { return err } - err = rs.contentAddressableStorage.Put(ctx, bbCommandDigest, buffer.NewCASBufferFromByteSlice(bbCommandDigest, commandPb, buffer.UserProvided)) + bbDigest, err := digestFunction.NewDigestFromProto(digest) if err != nil { return err } - - actionResult := &remoteexecution.ActionResult{ - ExecutionMetadata: &remoteexecution.ExecutedActionMetadata{ - QueuedTimestamp: data.LastUpdated, - }, - } - - if data.Type == asset.Asset_DIRECTORY { - d, err := rs.assetToDirectory(ctx, data, instance) - if err != nil { - // Users will hit this if they upload an digest referencing an - // arbitary Blob in `PushDirectory` or a digest that does not - // reference any blob at all. - return util.StatusWrapf( - err, - "digest in directory asset does not reference a Directory", - ) - } - - // If it is a directory, construct a tree from it as tree digest is - // required for action result - tree, err := rs.directoryToTree(ctx, d, instance) - if err != nil { - return status.Errorf(codes.InvalidArgument, "Failed to convert directory to tree (one of the subdirs is not in the CAS?): %v", err) - } - treePb, err := proto.Marshal(tree) - if err != nil { - return err - } - treeDigest, err := ProtoToDigest(tree) - if err != nil { - return err - } - bbTreeDigest, err := digestFunction.NewDigestFromProto(treeDigest) - if err != nil { - return err - } - err = rs.contentAddressableStorage.Put(ctx, bbTreeDigest, buffer.NewCASBufferFromByteSlice(bbTreeDigest, treePb, buffer.UserProvided)) - if err != nil { - return err - } - - // Use digest as a root directory digest - actionResult.OutputDirectories = []*remoteexecution.OutputDirectory{{ - Path: "out", - RootDirectoryDigest: data.Digest, - TreeDigest: treeDigest, - }} - } else if data.Type == asset.Asset_BLOB { - // Use the digest as an output file digest - actionResult.OutputFiles = []*remoteexecution.OutputFile{{ - Path: "out", - Digest: data.Digest, - }} - } else { - return status.Errorf(codes.InvalidArgument, "unknown asset type %v", data.Type) - } - - return rs.actionCache.Put(ctx, bbActionDigest, buffer.NewProtoBufferFromProto(actionResult, buffer.UserProvided)) + err = rs.contentAddressableStorage.Put(ctx, bbDigest, buffer.NewCASBufferFromByteSlice(bbDigest, obj, buffer.UserProvided)) + return err } +// Utility method to convert a Directory Proto to a Tree Proto func (rs *actionCacheAssetStore) directoryToTree(ctx context.Context, directory *remoteexecution.Directory, instance digest.InstanceName) (*remoteexecution.Tree, error) { children := []*remoteexecution.Directory{} for _, node := range directory.Directories { @@ -320,6 +322,8 @@ func (rs *actionCacheAssetStore) directoryToTree(ctx context.Context, directory }, nil } +// Utility method to list all descendants of a DirectoryNode, used for converting +// a Directory into a Tree func (rs *actionCacheAssetStore) directoryNodeToDirectories(ctx context.Context, instance digest.InstanceName, node *remoteexecution.DirectoryNode) ([]*remoteexecution.Directory, error) { digestFunction, err := instance.GetDigestFunction(remoteexecution.DigestFunction_UNKNOWN, len(node.GetDigest().GetHash())) if err != nil { diff --git a/pkg/storage/action_cache_asset_store_test.go b/pkg/storage/action_cache_asset_store_test.go index 11b8249..86e8afe 100644 --- a/pkg/storage/action_cache_asset_store_test.go +++ b/pkg/storage/action_cache_asset_store_test.go @@ -280,9 +280,9 @@ func TestActionCacheAssetStorePutRecursiveDirectory(t *testing.T) { }, } - rootDirectoryDigest, err := storage.ProtoToDigest(tree.Root) + _, rootDirectoryDigest, err := storage.ProtoSerialise(tree.Root) require.NoError(t, err) - treeDigest, err := storage.ProtoToDigest(tree) + _, treeDigest, err := storage.ProtoSerialise(tree) require.NoError(t, err) t.Logf("rootDirectoryDigest %v", rootDirectoryDigest) diff --git a/pkg/storage/asset_reference.go b/pkg/storage/asset_reference.go index 2d94814..cbff6a6 100644 --- a/pkg/storage/asset_reference.go +++ b/pkg/storage/asset_reference.go @@ -1,17 +1,12 @@ package storage import ( - "crypto/sha256" - "encoding/hex" "sort" remoteasset "github.com/bazelbuild/remote-apis/build/bazel/remote/asset/v1" - remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" "github.com/buildbarn/bb-remote-asset/pkg/proto/asset" "github.com/buildbarn/bb-remote-asset/pkg/qualifier" - "github.com/buildbarn/bb-storage/pkg/digest" - "google.golang.org/protobuf/proto" ) // NewAssetReference creates a new AssetReference from a URI and a list @@ -23,55 +18,3 @@ func NewAssetReference(uris []string, qualifiers []*remoteasset.Qualifier) *asse sort.Strings(uris) return &asset.AssetReference{Uris: uris, Qualifiers: sortedQualifiers.ToArray()} } - -// AssetReferenceToDigest converts an AssetReference into a bb-storage Digest of its -// wire format -func AssetReferenceToDigest(ar *asset.AssetReference, instance digest.InstanceName) (digest.Digest, error) { - wireFormat, err := proto.Marshal(ar) - if err != nil { - return digest.Digest{}, err - } - - hash := sha256.Sum256(wireFormat) - sizeBytes := int64(len(wireFormat)) - - // GetDigestFunction takes a length of a string repr of a hash, not the length - // of the byte array of the hash; multiply by 2 to convert to the former. - digestFunction, err := instance.GetDigestFunction(remoteexecution.DigestFunction_UNKNOWN, len(hash)*2) - if err != nil { - return digest.Digest{}, err - } - - return digestFunction.NewDigest(hex.EncodeToString(hash[:]), sizeBytes) -} - -func assetReferenceToAction(ar *asset.AssetReference, directoryDigest *remoteexecution.Digest) (*remoteexecution.Action, *remoteexecution.Command, error) { - command := &remoteexecution.Command{ - Arguments: ar.Uris, - OutputPaths: []string{"out"}, - OutputDirectoryFormat: remoteexecution.Command_TREE_AND_DIRECTORY, - } - commandDigest, err := ProtoToDigest(command) - if err != nil { - return nil, nil, err - } - action := &remoteexecution.Action{ - CommandDigest: commandDigest, - InputRootDigest: directoryDigest, - } - return action, command, nil -} - -// ProtoToDigest converts an arbitrary proto to a remote execution Digest -func ProtoToDigest(pb proto.Message) (*remoteexecution.Digest, error) { - wireFormat, err := proto.Marshal(pb) - if err != nil { - return nil, err - } - hash := sha256.Sum256(wireFormat) - - return &remoteexecution.Digest{ - Hash: hex.EncodeToString(hash[:]), - SizeBytes: int64(len(wireFormat)), - }, nil -} diff --git a/pkg/storage/blob_access_asset_store.go b/pkg/storage/blob_access_asset_store.go index cde04c2..c7194f5 100644 --- a/pkg/storage/blob_access_asset_store.go +++ b/pkg/storage/blob_access_asset_store.go @@ -25,7 +25,7 @@ func NewBlobAccessAssetStore(ba blobstore.BlobAccess, maximumMessageSizeBytes in // Get a digest given a reference func (rs *blobAccessAssetStore) Get(ctx context.Context, ref *asset.AssetReference, instance digest.InstanceName) (*asset.Asset, error) { - refDigest, err := AssetReferenceToDigest(ref, instance) + refDigest, err := ProtoToDigest(ref, instance) if err != nil { return nil, err } @@ -41,7 +41,7 @@ func (rs *blobAccessAssetStore) Get(ctx context.Context, ref *asset.AssetReferen // Put a digest into the store referenced by a given reference func (rs *blobAccessAssetStore) Put(ctx context.Context, ref *asset.AssetReference, data *asset.Asset, instance digest.InstanceName) error { - refDigest, err := AssetReferenceToDigest(ref, instance) + refDigest, err := ProtoToDigest(ref, instance) if err != nil { return err } diff --git a/pkg/storage/blob_access_asset_store_test.go b/pkg/storage/blob_access_asset_store_test.go index 92bb72d..971d0a4 100644 --- a/pkg/storage/blob_access_asset_store_test.go +++ b/pkg/storage/blob_access_asset_store_test.go @@ -27,7 +27,7 @@ func TestBlobAccessAssetStorePut(t *testing.T) { uri := "https://example.com/example.txt" assetRef := storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{}) assetData := storage.NewBlobAsset(blobDigest, timestamppb.Now()) - refDigest, err := storage.AssetReferenceToDigest(assetRef, instanceName) + refDigest, err := storage.ProtoToDigest(assetRef, instanceName) require.NoError(t, err) blobAccess := mock.NewMockBlobAccess(ctrl) @@ -54,7 +54,7 @@ func TestBlobAccessAssetStoreGet(t *testing.T) { blobDigest := &remoteexecution.Digest{Hash: "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f", SizeBytes: 222} uri := "https://example.com/example.txt" assetRef := storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{}) - refDigest, err := storage.AssetReferenceToDigest(assetRef, instanceName) + refDigest, err := storage.ProtoToDigest(assetRef, instanceName) require.NoError(t, err) buf := buffer.NewProtoBufferFromProto(&asset.Asset{Digest: blobDigest}, buffer.UserProvided) diff --git a/pkg/storage/digest.go b/pkg/storage/digest.go index 6b9ed98..02028f5 100644 --- a/pkg/storage/digest.go +++ b/pkg/storage/digest.go @@ -1,6 +1,15 @@ package storage -import remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" +import ( + "crypto/sha256" + "encoding/hex" + + remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" + + "google.golang.org/protobuf/proto" + + "github.com/buildbarn/bb-storage/pkg/digest" +) // EmptyDigest is a REv2 Digest representing an object of size 0 hashed // with SHA256 @@ -8,3 +17,39 @@ var EmptyDigest = &remoteexecution.Digest{ Hash: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", SizeBytes: 0, } + +// ProtoSerialise serialises an arbitrary protobuf message into its wire format and +// a Remote Execution API Digest of the format. This is very useful for interacting +// with the Remote Execution API +func ProtoSerialise(pb proto.Message) ([]byte, *remoteexecution.Digest, error) { + wireFormat, err := proto.Marshal(pb) + if err != nil { + return nil, nil, err + } + + hash := sha256.Sum256(wireFormat) + + return wireFormat, + &remoteexecution.Digest{ + Hash: hex.EncodeToString(hash[:]), + SizeBytes: int64(len(wireFormat)), + }, nil +} + +// ProtoToDigest converts an arbitrary protobuf message into a Buildbarn-internal +// Digest of its content. +func ProtoToDigest(pb proto.Message, instance digest.InstanceName) (digest.Digest, error) { + _, reapiDigest, err := ProtoSerialise(pb) + if err != nil { + return digest.Digest{}, err + } + digestFunction, err := instance.GetDigestFunction(remoteexecution.DigestFunction_UNKNOWN, len(reapiDigest.GetHash())) + if err != nil { + return digest.Digest{}, err + } + bbDigest, err := digestFunction.NewDigestFromProto(reapiDigest) + if err != nil { + return digest.Digest{}, err + } + return bbDigest, nil +}