From d3335e6ee4798bf96e86660e486ca40b86e18122 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Mon, 16 Sep 2024 13:02:39 +1000 Subject: [PATCH 1/5] Various artifact cleanups * Use context within FormUploader * Separate out ArtifactUploadAction --- api/artifacts.go | 16 ++++--- internal/artifact/batch_creator.go | 5 +- internal/artifact/form_uploader.go | 61 ++++++++++--------------- internal/artifact/form_uploader_test.go | 14 +----- 4 files changed, 37 insertions(+), 59 deletions(-) diff --git a/api/artifacts.go b/api/artifacts.go index 3554971165..a3f7d6d8de 100644 --- a/api/artifacts.go +++ b/api/artifacts.go @@ -57,13 +57,15 @@ type ArtifactBatch struct { } type ArtifactUploadInstructions struct { - Data map[string]string `json:"data"` - Action struct { - URL string `json:"url,omitempty"` - Method string `json:"method"` - Path string `json:"path"` - FileInput string `json:"file_input"` - } + Data map[string]string `json:"data"` + Action ArtifactUploadAction `json:"action"` +} + +type ArtifactUploadAction struct { + URL string `json:"url,omitempty"` + Method string `json:"method"` + Path string `json:"path"` + FileInput string `json:"file_input"` } type ArtifactBatchCreateResponse struct { diff --git a/internal/artifact/batch_creator.go b/internal/artifact/batch_creator.go index 099dc5229e..e3a3a4e150 100644 --- a/internal/artifact/batch_creator.go +++ b/internal/artifact/batch_creator.go @@ -50,10 +50,7 @@ func (a *BatchCreator) Create(ctx context.Context) ([]*api.Artifact, error) { // Split into the artifacts into chunks so we're not uploading a ton of // files at once. for i := 0; i < length; i += chunks { - j := i + chunks - if length < j { - j = length - } + j := min(i+chunks, length) // The artifacts that will be uploaded in this chunk theseArtifacts := a.conf.Artifacts[i:j] diff --git a/internal/artifact/form_uploader.go b/internal/artifact/form_uploader.go index 39a16aa1ed..a284b6a691 100644 --- a/internal/artifact/form_uploader.go +++ b/internal/artifact/form_uploader.go @@ -10,7 +10,6 @@ import ( "net/http" "net/http/httptrace" "net/http/httputil" - "regexp" "strings" // "net/http/httputil" @@ -23,10 +22,10 @@ import ( "github.com/buildkite/agent/v3/version" ) -var ArtifactPathVariableRegex = regexp.MustCompile("\\$\\{artifact\\:path\\}") +const artifactPathVariable = "${artifact:path}" // FormUploader uploads to S3 as a single signed POST, which have a hard limit of 5Gb. -var maxFormUploadedArtifactSize = int64(5368709120) +const maxFormUploadedArtifactSize = int64(5368709120) type FormUploaderConfig struct { // Whether or not HTTP calls should be debugged @@ -50,17 +49,15 @@ func NewFormUploader(l logger.Logger, c FormUploaderConfig) *FormUploader { // The FormUploader doens't specify a URL, as one is provided by Buildkite // after uploading -func (u *FormUploader) URL(artifact *api.Artifact) string { - return "" -} +func (u *FormUploader) URL(*api.Artifact) string { return "" } -func (u *FormUploader) Upload(_ context.Context, artifact *api.Artifact) error { +func (u *FormUploader) Upload(ctx context.Context, artifact *api.Artifact) error { if artifact.FileSize > maxFormUploadedArtifactSize { return errArtifactTooLarge{Size: artifact.FileSize} } // Create a HTTP request for uploading the file - request, err := createUploadRequest(u.logger, artifact) + request, err := createUploadRequest(ctx, u.logger, artifact) if err != nil { return err } @@ -100,49 +97,41 @@ func (u *FormUploader) Upload(_ context.Context, artifact *api.Artifact) error { // Perform the request u.logger.Debug("%s %s", request.Method, request.URL) response, err := client.Do(request) - - // Check for errors if err != nil { return err - } else { - // Be sure to close the response body at the end of - // this function - defer response.Body.Close() - - if u.conf.DebugHTTP { - responseDump, err := httputil.DumpResponse(response, true) - if err != nil { - u.logger.Debug("\nERR: %s\n%s", err, string(responseDump)) - } else { - u.logger.Debug("\n%s", string(responseDump)) - } - } - - if response.StatusCode/100 != 2 { - body := &bytes.Buffer{} - _, err := body.ReadFrom(response.Body) - if err != nil { - return err - } + } + defer response.Body.Close() - // Return a custom error with the response body from the page - message := fmt.Sprintf("%s (%d)", body, response.StatusCode) - return errors.New(message) + if u.conf.DebugHTTP { + responseDump, err := httputil.DumpResponse(response, true) + if err != nil { + u.logger.Debug("\nERR: %s\n%s", err, string(responseDump)) + } else { + u.logger.Debug("\n%s", string(responseDump)) } } + if response.StatusCode/100 != 2 { + body := &bytes.Buffer{} + _, err := body.ReadFrom(response.Body) + if err != nil { + return err + } + + return fmt.Errorf("%s (%d)", body, response.StatusCode) + } return nil } // Creates a new file upload http request with optional extra params -func createUploadRequest(_ logger.Logger, artifact *api.Artifact) (*http.Request, error) { +func createUploadRequest(ctx context.Context, _ logger.Logger, artifact *api.Artifact) (*http.Request, error) { streamer := newMultipartStreamer() // Set the post data for the request for key, val := range artifact.UploadInstructions.Data { // Replace the magical ${artifact:path} variable with the // artifact's path - newVal := ArtifactPathVariableRegex.ReplaceAllLiteralString(val, artifact.Path) + newVal := strings.ReplaceAll(val, artifactPathVariable, artifact.Path) // Write the new value to the form if err := streamer.WriteField(key, newVal); err != nil { @@ -173,7 +162,7 @@ func createUploadRequest(_ logger.Logger, artifact *api.Artifact) (*http.Request uri.Path = artifact.UploadInstructions.Action.Path // Create the request - req, err := http.NewRequest(artifact.UploadInstructions.Action.Method, uri.String(), streamer.Reader()) + req, err := http.NewRequestWithContext(ctx, artifact.UploadInstructions.Action.Method, uri.String(), streamer.Reader()) if err != nil { fh.Close() return nil, err diff --git a/internal/artifact/form_uploader_test.go b/internal/artifact/form_uploader_test.go index c0ff17a75f..96b73ec046 100644 --- a/internal/artifact/form_uploader_test.go +++ b/internal/artifact/form_uploader_test.go @@ -96,12 +96,7 @@ func TestFormUploading(t *testing.T) { Data: map[string]string{ "path": "${artifact:path}", }, - Action: struct { - URL string "json:\"url,omitempty\"" - Method string "json:\"method\"" - Path string "json:\"path\"" - FileInput string "json:\"file_input\"" - }{ + Action: api.ArtifactUploadAction{ URL: server.URL, Method: "POST", Path: "buildkiteartifacts.com", @@ -145,12 +140,7 @@ func TestFormUploadFileMissing(t *testing.T) { Data: map[string]string{ "path": "${artifact:path}", }, - Action: struct { - URL string "json:\"url,omitempty\"" - Method string "json:\"method\"" - Path string "json:\"path\"" - FileInput string "json:\"file_input\"" - }{ + Action: api.ArtifactUploadAction{ URL: server.URL, Method: "POST", Path: "buildkiteartifacts.com", From a68303feb11a028b1ff2b8770a259c8333b12032 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Thu, 19 Sep 2024 13:42:32 +1000 Subject: [PATCH 2/5] Rename FormUploader to BKUploader --- .../{form_uploader.go => bk_uploader.go} | 20 ++++++++++--------- ...m_uploader_test.go => bk_uploader_test.go} | 6 +++--- internal/artifact/uploader.go | 2 +- 3 files changed, 15 insertions(+), 13 deletions(-) rename internal/artifact/{form_uploader.go => bk_uploader.go} (92%) rename internal/artifact/{form_uploader_test.go => bk_uploader_test.go} (95%) diff --git a/internal/artifact/form_uploader.go b/internal/artifact/bk_uploader.go similarity index 92% rename from internal/artifact/form_uploader.go rename to internal/artifact/bk_uploader.go index a284b6a691..29f1781ada 100644 --- a/internal/artifact/form_uploader.go +++ b/internal/artifact/bk_uploader.go @@ -24,34 +24,36 @@ import ( const artifactPathVariable = "${artifact:path}" -// FormUploader uploads to S3 as a single signed POST, which have a hard limit of 5Gb. +// BKUploader uploads to S3 as a single signed POST, which have a hard limit of 5Gb. const maxFormUploadedArtifactSize = int64(5368709120) -type FormUploaderConfig struct { +type BKUploaderConfig struct { // Whether or not HTTP calls should be debugged DebugHTTP bool } -type FormUploader struct { +// BKUploader uploads artifacts to Buildkite itself. +type BKUploader struct { // The configuration - conf FormUploaderConfig + conf BKUploaderConfig // The logger instance to use logger logger.Logger } -func NewFormUploader(l logger.Logger, c FormUploaderConfig) *FormUploader { - return &FormUploader{ +// NewBKUploader creates a new Buildkite uploader. +func NewBKUploader(l logger.Logger, c BKUploaderConfig) *BKUploader { + return &BKUploader{ logger: l, conf: c, } } -// The FormUploader doens't specify a URL, as one is provided by Buildkite +// The BKUploader doens't specify a URL, as one is provided by Buildkite // after uploading -func (u *FormUploader) URL(*api.Artifact) string { return "" } +func (u *BKUploader) URL(*api.Artifact) string { return "" } -func (u *FormUploader) Upload(ctx context.Context, artifact *api.Artifact) error { +func (u *BKUploader) Upload(ctx context.Context, artifact *api.Artifact) error { if artifact.FileSize > maxFormUploadedArtifactSize { return errArtifactTooLarge{Size: artifact.FileSize} } diff --git a/internal/artifact/form_uploader_test.go b/internal/artifact/bk_uploader_test.go similarity index 95% rename from internal/artifact/form_uploader_test.go rename to internal/artifact/bk_uploader_test.go index 96b73ec046..e430f757b7 100644 --- a/internal/artifact/form_uploader_test.go +++ b/internal/artifact/bk_uploader_test.go @@ -85,7 +85,7 @@ func TestFormUploading(t *testing.T) { err = os.WriteFile(abspath, []byte("llamas"), 0700) defer os.Remove(abspath) - uploader := NewFormUploader(logger.Discard, FormUploaderConfig{}) + uploader := NewBKUploader(logger.Discard, BKUploaderConfig{}) artifact := &api.Artifact{ ID: "xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxx", Path: "llamas.txt", @@ -129,7 +129,7 @@ func TestFormUploadFileMissing(t *testing.T) { abspath := filepath.Join(temp, "llamas.txt") - uploader := NewFormUploader(logger.Discard, FormUploaderConfig{}) + uploader := NewBKUploader(logger.Discard, BKUploaderConfig{}) artifact := &api.Artifact{ ID: "xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxx", Path: "llamas.txt", @@ -155,7 +155,7 @@ func TestFormUploadFileMissing(t *testing.T) { func TestFormUploadTooBig(t *testing.T) { ctx := context.Background() - uploader := NewFormUploader(logger.Discard, FormUploaderConfig{}) + uploader := NewBKUploader(logger.Discard, BKUploaderConfig{}) const size = int64(6442450944) // 6Gb artifact := &api.Artifact{ ID: "xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxx", diff --git a/internal/artifact/uploader.go b/internal/artifact/uploader.go index c4cfc00b35..ea3d6edb4b 100644 --- a/internal/artifact/uploader.go +++ b/internal/artifact/uploader.go @@ -370,7 +370,7 @@ func (a *Uploader) createUploader() (_ uploader, err error) { switch { case a.conf.Destination == "": a.logger.Info("Uploading to default Buildkite artifact storage") - return NewFormUploader(a.logger, FormUploaderConfig{ + return NewBKUploader(a.logger, BKUploaderConfig{ DebugHTTP: a.conf.DebugHTTP, }), nil From f283cfbacb4bea41ff57b91099afba5e2a7ddac4 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Thu, 19 Sep 2024 13:42:32 +1000 Subject: [PATCH 3/5] Un-export collect method --- internal/artifact/uploader.go | 4 ++-- internal/artifact/uploader_test.go | 32 +++++++++++++++--------------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/internal/artifact/uploader.go b/internal/artifact/uploader.go index ea3d6edb4b..8098b9fa69 100644 --- a/internal/artifact/uploader.go +++ b/internal/artifact/uploader.go @@ -75,7 +75,7 @@ func NewUploader(l logger.Logger, ac APIClient, c UploaderConfig) *Uploader { func (a *Uploader) Upload(ctx context.Context) error { // Create artifact structs for all the files we need to upload - artifacts, err := a.Collect(ctx) + artifacts, err := a.collect(ctx) if err != nil { return fmt.Errorf("collecting artifacts: %w", err) } @@ -109,7 +109,7 @@ func isDir(path string) bool { return fi.IsDir() } -func (a *Uploader) Collect(ctx context.Context) ([]*api.Artifact, error) { +func (a *Uploader) collect(ctx context.Context) ([]*api.Artifact, error) { wd, err := os.Getwd() if err != nil { return nil, fmt.Errorf("getting working directory: %w", err) diff --git a/internal/artifact/uploader_test.go b/internal/artifact/uploader_test.go index c1b32b9fc1..a7a50ba4dd 100644 --- a/internal/artifact/uploader_test.go +++ b/internal/artifact/uploader_test.go @@ -97,13 +97,13 @@ func TestCollect(t *testing.T) { ctxExpEnabled, _ := experiments.Enable(ctx, experiments.NormalisedUploadPaths) ctxExpDisabled := experiments.Disable(ctx, experiments.NormalisedUploadPaths) - artifactsWithoutExperimentEnabled, err := uploader.Collect(ctxExpDisabled) + artifactsWithoutExperimentEnabled, err := uploader.collect(ctxExpDisabled) if err != nil { t.Fatalf("[normalised-upload-paths disabled] uploader.Collect() error = %v", err) } assert.Equal(t, 5, len(artifactsWithoutExperimentEnabled)) - artifactsWithExperimentEnabled, err := uploader.Collect(ctxExpEnabled) + artifactsWithExperimentEnabled, err := uploader.collect(ctxExpEnabled) if err != nil { t.Fatalf("[normalised-upload-paths enabled] uploader.Collect() error = %v", err) } @@ -165,7 +165,7 @@ func TestCollectThatDoesntMatchAnyFiles(t *testing.T) { }, ";"), }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -185,7 +185,7 @@ func TestCollectWithSomeGlobsThatDontMatchAnything(t *testing.T) { }, ";"), }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -209,7 +209,7 @@ func TestCollectWithSomeGlobsThatDontMatchAnythingFollowingSymlinks(t *testing.T GlobResolveFollowSymlinks: true, }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -230,7 +230,7 @@ func TestCollectWithDuplicateMatches(t *testing.T) { }, ";"), }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -263,7 +263,7 @@ func TestCollectWithDuplicateMatchesFollowingSymlinks(t *testing.T) { GlobResolveFollowSymlinks: true, }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -296,7 +296,7 @@ func TestCollectMatchesUploadSymlinks(t *testing.T) { UploadSkipSymlinks: true, }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -389,13 +389,13 @@ func TestCollect_WithZZGlob(t *testing.T) { ctxExpEnabled, _ := experiments.Enable(ctx, experiments.NormalisedUploadPaths) ctxExpDisabled := experiments.Disable(ctx, experiments.NormalisedUploadPaths) - artifactsWithoutExperimentEnabled, err := uploader.Collect(ctxExpDisabled) + artifactsWithoutExperimentEnabled, err := uploader.collect(ctxExpDisabled) if err != nil { t.Fatalf("[normalised-upload-paths disabled] uploader.Collect() error = %v", err) } assert.Equal(t, 5, len(artifactsWithoutExperimentEnabled)) - artifactsWithExperimentEnabled, err := uploader.Collect(ctxExpEnabled) + artifactsWithExperimentEnabled, err := uploader.collect(ctxExpEnabled) if err != nil { t.Fatalf("[normalised-upload-paths enabled] uploader.Collect() error = %v", err) } @@ -457,7 +457,7 @@ func TestCollectThatDoesntMatchAnyFiles_WithZZGlob(t *testing.T) { }, ";"), }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -477,7 +477,7 @@ func TestCollectWithSomeGlobsThatDontMatchAnything_WithZZGlob(t *testing.T) { }, ";"), }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -501,7 +501,7 @@ func TestCollectWithSomeGlobsThatDontMatchAnythingFollowingSymlinks_WithZZGlob(t GlobResolveFollowSymlinks: true, }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -522,7 +522,7 @@ func TestCollectWithDuplicateMatches_WithZZGlob(t *testing.T) { }, ";"), }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -555,7 +555,7 @@ func TestCollectWithDuplicateMatchesFollowingSymlinks_WithZZGlob(t *testing.T) { GlobResolveFollowSymlinks: true, }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -588,7 +588,7 @@ func TestCollectMatchesUploadSymlinks_WithZZGlob(t *testing.T) { UploadSkipSymlinks: true, }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } From 6f5f9fc747143771fff78e8c38ce37e8ed5250d4 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Thu, 19 Sep 2024 13:44:25 +1000 Subject: [PATCH 4/5] Refactor uploader to use workers and channels --- internal/artifact/artifactory_uploader.go | 42 ++- internal/artifact/azure_blob_uploader.go | 32 +- internal/artifact/batch_creator.go | 16 +- internal/artifact/bk_uploader.go | 35 +- internal/artifact/bk_uploader_test.go | 80 ++-- internal/artifact/gs_uploader.go | 36 +- internal/artifact/s3_uploader.go | 31 +- internal/artifact/uploader.go | 435 ++++++++++++++-------- 8 files changed, 478 insertions(+), 229 deletions(-) diff --git a/internal/artifact/artifactory_uploader.go b/internal/artifact/artifactory_uploader.go index d99ea29d73..e2564b6367 100644 --- a/internal/artifact/artifactory_uploader.go +++ b/internal/artifact/artifactory_uploader.go @@ -99,36 +99,54 @@ func (u *ArtifactoryUploader) URL(artifact *api.Artifact) string { return url.String() } -func (u *ArtifactoryUploader) Upload(_ context.Context, artifact *api.Artifact) error { +func (u *ArtifactoryUploader) CreateWork(artifact *api.Artifact) ([]workUnit, error) { + return []workUnit{&artifactoryUploaderWork{ + ArtifactoryUploader: u, + artifact: artifact, + }}, nil +} + +type artifactoryUploaderWork struct { + *ArtifactoryUploader + artifact *api.Artifact +} + +func (u *artifactoryUploaderWork) Artifact() *api.Artifact { return u.artifact } + +func (u *artifactoryUploaderWork) Description() string { + return singleUnitDescription(u.artifact) +} + +func (u *artifactoryUploaderWork) DoWork(context.Context) error { // Open file from filesystem - u.logger.Debug("Reading file \"%s\"", artifact.AbsolutePath) - f, err := os.Open(artifact.AbsolutePath) + u.logger.Debug("Reading file %q", u.artifact.AbsolutePath) + f, err := os.Open(u.artifact.AbsolutePath) if err != nil { - return fmt.Errorf("failed to open file %q (%w)", artifact.AbsolutePath, err) + return fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err) } // Upload the file to Artifactory. - u.logger.Debug("Uploading \"%s\" to `%s`", artifact.Path, u.URL(artifact)) + u.logger.Debug("Uploading %q to %q", u.artifact.Path, u.URL(u.artifact)) - req, err := http.NewRequest("PUT", u.URL(artifact), f) + req, err := http.NewRequest("PUT", u.URL(u.artifact), f) req.SetBasicAuth(u.user, u.password) if err != nil { return err } - md5Checksum, err := checksumFile(md5.New(), artifact.AbsolutePath) + md5Checksum, err := checksumFile(md5.New(), u.artifact.AbsolutePath) if err != nil { return err } req.Header.Add("X-Checksum-MD5", md5Checksum) - sha1Checksum, err := checksumFile(sha1.New(), artifact.AbsolutePath) + sha1Checksum, err := checksumFile(sha1.New(), u.artifact.AbsolutePath) if err != nil { return err } req.Header.Add("X-Checksum-SHA1", sha1Checksum) - sha256Checksum, err := checksumFile(sha256.New(), artifact.AbsolutePath) + sha256Checksum, err := checksumFile(sha256.New(), u.artifact.AbsolutePath) if err != nil { return err } @@ -138,11 +156,7 @@ func (u *ArtifactoryUploader) Upload(_ context.Context, artifact *api.Artifact) if err != nil { return err } - if err := checkResponse(res); err != nil { - return err - } - - return nil + return checkResponse(res) } func checksumFile(hasher hash.Hash, path string) (string, error) { diff --git a/internal/artifact/azure_blob_uploader.go b/internal/artifact/azure_blob_uploader.go index 6be24dab1f..d2d66d4fba 100644 --- a/internal/artifact/azure_blob_uploader.go +++ b/internal/artifact/azure_blob_uploader.go @@ -88,18 +88,36 @@ func (u *AzureBlobUploader) URL(artifact *api.Artifact) string { return sasURL } -// Upload uploads an artifact file. -func (u *AzureBlobUploader) Upload(ctx context.Context, artifact *api.Artifact) error { - u.logger.Debug("Reading file %q", artifact.AbsolutePath) - f, err := os.Open(artifact.AbsolutePath) +func (u *AzureBlobUploader) CreateWork(artifact *api.Artifact) ([]workUnit, error) { + return []workUnit{&azureBlobUploaderWork{ + AzureBlobUploader: u, + artifact: artifact, + }}, nil +} + +type azureBlobUploaderWork struct { + *AzureBlobUploader + artifact *api.Artifact +} + +func (u *azureBlobUploaderWork) Artifact() *api.Artifact { return u.artifact } + +func (u *azureBlobUploaderWork) Description() string { + return singleUnitDescription(u.artifact) +} + +// DoWork uploads an artifact file. +func (u *azureBlobUploaderWork) DoWork(ctx context.Context) error { + u.logger.Debug("Reading file %q", u.artifact.AbsolutePath) + f, err := os.Open(u.artifact.AbsolutePath) if err != nil { - return fmt.Errorf("failed to open file %q (%w)", artifact.AbsolutePath, err) + return fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err) } defer f.Close() - blobName := path.Join(u.loc.BlobPath, artifact.Path) + blobName := path.Join(u.loc.BlobPath, u.artifact.Path) - u.logger.Debug("Uploading %s to %s", artifact.Path, u.loc.URL(blobName)) + u.logger.Debug("Uploading %s to %s", u.artifact.Path, u.loc.URL(blobName)) bbc := u.client.NewContainerClient(u.loc.ContainerName).NewBlockBlobClient(blobName) _, err = bbc.UploadFile(ctx, f, nil) diff --git a/internal/artifact/batch_creator.go b/internal/artifact/batch_creator.go index e3a3a4e150..34665df6a0 100644 --- a/internal/artifact/batch_creator.go +++ b/internal/artifact/batch_creator.go @@ -60,9 +60,10 @@ func (a *BatchCreator) Create(ctx context.Context) ([]*api.Artifact, error) { // twice, it'll just return the previous data and skip the // upload) batch := &api.ArtifactBatch{ - ID: api.NewUUID(), - Artifacts: theseArtifacts, - UploadDestination: a.conf.UploadDestination, + ID: api.NewUUID(), + Artifacts: theseArtifacts, + UploadDestination: a.conf.UploadDestination, + MultipartSupported: true, } a.logger.Info("Creating (%d-%d)/%d artifacts", i, j, length) @@ -111,11 +112,12 @@ func (a *BatchCreator) Create(ctx context.Context) ([]*api.Artifact, error) { } // Save the id and instructions to each artifact - index := 0 - for _, id := range creation.ArtifactIDs { + for index, id := range creation.ArtifactIDs { theseArtifacts[index].ID = id - theseArtifacts[index].UploadInstructions = creation.UploadInstructions - index += 1 + theseArtifacts[index].UploadInstructions = creation.InstructionsTemplate + if specific := creation.PerArtifactInstructions[id]; specific != nil { + theseArtifacts[index].UploadInstructions = specific + } } } diff --git a/internal/artifact/bk_uploader.go b/internal/artifact/bk_uploader.go index 29f1781ada..8aa9e9ab1e 100644 --- a/internal/artifact/bk_uploader.go +++ b/internal/artifact/bk_uploader.go @@ -49,17 +49,36 @@ func NewBKUploader(l logger.Logger, c BKUploaderConfig) *BKUploader { } } -// The BKUploader doens't specify a URL, as one is provided by Buildkite -// after uploading +// URL returns the empty string. BKUploader doesn't know the URL in advance, +// it is provided by Buildkite after uploading. func (u *BKUploader) URL(*api.Artifact) string { return "" } -func (u *BKUploader) Upload(ctx context.Context, artifact *api.Artifact) error { +// CreateWork checks the artifact size, then creates one worker. +func (u *BKUploader) CreateWork(artifact *api.Artifact) ([]workUnit, error) { if artifact.FileSize > maxFormUploadedArtifactSize { - return errArtifactTooLarge{Size: artifact.FileSize} + return nil, errArtifactTooLarge{Size: artifact.FileSize} } + // TODO: create multiple workers for multipart uploads + return []workUnit{&bkUploaderWork{ + BKUploader: u, + artifact: artifact, + }}, nil +} + +type bkUploaderWork struct { + *BKUploader + artifact *api.Artifact +} + +func (u *bkUploaderWork) Artifact() *api.Artifact { return u.artifact } + +func (u *bkUploaderWork) Description() string { + return singleUnitDescription(u.artifact) +} - // Create a HTTP request for uploading the file - request, err := createUploadRequest(ctx, u.logger, artifact) +// DoWork tries the upload. +func (u *bkUploaderWork) DoWork(ctx context.Context) error { + request, err := createUploadRequest(ctx, u.logger, u.artifact) if err != nil { return err } @@ -87,13 +106,15 @@ func (u *BKUploader) Upload(ctx context.Context, artifact *api.Artifact) error { // know which one. trace := &httptrace.ClientTrace{ GotConn: func(connInfo httptrace.GotConnInfo) { - u.logger.Debug("artifact %s uploading to: %s", artifact.ID, connInfo.Conn.RemoteAddr()) + u.logger.Debug("artifact %s uploading to: %s", u.artifact.ID, connInfo.Conn.RemoteAddr()) }, } request = request.WithContext(httptrace.WithClientTrace(request.Context(), trace)) } // Create the client + // TODO: this uses the default transport, potentially ignoring many agent + // config options client := &http.Client{} // Perform the request diff --git a/internal/artifact/bk_uploader_test.go b/internal/artifact/bk_uploader_test.go index e430f757b7..7a888446e3 100644 --- a/internal/artifact/bk_uploader_test.go +++ b/internal/artifact/bk_uploader_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "io/fs" "net/http" "net/http/httptest" "os" @@ -80,37 +81,43 @@ func TestFormUploading(t *testing.T) { t.Fatalf("os.Getwd() error = %v", err) } - runtest := func(wd string) { - abspath := filepath.Join(wd, "llamas.txt") - err = os.WriteFile(abspath, []byte("llamas"), 0700) - defer os.Remove(abspath) - - uploader := NewBKUploader(logger.Discard, BKUploaderConfig{}) - artifact := &api.Artifact{ - ID: "xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxx", - Path: "llamas.txt", - AbsolutePath: abspath, - GlobPath: "llamas.txt", - ContentType: "text/plain", - UploadInstructions: &api.ArtifactUploadInstructions{ - Data: map[string]string{ - "path": "${artifact:path}", + for _, wd := range []string{temp, cwd} { + t.Run(wd, func(t *testing.T) { + abspath := filepath.Join(wd, "llamas.txt") + err = os.WriteFile(abspath, []byte("llamas"), 0700) + defer os.Remove(abspath) + + uploader := NewBKUploader(logger.Discard, BKUploaderConfig{}) + artifact := &api.Artifact{ + ID: "xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxx", + Path: "llamas.txt", + AbsolutePath: abspath, + GlobPath: "llamas.txt", + ContentType: "text/plain", + UploadInstructions: &api.ArtifactUploadInstructions{ + Data: map[string]string{ + "path": "${artifact:path}", + }, + Action: api.ArtifactUploadAction{ + URL: server.URL, + Method: "POST", + Path: "buildkiteartifacts.com", + FileInput: "file", + }, }, - Action: api.ArtifactUploadAction{ - URL: server.URL, - Method: "POST", - Path: "buildkiteartifacts.com", - FileInput: "file", - }}, - } + } - if err := uploader.Upload(ctx, artifact); err != nil { - t.Errorf("uploader.Upload(artifact) = %v", err) - } - } + work, err := uploader.CreateWork(artifact) + if err != nil { + t.Fatalf("uploader.CreateWork(artifact) error = %v", err) + } - for _, wd := range []string{temp, cwd} { - runtest(wd) + for _, wu := range work { + if err := wu.DoWork(ctx); err != nil { + t.Errorf("bkUploaderWork.DoWork(artifact) = %v", err) + } + } + }) } } @@ -148,13 +155,19 @@ func TestFormUploadFileMissing(t *testing.T) { }}, } - if err := uploader.Upload(ctx, artifact); !os.IsNotExist(err) { - t.Errorf("uploader.Upload(artifact) = %v, want os.ErrNotExist", err) + work, err := uploader.CreateWork(artifact) + if err != nil { + t.Fatalf("uploader.CreateWork(artifact) error = %v", err) + } + + for _, wu := range work { + if err := wu.DoWork(ctx); !errors.Is(err, fs.ErrNotExist) { + t.Errorf("bkUploaderWork.DoWork(artifact) = %v, want %v", err, fs.ErrNotExist) + } } } func TestFormUploadTooBig(t *testing.T) { - ctx := context.Background() uploader := NewBKUploader(logger.Discard, BKUploaderConfig{}) const size = int64(6442450944) // 6Gb artifact := &api.Artifact{ @@ -167,7 +180,8 @@ func TestFormUploadTooBig(t *testing.T) { UploadInstructions: &api.ArtifactUploadInstructions{}, } - if err := uploader.Upload(ctx, artifact); !errors.Is(err, errArtifactTooLarge{Size: size}) { - t.Errorf("uploader.Upload(artifact) = %v, want errArtifactTooLarge", err) + wantErr := errArtifactTooLarge{Size: size} + if _, err := uploader.CreateWork(artifact); !errors.Is(err, wantErr) { + t.Fatalf("uploader.CreateWork(artifact) error = %v, want %v", err, wantErr) } } diff --git a/internal/artifact/gs_uploader.go b/internal/artifact/gs_uploader.go index 3938f19494..06445cc2e5 100644 --- a/internal/artifact/gs_uploader.go +++ b/internal/artifact/gs_uploader.go @@ -119,7 +119,25 @@ func (u *GSUploader) URL(artifact *api.Artifact) string { return artifactURL.String() } -func (u *GSUploader) Upload(_ context.Context, artifact *api.Artifact) error { +func (u *GSUploader) CreateWork(artifact *api.Artifact) ([]workUnit, error) { + return []workUnit{&gsUploaderWork{ + GSUploader: u, + artifact: artifact, + }}, nil +} + +type gsUploaderWork struct { + *GSUploader + artifact *api.Artifact +} + +func (u *gsUploaderWork) Artifact() *api.Artifact { return u.artifact } + +func (u *gsUploaderWork) Description() string { + return singleUnitDescription(u.artifact) +} + +func (u *gsUploaderWork) DoWork(_ context.Context) error { permission := os.Getenv("BUILDKITE_GS_ACL") // The dirtiest validation method ever... @@ -134,19 +152,19 @@ func (u *GSUploader) Upload(_ context.Context, artifact *api.Artifact) error { if permission == "" { u.logger.Debug("Uploading \"%s\" to bucket \"%s\" with default permission", - u.artifactPath(artifact), u.BucketName) + u.artifactPath(u.artifact), u.BucketName) } else { u.logger.Debug("Uploading \"%s\" to bucket \"%s\" with permission \"%s\"", - u.artifactPath(artifact), u.BucketName, permission) + u.artifactPath(u.artifact), u.BucketName, permission) } object := &storage.Object{ - Name: u.artifactPath(artifact), - ContentType: artifact.ContentType, - ContentDisposition: u.contentDisposition(artifact), + Name: u.artifactPath(u.artifact), + ContentType: u.artifact.ContentType, + ContentDisposition: u.contentDisposition(u.artifact), } - file, err := os.Open(artifact.AbsolutePath) + file, err := os.Open(u.artifact.AbsolutePath) if err != nil { - return errors.New(fmt.Sprintf("Failed to open file \"%q\" (%v)", artifact.AbsolutePath, err)) + return errors.New(fmt.Sprintf("Failed to open file %q (%v)", u.artifact.AbsolutePath, err)) } call := u.service.Objects.Insert(u.BucketName, object) if permission != "" { @@ -155,7 +173,7 @@ func (u *GSUploader) Upload(_ context.Context, artifact *api.Artifact) error { if res, err := call.Media(file, googleapi.ContentType("")).Do(); err == nil { u.logger.Debug("Created object %v at location %v\n\n", res.Name, res.SelfLink) } else { - return errors.New(fmt.Sprintf("Failed to PUT file \"%s\" (%v)", u.artifactPath(artifact), err)) + return errors.New(fmt.Sprintf("Failed to PUT file %q (%v)", u.artifactPath(u.artifact), err)) } return nil diff --git a/internal/artifact/s3_uploader.go b/internal/artifact/s3_uploader.go index 3b9836bf2c..7cb41d3735 100644 --- a/internal/artifact/s3_uploader.go +++ b/internal/artifact/s3_uploader.go @@ -81,8 +81,25 @@ func (u *S3Uploader) URL(artifact *api.Artifact) string { return url.String() } -func (u *S3Uploader) Upload(_ context.Context, artifact *api.Artifact) error { +func (u *S3Uploader) CreateWork(artifact *api.Artifact) ([]workUnit, error) { + return []workUnit{&s3UploaderWork{ + S3Uploader: u, + artifact: artifact, + }}, nil +} + +type s3UploaderWork struct { + *S3Uploader + artifact *api.Artifact +} + +func (u *s3UploaderWork) Artifact() *api.Artifact { return u.artifact } + +func (u *s3UploaderWork) Description() string { + return singleUnitDescription(u.artifact) +} +func (u *s3UploaderWork) DoWork(context.Context) error { permission, err := u.resolvePermission() if err != nil { return err @@ -92,19 +109,19 @@ func (u *S3Uploader) Upload(_ context.Context, artifact *api.Artifact) error { uploader := s3manager.NewUploaderWithClient(u.client) // Open file from filesystem - u.logger.Debug("Reading file \"%s\"", artifact.AbsolutePath) - f, err := os.Open(artifact.AbsolutePath) + u.logger.Debug("Reading file %q", u.artifact.AbsolutePath) + f, err := os.Open(u.artifact.AbsolutePath) if err != nil { - return fmt.Errorf("failed to open file %q (%w)", artifact.AbsolutePath, err) + return fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err) } // Upload the file to S3. - u.logger.Debug("Uploading \"%s\" to bucket with permission `%s`", u.artifactPath(artifact), permission) + u.logger.Debug("Uploading %q to bucket with permission %q", u.artifactPath(u.artifact), permission) params := &s3manager.UploadInput{ Bucket: aws.String(u.BucketName), - Key: aws.String(u.artifactPath(artifact)), - ContentType: aws.String(artifact.ContentType), + Key: aws.String(u.artifactPath(u.artifact)), + ContentType: aws.String(u.artifact.ContentType), ACL: aws.String(permission), Body: f, } diff --git a/internal/artifact/uploader.go b/internal/artifact/uploader.go index 8098b9fa69..d0c2062e88 100644 --- a/internal/artifact/uploader.go +++ b/internal/artifact/uploader.go @@ -20,7 +20,6 @@ import ( "github.com/buildkite/agent/v3/internal/experiments" "github.com/buildkite/agent/v3/internal/mime" "github.com/buildkite/agent/v3/logger" - "github.com/buildkite/agent/v3/pool" "github.com/buildkite/roko" "github.com/dustin/go-humanize" "github.com/mattn/go-zglob" @@ -86,7 +85,31 @@ func (a *Uploader) Upload(ctx context.Context) error { } a.logger.Info("Found %d files that match %q", len(artifacts), a.conf.Paths) - if err := a.upload(ctx, artifacts); err != nil { + + // Determine what uploader to use + uploader, err := a.createUploader() + if err != nil { + return fmt.Errorf("creating uploader: %w", err) + } + + // Set the URLs of the artifacts based on the uploader + for _, artifact := range artifacts { + artifact.URL = uploader.URL(artifact) + } + + // Batch-create the artifact records on Buildkite + batchCreator := NewArtifactBatchCreator(a.logger, a.apiClient, BatchCreatorConfig{ + JobID: a.conf.JobID, + Artifacts: artifacts, + UploadDestination: a.conf.Destination, + CreateArtifactsTimeout: 10 * time.Second, + }) + artifacts, err = batchCreator.Create(ctx) + if err != nil { + return err + } + + if err := a.upload(ctx, artifacts, uploader); err != nil { return fmt.Errorf("uploading artifacts: %w", err) } @@ -358,7 +381,7 @@ func (a *Uploader) build(path string, absolutePath string) (*api.Artifact, error // createUploader applies some heuristics to the destination to infer which // uploader to use. -func (a *Uploader) createUploader() (_ uploader, err error) { +func (a *Uploader) createUploader() (_ workCreator, err error) { var dest string defer func() { if err != nil || dest == "" { @@ -406,138 +429,165 @@ func (a *Uploader) createUploader() (_ uploader, err error) { } } -type uploader interface { +// workCreator implementations convert artifacts into units of work for uploading. +type workCreator interface { // The Artifact.URL property is populated with what ever is returned // from this method prior to uploading. URL(*api.Artifact) string - // The actual uploading of the file - Upload(context.Context, *api.Artifact) error + // CreateWork provide units of work for uploading an artifact. + CreateWork(*api.Artifact) ([]workUnit, error) } -func (a *Uploader) upload(ctx context.Context, artifacts []*api.Artifact) error { - // Determine what uploader to use - uploader, err := a.createUploader() - if err != nil { - return fmt.Errorf("creating uploader: %w", err) - } +// workUnit implementations upload a whole artifact, or a part of an artifact, +// or could one day do some other work related to an artifact. +type workUnit interface { + // Artifact returns the artifact being worked on. + Artifact() *api.Artifact - // Set the URLs of the artifacts based on the uploader + // Description describes the unit of work. + Description() string + + // DoWork does the work. + DoWork(context.Context) error +} + +const ( + artifactStateFinished = "finished" + artifactStateError = "error" +) + +// Messages passed on channels between goroutines. + +// workUnitError is just a tuple (workUnit, error). +type workUnitError struct { + workUnit workUnit + err error +} + +// artifactWorkUnits is just a tuple (artifact, int). +type artifactWorkUnits struct { + artifact *api.Artifact + workUnits int +} + +// cancelCtx stores a context cancelled when part of an artifact upload has +// failed and needs to fail the whole artifact. +// Go readability notes: "Storing" a context? +// - In a long-lived struct: yeah nah 🙅. Read pkg.go.dev/context +// - Within a single operation ("upload", in this case): nah yeah 👍 +type cancelCtx struct { + ctx context.Context + cancel context.CancelCauseFunc +} + +func (a *Uploader) upload(ctx context.Context, artifacts []*api.Artifact, uploader workCreator) error { + perArtifactCtx := make(map[*api.Artifact]cancelCtx) for _, artifact := range artifacts { - artifact.URL = uploader.URL(artifact) + actx, cancel := context.WithCancelCause(ctx) + perArtifactCtx[artifact] = cancelCtx{actx, cancel} } - // Create the artifacts on Buildkite - batchCreator := NewArtifactBatchCreator(a.logger, a.apiClient, BatchCreatorConfig{ - JobID: a.conf.JobID, - Artifacts: artifacts, - UploadDestination: a.conf.Destination, - CreateArtifactsTimeout: 10 * time.Second, - }) + // workUnitStateCh: multiple worker goroutines --(work unit state)--> state updater + workUnitStateCh := make(chan workUnitError) + // artifactWorkUnitsCh: work unit creation --(# work units for artifact)--> state updater + artifactWorkUnitsCh := make(chan artifactWorkUnits) + // workUnitsCh: work unit creation --(work unit to be run)--> multiple worker goroutines + workUnitsCh := make(chan workUnit) + // stateUpdatesDoneCh: closed when all state updates are complete + stateUpdatesDoneCh := make(chan struct{}) + + // The status updater goroutine: updates batches of artifact states on + // Buildkite every few seconds. + var errs []error + go func() { + errs = a.statusUpdater(ctx, workUnitStateCh, artifactWorkUnitsCh, stateUpdatesDoneCh) + }() - artifacts, err = batchCreator.Create(ctx) - if err != nil { - return err + // Worker goroutines that work on work units. + var workerWG sync.WaitGroup + for range runtime.GOMAXPROCS(0) { + workerWG.Add(1) + go func() { + defer workerWG.Done() + a.uploadWorker(ctx, perArtifactCtx, workUnitsCh, workUnitStateCh) + }() } - // Prepare a concurrency pool to upload the artifacts - p := pool.New(pool.MaxConcurrencyLimit) - errs := []error{} - var errorsMutex sync.Mutex - - // Create a wait group so we can make sure the uploader waits for all - // the artifact states to upload before finishing - var stateUploaderWaitGroup sync.WaitGroup - stateUploaderWaitGroup.Add(1) + // Work creation: creates the work units for each artifact. + // This must happen after creating goroutines listening on the channels. + for _, artifact := range artifacts { + workUnits, err := uploader.CreateWork(artifact) + if err != nil { + a.logger.Error("Couldn't create upload workers for artifact %q: %v", artifact.Path, err) + return err + } - // A map to keep track of artifact states and how many we've uploaded - artifactStates := make(map[string]string) - artifactStatesUploaded := 0 - var artifactStatesMutex sync.Mutex + // Send the number of work units for this artifact to the state uploader. + select { + case <-ctx.Done(): + return ctx.Err() + case artifactWorkUnitsCh <- artifactWorkUnits{artifact: artifact, workUnits: len(workUnits)}: + } - // Spin up a gourtine that'll uploading artifact statuses every few - // seconds in batches - go func() { - for artifactStatesUploaded < len(artifacts) { - statesToUpload := make(map[string]string) - - // Grab all the states we need to upload, and remove - // them from the tracking map - // - // Since we mutate the artifactStates variable in - // multiple routines, we need to lock it to make sure - // nothing else is changing it at the same time. - artifactStatesMutex.Lock() - for id, state := range artifactStates { - statesToUpload[id] = state - delete(artifactStates, id) + // Send the work units themselves to the workers. + for _, workUnit := range workUnits { + select { + case <-ctx.Done(): + return ctx.Err() + case workUnitsCh <- workUnit: } - artifactStatesMutex.Unlock() + } + } - if len(statesToUpload) > 0 { - artifactStatesUploaded += len(statesToUpload) - for id, state := range statesToUpload { - a.logger.Debug("Artifact `%s` has state `%s`", id, state) - } + // All work units have been sent to workers, and all counts of pending work + // units have been sent to the state updater. + close(workUnitsCh) + close(artifactWorkUnitsCh) - timeout := 5 * time.Second - - // Update the states of the artifacts in bulk. - err := roko.NewRetrier( - roko.WithMaxAttempts(10), - roko.WithStrategy(roko.ExponentialSubsecond(500*time.Millisecond)), - ).DoWithContext(ctx, func(r *roko.Retrier) error { - - ctxTimeout := ctx - if timeout != 0 { - var cancel func() - ctxTimeout, cancel = context.WithTimeout(ctx, timeout) - defer cancel() - } - - _, err := a.apiClient.UpdateArtifacts(ctxTimeout, a.conf.JobID, statesToUpload) - if err != nil { - a.logger.Warn("%s (%s)", err, r) - } - - // after four attempts (0, 1, 2, 3)... - if r.AttemptCount() == 3 { - // The short timeout has given us fast feedback on the first couple of attempts, - // but perhaps the server needs more time to complete the request, so fall back to - // the default HTTP client timeout. - a.logger.Debug("UpdateArtifacts timeout (%s) removed for subsequent attempts", timeout) - timeout = 0 - } + a.logger.Debug("Waiting for uploads to complete...") - return err - }) - if err != nil { - a.logger.Error("Error uploading artifact states: %s", err) - - // Track the error that was raised. We need to - // acquire a lock since we mutate the errors - // slice in multiple routines. - errorsMutex.Lock() - errs = append(errs, err) - errorsMutex.Unlock() - } + // Wait for the workers to finish + workerWG.Wait() - a.logger.Debug("Uploaded %d artifact states (%d/%d)", len(statesToUpload), artifactStatesUploaded, len(artifacts)) - } + // Since the workers are done, all work unit states have been sent to the + // state updater. + close(workUnitStateCh) - // Check again for states to upload in a few seconds - time.Sleep(1 * time.Second) - } + a.logger.Debug("Uploads complete, waiting for upload status to be sent to Buildkite...") - stateUploaderWaitGroup.Done() - }() + // Wait for the statuses to finish uploading + <-stateUpdatesDoneCh - for _, artifact := range artifacts { - p.Spawn(func() { - // Show a nice message that we're starting to upload the file - a.logger.Info("Uploading artifact %s %s (%s)", artifact.ID, artifact.Path, humanize.IBytes(uint64(artifact.FileSize))) + if len(errs) > 0 { + err := errors.Join(errs...) + return fmt.Errorf("errors uploading artifacts: %w", err) + } - var state string + a.logger.Info("Artifact uploads completed successfully") + + return nil +} + +func (a *Uploader) uploadWorker( + ctx context.Context, + perArtifactCtx map[*api.Artifact]cancelCtx, + workUnitsCh <-chan workUnit, + workUnitStateCh chan<- workUnitError, +) { + for { + select { + case <-ctx.Done(): + return + + case workUnit, open := <-workUnitsCh: + if !open { + return // Done + } + artifact := workUnit.Artifact() + actx := perArtifactCtx[artifact].ctx + // Show a nice message that we're starting to upload the file + a.logger.Info("Uploading %s", workUnit.Description()) // Upload the artifact and then set the state depending // on whether or not it passed. We'll retry the upload @@ -545,55 +595,150 @@ func (a *Uploader) upload(ctx context.Context, artifacts []*api.Artifact) error err := roko.NewRetrier( roko.WithMaxAttempts(10), roko.WithStrategy(roko.Constant(5*time.Second)), - ).DoWithContext(ctx, func(r *roko.Retrier) error { - if err := uploader.Upload(ctx, artifact); err != nil { + ).DoWithContext(actx, func(r *roko.Retrier) error { + if err := workUnit.DoWork(actx); err != nil { a.logger.Warn("%s (%s)", err, r) return err } return nil }) - // Did the upload eventually fail? + + // If it failed, abort any other work items for this artifact. if err != nil { - a.logger.Error("Error uploading artifact \"%s\": %s", artifact.Path, err) + a.logger.Info("Upload failed for %s", workUnit.Description()) + perArtifactCtx[workUnit.Artifact()].cancel(err) + } - // Track the error that was raised. We need to - // acquire a lock since we mutate the errors - // slice in multiple routines. - errorsMutex.Lock() - errs = append(errs, err) - errorsMutex.Unlock() + // Let the state updater know how the work went. + select { + case <-ctx.Done(): // the main context, not the artifact ctx + return // ctx.Err() - state = "error" - } else { - a.logger.Info("Successfully uploaded artifact \"%s\"", artifact.Path) - state = "finished" + case workUnitStateCh <- workUnitError{workUnit: workUnit, err: err}: } - - // Since we mutate the artifactStates variable in - // multiple routines, we need to lock it to make sure - // nothing else is changing it at the same time. - artifactStatesMutex.Lock() - artifactStates[artifact.ID] = state - artifactStatesMutex.Unlock() - }) + } } +} - a.logger.Debug("Waiting for uploads to complete...") +func (a *Uploader) statusUpdater( + ctx context.Context, + workUnitStateCh <-chan workUnitError, + artifactWorkUnitsCh <-chan artifactWorkUnits, + doneCh chan<- struct{}, +) []error { + defer close(doneCh) - // Wait for the pool to finish - p.Wait() + // Errors that caused an artifact upload to fail, or a batch fail to update. + var errs []error - a.logger.Debug("Uploads complete, waiting for upload status to be sent to buildkite...") + // artifact -> number of work units that are incomplete + pendingWorkUnits := make(map[*api.Artifact]int) - // Wait for the statuses to finish uploading - stateUploaderWaitGroup.Wait() + // States that haven't been updated on Buildkite yet. + statesToUpload := make(map[string]string) // artifact ID -> state - if len(errs) > 0 { - err := errors.Join(errs...) - return fmt.Errorf("errors uploading artifacts: %w", err) - } + // When this ticks, upload any pending artifact states. + updateTicker := time.NewTicker(1 * time.Second) - a.logger.Info("Artifact uploads completed successfully") + for { + select { + case <-ctx.Done(): + return errs - return nil + case <-updateTicker.C: + if len(statesToUpload) == 0 { // no news from the frontier + break + } + // Post an update + timeout := 5 * time.Second + + // Update the states of the artifacts in bulk. + err := roko.NewRetrier( + roko.WithMaxAttempts(10), + roko.WithStrategy(roko.ExponentialSubsecond(500*time.Millisecond)), + ).DoWithContext(ctx, func(r *roko.Retrier) error { + ctxTimeout := ctx + if timeout != 0 { + var cancel func() + ctxTimeout, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + + _, err := a.apiClient.UpdateArtifacts(ctxTimeout, a.conf.JobID, statesToUpload) + if err != nil { + a.logger.Warn("%s (%s)", err, r) + } + + // after four attempts (0, 1, 2, 3)... + if r.AttemptCount() == 3 { + // The short timeout has given us fast feedback on the first couple of attempts, + // but perhaps the server needs more time to complete the request, so fall back to + // the default HTTP client timeout. + a.logger.Debug("UpdateArtifacts timeout (%s) removed for subsequent attempts", timeout) + timeout = 0 + } + + return err + }) + if err != nil { + a.logger.Error("Error updating artifact states: %s", err) + errs = append(errs, err) + } + + a.logger.Debug("Updated %d artifact states", len(statesToUpload)) + clear(statesToUpload) + + case awu, open := <-artifactWorkUnitsCh: + if !open { + // Set it to nil so this select branch never happens again. + artifactWorkUnitsCh = nil + // If both input channels are nil, we're done! + if workUnitStateCh == nil { + return errs + } + } + + // Track how many pending work units there should be per artifact. + // Use += in case some work units for this artifact already completed. + pendingWorkUnits[awu.artifact] += awu.workUnits + if pendingWorkUnits[awu.artifact] != 0 { + break + } + // The whole artifact is complete, add it to the next batch of + // states to upload. + statesToUpload[awu.artifact.ID] = artifactStateFinished + a.logger.Debug("Artifact `%s` has entered state `%s`", awu.artifact.ID, artifactStateFinished) + + case workUnitState, open := <-workUnitStateCh: + if !open { + // Set it to nil so this select branch never happens again. + workUnitStateCh = nil + // If both input channels are nil, we're done! + if artifactWorkUnitsCh == nil { + return errs + } + } + artifact := workUnitState.workUnit.Artifact() + if workUnitState.err != nil { + // The work unit failed, so the whole artifact upload has failed. + errs = append(errs, workUnitState.err) + statesToUpload[artifact.ID] = artifactStateError + a.logger.Debug("Artifact `%s` has entered state `%s`", artifact.ID, artifactStateError) + break + } + // The work unit is complete - it's no longer pending. + pendingWorkUnits[artifact]-- + if pendingWorkUnits[artifact] != 0 { + break + } + // No pending units remain, so the whole artifact is complete. + // Add it to the next batch of states to upload. + statesToUpload[artifact.ID] = artifactStateFinished + a.logger.Debug("Artifact `%s` has entered state `%s`", artifact.ID, artifactStateFinished) + } + } +} + +func singleUnitDescription(artifact *api.Artifact) string { + return fmt.Sprintf("%s %s (%s)", artifact.ID, artifact.Path, humanize.IBytes(uint64(artifact.FileSize))) } From 580c3501d89bb60270922a05f09838e212909b43 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Thu, 19 Sep 2024 13:44:25 +1000 Subject: [PATCH 5/5] Support multipart uploads --- agent/agent_configuration.go | 39 +- agent/api.go | 2 +- agent/job_runner.go | 4 + api/artifacts.go | 88 +++-- clicommand/agent_start.go | 23 +- clicommand/artifact_upload.go | 4 + clicommand/global.go | 6 + internal/artifact/api_client.go | 2 +- internal/artifact/artifactory_uploader.go | 16 +- internal/artifact/azure_blob_uploader.go | 6 +- internal/artifact/batch_creator.go | 5 +- internal/artifact/bk_uploader.go | 182 ++++++++-- internal/artifact/bk_uploader_test.go | 194 +++++++--- internal/artifact/gs_uploader.go | 10 +- internal/artifact/s3_uploader.go | 9 +- internal/artifact/uploader.go | 412 ++++++++++++---------- 16 files changed, 667 insertions(+), 335 deletions(-) diff --git a/agent/agent_configuration.go b/agent/agent_configuration.go index 0655437e57..f27597de24 100644 --- a/agent/agent_configuration.go +++ b/agent/agent_configuration.go @@ -43,23 +43,24 @@ type AgentConfiguration struct { VerificationJWKS any // The set of keys to verify jobs with VerificationFailureBehaviour string // What to do if job verification fails (one of `block` or `warn`) - ANSITimestamps bool - TimestampLines bool - HealthCheckAddr string - DisconnectAfterJob bool - DisconnectAfterIdleTimeout int - CancelGracePeriod int - SignalGracePeriod time.Duration - EnableJobLogTmpfile bool - JobLogPath string - WriteJobLogsToStdout bool - LogFormat string - Shell string - Profile string - RedactedVars []string - AcquireJob string - TracingBackend string - TracingServiceName string - TraceContextEncoding string - DisableWarningsFor []string + ANSITimestamps bool + TimestampLines bool + HealthCheckAddr string + DisconnectAfterJob bool + DisconnectAfterIdleTimeout int + CancelGracePeriod int + SignalGracePeriod time.Duration + EnableJobLogTmpfile bool + JobLogPath string + WriteJobLogsToStdout bool + LogFormat string + Shell string + Profile string + RedactedVars []string + AcquireJob string + TracingBackend string + TracingServiceName string + TraceContextEncoding string + DisableWarningsFor []string + AllowMultipartArtifactUpload bool } diff --git a/agent/api.go b/agent/api.go index 422cec6a84..ac6bed6f60 100644 --- a/agent/api.go +++ b/agent/api.go @@ -37,7 +37,7 @@ type APIClient interface { StartJob(context.Context, *api.Job) (*api.Response, error) StepExport(context.Context, string, *api.StepExportRequest) (*api.StepExportResponse, *api.Response, error) StepUpdate(context.Context, string, *api.StepUpdate) (*api.Response, error) - UpdateArtifacts(context.Context, string, map[string]string) (*api.Response, error) + UpdateArtifacts(context.Context, string, []api.ArtifactState) (*api.Response, error) UploadChunk(context.Context, string, *api.Chunk) (*api.Response, error) UploadPipeline(context.Context, string, *api.PipelineChange, ...api.Header) (*api.Response, error) } diff --git a/agent/job_runner.go b/agent/job_runner.go index 7c31e0efbc..a5e5d6ca8e 100644 --- a/agent/job_runner.go +++ b/agent/job_runner.go @@ -534,6 +534,10 @@ func (r *JobRunner) createEnvironment(ctx context.Context) ([]string, error) { env["BUILDKITE_KUBERNETES_EXEC"] = "true" } + if !r.conf.AgentConfiguration.AllowMultipartArtifactUpload { + env["BUILDKITE_NO_MULTIPART_ARTIFACT_UPLOAD"] = "true" + } + // propagate CancelSignal to bootstrap, unless it's the default SIGTERM if r.conf.CancelSignal != process.SIGTERM { env["BUILDKITE_CANCEL_SIGNAL"] = r.conf.CancelSignal.String() diff --git a/api/artifacts.go b/api/artifacts.go index a3f7d6d8de..351345de9d 100644 --- a/api/artifacts.go +++ b/api/artifacts.go @@ -51,27 +51,47 @@ type Artifact struct { } type ArtifactBatch struct { - ID string `json:"id"` - Artifacts []*Artifact `json:"artifacts"` - UploadDestination string `json:"upload_destination"` + ID string `json:"id"` + Artifacts []*Artifact `json:"artifacts"` + UploadDestination string `json:"upload_destination"` + MultipartSupported bool `json:"multipart_supported,omitempty"` } +// ArtifactUploadInstructions describes how to upload an artifact to Buildkite +// artifact storage. type ArtifactUploadInstructions struct { - Data map[string]string `json:"data"` + // Used for a single-part upload. Action ArtifactUploadAction `json:"action"` + + // Used for a multi-part upload. + Actions []ArtifactUploadAction `json:"actions"` + + // Contains other data necessary for interpreting instructions. + Data map[string]string `json:"data"` } +// ArtifactUploadAction describes one action needed to upload an artifact or +// part of an artifact to Buildkite artifact storage. type ArtifactUploadAction struct { - URL string `json:"url,omitempty"` - Method string `json:"method"` - Path string `json:"path"` - FileInput string `json:"file_input"` + URL string `json:"url,omitempty"` + Method string `json:"method"` + Path string `json:"path"` + FileInput string `json:"file_input"` + PartNumber int `json:"part_number,omitempty"` } type ArtifactBatchCreateResponse struct { - ID string `json:"id"` - ArtifactIDs []string `json:"artifact_ids"` - UploadInstructions *ArtifactUploadInstructions `json:"upload_instructions"` + ID string `json:"id"` + ArtifactIDs []string `json:"artifact_ids"` + + // These instructions apply to all artifacts. The template contains + // variable interpolations such as ${artifact:path}. + InstructionsTemplate *ArtifactUploadInstructions `json:"upload_instructions"` + + // These instructions apply to specific artifacts, necessary for multipart + // uploads. It overrides InstructionTemplate and should not contain + // interpolations. Map: artifact ID -> instructions for that artifact. + PerArtifactInstructions map[string]*ArtifactUploadInstructions `json:"per_artifact_instructions"` } // ArtifactSearchOptions specifies the optional parameters to the @@ -84,18 +104,29 @@ type ArtifactSearchOptions struct { IncludeDuplicates bool `url:"include_duplicates,omitempty"` } -type ArtifactBatchUpdateArtifact struct { - ID string `json:"id"` - State string `json:"state"` +// ArtifactState represents the state of a single artifact, when calling UpdateArtifacts. +type ArtifactState struct { + ID string `json:"id"` + State string `json:"state"` + Multipart bool `json:"multipart,omitempty"` + // If this artifact was a multipart upload and is complete, we need the + // the ETag from each uploaded part so that they can be joined together. + MultipartETags []ArtifactPartETag `json:"multipart_etags,omitempty"` +} + +// ArtifactPartETag associates an ETag to a part number for a multipart upload. +type ArtifactPartETag struct { + PartNumber int `json:"part_number"` + ETag string `json:"etag"` } type ArtifactBatchUpdateRequest struct { - Artifacts []*ArtifactBatchUpdateArtifact `json:"artifacts"` + Artifacts []ArtifactState `json:"artifacts"` } // CreateArtifacts takes a slice of artifacts, and creates them on Buildkite as a batch. -func (c *Client) CreateArtifacts(ctx context.Context, jobId string, batch *ArtifactBatch) (*ArtifactBatchCreateResponse, *Response, error) { - u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobId)) +func (c *Client) CreateArtifacts(ctx context.Context, jobID string, batch *ArtifactBatch) (*ArtifactBatchCreateResponse, *Response, error) { + u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobID)) req, err := c.newRequest(ctx, "POST", u, batch) if err != nil { @@ -111,13 +142,11 @@ func (c *Client) CreateArtifacts(ctx context.Context, jobId string, batch *Artif return createResponse, resp, err } -// Updates a particular artifact -func (c *Client) UpdateArtifacts(ctx context.Context, jobId string, artifactStates map[string]string) (*Response, error) { - u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobId)) - payload := ArtifactBatchUpdateRequest{} - - for id, state := range artifactStates { - payload.Artifacts = append(payload.Artifacts, &ArtifactBatchUpdateArtifact{id, state}) +// UpdateArtifacts updates Buildkite with one or more artifact states. +func (c *Client) UpdateArtifacts(ctx context.Context, jobID string, artifactStates []ArtifactState) (*Response, error) { + u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobID)) + payload := ArtifactBatchUpdateRequest{ + Artifacts: artifactStates, } req, err := c.newRequest(ctx, "PUT", u, payload) @@ -125,17 +154,12 @@ func (c *Client) UpdateArtifacts(ctx context.Context, jobId string, artifactStat return nil, err } - resp, err := c.doRequest(req, nil) - if err != nil { - return resp, err - } - - return resp, err + return c.doRequest(req, nil) } // SearchArtifacts searches Buildkite for a set of artifacts -func (c *Client) SearchArtifacts(ctx context.Context, buildId string, opt *ArtifactSearchOptions) ([]*Artifact, *Response, error) { - u := fmt.Sprintf("builds/%s/artifacts/search", railsPathEscape(buildId)) +func (c *Client) SearchArtifacts(ctx context.Context, buildID string, opt *ArtifactSearchOptions) ([]*Artifact, *Response, error) { + u := fmt.Sprintf("builds/%s/artifacts/search", railsPathEscape(buildID)) u, err := addOptions(u, opt) if err != nil { return nil, nil, err diff --git a/clicommand/agent_start.go b/clicommand/agent_start.go index 157bb2a22c..6ff12c1e7e 100644 --- a/clicommand/agent_start.go +++ b/clicommand/agent_start.go @@ -167,14 +167,15 @@ type AgentStartConfig struct { TracingServiceName string `cli:"tracing-service-name"` // Global flags - Debug bool `cli:"debug"` - LogLevel string `cli:"log-level"` - NoColor bool `cli:"no-color"` - Experiments []string `cli:"experiment" normalize:"list"` - Profile string `cli:"profile"` - StrictSingleHooks bool `cli:"strict-single-hooks"` - KubernetesExec bool `cli:"kubernetes-exec"` - TraceContextEncoding string `cli:"trace-context-encoding"` + Debug bool `cli:"debug"` + LogLevel string `cli:"log-level"` + NoColor bool `cli:"no-color"` + Experiments []string `cli:"experiment" normalize:"list"` + Profile string `cli:"profile"` + StrictSingleHooks bool `cli:"strict-single-hooks"` + KubernetesExec bool `cli:"kubernetes-exec"` + TraceContextEncoding string `cli:"trace-context-encoding"` + NoMultipartArtifactUpload bool `cli:"no-multipart-artifact-upload"` // API config DebugHTTP bool `cli:"debug-http"` @@ -704,6 +705,7 @@ var AgentStartCommand = cli.Command{ StrictSingleHooksFlag, KubernetesExecFlag, TraceContextEncodingFlag, + NoMultipartArtifactUploadFlag, // Deprecated flags which will be removed in v4 cli.StringSliceFlag{ @@ -994,7 +996,7 @@ var AgentStartCommand = cli.Command{ TracingBackend: cfg.TracingBackend, TracingServiceName: cfg.TracingServiceName, TraceContextEncoding: cfg.TraceContextEncoding, - VerificationFailureBehaviour: cfg.VerificationFailureBehavior, + AllowMultipartArtifactUpload: !cfg.NoMultipartArtifactUpload, KubernetesExec: cfg.KubernetesExec, SigningJWKSFile: cfg.SigningJWKSFile, @@ -1002,7 +1004,8 @@ var AgentStartCommand = cli.Command{ SigningAWSKMSKey: cfg.SigningAWSKMSKey, DebugSigning: cfg.DebugSigning, - VerificationJWKS: verificationJWKS, + VerificationJWKS: verificationJWKS, + VerificationFailureBehaviour: cfg.VerificationFailureBehavior, DisableWarningsFor: cfg.DisableWarningsFor, } diff --git a/clicommand/artifact_upload.go b/clicommand/artifact_upload.go index 73e3f1709b..98ba6345d6 100644 --- a/clicommand/artifact_upload.go +++ b/clicommand/artifact_upload.go @@ -88,6 +88,7 @@ type ArtifactUploadConfig struct { // Uploader flags GlobResolveFollowSymlinks bool `cli:"glob-resolve-follow-symlinks"` UploadSkipSymlinks bool `cli:"upload-skip-symlinks"` + NoMultipartUpload bool `cli:"no-multipart-artifact-upload"` // deprecated FollowSymlinks bool `cli:"follow-symlinks" deprecated-and-renamed-to:"GlobResolveFollowSymlinks"` @@ -138,6 +139,7 @@ var ArtifactUploadCommand = cli.Command{ LogLevelFlag, ExperimentsFlag, ProfileFlag, + NoMultipartArtifactUploadFlag, }, Action: func(c *cli.Context) error { ctx := context.Background() @@ -155,6 +157,8 @@ var ArtifactUploadCommand = cli.Command{ ContentType: cfg.ContentType, DebugHTTP: cfg.DebugHTTP, + AllowMultipart: !cfg.NoMultipartUpload, + // If the deprecated flag was set to true, pretend its replacement was set to true too // this works as long as the user only sets one of the two flags GlobResolveFollowSymlinks: (cfg.GlobResolveFollowSymlinks || cfg.FollowSymlinks), diff --git a/clicommand/global.go b/clicommand/global.go index fb5cd6eaf2..08902d98b0 100644 --- a/clicommand/global.go +++ b/clicommand/global.go @@ -100,6 +100,12 @@ var ( EnvVar: "BUILDKITE_KUBERNETES_EXEC", } + NoMultipartArtifactUploadFlag = cli.BoolFlag{ + Name: "no-multipart-artifact-upload", + Usage: "For Buildkite-hosted artifacts, disables the use of multipart uploads. Has no effect on uploads to other destinations such as custom cloud buckets", + EnvVar: "BUILDKITE_NO_MULTIPART_ARTIFACT_UPLOAD", + } + ExperimentsFlag = cli.StringSliceFlag{ Name: "experiment", Value: &cli.StringSlice{}, diff --git a/internal/artifact/api_client.go b/internal/artifact/api_client.go index 6b4e72af78..f27bca2e10 100644 --- a/internal/artifact/api_client.go +++ b/internal/artifact/api_client.go @@ -10,5 +10,5 @@ import ( type APIClient interface { CreateArtifacts(context.Context, string, *api.ArtifactBatch) (*api.ArtifactBatchCreateResponse, *api.Response, error) SearchArtifacts(context.Context, string, *api.ArtifactSearchOptions) ([]*api.Artifact, *api.Response, error) - UpdateArtifacts(context.Context, string, map[string]string) (*api.Response, error) + UpdateArtifacts(context.Context, string, []api.ArtifactState) (*api.Response, error) } diff --git a/internal/artifact/artifactory_uploader.go b/internal/artifact/artifactory_uploader.go index e2564b6367..f35da80ebb 100644 --- a/internal/artifact/artifactory_uploader.go +++ b/internal/artifact/artifactory_uploader.go @@ -117,12 +117,12 @@ func (u *artifactoryUploaderWork) Description() string { return singleUnitDescription(u.artifact) } -func (u *artifactoryUploaderWork) DoWork(context.Context) error { +func (u *artifactoryUploaderWork) DoWork(context.Context) (*api.ArtifactPartETag, error) { // Open file from filesystem u.logger.Debug("Reading file %q", u.artifact.AbsolutePath) f, err := os.Open(u.artifact.AbsolutePath) if err != nil { - return fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err) + return nil, fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err) } // Upload the file to Artifactory. @@ -131,32 +131,32 @@ func (u *artifactoryUploaderWork) DoWork(context.Context) error { req, err := http.NewRequest("PUT", u.URL(u.artifact), f) req.SetBasicAuth(u.user, u.password) if err != nil { - return err + return nil, err } md5Checksum, err := checksumFile(md5.New(), u.artifact.AbsolutePath) if err != nil { - return err + return nil, err } req.Header.Add("X-Checksum-MD5", md5Checksum) sha1Checksum, err := checksumFile(sha1.New(), u.artifact.AbsolutePath) if err != nil { - return err + return nil, err } req.Header.Add("X-Checksum-SHA1", sha1Checksum) sha256Checksum, err := checksumFile(sha256.New(), u.artifact.AbsolutePath) if err != nil { - return err + return nil, err } req.Header.Add("X-Checksum-SHA256", sha256Checksum) res, err := u.client.Do(req) if err != nil { - return err + return nil, err } - return checkResponse(res) + return nil, checkResponse(res) } func checksumFile(hasher hash.Hash, path string) (string, error) { diff --git a/internal/artifact/azure_blob_uploader.go b/internal/artifact/azure_blob_uploader.go index d2d66d4fba..ae9bfc1b19 100644 --- a/internal/artifact/azure_blob_uploader.go +++ b/internal/artifact/azure_blob_uploader.go @@ -107,11 +107,11 @@ func (u *azureBlobUploaderWork) Description() string { } // DoWork uploads an artifact file. -func (u *azureBlobUploaderWork) DoWork(ctx context.Context) error { +func (u *azureBlobUploaderWork) DoWork(ctx context.Context) (*api.ArtifactPartETag, error) { u.logger.Debug("Reading file %q", u.artifact.AbsolutePath) f, err := os.Open(u.artifact.AbsolutePath) if err != nil { - return fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err) + return nil, fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err) } defer f.Close() @@ -121,5 +121,5 @@ func (u *azureBlobUploaderWork) DoWork(ctx context.Context) error { bbc := u.client.NewContainerClient(u.loc.ContainerName).NewBlockBlobClient(blobName) _, err = bbc.UploadFile(ctx, f, nil) - return err + return nil, err } diff --git a/internal/artifact/batch_creator.go b/internal/artifact/batch_creator.go index 34665df6a0..f761ae950b 100644 --- a/internal/artifact/batch_creator.go +++ b/internal/artifact/batch_creator.go @@ -22,6 +22,9 @@ type BatchCreatorConfig struct { // CreateArtifactsTimeout, sets a context.WithTimeout around the CreateArtifacts API. // If it's zero, there's no context timeout and the default HTTP timeout will prevail. CreateArtifactsTimeout time.Duration + + // Whether to allow multipart uploads to the BK-hosted bucket. + AllowMultipart bool } type BatchCreator struct { @@ -63,7 +66,7 @@ func (a *BatchCreator) Create(ctx context.Context) ([]*api.Artifact, error) { ID: api.NewUUID(), Artifacts: theseArtifacts, UploadDestination: a.conf.UploadDestination, - MultipartSupported: true, + MultipartSupported: a.conf.AllowMultipart, } a.logger.Info("Creating (%d-%d)/%d artifacts", i, j, length) diff --git a/internal/artifact/bk_uploader.go b/internal/artifact/bk_uploader.go index 8aa9e9ab1e..b0e4dcbdf1 100644 --- a/internal/artifact/bk_uploader.go +++ b/internal/artifact/bk_uploader.go @@ -2,30 +2,39 @@ package artifact import ( "bytes" + "cmp" "context" _ "crypto/sha512" // import sha512 to make sha512 ssl certs work + "errors" "fmt" "io" "mime/multipart" "net/http" "net/http/httptrace" "net/http/httputil" - "strings" - - // "net/http/httputil" - "errors" "net/url" "os" + "slices" + "strings" "github.com/buildkite/agent/v3/api" "github.com/buildkite/agent/v3/logger" "github.com/buildkite/agent/v3/version" + "github.com/dustin/go-humanize" ) const artifactPathVariable = "${artifact:path}" -// BKUploader uploads to S3 as a single signed POST, which have a hard limit of 5Gb. -const maxFormUploadedArtifactSize = int64(5368709120) +const ( + // BKUploader uploads to S3 either as: + // - a single signed POST, which has a hard limit of 5GB, or + // - as a signed multipart, which has a limit of 5GB per _part_, but we + // aren't supporting larger artifacts yet. + maxFormUploadedArtifactSize = int64(5 * 1024 * 1024 * 1024) + + // Multipart parts have a minimum size of 5MB. + minPartSize = int64(5 * 1024 * 1024) +) type BKUploaderConfig struct { // Whether or not HTTP calls should be debugged @@ -58,29 +67,151 @@ func (u *BKUploader) CreateWork(artifact *api.Artifact) ([]workUnit, error) { if artifact.FileSize > maxFormUploadedArtifactSize { return nil, errArtifactTooLarge{Size: artifact.FileSize} } - // TODO: create multiple workers for multipart uploads - return []workUnit{&bkUploaderWork{ - BKUploader: u, - artifact: artifact, - }}, nil + actions := artifact.UploadInstructions.Actions + if len(actions) == 0 { + // Not multiple actions - use a single form upload. + return []workUnit{&bkFormUpload{ + BKUploader: u, + artifact: artifact, + }}, nil + } + + // Ensure the actions are sorted by part number. + slices.SortFunc(actions, func(a, b api.ArtifactUploadAction) int { + return cmp.Compare(a.PartNumber, b.PartNumber) + }) + + // Split the artifact across multiple parts. + chunks := int64(len(actions)) + chunkSize := artifact.FileSize / chunks + remainder := artifact.FileSize % chunks + var offset int64 + workUnits := make([]workUnit, 0, chunks) + for i, action := range actions { + size := chunkSize + if int64(i) < remainder { + // Spread the remainder across the first chunks. + size++ + } + workUnits = append(workUnits, &bkMultipartUpload{ + BKUploader: u, + artifact: artifact, + partCount: int(chunks), + action: &action, + offset: offset, + size: size, + }) + offset += size + } + // After that loop, `offset` should equal `artifact.FileSize`. + return workUnits, nil } -type bkUploaderWork struct { +// bkMultipartUpload uploads a single part of a multipart upload. +type bkMultipartUpload struct { + *BKUploader + artifact *api.Artifact + action *api.ArtifactUploadAction + partCount int + offset, size int64 +} + +func (u *bkMultipartUpload) Artifact() *api.Artifact { return u.artifact } + +func (u *bkMultipartUpload) Description() string { + return fmt.Sprintf("%s %s part %d/%d (~%s starting at ~%s)", + u.artifact.ID, + u.artifact.Path, + u.action.PartNumber, + u.partCount, + humanize.IBytes(uint64(u.size)), + humanize.IBytes(uint64(u.offset)), + ) +} + +func (u *bkMultipartUpload) DoWork(ctx context.Context) (*api.ArtifactPartETag, error) { + f, err := os.Open(u.artifact.AbsolutePath) + if err != nil { + return nil, err + } + defer f.Close() + + f.Seek(u.offset, 0) + lr := io.LimitReader(f, u.size) + + req, err := http.NewRequestWithContext(ctx, u.action.Method, u.action.URL, lr) + if err != nil { + return nil, err + } + // Content-Ranges are 0-indexed and inclusive + // example: Content-Range: bytes 200-1000/67589 + req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", u.offset, u.offset+u.size-1, u.artifact.FileSize)) + req.Header.Set("Content-Type", u.artifact.ContentType) + req.Header.Add("User-Agent", version.UserAgent()) + + if u.conf.DebugHTTP { + dumpReqOut, err := httputil.DumpRequestOut(req, false) + if err != nil { + u.logger.Error("Couldn't dump outgoing request: %v", err) + } + u.logger.Debug("%s", dumpReqOut) + } + + // TODO: set all the usual http transport & client options... + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if u.conf.DebugHTTP { + dumpResp, err := httputil.DumpResponse(resp, true) + if err != nil { + u.logger.Error("Couldn't dump outgoing request: %v", err) + return nil, err + } + u.logger.Debug("%s", dumpResp) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read response body: %v", err) + } + + if resp.StatusCode < 200 || resp.StatusCode > 299 { + return nil, fmt.Errorf("unsuccessful status %s: %s", resp.Status, body) + } + + etag := resp.Header.Get("Etag") + u.logger.Debug("Artifact %s part %d has ETag = %s", u.artifact.ID, u.action.PartNumber, etag) + if etag == "" { + return nil, errors.New("response missing ETag header") + } + + return &api.ArtifactPartETag{ + PartNumber: u.action.PartNumber, + ETag: etag, + }, nil +} + +// bkFormUpload uploads an artifact to a presigned URL in a single request using +// a request body encoded as multipart/form-data. +type bkFormUpload struct { *BKUploader artifact *api.Artifact } -func (u *bkUploaderWork) Artifact() *api.Artifact { return u.artifact } +func (u *bkFormUpload) Artifact() *api.Artifact { return u.artifact } -func (u *bkUploaderWork) Description() string { +func (u *bkFormUpload) Description() string { return singleUnitDescription(u.artifact) } // DoWork tries the upload. -func (u *bkUploaderWork) DoWork(ctx context.Context) error { - request, err := createUploadRequest(ctx, u.logger, u.artifact) +func (u *bkFormUpload) DoWork(ctx context.Context) (*api.ArtifactPartETag, error) { + request, err := createFormUploadRequest(ctx, u.logger, u.artifact) if err != nil { - return err + return nil, err } if u.conf.DebugHTTP { @@ -121,7 +252,7 @@ func (u *bkUploaderWork) DoWork(ctx context.Context) error { u.logger.Debug("%s %s", request.Method, request.URL) response, err := client.Do(request) if err != nil { - return err + return nil, err } defer response.Body.Close() @@ -138,17 +269,18 @@ func (u *bkUploaderWork) DoWork(ctx context.Context) error { body := &bytes.Buffer{} _, err := body.ReadFrom(response.Body) if err != nil { - return err + return nil, err } - return fmt.Errorf("%s (%d)", body, response.StatusCode) + return nil, fmt.Errorf("%s (%d)", body, response.StatusCode) } - return nil + return nil, nil } // Creates a new file upload http request with optional extra params -func createUploadRequest(ctx context.Context, _ logger.Logger, artifact *api.Artifact) (*http.Request, error) { +func createFormUploadRequest(ctx context.Context, _ logger.Logger, artifact *api.Artifact) (*http.Request, error) { streamer := newMultipartStreamer() + action := artifact.UploadInstructions.Action // Set the post data for the request for key, val := range artifact.UploadInstructions.Data { @@ -170,13 +302,13 @@ func createUploadRequest(ctx context.Context, _ logger.Logger, artifact *api.Art // It's important that we add the form field last because when // uploading to an S3 form, they are really nit-picky about the field // order, and the file needs to be the last one other it doesn't work. - if err := streamer.WriteFile(artifact.UploadInstructions.Action.FileInput, artifact.Path, fh); err != nil { + if err := streamer.WriteFile(action.FileInput, artifact.Path, fh); err != nil { fh.Close() return nil, err } // Create the URL that we'll send data to - uri, err := url.Parse(artifact.UploadInstructions.Action.URL) + uri, err := url.Parse(action.URL) if err != nil { fh.Close() return nil, err @@ -185,7 +317,7 @@ func createUploadRequest(ctx context.Context, _ logger.Logger, artifact *api.Art uri.Path = artifact.UploadInstructions.Action.Path // Create the request - req, err := http.NewRequestWithContext(ctx, artifact.UploadInstructions.Action.Method, uri.String(), streamer.Reader()) + req, err := http.NewRequestWithContext(ctx, action.Method, uri.String(), streamer.Reader()) if err != nil { fh.Close() return nil, err diff --git a/internal/artifact/bk_uploader_test.go b/internal/artifact/bk_uploader_test.go index 7a888446e3..86c34f60ad 100644 --- a/internal/artifact/bk_uploader_test.go +++ b/internal/artifact/bk_uploader_test.go @@ -11,61 +11,63 @@ import ( "net/http/httptest" "os" "path/filepath" + "strconv" "testing" "github.com/buildkite/agent/v3/api" "github.com/buildkite/agent/v3/logger" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" ) func TestFormUploading(t *testing.T) { ctx := context.Background() server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - switch req.URL.Path { - case "/buildkiteartifacts.com": - if req.ContentLength <= 0 { - http.Error(rw, "zero or unknown Content-Length", http.StatusBadRequest) - return - } + if req.Method != "POST" && req.URL.Path != "/buildkiteartifacts.com" { + http.Error(rw, fmt.Sprintf("not found; (method, path) = (%q, %q), want PUT /llamas3.txt", req.Method, req.URL.Path), http.StatusNotFound) + return + } - if err := req.ParseMultipartForm(5 * 1024 * 1024); err != nil { - http.Error(rw, fmt.Sprintf("req.ParseMultipartForm() = %v", err), http.StatusBadRequest) - return - } + if req.ContentLength <= 0 { + http.Error(rw, "zero or unknown Content-Length", http.StatusBadRequest) + return + } - // Check the ${artifact:path} interpolation is working - path := req.FormValue("path") - if got, want := path, "llamas.txt"; got != want { - http.Error(rw, fmt.Sprintf("path = %q, want %q", got, want), http.StatusBadRequest) - return - } + if err := req.ParseMultipartForm(5 * 1024 * 1024); err != nil { + http.Error(rw, fmt.Sprintf("req.ParseMultipartForm() = %v", err), http.StatusBadRequest) + return + } - file, fh, err := req.FormFile("file") - if err != nil { - http.Error(rw, fmt.Sprintf(`req.FormFile("file") error = %v`, err), http.StatusBadRequest) - return - } - defer file.Close() + // Check the ${artifact:path} interpolation is working + path := req.FormValue("path") + if got, want := path, "llamas.txt"; got != want { + http.Error(rw, fmt.Sprintf("path = %q, want %q", got, want), http.StatusBadRequest) + return + } - b := &bytes.Buffer{} - if _, err := io.Copy(b, file); err != nil { - http.Error(rw, fmt.Sprintf("io.Copy() error = %v", err), http.StatusInternalServerError) - return - } + file, fh, err := req.FormFile("file") + if err != nil { + http.Error(rw, fmt.Sprintf(`req.FormFile("file") error = %v`, err), http.StatusBadRequest) + return + } + defer file.Close() - // Check the file is attached correctly - if got, want := b.String(), "llamas"; got != want { - http.Error(rw, fmt.Sprintf("uploaded file content = %q, want %q", got, want), http.StatusBadRequest) - return - } + b := &bytes.Buffer{} + if _, err := io.Copy(b, file); err != nil { + http.Error(rw, fmt.Sprintf("io.Copy() error = %v", err), http.StatusInternalServerError) + return + } - if got, want := fh.Filename, "llamas.txt"; got != want { - http.Error(rw, fmt.Sprintf("uploaded file name = %q, want %q", got, want), http.StatusInternalServerError) - return - } + // Check the file is attached correctly + if got, want := b.String(), "llamas"; got != want { + http.Error(rw, fmt.Sprintf("uploaded file content = %q, want %q", got, want), http.StatusBadRequest) + return + } - default: - http.Error(rw, fmt.Sprintf("not found; method = %q, path = %q", req.Method, req.URL.Path), http.StatusNotFound) + if got, want := fh.Filename, "llamas.txt"; got != want { + http.Error(rw, fmt.Sprintf("uploaded file name = %q, want %q", got, want), http.StatusInternalServerError) + return } })) defer server.Close() @@ -113,9 +115,117 @@ func TestFormUploading(t *testing.T) { } for _, wu := range work { - if err := wu.DoWork(ctx); err != nil { - t.Errorf("bkUploaderWork.DoWork(artifact) = %v", err) + if _, err := wu.DoWork(ctx); err != nil { + t.Errorf("bkUploaderWork.DoWork(ctx) = %v", err) + } + } + }) + } +} + +func TestMultipartUploading(t *testing.T) { + ctx := context.Background() + + server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + if req.Method != "PUT" || req.URL.Path != "/llamas3.txt" { + http.Error(rw, fmt.Sprintf("not found; (method, path) = (%q, %q), want PUT /llamas3.txt", req.Method, req.URL.Path), http.StatusNotFound) + return + } + partNum, err := strconv.Atoi(req.URL.Query().Get("partNumber")) + if err != nil { + http.Error(rw, fmt.Sprintf("strconv.Atoi(req.URL.Query().Get(partNumber)) error = %v", err), http.StatusBadRequest) + return + } + + if partNum < 1 || partNum > 3 { + http.Error(rw, fmt.Sprintf("partNumber %d out of range [1, 3]", partNum), http.StatusBadRequest) + return + } + + b, err := io.ReadAll(req.Body) + if err != nil { + http.Error(rw, fmt.Sprintf("io.ReadAll(req.Body) error = %v", err), http.StatusInternalServerError) + return + } + + if got, want := string(b), "llamas"; got != want { + http.Error(rw, fmt.Sprintf("req.Body = %q, want %q", got, want), http.StatusExpectationFailed) + } + + rw.Header().Set("ETag", fmt.Sprintf(`"part number %d"`, partNum)) + rw.WriteHeader(http.StatusCreated) + })) + defer server.Close() + + temp, err := os.MkdirTemp("", "agent") + if err != nil { + t.Fatalf(`os.MkdirTemp("", "agent") error = %v`, err) + } + defer os.Remove(temp) + + cwd, err := os.Getwd() + if err != nil { + t.Fatalf("os.Getwd() error = %v", err) + } + + for _, wd := range []string{temp, cwd} { + t.Run(wd, func(t *testing.T) { + abspath := filepath.Join(wd, "llamas3.txt") + err = os.WriteFile(abspath, []byte("llamasllamasllamas"), 0700) + defer os.Remove(abspath) + + uploader := NewBKUploader(logger.Discard, BKUploaderConfig{}) + actions := []api.ArtifactUploadAction{ + {URL: server.URL + "/llamas3.txt?partNumber=1", Method: "PUT", PartNumber: 1}, + {URL: server.URL + "/llamas3.txt?partNumber=2", Method: "PUT", PartNumber: 2}, + {URL: server.URL + "/llamas3.txt?partNumber=3", Method: "PUT", PartNumber: 3}, + } + artifact := &api.Artifact{ + ID: "xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxx", + Path: "llamas3.txt", + AbsolutePath: abspath, + GlobPath: "llamas3.txt", + FileSize: 18, + ContentType: "text/plain", + UploadInstructions: &api.ArtifactUploadInstructions{ + Actions: actions, + }, + } + + work, err := uploader.CreateWork(artifact) + if err != nil { + t.Fatalf("uploader.CreateWork(artifact) error = %v", err) + } + + want := []workUnit{ + &bkMultipartUpload{BKUploader: uploader, artifact: artifact, partCount: 3, action: &actions[0], offset: 0, size: 6}, + &bkMultipartUpload{BKUploader: uploader, artifact: artifact, partCount: 3, action: &actions[1], offset: 6, size: 6}, + &bkMultipartUpload{BKUploader: uploader, artifact: artifact, partCount: 3, action: &actions[2], offset: 12, size: 6}, + } + + if diff := cmp.Diff(work, want, + cmp.AllowUnexported(bkMultipartUpload{}), + cmpopts.EquateComparable(uploader), + ); diff != "" { + t.Fatalf("CreateWork diff (-got +want):\n%s", diff) + } + + var gotEtags []api.ArtifactPartETag + for _, wu := range work { + etag, err := wu.DoWork(ctx) + if err != nil { + t.Errorf("bkUploaderWork.DoWork(ctx) = %v", err) } + gotEtags = append(gotEtags, *etag) + } + + wantEtags := []api.ArtifactPartETag{ + {PartNumber: 1, ETag: `"part number 1"`}, + {PartNumber: 2, ETag: `"part number 2"`}, + {PartNumber: 3, ETag: `"part number 3"`}, + } + if diff := cmp.Diff(gotEtags, wantEtags); diff != "" { + t.Errorf("etags diff (-got +want):\n%s", diff) } }) } @@ -161,8 +271,8 @@ func TestFormUploadFileMissing(t *testing.T) { } for _, wu := range work { - if err := wu.DoWork(ctx); !errors.Is(err, fs.ErrNotExist) { - t.Errorf("bkUploaderWork.DoWork(artifact) = %v, want %v", err, fs.ErrNotExist) + if _, err := wu.DoWork(ctx); !errors.Is(err, fs.ErrNotExist) { + t.Errorf("bkUploaderWork.DoWork(ctx) = %v, want %v", err, fs.ErrNotExist) } } } diff --git a/internal/artifact/gs_uploader.go b/internal/artifact/gs_uploader.go index 06445cc2e5..11bfdaf446 100644 --- a/internal/artifact/gs_uploader.go +++ b/internal/artifact/gs_uploader.go @@ -137,7 +137,7 @@ func (u *gsUploaderWork) Description() string { return singleUnitDescription(u.artifact) } -func (u *gsUploaderWork) DoWork(_ context.Context) error { +func (u *gsUploaderWork) DoWork(_ context.Context) (*api.ArtifactPartETag, error) { permission := os.Getenv("BUILDKITE_GS_ACL") // The dirtiest validation method ever... @@ -147,7 +147,7 @@ func (u *gsUploaderWork) DoWork(_ context.Context) error { permission != "projectPrivate" && permission != "publicRead" && permission != "publicReadWrite" { - return fmt.Errorf("Invalid GS ACL `%s`", permission) + return nil, fmt.Errorf("Invalid GS ACL `%s`", permission) } if permission == "" { @@ -164,7 +164,7 @@ func (u *gsUploaderWork) DoWork(_ context.Context) error { } file, err := os.Open(u.artifact.AbsolutePath) if err != nil { - return errors.New(fmt.Sprintf("Failed to open file %q (%v)", u.artifact.AbsolutePath, err)) + return nil, errors.New(fmt.Sprintf("Failed to open file %q (%v)", u.artifact.AbsolutePath, err)) } call := u.service.Objects.Insert(u.BucketName, object) if permission != "" { @@ -173,10 +173,10 @@ func (u *gsUploaderWork) DoWork(_ context.Context) error { if res, err := call.Media(file, googleapi.ContentType("")).Do(); err == nil { u.logger.Debug("Created object %v at location %v\n\n", res.Name, res.SelfLink) } else { - return errors.New(fmt.Sprintf("Failed to PUT file %q (%v)", u.artifactPath(u.artifact), err)) + return nil, errors.New(fmt.Sprintf("Failed to PUT file %q (%v)", u.artifactPath(u.artifact), err)) } - return nil + return nil, nil } func (u *GSUploader) artifactPath(artifact *api.Artifact) string { diff --git a/internal/artifact/s3_uploader.go b/internal/artifact/s3_uploader.go index 7cb41d3735..b13b6c1718 100644 --- a/internal/artifact/s3_uploader.go +++ b/internal/artifact/s3_uploader.go @@ -99,10 +99,10 @@ func (u *s3UploaderWork) Description() string { return singleUnitDescription(u.artifact) } -func (u *s3UploaderWork) DoWork(context.Context) error { +func (u *s3UploaderWork) DoWork(context.Context) (*api.ArtifactPartETag, error) { permission, err := u.resolvePermission() if err != nil { - return err + return nil, err } // Create an uploader with the session and default options @@ -112,7 +112,7 @@ func (u *s3UploaderWork) DoWork(context.Context) error { u.logger.Debug("Reading file %q", u.artifact.AbsolutePath) f, err := os.Open(u.artifact.AbsolutePath) if err != nil { - return fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err) + return nil, fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err) } // Upload the file to S3. @@ -131,8 +131,7 @@ func (u *s3UploaderWork) DoWork(context.Context) error { } _, err = uploader.Upload(params) - - return err + return nil, err } func (u *S3Uploader) artifactPath(artifact *api.Artifact) string { diff --git a/internal/artifact/uploader.go b/internal/artifact/uploader.go index d0c2062e88..baa584bc94 100644 --- a/internal/artifact/uploader.go +++ b/internal/artifact/uploader.go @@ -1,6 +1,7 @@ package artifact import ( + "cmp" "context" "crypto/sha1" "crypto/sha256" @@ -11,6 +12,7 @@ import ( "os" "path/filepath" "runtime" + "slices" "strings" "sync" "time" @@ -51,6 +53,9 @@ type UploaderConfig struct { // Whether to not upload symlinks UploadSkipSymlinks bool + + // Whether to allow multipart uploads to the BK-hosted bucket + AllowMultipart bool } type Uploader struct { @@ -103,6 +108,7 @@ func (a *Uploader) Upload(ctx context.Context) error { Artifacts: artifacts, UploadDestination: a.conf.Destination, CreateArtifactsTimeout: 10 * time.Second, + AllowMultipart: a.conf.AllowMultipart, }) artifacts, err = batchCreator.Create(ctx) if err != nil { @@ -449,73 +455,75 @@ type workUnit interface { Description() string // DoWork does the work. - DoWork(context.Context) error + DoWork(context.Context) (*api.ArtifactPartETag, error) } -const ( - artifactStateFinished = "finished" - artifactStateError = "error" -) - -// Messages passed on channels between goroutines. - -// workUnitError is just a tuple (workUnit, error). -type workUnitError struct { +// workUnitResult is just a tuple (workUnit, partETag | error). +type workUnitResult struct { workUnit workUnit + partETag *api.ArtifactPartETag err error } -// artifactWorkUnits is just a tuple (artifact, int). -type artifactWorkUnits struct { - artifact *api.Artifact - workUnits int +// artifactUploadWorker contains the shared state between the worker goroutines +// and the state uploader. +type artifactUploadWorker struct { + *Uploader + + // Counts the worker goroutines. + wg sync.WaitGroup + + // A tracker for every artifact. + // The map is written at the start of upload, and other goroutines only read + // afterwards. + trackers map[*api.Artifact]*artifactTracker } -// cancelCtx stores a context cancelled when part of an artifact upload has -// failed and needs to fail the whole artifact. -// Go readability notes: "Storing" a context? -// - In a long-lived struct: yeah nah 🙅. Read pkg.go.dev/context -// - Within a single operation ("upload", in this case): nah yeah 👍 -type cancelCtx struct { +// artifactTracker tracks the amount of work pending for an artifact. +type artifactTracker struct { + // All the work for uploading this artifact. + work []workUnit + + // Normally storing a context in a struct is a bad idea. It's explicitly + // called out in pkg.go.dev/context as a no-no (unless you are required to + // store a context for some reason like interface implementation or + // backwards compatibility). + // + // The main reason is that contexts are intended to be associated with a + // clear chain of function calls (i.e. work), and having contexts stored in + // a struct somewhere means there's confusion over which context should be + // used for a given function call. (Do you use one passed in, or from a + // struct? Do you listen for Done on both? Which context values apply?) + // + // So here's how this context is situated within the chain of work, and + // how it will be used: + // + // This context applies to all the work units for this artifact. + // It is a child context of the context passed to Uploader.upload, which + // should be exactly the same context that Uploader.upload passes to + // artifactUploadWorker. + // Canceling this context cancels all work associated with this artifact + // (across the fixed-size pool of worker goroutines). ctx context.Context cancel context.CancelCauseFunc -} - -func (a *Uploader) upload(ctx context.Context, artifacts []*api.Artifact, uploader workCreator) error { - perArtifactCtx := make(map[*api.Artifact]cancelCtx) - for _, artifact := range artifacts { - actx, cancel := context.WithCancelCause(ctx) - perArtifactCtx[artifact] = cancelCtx{actx, cancel} - } - // workUnitStateCh: multiple worker goroutines --(work unit state)--> state updater - workUnitStateCh := make(chan workUnitError) - // artifactWorkUnitsCh: work unit creation --(# work units for artifact)--> state updater - artifactWorkUnitsCh := make(chan artifactWorkUnits) - // workUnitsCh: work unit creation --(work unit to be run)--> multiple worker goroutines - workUnitsCh := make(chan workUnit) - // stateUpdatesDoneCh: closed when all state updates are complete - stateUpdatesDoneCh := make(chan struct{}) + // pendingWork is the number of incomplete units for this artifact. + // This is set once at the start of upload, and then decremented by the + // state updater goroutine as work units complete. + pendingWork int - // The status updater goroutine: updates batches of artifact states on - // Buildkite every few seconds. - var errs []error - go func() { - errs = a.statusUpdater(ctx, workUnitStateCh, artifactWorkUnitsCh, stateUpdatesDoneCh) - }() + // State that will be uploaded to BK when the artifact is finished or errored. + // Only the state updater goroutine writes this. + api.ArtifactState +} - // Worker goroutines that work on work units. - var workerWG sync.WaitGroup - for range runtime.GOMAXPROCS(0) { - workerWG.Add(1) - go func() { - defer workerWG.Done() - a.uploadWorker(ctx, perArtifactCtx, workUnitsCh, workUnitStateCh) - }() +func (a *Uploader) upload(ctx context.Context, artifacts []*api.Artifact, uploader workCreator) error { + worker := &artifactUploadWorker{ + Uploader: a, + trackers: make(map[*api.Artifact]*artifactTracker), } - // Work creation: creates the work units for each artifact. - // This must happen after creating goroutines listening on the channels. + // Create work and trackers for each artifact. for _, artifact := range artifacts { workUnits, err := uploader.CreateWork(artifact) if err != nil { @@ -523,44 +531,64 @@ func (a *Uploader) upload(ctx context.Context, artifacts []*api.Artifact, upload return err } - // Send the number of work units for this artifact to the state uploader. - select { - case <-ctx.Done(): - return ctx.Err() - case artifactWorkUnitsCh <- artifactWorkUnits{artifact: artifact, workUnits: len(workUnits)}: + actx, acancel := context.WithCancelCause(ctx) + worker.trackers[artifact] = &artifactTracker{ + ctx: actx, + cancel: acancel, + work: workUnits, + pendingWork: len(workUnits), + ArtifactState: api.ArtifactState{ + ID: artifact.ID, + Multipart: len(workUnits) > 1, + }, } + } + + // unitsCh: work unit creation --(work unit to be run)--> multiple worker goroutines + unitsCh := make(chan workUnit) + // resultsCh: multiple worker goroutines --(work unit result)--> state updater + resultsCh := make(chan workUnitResult) + // errCh: receives the final error from the status updater + errCh := make(chan error, 1) - // Send the work units themselves to the workers. - for _, workUnit := range workUnits { + // The status updater goroutine: updates batches of artifact states on + // Buildkite every few seconds. + go worker.stateUpdater(ctx, resultsCh, errCh) + + // Worker goroutines that work on work units. + for range runtime.GOMAXPROCS(0) { + worker.wg.Add(1) + go worker.doWorkUnits(ctx, unitsCh, resultsCh) + } + + // Send the work units for each artifact to the workers. + // This must happen after creating worker goroutines listening on workUnitsCh. + for _, tracker := range worker.trackers { + for _, workUnit := range tracker.work { select { case <-ctx.Done(): return ctx.Err() - case workUnitsCh <- workUnit: + case unitsCh <- workUnit: } } } - // All work units have been sent to workers, and all counts of pending work - // units have been sent to the state updater. - close(workUnitsCh) - close(artifactWorkUnitsCh) + // All work units have been sent to workers. + close(unitsCh) a.logger.Debug("Waiting for uploads to complete...") // Wait for the workers to finish - workerWG.Wait() + worker.wg.Wait() // Since the workers are done, all work unit states have been sent to the // state updater. - close(workUnitStateCh) + close(resultsCh) a.logger.Debug("Uploads complete, waiting for upload status to be sent to Buildkite...") // Wait for the statuses to finish uploading - <-stateUpdatesDoneCh - - if len(errs) > 0 { - err := errors.Join(errs...) + if err := <-errCh; err != nil { return fmt.Errorf("errors uploading artifacts: %w", err) } @@ -569,176 +597,194 @@ func (a *Uploader) upload(ctx context.Context, artifacts []*api.Artifact, upload return nil } -func (a *Uploader) uploadWorker( - ctx context.Context, - perArtifactCtx map[*api.Artifact]cancelCtx, - workUnitsCh <-chan workUnit, - workUnitStateCh chan<- workUnitError, -) { +func (a *artifactUploadWorker) doWorkUnits(ctx context.Context, unitsCh <-chan workUnit, resultsCh chan<- workUnitResult) { + defer a.wg.Done() + for { select { case <-ctx.Done(): return - case workUnit, open := <-workUnitsCh: + case workUnit, open := <-unitsCh: if !open { return // Done } - artifact := workUnit.Artifact() - actx := perArtifactCtx[artifact].ctx + tracker := a.trackers[workUnit.Artifact()] // Show a nice message that we're starting to upload the file a.logger.Info("Uploading %s", workUnit.Description()) // Upload the artifact and then set the state depending // on whether or not it passed. We'll retry the upload // a couple of times before giving up. - err := roko.NewRetrier( + r := roko.NewRetrier( roko.WithMaxAttempts(10), roko.WithStrategy(roko.Constant(5*time.Second)), - ).DoWithContext(actx, func(r *roko.Retrier) error { - if err := workUnit.DoWork(actx); err != nil { + ) + partETag, err := roko.DoFunc(tracker.ctx, r, func(r *roko.Retrier) (*api.ArtifactPartETag, error) { + etag, err := workUnit.DoWork(tracker.ctx) + if err != nil { a.logger.Warn("%s (%s)", err, r) - return err } - return nil + return etag, err }) // If it failed, abort any other work items for this artifact. if err != nil { - a.logger.Info("Upload failed for %s", workUnit.Description()) - perArtifactCtx[workUnit.Artifact()].cancel(err) + a.logger.Info("Upload failed for %s: %v", workUnit.Description(), err) + tracker.cancel(err) + // then the error is sent to the status updater } // Let the state updater know how the work went. select { - case <-ctx.Done(): // the main context, not the artifact ctx - return // ctx.Err() + case <-ctx.Done(): // Note: the main context, not the artifact tracker context + return - case workUnitStateCh <- workUnitError{workUnit: workUnit, err: err}: + case resultsCh <- workUnitResult{workUnit: workUnit, partETag: partETag, err: err}: } } } } -func (a *Uploader) statusUpdater( - ctx context.Context, - workUnitStateCh <-chan workUnitError, - artifactWorkUnitsCh <-chan artifactWorkUnits, - doneCh chan<- struct{}, -) []error { - defer close(doneCh) - - // Errors that caused an artifact upload to fail, or a batch fail to update. +func (a *artifactUploadWorker) stateUpdater(ctx context.Context, resultsCh <-chan workUnitResult, stateUpdaterErrCh chan<- error) { var errs []error + defer func() { + if len(errs) > 0 { + stateUpdaterErrCh <- errors.Join(errs...) + } + close(stateUpdaterErrCh) + }() - // artifact -> number of work units that are incomplete - pendingWorkUnits := make(map[*api.Artifact]int) - - // States that haven't been updated on Buildkite yet. - statesToUpload := make(map[string]string) // artifact ID -> state - - // When this ticks, upload any pending artifact states. + // When this ticks, upload any pending artifact states as a batch. updateTicker := time.NewTicker(1 * time.Second) +selectLoop: for { select { case <-ctx.Done(): - return errs + return case <-updateTicker.C: - if len(statesToUpload) == 0 { // no news from the frontier - break - } - // Post an update - timeout := 5 * time.Second - - // Update the states of the artifacts in bulk. - err := roko.NewRetrier( - roko.WithMaxAttempts(10), - roko.WithStrategy(roko.ExponentialSubsecond(500*time.Millisecond)), - ).DoWithContext(ctx, func(r *roko.Retrier) error { - ctxTimeout := ctx - if timeout != 0 { - var cancel func() - ctxTimeout, cancel = context.WithTimeout(ctx, timeout) - defer cancel() - } - - _, err := a.apiClient.UpdateArtifacts(ctxTimeout, a.conf.JobID, statesToUpload) - if err != nil { - a.logger.Warn("%s (%s)", err, r) - } - - // after four attempts (0, 1, 2, 3)... - if r.AttemptCount() == 3 { - // The short timeout has given us fast feedback on the first couple of attempts, - // but perhaps the server needs more time to complete the request, so fall back to - // the default HTTP client timeout. - a.logger.Debug("UpdateArtifacts timeout (%s) removed for subsequent attempts", timeout) - timeout = 0 - } - - return err - }) - if err != nil { - a.logger.Error("Error updating artifact states: %s", err) + // Note: updateStates removes trackers for completed states. + if err := a.updateStates(ctx); err != nil { errs = append(errs, err) } - a.logger.Debug("Updated %d artifact states", len(statesToUpload)) - clear(statesToUpload) - - case awu, open := <-artifactWorkUnitsCh: + case result, open := <-resultsCh: if !open { - // Set it to nil so this select branch never happens again. - artifactWorkUnitsCh = nil - // If both input channels are nil, we're done! - if workUnitStateCh == nil { - return errs - } + // No more input: we're done! + break selectLoop } + artifact := result.workUnit.Artifact() + tracker := a.trackers[artifact] - // Track how many pending work units there should be per artifact. - // Use += in case some work units for this artifact already completed. - pendingWorkUnits[awu.artifact] += awu.workUnits - if pendingWorkUnits[awu.artifact] != 0 { - break - } - // The whole artifact is complete, add it to the next batch of - // states to upload. - statesToUpload[awu.artifact.ID] = artifactStateFinished - a.logger.Debug("Artifact `%s` has entered state `%s`", awu.artifact.ID, artifactStateFinished) - - case workUnitState, open := <-workUnitStateCh: - if !open { - // Set it to nil so this select branch never happens again. - workUnitStateCh = nil - // If both input channels are nil, we're done! - if artifactWorkUnitsCh == nil { - return errs - } - } - artifact := workUnitState.workUnit.Artifact() - if workUnitState.err != nil { + if result.err != nil { // The work unit failed, so the whole artifact upload has failed. - errs = append(errs, workUnitState.err) - statesToUpload[artifact.ID] = artifactStateError - a.logger.Debug("Artifact `%s` has entered state `%s`", artifact.ID, artifactStateError) - break + errs = append(errs, result.err) + tracker.State = "error" + a.logger.Debug("Artifact %s has entered state %s", tracker.ID, tracker.State) + continue } + // The work unit is complete - it's no longer pending. - pendingWorkUnits[artifact]-- - if pendingWorkUnits[artifact] != 0 { - break + if partETag := result.partETag; partETag != nil { + tracker.MultipartETags = append(tracker.MultipartETags, *partETag) + } + + tracker.pendingWork-- + if tracker.pendingWork > 0 { + continue } + // No pending units remain, so the whole artifact is complete. // Add it to the next batch of states to upload. - statesToUpload[artifact.ID] = artifactStateFinished - a.logger.Debug("Artifact `%s` has entered state `%s`", artifact.ID, artifactStateFinished) + tracker.State = "finished" + a.logger.Debug("Artifact %s has entered state %s", tracker.ID, tracker.State) + } + } + + // Upload any remaining states. + if err := a.updateStates(ctx); err != nil { + errs = append(errs, err) + } + return +} + +// updateStates uploads terminal artifact states to Buildkite in a batch. +func (a *artifactUploadWorker) updateStates(ctx context.Context) error { + // Only upload states that are finished or error. + var statesToUpload []api.ArtifactState + var trackersToMarkSent []*artifactTracker + for _, tracker := range a.trackers { + switch tracker.State { + case "finished", "error": + // Only send these states. + default: + continue } + // This artifact is complete, move it from a.trackers to statesToUpload. + statesToUpload = append(statesToUpload, tracker.ArtifactState) + trackersToMarkSent = append(trackersToMarkSent, tracker) } + + if len(statesToUpload) == 0 { // no news from the frontier + return nil + } + + for _, state := range statesToUpload { + // Ensure ETags are in ascending order by part number. + // This is required by S3. + slices.SortFunc(state.MultipartETags, func(a, b api.ArtifactPartETag) int { + return cmp.Compare(a.PartNumber, b.PartNumber) + }) + } + + // Post the update + timeout := 5 * time.Second + + // Update the states of the artifacts in bulk. + err := roko.NewRetrier( + roko.WithMaxAttempts(10), + roko.WithStrategy(roko.ExponentialSubsecond(500*time.Millisecond)), + ).DoWithContext(ctx, func(r *roko.Retrier) error { + ctxTimeout := ctx + if timeout != 0 { + var cancel func() + ctxTimeout, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + + _, err := a.apiClient.UpdateArtifacts(ctxTimeout, a.conf.JobID, statesToUpload) + if err != nil { + a.logger.Warn("%s (%s)", err, r) + } + + // after four attempts (0, 1, 2, 3)... + if r.AttemptCount() == 3 { + // The short timeout has given us fast feedback on the first couple of attempts, + // but perhaps the server needs more time to complete the request, so fall back to + // the default HTTP client timeout. + a.logger.Debug("UpdateArtifacts timeout (%s) removed for subsequent attempts", timeout) + timeout = 0 + } + + return err + }) + if err != nil { + a.logger.Error("Error updating artifact states: %v", err) + return err + } + + for _, tracker := range trackersToMarkSent { + // Don't send this state again. + tracker.State = "sent" + } + a.logger.Debug("Updated %d artifact states", len(statesToUpload)) + return nil } +// singleUnitDescription can be used by uploader implementations to describe +// artifact uploads consisting of a single work unit. func singleUnitDescription(artifact *api.Artifact) string { return fmt.Sprintf("%s %s (%s)", artifact.ID, artifact.Path, humanize.IBytes(uint64(artifact.FileSize))) }