Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add extra header to signed url #4971

Merged
merged 24 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions flyteadmin/dataproxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/base32"
"encoding/base64"
"encoding/hex"
"fmt"
"net/url"
"reflect"
Expand Down Expand Up @@ -49,11 +48,13 @@ func (s Service) CreateUploadLocation(ctx context.Context, req *service.CreateUp
// If it doesn't exist, then proceed as normal.

if len(req.Project) == 0 || len(req.Domain) == 0 {
logger.Errorf(ctx, "project and domain are required parameters")
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "project and domain are required parameters")
}

// At least one of the hash or manually given prefix must be provided.
if len(req.FilenameRoot) == 0 && len(req.ContentMd5) == 0 {
logger.Errorf(ctx, "content_md5 or filename_root is a required parameter")
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"content_md5 or filename_root is a required parameter")
}
Expand All @@ -63,10 +64,12 @@ func (s Service) CreateUploadLocation(ctx context.Context, req *service.CreateUp
knownLocation, err := createStorageLocation(ctx, s.dataStore, s.cfg.Upload,
req.Project, req.Domain, req.FilenameRoot, req.Filename)
if err != nil {
logger.Errorf(ctx, "failed to create storage location. Error %v", err)
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to create storage location, Error: %v", err)
}
metadata, err := s.dataStore.Head(ctx, knownLocation)
if err != nil {
logger.Errorf(ctx, "failed to check if file exists at location [%s], Error: %v", knownLocation.String(), err)
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to check if file exists at location [%s], Error: %v", knownLocation.String(), err)
}
if metadata.Exists() {
Expand All @@ -76,12 +79,9 @@ func (s Service) CreateUploadLocation(ctx context.Context, req *service.CreateUp
if len(req.ContentMd5) == 0 {
return nil, errors.NewFlyteAdminErrorf(codes.AlreadyExists, "file already exists at location [%v], specify a matching hash if you wish to rewrite", knownLocation)
}
// Re-encode the hash 3-ways to support matching, hex, base32 and base64
hexDigest := hex.EncodeToString(req.ContentMd5)
base32Digest := base32.StdEncoding.EncodeToString(req.ContentMd5)
base64Digest := base64.StdEncoding.EncodeToString(req.ContentMd5)
if hexDigest != metadata.Etag() && base32Digest != metadata.Etag() && base64Digest != metadata.Etag() {
logger.Debugf(ctx, "File already exists at location [%v] but hashes do not match", knownLocation)
if len(metadata.ContentMD5()) != 0 && base64Digest != metadata.ContentMD5() {
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
logger.Errorf(ctx, "File already exists at location [%v] but hashes do not match", knownLocation)
return nil, errors.NewFlyteAdminErrorf(codes.AlreadyExists, "file already exists at location [%v], specify a matching hash if you wish to rewrite", knownLocation)
}
logger.Debugf(ctx, "File already exists at location [%v] but allowing rewrite", knownLocation)
Expand Down Expand Up @@ -117,23 +117,28 @@ func (s Service) CreateUploadLocation(ctx context.Context, req *service.CreateUp
storagePath, err := createStorageLocation(ctx, s.dataStore, s.cfg.Upload,
req.Project, req.Domain, prefix, req.Filename)
if err != nil {
logger.Errorf(ctx, "failed to create shardedStorageLocation. Error %v", err)
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to create shardedStorageLocation, Error: %v", err)
}

logger.Infof(ctx, "CreateSignedURL for", storagePath)
resp, err := s.dataStore.CreateSignedURL(ctx, storagePath, storage.SignedURLProperties{
Scope: stow.ClientMethodPut,
ExpiresIn: req.ExpiresIn.AsDuration(),
ContentMD5: md5,
Scope: stow.ClientMethodPut,
ExpiresIn: req.ExpiresIn.AsDuration(),
ContentMD5: md5,
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
AddMetadata: req.AddMetadata,
})

if err != nil {
logger.Errorf(ctx, "failed to create signed url. Error:", err)
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to create a signed url. Error: %v", err)
}

return &service.CreateUploadLocationResponse{
SignedUrl: resp.URL.String(),
NativeUrl: storagePath.String(),
ExpiresAt: timestamppb.New(time.Now().Add(req.ExpiresIn.AsDuration())),
Headers: getExtraHeaders(storagePath, md5, req.AddMetadata),
}, nil
}

Expand Down Expand Up @@ -268,6 +273,21 @@ func (s Service) validateCreateDownloadLinkRequest(req *service.CreateDownloadLi
return req, nil
}

func getExtraHeaders(reference storage.DataReference, contentMd5 string, addMetadata bool) map[string]string {
headers := map[string]string{"Content-Length": strconv.Itoa(len(contentMd5)), "Content-MD5": contentMd5}
if addMetadata {
if strings.HasPrefix(reference.String(), "s3://") {
headers[fmt.Sprintf("x-amz-meta-%s", stow.FlyteContentMD5)] = contentMd5
} else if strings.HasPrefix(reference.String(), "gs://") {
headers[fmt.Sprintf("x-goog-meta-%s", stow.FlyteContentMD5)] = contentMd5
} else if strings.HasPrefix(reference.String(), "abfs://") {
headers[fmt.Sprintf("x-ms-meta-%s", stow.FlyteContentMD5)] = contentMd5
headers["x-ms-blob-type"] = "BlockBlob" // https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob?tabs=microsoft-entra-id#remarks
}
}
return headers
}

// createStorageLocation creates a location in storage destination to maximize read/write performance in most
// block stores. The final location should look something like: s3://<my bucket>/<file name>
func createStorageLocation(ctx context.Context, store *storage.DataStore,
Expand Down
14 changes: 14 additions & 0 deletions flyteadmin/dataproxy/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,17 @@ func TestService_Error(t *testing.T) {
assert.Error(t, err, "no task executions")
})
}

func TestGetExtraHeader(t *testing.T) {
reference := storage.DataReference("s3://bucket/key")
headers := getExtraHeaders(reference, "md5", true)
assert.Equal(t, map[string]string{"Content-MD5": "md5", "Content-Length": "3", "x-amz-meta-flyteContentMD5": "md5"}, headers)

reference = "gs://bucket/key"
headers = getExtraHeaders(reference, "md5", true)
assert.Equal(t, map[string]string{"Content-MD5": "md5", "Content-Length": "3", "x-goog-meta-flyteContentMD5": "md5"}, headers)

reference = "abfs://bucket/key"
headers = getExtraHeaders(reference, "md5", true)
assert.Equal(t, map[string]string{"Content-MD5": "md5", "Content-Length": "3", "x-ms-meta-flyteContentMD5": "md5", "x-ms-blob-type": "BlockBlob"}, headers)
}
2 changes: 2 additions & 0 deletions flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,5 @@ replace (
k8s.io/kube-openapi => k8s.io/kube-openapi v0.0.0-20230905202853-d090da108d2f
sigs.k8s.io/controller-runtime => sigs.k8s.io/controller-runtime v0.16.2
)

replace github.com/flyteorg/stow => github.com/pingsutw/stow v0.3.6-0.20240228091138-cafd670a5c8d
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 2 additions & 2 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,6 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk=
github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4=
github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
Expand Down Expand Up @@ -1096,6 +1094,8 @@ github.com/pierrec/lz4 v2.2.6+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi
github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI=
github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pingsutw/stow v0.3.6-0.20240228091138-cafd670a5c8d h1:hI+GRcUHmxokSEO6bya98+fEGRt0fZIFr9WFAhx0XYU=
github.com/pingsutw/stow v0.3.6-0.20240228091138-cafd670a5c8d/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
17 changes: 17 additions & 0 deletions flyteidl/gen/pb-es/flyteidl/service/dataproxy_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading