From 580c3501d89bb60270922a05f09838e212909b43 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Thu, 19 Sep 2024 13:44:25 +1000 Subject: [PATCH] Support multipart uploads --- agent/agent_configuration.go | 39 +- agent/api.go | 2 +- agent/job_runner.go | 4 + api/artifacts.go | 88 +++-- clicommand/agent_start.go | 23 +- clicommand/artifact_upload.go | 4 + clicommand/global.go | 6 + internal/artifact/api_client.go | 2 +- internal/artifact/artifactory_uploader.go | 16 +- internal/artifact/azure_blob_uploader.go | 6 +- internal/artifact/batch_creator.go | 5 +- internal/artifact/bk_uploader.go | 182 ++++++++-- internal/artifact/bk_uploader_test.go | 194 +++++++--- internal/artifact/gs_uploader.go | 10 +- internal/artifact/s3_uploader.go | 9 +- internal/artifact/uploader.go | 412 ++++++++++++---------- 16 files changed, 667 insertions(+), 335 deletions(-) diff --git a/agent/agent_configuration.go b/agent/agent_configuration.go index 0655437e57..f27597de24 100644 --- a/agent/agent_configuration.go +++ b/agent/agent_configuration.go @@ -43,23 +43,24 @@ type AgentConfiguration struct { VerificationJWKS any // The set of keys to verify jobs with VerificationFailureBehaviour string // What to do if job verification fails (one of `block` or `warn`) - ANSITimestamps bool - TimestampLines bool - HealthCheckAddr string - DisconnectAfterJob bool - DisconnectAfterIdleTimeout int - CancelGracePeriod int - SignalGracePeriod time.Duration - EnableJobLogTmpfile bool - JobLogPath string - WriteJobLogsToStdout bool - LogFormat string - Shell string - Profile string - RedactedVars []string - AcquireJob string - TracingBackend string - TracingServiceName string - TraceContextEncoding string - DisableWarningsFor []string + ANSITimestamps bool + TimestampLines bool + HealthCheckAddr string + DisconnectAfterJob bool + DisconnectAfterIdleTimeout int + CancelGracePeriod int + SignalGracePeriod time.Duration + EnableJobLogTmpfile bool + JobLogPath string + WriteJobLogsToStdout bool + LogFormat string + Shell string + Profile string + RedactedVars []string + AcquireJob string + TracingBackend string + TracingServiceName string + TraceContextEncoding string + DisableWarningsFor []string + AllowMultipartArtifactUpload bool } diff --git a/agent/api.go b/agent/api.go index 422cec6a84..ac6bed6f60 100644 --- a/agent/api.go +++ b/agent/api.go @@ -37,7 +37,7 @@ type APIClient interface { StartJob(context.Context, *api.Job) (*api.Response, error) StepExport(context.Context, string, *api.StepExportRequest) (*api.StepExportResponse, *api.Response, error) StepUpdate(context.Context, string, *api.StepUpdate) (*api.Response, error) - UpdateArtifacts(context.Context, string, map[string]string) (*api.Response, error) + UpdateArtifacts(context.Context, string, []api.ArtifactState) (*api.Response, error) UploadChunk(context.Context, string, *api.Chunk) (*api.Response, error) UploadPipeline(context.Context, string, *api.PipelineChange, ...api.Header) (*api.Response, error) } diff --git a/agent/job_runner.go b/agent/job_runner.go index 7c31e0efbc..a5e5d6ca8e 100644 --- a/agent/job_runner.go +++ b/agent/job_runner.go @@ -534,6 +534,10 @@ func (r *JobRunner) createEnvironment(ctx context.Context) ([]string, error) { env["BUILDKITE_KUBERNETES_EXEC"] = "true" } + if !r.conf.AgentConfiguration.AllowMultipartArtifactUpload { + env["BUILDKITE_NO_MULTIPART_ARTIFACT_UPLOAD"] = "true" + } + // propagate CancelSignal to bootstrap, unless it's the default SIGTERM if r.conf.CancelSignal != process.SIGTERM { env["BUILDKITE_CANCEL_SIGNAL"] = r.conf.CancelSignal.String() diff --git a/api/artifacts.go b/api/artifacts.go index a3f7d6d8de..351345de9d 100644 --- a/api/artifacts.go +++ b/api/artifacts.go @@ -51,27 +51,47 @@ type Artifact struct { } type ArtifactBatch struct { - ID string `json:"id"` - Artifacts []*Artifact `json:"artifacts"` - UploadDestination string `json:"upload_destination"` + ID string `json:"id"` + Artifacts []*Artifact `json:"artifacts"` + UploadDestination string `json:"upload_destination"` + MultipartSupported bool `json:"multipart_supported,omitempty"` } +// ArtifactUploadInstructions describes how to upload an artifact to Buildkite +// artifact storage. type ArtifactUploadInstructions struct { - Data map[string]string `json:"data"` + // Used for a single-part upload. Action ArtifactUploadAction `json:"action"` + + // Used for a multi-part upload. + Actions []ArtifactUploadAction `json:"actions"` + + // Contains other data necessary for interpreting instructions. + Data map[string]string `json:"data"` } +// ArtifactUploadAction describes one action needed to upload an artifact or +// part of an artifact to Buildkite artifact storage. type ArtifactUploadAction struct { - URL string `json:"url,omitempty"` - Method string `json:"method"` - Path string `json:"path"` - FileInput string `json:"file_input"` + URL string `json:"url,omitempty"` + Method string `json:"method"` + Path string `json:"path"` + FileInput string `json:"file_input"` + PartNumber int `json:"part_number,omitempty"` } type ArtifactBatchCreateResponse struct { - ID string `json:"id"` - ArtifactIDs []string `json:"artifact_ids"` - UploadInstructions *ArtifactUploadInstructions `json:"upload_instructions"` + ID string `json:"id"` + ArtifactIDs []string `json:"artifact_ids"` + + // These instructions apply to all artifacts. The template contains + // variable interpolations such as ${artifact:path}. + InstructionsTemplate *ArtifactUploadInstructions `json:"upload_instructions"` + + // These instructions apply to specific artifacts, necessary for multipart + // uploads. It overrides InstructionTemplate and should not contain + // interpolations. Map: artifact ID -> instructions for that artifact. + PerArtifactInstructions map[string]*ArtifactUploadInstructions `json:"per_artifact_instructions"` } // ArtifactSearchOptions specifies the optional parameters to the @@ -84,18 +104,29 @@ type ArtifactSearchOptions struct { IncludeDuplicates bool `url:"include_duplicates,omitempty"` } -type ArtifactBatchUpdateArtifact struct { - ID string `json:"id"` - State string `json:"state"` +// ArtifactState represents the state of a single artifact, when calling UpdateArtifacts. +type ArtifactState struct { + ID string `json:"id"` + State string `json:"state"` + Multipart bool `json:"multipart,omitempty"` + // If this artifact was a multipart upload and is complete, we need the + // the ETag from each uploaded part so that they can be joined together. + MultipartETags []ArtifactPartETag `json:"multipart_etags,omitempty"` +} + +// ArtifactPartETag associates an ETag to a part number for a multipart upload. +type ArtifactPartETag struct { + PartNumber int `json:"part_number"` + ETag string `json:"etag"` } type ArtifactBatchUpdateRequest struct { - Artifacts []*ArtifactBatchUpdateArtifact `json:"artifacts"` + Artifacts []ArtifactState `json:"artifacts"` } // CreateArtifacts takes a slice of artifacts, and creates them on Buildkite as a batch. -func (c *Client) CreateArtifacts(ctx context.Context, jobId string, batch *ArtifactBatch) (*ArtifactBatchCreateResponse, *Response, error) { - u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobId)) +func (c *Client) CreateArtifacts(ctx context.Context, jobID string, batch *ArtifactBatch) (*ArtifactBatchCreateResponse, *Response, error) { + u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobID)) req, err := c.newRequest(ctx, "POST", u, batch) if err != nil { @@ -111,13 +142,11 @@ func (c *Client) CreateArtifacts(ctx context.Context, jobId string, batch *Artif return createResponse, resp, err } -// Updates a particular artifact -func (c *Client) UpdateArtifacts(ctx context.Context, jobId string, artifactStates map[string]string) (*Response, error) { - u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobId)) - payload := ArtifactBatchUpdateRequest{} - - for id, state := range artifactStates { - payload.Artifacts = append(payload.Artifacts, &ArtifactBatchUpdateArtifact{id, state}) +// UpdateArtifacts updates Buildkite with one or more artifact states. +func (c *Client) UpdateArtifacts(ctx context.Context, jobID string, artifactStates []ArtifactState) (*Response, error) { + u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobID)) + payload := ArtifactBatchUpdateRequest{ + Artifacts: artifactStates, } req, err := c.newRequest(ctx, "PUT", u, payload) @@ -125,17 +154,12 @@ func (c *Client) UpdateArtifacts(ctx context.Context, jobId string, artifactStat return nil, err } - resp, err := c.doRequest(req, nil) - if err != nil { - return resp, err - } - - return resp, err + return c.doRequest(req, nil) } // SearchArtifacts searches Buildkite for a set of artifacts -func (c *Client) SearchArtifacts(ctx context.Context, buildId string, opt *ArtifactSearchOptions) ([]*Artifact, *Response, error) { - u := fmt.Sprintf("builds/%s/artifacts/search", railsPathEscape(buildId)) +func (c *Client) SearchArtifacts(ctx context.Context, buildID string, opt *ArtifactSearchOptions) ([]*Artifact, *Response, error) { + u := fmt.Sprintf("builds/%s/artifacts/search", railsPathEscape(buildID)) u, err := addOptions(u, opt) if err != nil { return nil, nil, err diff --git a/clicommand/agent_start.go b/clicommand/agent_start.go index 157bb2a22c..6ff12c1e7e 100644 --- a/clicommand/agent_start.go +++ b/clicommand/agent_start.go @@ -167,14 +167,15 @@ type AgentStartConfig struct { TracingServiceName string `cli:"tracing-service-name"` // Global flags - Debug bool `cli:"debug"` - LogLevel string `cli:"log-level"` - NoColor bool `cli:"no-color"` - Experiments []string `cli:"experiment" normalize:"list"` - Profile string `cli:"profile"` - StrictSingleHooks bool `cli:"strict-single-hooks"` - KubernetesExec bool `cli:"kubernetes-exec"` - TraceContextEncoding string `cli:"trace-context-encoding"` + Debug bool `cli:"debug"` + LogLevel string `cli:"log-level"` + NoColor bool `cli:"no-color"` + Experiments []string `cli:"experiment" normalize:"list"` + Profile string `cli:"profile"` + StrictSingleHooks bool `cli:"strict-single-hooks"` + KubernetesExec bool `cli:"kubernetes-exec"` + TraceContextEncoding string `cli:"trace-context-encoding"` + NoMultipartArtifactUpload bool `cli:"no-multipart-artifact-upload"` // API config DebugHTTP bool `cli:"debug-http"` @@ -704,6 +705,7 @@ var AgentStartCommand = cli.Command{ StrictSingleHooksFlag, KubernetesExecFlag, TraceContextEncodingFlag, + NoMultipartArtifactUploadFlag, // Deprecated flags which will be removed in v4 cli.StringSliceFlag{ @@ -994,7 +996,7 @@ var AgentStartCommand = cli.Command{ TracingBackend: cfg.TracingBackend, TracingServiceName: cfg.TracingServiceName, TraceContextEncoding: cfg.TraceContextEncoding, - VerificationFailureBehaviour: cfg.VerificationFailureBehavior, + AllowMultipartArtifactUpload: !cfg.NoMultipartArtifactUpload, KubernetesExec: cfg.KubernetesExec, SigningJWKSFile: cfg.SigningJWKSFile, @@ -1002,7 +1004,8 @@ var AgentStartCommand = cli.Command{ SigningAWSKMSKey: cfg.SigningAWSKMSKey, DebugSigning: cfg.DebugSigning, - VerificationJWKS: verificationJWKS, + VerificationJWKS: verificationJWKS, + VerificationFailureBehaviour: cfg.VerificationFailureBehavior, DisableWarningsFor: cfg.DisableWarningsFor, } diff --git a/clicommand/artifact_upload.go b/clicommand/artifact_upload.go index 73e3f1709b..98ba6345d6 100644 --- a/clicommand/artifact_upload.go +++ b/clicommand/artifact_upload.go @@ -88,6 +88,7 @@ type ArtifactUploadConfig struct { // Uploader flags GlobResolveFollowSymlinks bool `cli:"glob-resolve-follow-symlinks"` UploadSkipSymlinks bool `cli:"upload-skip-symlinks"` + NoMultipartUpload bool `cli:"no-multipart-artifact-upload"` // deprecated FollowSymlinks bool `cli:"follow-symlinks" deprecated-and-renamed-to:"GlobResolveFollowSymlinks"` @@ -138,6 +139,7 @@ var ArtifactUploadCommand = cli.Command{ LogLevelFlag, ExperimentsFlag, ProfileFlag, + NoMultipartArtifactUploadFlag, }, Action: func(c *cli.Context) error { ctx := context.Background() @@ -155,6 +157,8 @@ var ArtifactUploadCommand = cli.Command{ ContentType: cfg.ContentType, DebugHTTP: cfg.DebugHTTP, + AllowMultipart: !cfg.NoMultipartUpload, + // If the deprecated flag was set to true, pretend its replacement was set to true too // this works as long as the user only sets one of the two flags GlobResolveFollowSymlinks: (cfg.GlobResolveFollowSymlinks || cfg.FollowSymlinks), diff --git a/clicommand/global.go b/clicommand/global.go index fb5cd6eaf2..08902d98b0 100644 --- a/clicommand/global.go +++ b/clicommand/global.go @@ -100,6 +100,12 @@ var ( EnvVar: "BUILDKITE_KUBERNETES_EXEC", } + NoMultipartArtifactUploadFlag = cli.BoolFlag{ + Name: "no-multipart-artifact-upload", + Usage: "For Buildkite-hosted artifacts, disables the use of multipart uploads. Has no effect on uploads to other destinations such as custom cloud buckets", + EnvVar: "BUILDKITE_NO_MULTIPART_ARTIFACT_UPLOAD", + } + ExperimentsFlag = cli.StringSliceFlag{ Name: "experiment", Value: &cli.StringSlice{}, diff --git a/internal/artifact/api_client.go b/internal/artifact/api_client.go index 6b4e72af78..f27bca2e10 100644 --- a/internal/artifact/api_client.go +++ b/internal/artifact/api_client.go @@ -10,5 +10,5 @@ import ( type APIClient interface { CreateArtifacts(context.Context, string, *api.ArtifactBatch) (*api.ArtifactBatchCreateResponse, *api.Response, error) SearchArtifacts(context.Context, string, *api.ArtifactSearchOptions) ([]*api.Artifact, *api.Response, error) - UpdateArtifacts(context.Context, string, map[string]string) (*api.Response, error) + UpdateArtifacts(context.Context, string, []api.ArtifactState) (*api.Response, error) } diff --git a/internal/artifact/artifactory_uploader.go b/internal/artifact/artifactory_uploader.go index e2564b6367..f35da80ebb 100644 --- a/internal/artifact/artifactory_uploader.go +++ b/internal/artifact/artifactory_uploader.go @@ -117,12 +117,12 @@ func (u *artifactoryUploaderWork) Description() string { return singleUnitDescription(u.artifact) } -func (u *artifactoryUploaderWork) DoWork(context.Context) error { +func (u *artifactoryUploaderWork) DoWork(context.Context) (*api.ArtifactPartETag, error) { // Open file from filesystem u.logger.Debug("Reading file %q", u.artifact.AbsolutePath) f, err := os.Open(u.artifact.AbsolutePath) if err != nil { - return fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err) + return nil, fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err) } // Upload the file to Artifactory. @@ -131,32 +131,32 @@ func (u *artifactoryUploaderWork) DoWork(context.Context) error { req, err := http.NewRequest("PUT", u.URL(u.artifact), f) req.SetBasicAuth(u.user, u.password) if err != nil { - return err + return nil, err } md5Checksum, err := checksumFile(md5.New(), u.artifact.AbsolutePath) if err != nil { - return err + return nil, err } req.Header.Add("X-Checksum-MD5", md5Checksum) sha1Checksum, err := checksumFile(sha1.New(), u.artifact.AbsolutePath) if err != nil { - return err + return nil, err } req.Header.Add("X-Checksum-SHA1", sha1Checksum) sha256Checksum, err := checksumFile(sha256.New(), u.artifact.AbsolutePath) if err != nil { - return err + return nil, err } req.Header.Add("X-Checksum-SHA256", sha256Checksum) res, err := u.client.Do(req) if err != nil { - return err + return nil, err } - return checkResponse(res) + return nil, checkResponse(res) } func checksumFile(hasher hash.Hash, path string) (string, error) { diff --git a/internal/artifact/azure_blob_uploader.go b/internal/artifact/azure_blob_uploader.go index d2d66d4fba..ae9bfc1b19 100644 --- a/internal/artifact/azure_blob_uploader.go +++ b/internal/artifact/azure_blob_uploader.go @@ -107,11 +107,11 @@ func (u *azureBlobUploaderWork) Description() string { } // DoWork uploads an artifact file. -func (u *azureBlobUploaderWork) DoWork(ctx context.Context) error { +func (u *azureBlobUploaderWork) DoWork(ctx context.Context) (*api.ArtifactPartETag, error) { u.logger.Debug("Reading file %q", u.artifact.AbsolutePath) f, err := os.Open(u.artifact.AbsolutePath) if err != nil { - return fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err) + return nil, fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err) } defer f.Close() @@ -121,5 +121,5 @@ func (u *azureBlobUploaderWork) DoWork(ctx context.Context) error { bbc := u.client.NewContainerClient(u.loc.ContainerName).NewBlockBlobClient(blobName) _, err = bbc.UploadFile(ctx, f, nil) - return err + return nil, err } diff --git a/internal/artifact/batch_creator.go b/internal/artifact/batch_creator.go index 34665df6a0..f761ae950b 100644 --- a/internal/artifact/batch_creator.go +++ b/internal/artifact/batch_creator.go @@ -22,6 +22,9 @@ type BatchCreatorConfig struct { // CreateArtifactsTimeout, sets a context.WithTimeout around the CreateArtifacts API. // If it's zero, there's no context timeout and the default HTTP timeout will prevail. CreateArtifactsTimeout time.Duration + + // Whether to allow multipart uploads to the BK-hosted bucket. + AllowMultipart bool } type BatchCreator struct { @@ -63,7 +66,7 @@ func (a *BatchCreator) Create(ctx context.Context) ([]*api.Artifact, error) { ID: api.NewUUID(), Artifacts: theseArtifacts, UploadDestination: a.conf.UploadDestination, - MultipartSupported: true, + MultipartSupported: a.conf.AllowMultipart, } a.logger.Info("Creating (%d-%d)/%d artifacts", i, j, length) diff --git a/internal/artifact/bk_uploader.go b/internal/artifact/bk_uploader.go index 8aa9e9ab1e..b0e4dcbdf1 100644 --- a/internal/artifact/bk_uploader.go +++ b/internal/artifact/bk_uploader.go @@ -2,30 +2,39 @@ package artifact import ( "bytes" + "cmp" "context" _ "crypto/sha512" // import sha512 to make sha512 ssl certs work + "errors" "fmt" "io" "mime/multipart" "net/http" "net/http/httptrace" "net/http/httputil" - "strings" - - // "net/http/httputil" - "errors" "net/url" "os" + "slices" + "strings" "github.com/buildkite/agent/v3/api" "github.com/buildkite/agent/v3/logger" "github.com/buildkite/agent/v3/version" + "github.com/dustin/go-humanize" ) const artifactPathVariable = "${artifact:path}" -// BKUploader uploads to S3 as a single signed POST, which have a hard limit of 5Gb. -const maxFormUploadedArtifactSize = int64(5368709120) +const ( + // BKUploader uploads to S3 either as: + // - a single signed POST, which has a hard limit of 5GB, or + // - as a signed multipart, which has a limit of 5GB per _part_, but we + // aren't supporting larger artifacts yet. + maxFormUploadedArtifactSize = int64(5 * 1024 * 1024 * 1024) + + // Multipart parts have a minimum size of 5MB. + minPartSize = int64(5 * 1024 * 1024) +) type BKUploaderConfig struct { // Whether or not HTTP calls should be debugged @@ -58,29 +67,151 @@ func (u *BKUploader) CreateWork(artifact *api.Artifact) ([]workUnit, error) { if artifact.FileSize > maxFormUploadedArtifactSize { return nil, errArtifactTooLarge{Size: artifact.FileSize} } - // TODO: create multiple workers for multipart uploads - return []workUnit{&bkUploaderWork{ - BKUploader: u, - artifact: artifact, - }}, nil + actions := artifact.UploadInstructions.Actions + if len(actions) == 0 { + // Not multiple actions - use a single form upload. + return []workUnit{&bkFormUpload{ + BKUploader: u, + artifact: artifact, + }}, nil + } + + // Ensure the actions are sorted by part number. + slices.SortFunc(actions, func(a, b api.ArtifactUploadAction) int { + return cmp.Compare(a.PartNumber, b.PartNumber) + }) + + // Split the artifact across multiple parts. + chunks := int64(len(actions)) + chunkSize := artifact.FileSize / chunks + remainder := artifact.FileSize % chunks + var offset int64 + workUnits := make([]workUnit, 0, chunks) + for i, action := range actions { + size := chunkSize + if int64(i) < remainder { + // Spread the remainder across the first chunks. + size++ + } + workUnits = append(workUnits, &bkMultipartUpload{ + BKUploader: u, + artifact: artifact, + partCount: int(chunks), + action: &action, + offset: offset, + size: size, + }) + offset += size + } + // After that loop, `offset` should equal `artifact.FileSize`. + return workUnits, nil } -type bkUploaderWork struct { +// bkMultipartUpload uploads a single part of a multipart upload. +type bkMultipartUpload struct { + *BKUploader + artifact *api.Artifact + action *api.ArtifactUploadAction + partCount int + offset, size int64 +} + +func (u *bkMultipartUpload) Artifact() *api.Artifact { return u.artifact } + +func (u *bkMultipartUpload) Description() string { + return fmt.Sprintf("%s %s part %d/%d (~%s starting at ~%s)", + u.artifact.ID, + u.artifact.Path, + u.action.PartNumber, + u.partCount, + humanize.IBytes(uint64(u.size)), + humanize.IBytes(uint64(u.offset)), + ) +} + +func (u *bkMultipartUpload) DoWork(ctx context.Context) (*api.ArtifactPartETag, error) { + f, err := os.Open(u.artifact.AbsolutePath) + if err != nil { + return nil, err + } + defer f.Close() + + f.Seek(u.offset, 0) + lr := io.LimitReader(f, u.size) + + req, err := http.NewRequestWithContext(ctx, u.action.Method, u.action.URL, lr) + if err != nil { + return nil, err + } + // Content-Ranges are 0-indexed and inclusive + // example: Content-Range: bytes 200-1000/67589 + req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", u.offset, u.offset+u.size-1, u.artifact.FileSize)) + req.Header.Set("Content-Type", u.artifact.ContentType) + req.Header.Add("User-Agent", version.UserAgent()) + + if u.conf.DebugHTTP { + dumpReqOut, err := httputil.DumpRequestOut(req, false) + if err != nil { + u.logger.Error("Couldn't dump outgoing request: %v", err) + } + u.logger.Debug("%s", dumpReqOut) + } + + // TODO: set all the usual http transport & client options... + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if u.conf.DebugHTTP { + dumpResp, err := httputil.DumpResponse(resp, true) + if err != nil { + u.logger.Error("Couldn't dump outgoing request: %v", err) + return nil, err + } + u.logger.Debug("%s", dumpResp) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read response body: %v", err) + } + + if resp.StatusCode < 200 || resp.StatusCode > 299 { + return nil, fmt.Errorf("unsuccessful status %s: %s", resp.Status, body) + } + + etag := resp.Header.Get("Etag") + u.logger.Debug("Artifact %s part %d has ETag = %s", u.artifact.ID, u.action.PartNumber, etag) + if etag == "" { + return nil, errors.New("response missing ETag header") + } + + return &api.ArtifactPartETag{ + PartNumber: u.action.PartNumber, + ETag: etag, + }, nil +} + +// bkFormUpload uploads an artifact to a presigned URL in a single request using +// a request body encoded as multipart/form-data. +type bkFormUpload struct { *BKUploader artifact *api.Artifact } -func (u *bkUploaderWork) Artifact() *api.Artifact { return u.artifact } +func (u *bkFormUpload) Artifact() *api.Artifact { return u.artifact } -func (u *bkUploaderWork) Description() string { +func (u *bkFormUpload) Description() string { return singleUnitDescription(u.artifact) } // DoWork tries the upload. -func (u *bkUploaderWork) DoWork(ctx context.Context) error { - request, err := createUploadRequest(ctx, u.logger, u.artifact) +func (u *bkFormUpload) DoWork(ctx context.Context) (*api.ArtifactPartETag, error) { + request, err := createFormUploadRequest(ctx, u.logger, u.artifact) if err != nil { - return err + return nil, err } if u.conf.DebugHTTP { @@ -121,7 +252,7 @@ func (u *bkUploaderWork) DoWork(ctx context.Context) error { u.logger.Debug("%s %s", request.Method, request.URL) response, err := client.Do(request) if err != nil { - return err + return nil, err } defer response.Body.Close() @@ -138,17 +269,18 @@ func (u *bkUploaderWork) DoWork(ctx context.Context) error { body := &bytes.Buffer{} _, err := body.ReadFrom(response.Body) if err != nil { - return err + return nil, err } - return fmt.Errorf("%s (%d)", body, response.StatusCode) + return nil, fmt.Errorf("%s (%d)", body, response.StatusCode) } - return nil + return nil, nil } // Creates a new file upload http request with optional extra params -func createUploadRequest(ctx context.Context, _ logger.Logger, artifact *api.Artifact) (*http.Request, error) { +func createFormUploadRequest(ctx context.Context, _ logger.Logger, artifact *api.Artifact) (*http.Request, error) { streamer := newMultipartStreamer() + action := artifact.UploadInstructions.Action // Set the post data for the request for key, val := range artifact.UploadInstructions.Data { @@ -170,13 +302,13 @@ func createUploadRequest(ctx context.Context, _ logger.Logger, artifact *api.Art // It's important that we add the form field last because when // uploading to an S3 form, they are really nit-picky about the field // order, and the file needs to be the last one other it doesn't work. - if err := streamer.WriteFile(artifact.UploadInstructions.Action.FileInput, artifact.Path, fh); err != nil { + if err := streamer.WriteFile(action.FileInput, artifact.Path, fh); err != nil { fh.Close() return nil, err } // Create the URL that we'll send data to - uri, err := url.Parse(artifact.UploadInstructions.Action.URL) + uri, err := url.Parse(action.URL) if err != nil { fh.Close() return nil, err @@ -185,7 +317,7 @@ func createUploadRequest(ctx context.Context, _ logger.Logger, artifact *api.Art uri.Path = artifact.UploadInstructions.Action.Path // Create the request - req, err := http.NewRequestWithContext(ctx, artifact.UploadInstructions.Action.Method, uri.String(), streamer.Reader()) + req, err := http.NewRequestWithContext(ctx, action.Method, uri.String(), streamer.Reader()) if err != nil { fh.Close() return nil, err diff --git a/internal/artifact/bk_uploader_test.go b/internal/artifact/bk_uploader_test.go index 7a888446e3..86c34f60ad 100644 --- a/internal/artifact/bk_uploader_test.go +++ b/internal/artifact/bk_uploader_test.go @@ -11,61 +11,63 @@ import ( "net/http/httptest" "os" "path/filepath" + "strconv" "testing" "github.com/buildkite/agent/v3/api" "github.com/buildkite/agent/v3/logger" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" ) func TestFormUploading(t *testing.T) { ctx := context.Background() server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - switch req.URL.Path { - case "/buildkiteartifacts.com": - if req.ContentLength <= 0 { - http.Error(rw, "zero or unknown Content-Length", http.StatusBadRequest) - return - } + if req.Method != "POST" && req.URL.Path != "/buildkiteartifacts.com" { + http.Error(rw, fmt.Sprintf("not found; (method, path) = (%q, %q), want PUT /llamas3.txt", req.Method, req.URL.Path), http.StatusNotFound) + return + } - if err := req.ParseMultipartForm(5 * 1024 * 1024); err != nil { - http.Error(rw, fmt.Sprintf("req.ParseMultipartForm() = %v", err), http.StatusBadRequest) - return - } + if req.ContentLength <= 0 { + http.Error(rw, "zero or unknown Content-Length", http.StatusBadRequest) + return + } - // Check the ${artifact:path} interpolation is working - path := req.FormValue("path") - if got, want := path, "llamas.txt"; got != want { - http.Error(rw, fmt.Sprintf("path = %q, want %q", got, want), http.StatusBadRequest) - return - } + if err := req.ParseMultipartForm(5 * 1024 * 1024); err != nil { + http.Error(rw, fmt.Sprintf("req.ParseMultipartForm() = %v", err), http.StatusBadRequest) + return + } - file, fh, err := req.FormFile("file") - if err != nil { - http.Error(rw, fmt.Sprintf(`req.FormFile("file") error = %v`, err), http.StatusBadRequest) - return - } - defer file.Close() + // Check the ${artifact:path} interpolation is working + path := req.FormValue("path") + if got, want := path, "llamas.txt"; got != want { + http.Error(rw, fmt.Sprintf("path = %q, want %q", got, want), http.StatusBadRequest) + return + } - b := &bytes.Buffer{} - if _, err := io.Copy(b, file); err != nil { - http.Error(rw, fmt.Sprintf("io.Copy() error = %v", err), http.StatusInternalServerError) - return - } + file, fh, err := req.FormFile("file") + if err != nil { + http.Error(rw, fmt.Sprintf(`req.FormFile("file") error = %v`, err), http.StatusBadRequest) + return + } + defer file.Close() - // Check the file is attached correctly - if got, want := b.String(), "llamas"; got != want { - http.Error(rw, fmt.Sprintf("uploaded file content = %q, want %q", got, want), http.StatusBadRequest) - return - } + b := &bytes.Buffer{} + if _, err := io.Copy(b, file); err != nil { + http.Error(rw, fmt.Sprintf("io.Copy() error = %v", err), http.StatusInternalServerError) + return + } - if got, want := fh.Filename, "llamas.txt"; got != want { - http.Error(rw, fmt.Sprintf("uploaded file name = %q, want %q", got, want), http.StatusInternalServerError) - return - } + // Check the file is attached correctly + if got, want := b.String(), "llamas"; got != want { + http.Error(rw, fmt.Sprintf("uploaded file content = %q, want %q", got, want), http.StatusBadRequest) + return + } - default: - http.Error(rw, fmt.Sprintf("not found; method = %q, path = %q", req.Method, req.URL.Path), http.StatusNotFound) + if got, want := fh.Filename, "llamas.txt"; got != want { + http.Error(rw, fmt.Sprintf("uploaded file name = %q, want %q", got, want), http.StatusInternalServerError) + return } })) defer server.Close() @@ -113,9 +115,117 @@ func TestFormUploading(t *testing.T) { } for _, wu := range work { - if err := wu.DoWork(ctx); err != nil { - t.Errorf("bkUploaderWork.DoWork(artifact) = %v", err) + if _, err := wu.DoWork(ctx); err != nil { + t.Errorf("bkUploaderWork.DoWork(ctx) = %v", err) + } + } + }) + } +} + +func TestMultipartUploading(t *testing.T) { + ctx := context.Background() + + server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + if req.Method != "PUT" || req.URL.Path != "/llamas3.txt" { + http.Error(rw, fmt.Sprintf("not found; (method, path) = (%q, %q), want PUT /llamas3.txt", req.Method, req.URL.Path), http.StatusNotFound) + return + } + partNum, err := strconv.Atoi(req.URL.Query().Get("partNumber")) + if err != nil { + http.Error(rw, fmt.Sprintf("strconv.Atoi(req.URL.Query().Get(partNumber)) error = %v", err), http.StatusBadRequest) + return + } + + if partNum < 1 || partNum > 3 { + http.Error(rw, fmt.Sprintf("partNumber %d out of range [1, 3]", partNum), http.StatusBadRequest) + return + } + + b, err := io.ReadAll(req.Body) + if err != nil { + http.Error(rw, fmt.Sprintf("io.ReadAll(req.Body) error = %v", err), http.StatusInternalServerError) + return + } + + if got, want := string(b), "llamas"; got != want { + http.Error(rw, fmt.Sprintf("req.Body = %q, want %q", got, want), http.StatusExpectationFailed) + } + + rw.Header().Set("ETag", fmt.Sprintf(`"part number %d"`, partNum)) + rw.WriteHeader(http.StatusCreated) + })) + defer server.Close() + + temp, err := os.MkdirTemp("", "agent") + if err != nil { + t.Fatalf(`os.MkdirTemp("", "agent") error = %v`, err) + } + defer os.Remove(temp) + + cwd, err := os.Getwd() + if err != nil { + t.Fatalf("os.Getwd() error = %v", err) + } + + for _, wd := range []string{temp, cwd} { + t.Run(wd, func(t *testing.T) { + abspath := filepath.Join(wd, "llamas3.txt") + err = os.WriteFile(abspath, []byte("llamasllamasllamas"), 0700) + defer os.Remove(abspath) + + uploader := NewBKUploader(logger.Discard, BKUploaderConfig{}) + actions := []api.ArtifactUploadAction{ + {URL: server.URL + "/llamas3.txt?partNumber=1", Method: "PUT", PartNumber: 1}, + {URL: server.URL + "/llamas3.txt?partNumber=2", Method: "PUT", PartNumber: 2}, + {URL: server.URL + "/llamas3.txt?partNumber=3", Method: "PUT", PartNumber: 3}, + } + artifact := &api.Artifact{ + ID: "xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxx", + Path: "llamas3.txt", + AbsolutePath: abspath, + GlobPath: "llamas3.txt", + FileSize: 18, + ContentType: "text/plain", + UploadInstructions: &api.ArtifactUploadInstructions{ + Actions: actions, + }, + } + + work, err := uploader.CreateWork(artifact) + if err != nil { + t.Fatalf("uploader.CreateWork(artifact) error = %v", err) + } + + want := []workUnit{ + &bkMultipartUpload{BKUploader: uploader, artifact: artifact, partCount: 3, action: &actions[0], offset: 0, size: 6}, + &bkMultipartUpload{BKUploader: uploader, artifact: artifact, partCount: 3, action: &actions[1], offset: 6, size: 6}, + &bkMultipartUpload{BKUploader: uploader, artifact: artifact, partCount: 3, action: &actions[2], offset: 12, size: 6}, + } + + if diff := cmp.Diff(work, want, + cmp.AllowUnexported(bkMultipartUpload{}), + cmpopts.EquateComparable(uploader), + ); diff != "" { + t.Fatalf("CreateWork diff (-got +want):\n%s", diff) + } + + var gotEtags []api.ArtifactPartETag + for _, wu := range work { + etag, err := wu.DoWork(ctx) + if err != nil { + t.Errorf("bkUploaderWork.DoWork(ctx) = %v", err) } + gotEtags = append(gotEtags, *etag) + } + + wantEtags := []api.ArtifactPartETag{ + {PartNumber: 1, ETag: `"part number 1"`}, + {PartNumber: 2, ETag: `"part number 2"`}, + {PartNumber: 3, ETag: `"part number 3"`}, + } + if diff := cmp.Diff(gotEtags, wantEtags); diff != "" { + t.Errorf("etags diff (-got +want):\n%s", diff) } }) } @@ -161,8 +271,8 @@ func TestFormUploadFileMissing(t *testing.T) { } for _, wu := range work { - if err := wu.DoWork(ctx); !errors.Is(err, fs.ErrNotExist) { - t.Errorf("bkUploaderWork.DoWork(artifact) = %v, want %v", err, fs.ErrNotExist) + if _, err := wu.DoWork(ctx); !errors.Is(err, fs.ErrNotExist) { + t.Errorf("bkUploaderWork.DoWork(ctx) = %v, want %v", err, fs.ErrNotExist) } } } diff --git a/internal/artifact/gs_uploader.go b/internal/artifact/gs_uploader.go index 06445cc2e5..11bfdaf446 100644 --- a/internal/artifact/gs_uploader.go +++ b/internal/artifact/gs_uploader.go @@ -137,7 +137,7 @@ func (u *gsUploaderWork) Description() string { return singleUnitDescription(u.artifact) } -func (u *gsUploaderWork) DoWork(_ context.Context) error { +func (u *gsUploaderWork) DoWork(_ context.Context) (*api.ArtifactPartETag, error) { permission := os.Getenv("BUILDKITE_GS_ACL") // The dirtiest validation method ever... @@ -147,7 +147,7 @@ func (u *gsUploaderWork) DoWork(_ context.Context) error { permission != "projectPrivate" && permission != "publicRead" && permission != "publicReadWrite" { - return fmt.Errorf("Invalid GS ACL `%s`", permission) + return nil, fmt.Errorf("Invalid GS ACL `%s`", permission) } if permission == "" { @@ -164,7 +164,7 @@ func (u *gsUploaderWork) DoWork(_ context.Context) error { } file, err := os.Open(u.artifact.AbsolutePath) if err != nil { - return errors.New(fmt.Sprintf("Failed to open file %q (%v)", u.artifact.AbsolutePath, err)) + return nil, errors.New(fmt.Sprintf("Failed to open file %q (%v)", u.artifact.AbsolutePath, err)) } call := u.service.Objects.Insert(u.BucketName, object) if permission != "" { @@ -173,10 +173,10 @@ func (u *gsUploaderWork) DoWork(_ context.Context) error { if res, err := call.Media(file, googleapi.ContentType("")).Do(); err == nil { u.logger.Debug("Created object %v at location %v\n\n", res.Name, res.SelfLink) } else { - return errors.New(fmt.Sprintf("Failed to PUT file %q (%v)", u.artifactPath(u.artifact), err)) + return nil, errors.New(fmt.Sprintf("Failed to PUT file %q (%v)", u.artifactPath(u.artifact), err)) } - return nil + return nil, nil } func (u *GSUploader) artifactPath(artifact *api.Artifact) string { diff --git a/internal/artifact/s3_uploader.go b/internal/artifact/s3_uploader.go index 7cb41d3735..b13b6c1718 100644 --- a/internal/artifact/s3_uploader.go +++ b/internal/artifact/s3_uploader.go @@ -99,10 +99,10 @@ func (u *s3UploaderWork) Description() string { return singleUnitDescription(u.artifact) } -func (u *s3UploaderWork) DoWork(context.Context) error { +func (u *s3UploaderWork) DoWork(context.Context) (*api.ArtifactPartETag, error) { permission, err := u.resolvePermission() if err != nil { - return err + return nil, err } // Create an uploader with the session and default options @@ -112,7 +112,7 @@ func (u *s3UploaderWork) DoWork(context.Context) error { u.logger.Debug("Reading file %q", u.artifact.AbsolutePath) f, err := os.Open(u.artifact.AbsolutePath) if err != nil { - return fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err) + return nil, fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err) } // Upload the file to S3. @@ -131,8 +131,7 @@ func (u *s3UploaderWork) DoWork(context.Context) error { } _, err = uploader.Upload(params) - - return err + return nil, err } func (u *S3Uploader) artifactPath(artifact *api.Artifact) string { diff --git a/internal/artifact/uploader.go b/internal/artifact/uploader.go index d0c2062e88..baa584bc94 100644 --- a/internal/artifact/uploader.go +++ b/internal/artifact/uploader.go @@ -1,6 +1,7 @@ package artifact import ( + "cmp" "context" "crypto/sha1" "crypto/sha256" @@ -11,6 +12,7 @@ import ( "os" "path/filepath" "runtime" + "slices" "strings" "sync" "time" @@ -51,6 +53,9 @@ type UploaderConfig struct { // Whether to not upload symlinks UploadSkipSymlinks bool + + // Whether to allow multipart uploads to the BK-hosted bucket + AllowMultipart bool } type Uploader struct { @@ -103,6 +108,7 @@ func (a *Uploader) Upload(ctx context.Context) error { Artifacts: artifacts, UploadDestination: a.conf.Destination, CreateArtifactsTimeout: 10 * time.Second, + AllowMultipart: a.conf.AllowMultipart, }) artifacts, err = batchCreator.Create(ctx) if err != nil { @@ -449,73 +455,75 @@ type workUnit interface { Description() string // DoWork does the work. - DoWork(context.Context) error + DoWork(context.Context) (*api.ArtifactPartETag, error) } -const ( - artifactStateFinished = "finished" - artifactStateError = "error" -) - -// Messages passed on channels between goroutines. - -// workUnitError is just a tuple (workUnit, error). -type workUnitError struct { +// workUnitResult is just a tuple (workUnit, partETag | error). +type workUnitResult struct { workUnit workUnit + partETag *api.ArtifactPartETag err error } -// artifactWorkUnits is just a tuple (artifact, int). -type artifactWorkUnits struct { - artifact *api.Artifact - workUnits int +// artifactUploadWorker contains the shared state between the worker goroutines +// and the state uploader. +type artifactUploadWorker struct { + *Uploader + + // Counts the worker goroutines. + wg sync.WaitGroup + + // A tracker for every artifact. + // The map is written at the start of upload, and other goroutines only read + // afterwards. + trackers map[*api.Artifact]*artifactTracker } -// cancelCtx stores a context cancelled when part of an artifact upload has -// failed and needs to fail the whole artifact. -// Go readability notes: "Storing" a context? -// - In a long-lived struct: yeah nah 🙅. Read pkg.go.dev/context -// - Within a single operation ("upload", in this case): nah yeah 👍 -type cancelCtx struct { +// artifactTracker tracks the amount of work pending for an artifact. +type artifactTracker struct { + // All the work for uploading this artifact. + work []workUnit + + // Normally storing a context in a struct is a bad idea. It's explicitly + // called out in pkg.go.dev/context as a no-no (unless you are required to + // store a context for some reason like interface implementation or + // backwards compatibility). + // + // The main reason is that contexts are intended to be associated with a + // clear chain of function calls (i.e. work), and having contexts stored in + // a struct somewhere means there's confusion over which context should be + // used for a given function call. (Do you use one passed in, or from a + // struct? Do you listen for Done on both? Which context values apply?) + // + // So here's how this context is situated within the chain of work, and + // how it will be used: + // + // This context applies to all the work units for this artifact. + // It is a child context of the context passed to Uploader.upload, which + // should be exactly the same context that Uploader.upload passes to + // artifactUploadWorker. + // Canceling this context cancels all work associated with this artifact + // (across the fixed-size pool of worker goroutines). ctx context.Context cancel context.CancelCauseFunc -} - -func (a *Uploader) upload(ctx context.Context, artifacts []*api.Artifact, uploader workCreator) error { - perArtifactCtx := make(map[*api.Artifact]cancelCtx) - for _, artifact := range artifacts { - actx, cancel := context.WithCancelCause(ctx) - perArtifactCtx[artifact] = cancelCtx{actx, cancel} - } - // workUnitStateCh: multiple worker goroutines --(work unit state)--> state updater - workUnitStateCh := make(chan workUnitError) - // artifactWorkUnitsCh: work unit creation --(# work units for artifact)--> state updater - artifactWorkUnitsCh := make(chan artifactWorkUnits) - // workUnitsCh: work unit creation --(work unit to be run)--> multiple worker goroutines - workUnitsCh := make(chan workUnit) - // stateUpdatesDoneCh: closed when all state updates are complete - stateUpdatesDoneCh := make(chan struct{}) + // pendingWork is the number of incomplete units for this artifact. + // This is set once at the start of upload, and then decremented by the + // state updater goroutine as work units complete. + pendingWork int - // The status updater goroutine: updates batches of artifact states on - // Buildkite every few seconds. - var errs []error - go func() { - errs = a.statusUpdater(ctx, workUnitStateCh, artifactWorkUnitsCh, stateUpdatesDoneCh) - }() + // State that will be uploaded to BK when the artifact is finished or errored. + // Only the state updater goroutine writes this. + api.ArtifactState +} - // Worker goroutines that work on work units. - var workerWG sync.WaitGroup - for range runtime.GOMAXPROCS(0) { - workerWG.Add(1) - go func() { - defer workerWG.Done() - a.uploadWorker(ctx, perArtifactCtx, workUnitsCh, workUnitStateCh) - }() +func (a *Uploader) upload(ctx context.Context, artifacts []*api.Artifact, uploader workCreator) error { + worker := &artifactUploadWorker{ + Uploader: a, + trackers: make(map[*api.Artifact]*artifactTracker), } - // Work creation: creates the work units for each artifact. - // This must happen after creating goroutines listening on the channels. + // Create work and trackers for each artifact. for _, artifact := range artifacts { workUnits, err := uploader.CreateWork(artifact) if err != nil { @@ -523,44 +531,64 @@ func (a *Uploader) upload(ctx context.Context, artifacts []*api.Artifact, upload return err } - // Send the number of work units for this artifact to the state uploader. - select { - case <-ctx.Done(): - return ctx.Err() - case artifactWorkUnitsCh <- artifactWorkUnits{artifact: artifact, workUnits: len(workUnits)}: + actx, acancel := context.WithCancelCause(ctx) + worker.trackers[artifact] = &artifactTracker{ + ctx: actx, + cancel: acancel, + work: workUnits, + pendingWork: len(workUnits), + ArtifactState: api.ArtifactState{ + ID: artifact.ID, + Multipart: len(workUnits) > 1, + }, } + } + + // unitsCh: work unit creation --(work unit to be run)--> multiple worker goroutines + unitsCh := make(chan workUnit) + // resultsCh: multiple worker goroutines --(work unit result)--> state updater + resultsCh := make(chan workUnitResult) + // errCh: receives the final error from the status updater + errCh := make(chan error, 1) - // Send the work units themselves to the workers. - for _, workUnit := range workUnits { + // The status updater goroutine: updates batches of artifact states on + // Buildkite every few seconds. + go worker.stateUpdater(ctx, resultsCh, errCh) + + // Worker goroutines that work on work units. + for range runtime.GOMAXPROCS(0) { + worker.wg.Add(1) + go worker.doWorkUnits(ctx, unitsCh, resultsCh) + } + + // Send the work units for each artifact to the workers. + // This must happen after creating worker goroutines listening on workUnitsCh. + for _, tracker := range worker.trackers { + for _, workUnit := range tracker.work { select { case <-ctx.Done(): return ctx.Err() - case workUnitsCh <- workUnit: + case unitsCh <- workUnit: } } } - // All work units have been sent to workers, and all counts of pending work - // units have been sent to the state updater. - close(workUnitsCh) - close(artifactWorkUnitsCh) + // All work units have been sent to workers. + close(unitsCh) a.logger.Debug("Waiting for uploads to complete...") // Wait for the workers to finish - workerWG.Wait() + worker.wg.Wait() // Since the workers are done, all work unit states have been sent to the // state updater. - close(workUnitStateCh) + close(resultsCh) a.logger.Debug("Uploads complete, waiting for upload status to be sent to Buildkite...") // Wait for the statuses to finish uploading - <-stateUpdatesDoneCh - - if len(errs) > 0 { - err := errors.Join(errs...) + if err := <-errCh; err != nil { return fmt.Errorf("errors uploading artifacts: %w", err) } @@ -569,176 +597,194 @@ func (a *Uploader) upload(ctx context.Context, artifacts []*api.Artifact, upload return nil } -func (a *Uploader) uploadWorker( - ctx context.Context, - perArtifactCtx map[*api.Artifact]cancelCtx, - workUnitsCh <-chan workUnit, - workUnitStateCh chan<- workUnitError, -) { +func (a *artifactUploadWorker) doWorkUnits(ctx context.Context, unitsCh <-chan workUnit, resultsCh chan<- workUnitResult) { + defer a.wg.Done() + for { select { case <-ctx.Done(): return - case workUnit, open := <-workUnitsCh: + case workUnit, open := <-unitsCh: if !open { return // Done } - artifact := workUnit.Artifact() - actx := perArtifactCtx[artifact].ctx + tracker := a.trackers[workUnit.Artifact()] // Show a nice message that we're starting to upload the file a.logger.Info("Uploading %s", workUnit.Description()) // Upload the artifact and then set the state depending // on whether or not it passed. We'll retry the upload // a couple of times before giving up. - err := roko.NewRetrier( + r := roko.NewRetrier( roko.WithMaxAttempts(10), roko.WithStrategy(roko.Constant(5*time.Second)), - ).DoWithContext(actx, func(r *roko.Retrier) error { - if err := workUnit.DoWork(actx); err != nil { + ) + partETag, err := roko.DoFunc(tracker.ctx, r, func(r *roko.Retrier) (*api.ArtifactPartETag, error) { + etag, err := workUnit.DoWork(tracker.ctx) + if err != nil { a.logger.Warn("%s (%s)", err, r) - return err } - return nil + return etag, err }) // If it failed, abort any other work items for this artifact. if err != nil { - a.logger.Info("Upload failed for %s", workUnit.Description()) - perArtifactCtx[workUnit.Artifact()].cancel(err) + a.logger.Info("Upload failed for %s: %v", workUnit.Description(), err) + tracker.cancel(err) + // then the error is sent to the status updater } // Let the state updater know how the work went. select { - case <-ctx.Done(): // the main context, not the artifact ctx - return // ctx.Err() + case <-ctx.Done(): // Note: the main context, not the artifact tracker context + return - case workUnitStateCh <- workUnitError{workUnit: workUnit, err: err}: + case resultsCh <- workUnitResult{workUnit: workUnit, partETag: partETag, err: err}: } } } } -func (a *Uploader) statusUpdater( - ctx context.Context, - workUnitStateCh <-chan workUnitError, - artifactWorkUnitsCh <-chan artifactWorkUnits, - doneCh chan<- struct{}, -) []error { - defer close(doneCh) - - // Errors that caused an artifact upload to fail, or a batch fail to update. +func (a *artifactUploadWorker) stateUpdater(ctx context.Context, resultsCh <-chan workUnitResult, stateUpdaterErrCh chan<- error) { var errs []error + defer func() { + if len(errs) > 0 { + stateUpdaterErrCh <- errors.Join(errs...) + } + close(stateUpdaterErrCh) + }() - // artifact -> number of work units that are incomplete - pendingWorkUnits := make(map[*api.Artifact]int) - - // States that haven't been updated on Buildkite yet. - statesToUpload := make(map[string]string) // artifact ID -> state - - // When this ticks, upload any pending artifact states. + // When this ticks, upload any pending artifact states as a batch. updateTicker := time.NewTicker(1 * time.Second) +selectLoop: for { select { case <-ctx.Done(): - return errs + return case <-updateTicker.C: - if len(statesToUpload) == 0 { // no news from the frontier - break - } - // Post an update - timeout := 5 * time.Second - - // Update the states of the artifacts in bulk. - err := roko.NewRetrier( - roko.WithMaxAttempts(10), - roko.WithStrategy(roko.ExponentialSubsecond(500*time.Millisecond)), - ).DoWithContext(ctx, func(r *roko.Retrier) error { - ctxTimeout := ctx - if timeout != 0 { - var cancel func() - ctxTimeout, cancel = context.WithTimeout(ctx, timeout) - defer cancel() - } - - _, err := a.apiClient.UpdateArtifacts(ctxTimeout, a.conf.JobID, statesToUpload) - if err != nil { - a.logger.Warn("%s (%s)", err, r) - } - - // after four attempts (0, 1, 2, 3)... - if r.AttemptCount() == 3 { - // The short timeout has given us fast feedback on the first couple of attempts, - // but perhaps the server needs more time to complete the request, so fall back to - // the default HTTP client timeout. - a.logger.Debug("UpdateArtifacts timeout (%s) removed for subsequent attempts", timeout) - timeout = 0 - } - - return err - }) - if err != nil { - a.logger.Error("Error updating artifact states: %s", err) + // Note: updateStates removes trackers for completed states. + if err := a.updateStates(ctx); err != nil { errs = append(errs, err) } - a.logger.Debug("Updated %d artifact states", len(statesToUpload)) - clear(statesToUpload) - - case awu, open := <-artifactWorkUnitsCh: + case result, open := <-resultsCh: if !open { - // Set it to nil so this select branch never happens again. - artifactWorkUnitsCh = nil - // If both input channels are nil, we're done! - if workUnitStateCh == nil { - return errs - } + // No more input: we're done! + break selectLoop } + artifact := result.workUnit.Artifact() + tracker := a.trackers[artifact] - // Track how many pending work units there should be per artifact. - // Use += in case some work units for this artifact already completed. - pendingWorkUnits[awu.artifact] += awu.workUnits - if pendingWorkUnits[awu.artifact] != 0 { - break - } - // The whole artifact is complete, add it to the next batch of - // states to upload. - statesToUpload[awu.artifact.ID] = artifactStateFinished - a.logger.Debug("Artifact `%s` has entered state `%s`", awu.artifact.ID, artifactStateFinished) - - case workUnitState, open := <-workUnitStateCh: - if !open { - // Set it to nil so this select branch never happens again. - workUnitStateCh = nil - // If both input channels are nil, we're done! - if artifactWorkUnitsCh == nil { - return errs - } - } - artifact := workUnitState.workUnit.Artifact() - if workUnitState.err != nil { + if result.err != nil { // The work unit failed, so the whole artifact upload has failed. - errs = append(errs, workUnitState.err) - statesToUpload[artifact.ID] = artifactStateError - a.logger.Debug("Artifact `%s` has entered state `%s`", artifact.ID, artifactStateError) - break + errs = append(errs, result.err) + tracker.State = "error" + a.logger.Debug("Artifact %s has entered state %s", tracker.ID, tracker.State) + continue } + // The work unit is complete - it's no longer pending. - pendingWorkUnits[artifact]-- - if pendingWorkUnits[artifact] != 0 { - break + if partETag := result.partETag; partETag != nil { + tracker.MultipartETags = append(tracker.MultipartETags, *partETag) + } + + tracker.pendingWork-- + if tracker.pendingWork > 0 { + continue } + // No pending units remain, so the whole artifact is complete. // Add it to the next batch of states to upload. - statesToUpload[artifact.ID] = artifactStateFinished - a.logger.Debug("Artifact `%s` has entered state `%s`", artifact.ID, artifactStateFinished) + tracker.State = "finished" + a.logger.Debug("Artifact %s has entered state %s", tracker.ID, tracker.State) + } + } + + // Upload any remaining states. + if err := a.updateStates(ctx); err != nil { + errs = append(errs, err) + } + return +} + +// updateStates uploads terminal artifact states to Buildkite in a batch. +func (a *artifactUploadWorker) updateStates(ctx context.Context) error { + // Only upload states that are finished or error. + var statesToUpload []api.ArtifactState + var trackersToMarkSent []*artifactTracker + for _, tracker := range a.trackers { + switch tracker.State { + case "finished", "error": + // Only send these states. + default: + continue } + // This artifact is complete, move it from a.trackers to statesToUpload. + statesToUpload = append(statesToUpload, tracker.ArtifactState) + trackersToMarkSent = append(trackersToMarkSent, tracker) } + + if len(statesToUpload) == 0 { // no news from the frontier + return nil + } + + for _, state := range statesToUpload { + // Ensure ETags are in ascending order by part number. + // This is required by S3. + slices.SortFunc(state.MultipartETags, func(a, b api.ArtifactPartETag) int { + return cmp.Compare(a.PartNumber, b.PartNumber) + }) + } + + // Post the update + timeout := 5 * time.Second + + // Update the states of the artifacts in bulk. + err := roko.NewRetrier( + roko.WithMaxAttempts(10), + roko.WithStrategy(roko.ExponentialSubsecond(500*time.Millisecond)), + ).DoWithContext(ctx, func(r *roko.Retrier) error { + ctxTimeout := ctx + if timeout != 0 { + var cancel func() + ctxTimeout, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + + _, err := a.apiClient.UpdateArtifacts(ctxTimeout, a.conf.JobID, statesToUpload) + if err != nil { + a.logger.Warn("%s (%s)", err, r) + } + + // after four attempts (0, 1, 2, 3)... + if r.AttemptCount() == 3 { + // The short timeout has given us fast feedback on the first couple of attempts, + // but perhaps the server needs more time to complete the request, so fall back to + // the default HTTP client timeout. + a.logger.Debug("UpdateArtifacts timeout (%s) removed for subsequent attempts", timeout) + timeout = 0 + } + + return err + }) + if err != nil { + a.logger.Error("Error updating artifact states: %v", err) + return err + } + + for _, tracker := range trackersToMarkSent { + // Don't send this state again. + tracker.State = "sent" + } + a.logger.Debug("Updated %d artifact states", len(statesToUpload)) + return nil } +// singleUnitDescription can be used by uploader implementations to describe +// artifact uploads consisting of a single work unit. func singleUnitDescription(artifact *api.Artifact) string { return fmt.Sprintf("%s %s (%s)", artifact.ID, artifact.Path, humanize.IBytes(uint64(artifact.FileSize))) }