diff --git a/agent/agent_configuration.go b/agent/agent_configuration.go index 0655437e57..f27597de24 100644 --- a/agent/agent_configuration.go +++ b/agent/agent_configuration.go @@ -43,23 +43,24 @@ type AgentConfiguration struct { VerificationJWKS any // The set of keys to verify jobs with VerificationFailureBehaviour string // What to do if job verification fails (one of `block` or `warn`) - ANSITimestamps bool - TimestampLines bool - HealthCheckAddr string - DisconnectAfterJob bool - DisconnectAfterIdleTimeout int - CancelGracePeriod int - SignalGracePeriod time.Duration - EnableJobLogTmpfile bool - JobLogPath string - WriteJobLogsToStdout bool - LogFormat string - Shell string - Profile string - RedactedVars []string - AcquireJob string - TracingBackend string - TracingServiceName string - TraceContextEncoding string - DisableWarningsFor []string + ANSITimestamps bool + TimestampLines bool + HealthCheckAddr string + DisconnectAfterJob bool + DisconnectAfterIdleTimeout int + CancelGracePeriod int + SignalGracePeriod time.Duration + EnableJobLogTmpfile bool + JobLogPath string + WriteJobLogsToStdout bool + LogFormat string + Shell string + Profile string + RedactedVars []string + AcquireJob string + TracingBackend string + TracingServiceName string + TraceContextEncoding string + DisableWarningsFor []string + AllowMultipartArtifactUpload bool } diff --git a/agent/api.go b/agent/api.go index 422cec6a84..ac6bed6f60 100644 --- a/agent/api.go +++ b/agent/api.go @@ -37,7 +37,7 @@ type APIClient interface { StartJob(context.Context, *api.Job) (*api.Response, error) StepExport(context.Context, string, *api.StepExportRequest) (*api.StepExportResponse, *api.Response, error) StepUpdate(context.Context, string, *api.StepUpdate) (*api.Response, error) - UpdateArtifacts(context.Context, string, map[string]string) (*api.Response, error) + UpdateArtifacts(context.Context, string, []api.ArtifactState) (*api.Response, error) UploadChunk(context.Context, string, *api.Chunk) (*api.Response, error) UploadPipeline(context.Context, string, *api.PipelineChange, ...api.Header) (*api.Response, error) } diff --git a/agent/job_runner.go b/agent/job_runner.go index 7c31e0efbc..a5e5d6ca8e 100644 --- a/agent/job_runner.go +++ b/agent/job_runner.go @@ -534,6 +534,10 @@ func (r *JobRunner) createEnvironment(ctx context.Context) ([]string, error) { env["BUILDKITE_KUBERNETES_EXEC"] = "true" } + if !r.conf.AgentConfiguration.AllowMultipartArtifactUpload { + env["BUILDKITE_NO_MULTIPART_ARTIFACT_UPLOAD"] = "true" + } + // propagate CancelSignal to bootstrap, unless it's the default SIGTERM if r.conf.CancelSignal != process.SIGTERM { env["BUILDKITE_CANCEL_SIGNAL"] = r.conf.CancelSignal.String() diff --git a/api/artifacts.go b/api/artifacts.go index 3554971165..351345de9d 100644 --- a/api/artifacts.go +++ b/api/artifacts.go @@ -51,25 +51,47 @@ type Artifact struct { } type ArtifactBatch struct { - ID string `json:"id"` - Artifacts []*Artifact `json:"artifacts"` - UploadDestination string `json:"upload_destination"` + ID string `json:"id"` + Artifacts []*Artifact `json:"artifacts"` + UploadDestination string `json:"upload_destination"` + MultipartSupported bool `json:"multipart_supported,omitempty"` } +// ArtifactUploadInstructions describes how to upload an artifact to Buildkite +// artifact storage. type ArtifactUploadInstructions struct { - Data map[string]string `json:"data"` - Action struct { - URL string `json:"url,omitempty"` - Method string `json:"method"` - Path string `json:"path"` - FileInput string `json:"file_input"` - } + // Used for a single-part upload. + Action ArtifactUploadAction `json:"action"` + + // Used for a multi-part upload. + Actions []ArtifactUploadAction `json:"actions"` + + // Contains other data necessary for interpreting instructions. + Data map[string]string `json:"data"` +} + +// ArtifactUploadAction describes one action needed to upload an artifact or +// part of an artifact to Buildkite artifact storage. +type ArtifactUploadAction struct { + URL string `json:"url,omitempty"` + Method string `json:"method"` + Path string `json:"path"` + FileInput string `json:"file_input"` + PartNumber int `json:"part_number,omitempty"` } type ArtifactBatchCreateResponse struct { - ID string `json:"id"` - ArtifactIDs []string `json:"artifact_ids"` - UploadInstructions *ArtifactUploadInstructions `json:"upload_instructions"` + ID string `json:"id"` + ArtifactIDs []string `json:"artifact_ids"` + + // These instructions apply to all artifacts. The template contains + // variable interpolations such as ${artifact:path}. + InstructionsTemplate *ArtifactUploadInstructions `json:"upload_instructions"` + + // These instructions apply to specific artifacts, necessary for multipart + // uploads. It overrides InstructionTemplate and should not contain + // interpolations. Map: artifact ID -> instructions for that artifact. + PerArtifactInstructions map[string]*ArtifactUploadInstructions `json:"per_artifact_instructions"` } // ArtifactSearchOptions specifies the optional parameters to the @@ -82,18 +104,29 @@ type ArtifactSearchOptions struct { IncludeDuplicates bool `url:"include_duplicates,omitempty"` } -type ArtifactBatchUpdateArtifact struct { - ID string `json:"id"` - State string `json:"state"` +// ArtifactState represents the state of a single artifact, when calling UpdateArtifacts. +type ArtifactState struct { + ID string `json:"id"` + State string `json:"state"` + Multipart bool `json:"multipart,omitempty"` + // If this artifact was a multipart upload and is complete, we need the + // the ETag from each uploaded part so that they can be joined together. + MultipartETags []ArtifactPartETag `json:"multipart_etags,omitempty"` +} + +// ArtifactPartETag associates an ETag to a part number for a multipart upload. +type ArtifactPartETag struct { + PartNumber int `json:"part_number"` + ETag string `json:"etag"` } type ArtifactBatchUpdateRequest struct { - Artifacts []*ArtifactBatchUpdateArtifact `json:"artifacts"` + Artifacts []ArtifactState `json:"artifacts"` } // CreateArtifacts takes a slice of artifacts, and creates them on Buildkite as a batch. -func (c *Client) CreateArtifacts(ctx context.Context, jobId string, batch *ArtifactBatch) (*ArtifactBatchCreateResponse, *Response, error) { - u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobId)) +func (c *Client) CreateArtifacts(ctx context.Context, jobID string, batch *ArtifactBatch) (*ArtifactBatchCreateResponse, *Response, error) { + u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobID)) req, err := c.newRequest(ctx, "POST", u, batch) if err != nil { @@ -109,13 +142,11 @@ func (c *Client) CreateArtifacts(ctx context.Context, jobId string, batch *Artif return createResponse, resp, err } -// Updates a particular artifact -func (c *Client) UpdateArtifacts(ctx context.Context, jobId string, artifactStates map[string]string) (*Response, error) { - u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobId)) - payload := ArtifactBatchUpdateRequest{} - - for id, state := range artifactStates { - payload.Artifacts = append(payload.Artifacts, &ArtifactBatchUpdateArtifact{id, state}) +// UpdateArtifacts updates Buildkite with one or more artifact states. +func (c *Client) UpdateArtifacts(ctx context.Context, jobID string, artifactStates []ArtifactState) (*Response, error) { + u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobID)) + payload := ArtifactBatchUpdateRequest{ + Artifacts: artifactStates, } req, err := c.newRequest(ctx, "PUT", u, payload) @@ -123,17 +154,12 @@ func (c *Client) UpdateArtifacts(ctx context.Context, jobId string, artifactStat return nil, err } - resp, err := c.doRequest(req, nil) - if err != nil { - return resp, err - } - - return resp, err + return c.doRequest(req, nil) } // SearchArtifacts searches Buildkite for a set of artifacts -func (c *Client) SearchArtifacts(ctx context.Context, buildId string, opt *ArtifactSearchOptions) ([]*Artifact, *Response, error) { - u := fmt.Sprintf("builds/%s/artifacts/search", railsPathEscape(buildId)) +func (c *Client) SearchArtifacts(ctx context.Context, buildID string, opt *ArtifactSearchOptions) ([]*Artifact, *Response, error) { + u := fmt.Sprintf("builds/%s/artifacts/search", railsPathEscape(buildID)) u, err := addOptions(u, opt) if err != nil { return nil, nil, err diff --git a/clicommand/agent_start.go b/clicommand/agent_start.go index 157bb2a22c..6ff12c1e7e 100644 --- a/clicommand/agent_start.go +++ b/clicommand/agent_start.go @@ -167,14 +167,15 @@ type AgentStartConfig struct { TracingServiceName string `cli:"tracing-service-name"` // Global flags - Debug bool `cli:"debug"` - LogLevel string `cli:"log-level"` - NoColor bool `cli:"no-color"` - Experiments []string `cli:"experiment" normalize:"list"` - Profile string `cli:"profile"` - StrictSingleHooks bool `cli:"strict-single-hooks"` - KubernetesExec bool `cli:"kubernetes-exec"` - TraceContextEncoding string `cli:"trace-context-encoding"` + Debug bool `cli:"debug"` + LogLevel string `cli:"log-level"` + NoColor bool `cli:"no-color"` + Experiments []string `cli:"experiment" normalize:"list"` + Profile string `cli:"profile"` + StrictSingleHooks bool `cli:"strict-single-hooks"` + KubernetesExec bool `cli:"kubernetes-exec"` + TraceContextEncoding string `cli:"trace-context-encoding"` + NoMultipartArtifactUpload bool `cli:"no-multipart-artifact-upload"` // API config DebugHTTP bool `cli:"debug-http"` @@ -704,6 +705,7 @@ var AgentStartCommand = cli.Command{ StrictSingleHooksFlag, KubernetesExecFlag, TraceContextEncodingFlag, + NoMultipartArtifactUploadFlag, // Deprecated flags which will be removed in v4 cli.StringSliceFlag{ @@ -994,7 +996,7 @@ var AgentStartCommand = cli.Command{ TracingBackend: cfg.TracingBackend, TracingServiceName: cfg.TracingServiceName, TraceContextEncoding: cfg.TraceContextEncoding, - VerificationFailureBehaviour: cfg.VerificationFailureBehavior, + AllowMultipartArtifactUpload: !cfg.NoMultipartArtifactUpload, KubernetesExec: cfg.KubernetesExec, SigningJWKSFile: cfg.SigningJWKSFile, @@ -1002,7 +1004,8 @@ var AgentStartCommand = cli.Command{ SigningAWSKMSKey: cfg.SigningAWSKMSKey, DebugSigning: cfg.DebugSigning, - VerificationJWKS: verificationJWKS, + VerificationJWKS: verificationJWKS, + VerificationFailureBehaviour: cfg.VerificationFailureBehavior, DisableWarningsFor: cfg.DisableWarningsFor, } diff --git a/clicommand/artifact_upload.go b/clicommand/artifact_upload.go index 73e3f1709b..98ba6345d6 100644 --- a/clicommand/artifact_upload.go +++ b/clicommand/artifact_upload.go @@ -88,6 +88,7 @@ type ArtifactUploadConfig struct { // Uploader flags GlobResolveFollowSymlinks bool `cli:"glob-resolve-follow-symlinks"` UploadSkipSymlinks bool `cli:"upload-skip-symlinks"` + NoMultipartUpload bool `cli:"no-multipart-artifact-upload"` // deprecated FollowSymlinks bool `cli:"follow-symlinks" deprecated-and-renamed-to:"GlobResolveFollowSymlinks"` @@ -138,6 +139,7 @@ var ArtifactUploadCommand = cli.Command{ LogLevelFlag, ExperimentsFlag, ProfileFlag, + NoMultipartArtifactUploadFlag, }, Action: func(c *cli.Context) error { ctx := context.Background() @@ -155,6 +157,8 @@ var ArtifactUploadCommand = cli.Command{ ContentType: cfg.ContentType, DebugHTTP: cfg.DebugHTTP, + AllowMultipart: !cfg.NoMultipartUpload, + // If the deprecated flag was set to true, pretend its replacement was set to true too // this works as long as the user only sets one of the two flags GlobResolveFollowSymlinks: (cfg.GlobResolveFollowSymlinks || cfg.FollowSymlinks), diff --git a/clicommand/global.go b/clicommand/global.go index fb5cd6eaf2..08902d98b0 100644 --- a/clicommand/global.go +++ b/clicommand/global.go @@ -100,6 +100,12 @@ var ( EnvVar: "BUILDKITE_KUBERNETES_EXEC", } + NoMultipartArtifactUploadFlag = cli.BoolFlag{ + Name: "no-multipart-artifact-upload", + Usage: "For Buildkite-hosted artifacts, disables the use of multipart uploads. Has no effect on uploads to other destinations such as custom cloud buckets", + EnvVar: "BUILDKITE_NO_MULTIPART_ARTIFACT_UPLOAD", + } + ExperimentsFlag = cli.StringSliceFlag{ Name: "experiment", Value: &cli.StringSlice{}, diff --git a/internal/artifact/api_client.go b/internal/artifact/api_client.go index 6b4e72af78..f27bca2e10 100644 --- a/internal/artifact/api_client.go +++ b/internal/artifact/api_client.go @@ -10,5 +10,5 @@ import ( type APIClient interface { CreateArtifacts(context.Context, string, *api.ArtifactBatch) (*api.ArtifactBatchCreateResponse, *api.Response, error) SearchArtifacts(context.Context, string, *api.ArtifactSearchOptions) ([]*api.Artifact, *api.Response, error) - UpdateArtifacts(context.Context, string, map[string]string) (*api.Response, error) + UpdateArtifacts(context.Context, string, []api.ArtifactState) (*api.Response, error) } diff --git a/internal/artifact/artifactory_uploader.go b/internal/artifact/artifactory_uploader.go index d99ea29d73..f35da80ebb 100644 --- a/internal/artifact/artifactory_uploader.go +++ b/internal/artifact/artifactory_uploader.go @@ -99,50 +99,64 @@ func (u *ArtifactoryUploader) URL(artifact *api.Artifact) string { return url.String() } -func (u *ArtifactoryUploader) Upload(_ context.Context, artifact *api.Artifact) error { +func (u *ArtifactoryUploader) CreateWork(artifact *api.Artifact) ([]workUnit, error) { + return []workUnit{&artifactoryUploaderWork{ + ArtifactoryUploader: u, + artifact: artifact, + }}, nil +} + +type artifactoryUploaderWork struct { + *ArtifactoryUploader + artifact *api.Artifact +} + +func (u *artifactoryUploaderWork) Artifact() *api.Artifact { return u.artifact } + +func (u *artifactoryUploaderWork) Description() string { + return singleUnitDescription(u.artifact) +} + +func (u *artifactoryUploaderWork) DoWork(context.Context) (*api.ArtifactPartETag, error) { // Open file from filesystem - u.logger.Debug("Reading file \"%s\"", artifact.AbsolutePath) - f, err := os.Open(artifact.AbsolutePath) + u.logger.Debug("Reading file %q", u.artifact.AbsolutePath) + f, err := os.Open(u.artifact.AbsolutePath) if err != nil { - return fmt.Errorf("failed to open file %q (%w)", artifact.AbsolutePath, err) + return nil, fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err) } // Upload the file to Artifactory. - u.logger.Debug("Uploading \"%s\" to `%s`", artifact.Path, u.URL(artifact)) + u.logger.Debug("Uploading %q to %q", u.artifact.Path, u.URL(u.artifact)) - req, err := http.NewRequest("PUT", u.URL(artifact), f) + req, err := http.NewRequest("PUT", u.URL(u.artifact), f) req.SetBasicAuth(u.user, u.password) if err != nil { - return err + return nil, err } - md5Checksum, err := checksumFile(md5.New(), artifact.AbsolutePath) + md5Checksum, err := checksumFile(md5.New(), u.artifact.AbsolutePath) if err != nil { - return err + return nil, err } req.Header.Add("X-Checksum-MD5", md5Checksum) - sha1Checksum, err := checksumFile(sha1.New(), artifact.AbsolutePath) + sha1Checksum, err := checksumFile(sha1.New(), u.artifact.AbsolutePath) if err != nil { - return err + return nil, err } req.Header.Add("X-Checksum-SHA1", sha1Checksum) - sha256Checksum, err := checksumFile(sha256.New(), artifact.AbsolutePath) + sha256Checksum, err := checksumFile(sha256.New(), u.artifact.AbsolutePath) if err != nil { - return err + return nil, err } req.Header.Add("X-Checksum-SHA256", sha256Checksum) res, err := u.client.Do(req) if err != nil { - return err - } - if err := checkResponse(res); err != nil { - return err + return nil, err } - - return nil + return nil, checkResponse(res) } func checksumFile(hasher hash.Hash, path string) (string, error) { diff --git a/internal/artifact/azure_blob_uploader.go b/internal/artifact/azure_blob_uploader.go index 6be24dab1f..ae9bfc1b19 100644 --- a/internal/artifact/azure_blob_uploader.go +++ b/internal/artifact/azure_blob_uploader.go @@ -88,20 +88,38 @@ func (u *AzureBlobUploader) URL(artifact *api.Artifact) string { return sasURL } -// Upload uploads an artifact file. -func (u *AzureBlobUploader) Upload(ctx context.Context, artifact *api.Artifact) error { - u.logger.Debug("Reading file %q", artifact.AbsolutePath) - f, err := os.Open(artifact.AbsolutePath) +func (u *AzureBlobUploader) CreateWork(artifact *api.Artifact) ([]workUnit, error) { + return []workUnit{&azureBlobUploaderWork{ + AzureBlobUploader: u, + artifact: artifact, + }}, nil +} + +type azureBlobUploaderWork struct { + *AzureBlobUploader + artifact *api.Artifact +} + +func (u *azureBlobUploaderWork) Artifact() *api.Artifact { return u.artifact } + +func (u *azureBlobUploaderWork) Description() string { + return singleUnitDescription(u.artifact) +} + +// DoWork uploads an artifact file. +func (u *azureBlobUploaderWork) DoWork(ctx context.Context) (*api.ArtifactPartETag, error) { + u.logger.Debug("Reading file %q", u.artifact.AbsolutePath) + f, err := os.Open(u.artifact.AbsolutePath) if err != nil { - return fmt.Errorf("failed to open file %q (%w)", artifact.AbsolutePath, err) + return nil, fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err) } defer f.Close() - blobName := path.Join(u.loc.BlobPath, artifact.Path) + blobName := path.Join(u.loc.BlobPath, u.artifact.Path) - u.logger.Debug("Uploading %s to %s", artifact.Path, u.loc.URL(blobName)) + u.logger.Debug("Uploading %s to %s", u.artifact.Path, u.loc.URL(blobName)) bbc := u.client.NewContainerClient(u.loc.ContainerName).NewBlockBlobClient(blobName) _, err = bbc.UploadFile(ctx, f, nil) - return err + return nil, err } diff --git a/internal/artifact/batch_creator.go b/internal/artifact/batch_creator.go index 099dc5229e..f761ae950b 100644 --- a/internal/artifact/batch_creator.go +++ b/internal/artifact/batch_creator.go @@ -22,6 +22,9 @@ type BatchCreatorConfig struct { // CreateArtifactsTimeout, sets a context.WithTimeout around the CreateArtifacts API. // If it's zero, there's no context timeout and the default HTTP timeout will prevail. CreateArtifactsTimeout time.Duration + + // Whether to allow multipart uploads to the BK-hosted bucket. + AllowMultipart bool } type BatchCreator struct { @@ -50,10 +53,7 @@ func (a *BatchCreator) Create(ctx context.Context) ([]*api.Artifact, error) { // Split into the artifacts into chunks so we're not uploading a ton of // files at once. for i := 0; i < length; i += chunks { - j := i + chunks - if length < j { - j = length - } + j := min(i+chunks, length) // The artifacts that will be uploaded in this chunk theseArtifacts := a.conf.Artifacts[i:j] @@ -63,9 +63,10 @@ func (a *BatchCreator) Create(ctx context.Context) ([]*api.Artifact, error) { // twice, it'll just return the previous data and skip the // upload) batch := &api.ArtifactBatch{ - ID: api.NewUUID(), - Artifacts: theseArtifacts, - UploadDestination: a.conf.UploadDestination, + ID: api.NewUUID(), + Artifacts: theseArtifacts, + UploadDestination: a.conf.UploadDestination, + MultipartSupported: a.conf.AllowMultipart, } a.logger.Info("Creating (%d-%d)/%d artifacts", i, j, length) @@ -114,11 +115,12 @@ func (a *BatchCreator) Create(ctx context.Context) ([]*api.Artifact, error) { } // Save the id and instructions to each artifact - index := 0 - for _, id := range creation.ArtifactIDs { + for index, id := range creation.ArtifactIDs { theseArtifacts[index].ID = id - theseArtifacts[index].UploadInstructions = creation.UploadInstructions - index += 1 + theseArtifacts[index].UploadInstructions = creation.InstructionsTemplate + if specific := creation.PerArtifactInstructions[id]; specific != nil { + theseArtifacts[index].UploadInstructions = specific + } } } diff --git a/internal/artifact/bk_uploader.go b/internal/artifact/bk_uploader.go new file mode 100644 index 0000000000..b0e4dcbdf1 --- /dev/null +++ b/internal/artifact/bk_uploader.go @@ -0,0 +1,419 @@ +package artifact + +import ( + "bytes" + "cmp" + "context" + _ "crypto/sha512" // import sha512 to make sha512 ssl certs work + "errors" + "fmt" + "io" + "mime/multipart" + "net/http" + "net/http/httptrace" + "net/http/httputil" + "net/url" + "os" + "slices" + "strings" + + "github.com/buildkite/agent/v3/api" + "github.com/buildkite/agent/v3/logger" + "github.com/buildkite/agent/v3/version" + "github.com/dustin/go-humanize" +) + +const artifactPathVariable = "${artifact:path}" + +const ( + // BKUploader uploads to S3 either as: + // - a single signed POST, which has a hard limit of 5GB, or + // - as a signed multipart, which has a limit of 5GB per _part_, but we + // aren't supporting larger artifacts yet. + maxFormUploadedArtifactSize = int64(5 * 1024 * 1024 * 1024) + + // Multipart parts have a minimum size of 5MB. + minPartSize = int64(5 * 1024 * 1024) +) + +type BKUploaderConfig struct { + // Whether or not HTTP calls should be debugged + DebugHTTP bool +} + +// BKUploader uploads artifacts to Buildkite itself. +type BKUploader struct { + // The configuration + conf BKUploaderConfig + + // The logger instance to use + logger logger.Logger +} + +// NewBKUploader creates a new Buildkite uploader. +func NewBKUploader(l logger.Logger, c BKUploaderConfig) *BKUploader { + return &BKUploader{ + logger: l, + conf: c, + } +} + +// URL returns the empty string. BKUploader doesn't know the URL in advance, +// it is provided by Buildkite after uploading. +func (u *BKUploader) URL(*api.Artifact) string { return "" } + +// CreateWork checks the artifact size, then creates one worker. +func (u *BKUploader) CreateWork(artifact *api.Artifact) ([]workUnit, error) { + if artifact.FileSize > maxFormUploadedArtifactSize { + return nil, errArtifactTooLarge{Size: artifact.FileSize} + } + actions := artifact.UploadInstructions.Actions + if len(actions) == 0 { + // Not multiple actions - use a single form upload. + return []workUnit{&bkFormUpload{ + BKUploader: u, + artifact: artifact, + }}, nil + } + + // Ensure the actions are sorted by part number. + slices.SortFunc(actions, func(a, b api.ArtifactUploadAction) int { + return cmp.Compare(a.PartNumber, b.PartNumber) + }) + + // Split the artifact across multiple parts. + chunks := int64(len(actions)) + chunkSize := artifact.FileSize / chunks + remainder := artifact.FileSize % chunks + var offset int64 + workUnits := make([]workUnit, 0, chunks) + for i, action := range actions { + size := chunkSize + if int64(i) < remainder { + // Spread the remainder across the first chunks. + size++ + } + workUnits = append(workUnits, &bkMultipartUpload{ + BKUploader: u, + artifact: artifact, + partCount: int(chunks), + action: &action, + offset: offset, + size: size, + }) + offset += size + } + // After that loop, `offset` should equal `artifact.FileSize`. + return workUnits, nil +} + +// bkMultipartUpload uploads a single part of a multipart upload. +type bkMultipartUpload struct { + *BKUploader + artifact *api.Artifact + action *api.ArtifactUploadAction + partCount int + offset, size int64 +} + +func (u *bkMultipartUpload) Artifact() *api.Artifact { return u.artifact } + +func (u *bkMultipartUpload) Description() string { + return fmt.Sprintf("%s %s part %d/%d (~%s starting at ~%s)", + u.artifact.ID, + u.artifact.Path, + u.action.PartNumber, + u.partCount, + humanize.IBytes(uint64(u.size)), + humanize.IBytes(uint64(u.offset)), + ) +} + +func (u *bkMultipartUpload) DoWork(ctx context.Context) (*api.ArtifactPartETag, error) { + f, err := os.Open(u.artifact.AbsolutePath) + if err != nil { + return nil, err + } + defer f.Close() + + f.Seek(u.offset, 0) + lr := io.LimitReader(f, u.size) + + req, err := http.NewRequestWithContext(ctx, u.action.Method, u.action.URL, lr) + if err != nil { + return nil, err + } + // Content-Ranges are 0-indexed and inclusive + // example: Content-Range: bytes 200-1000/67589 + req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", u.offset, u.offset+u.size-1, u.artifact.FileSize)) + req.Header.Set("Content-Type", u.artifact.ContentType) + req.Header.Add("User-Agent", version.UserAgent()) + + if u.conf.DebugHTTP { + dumpReqOut, err := httputil.DumpRequestOut(req, false) + if err != nil { + u.logger.Error("Couldn't dump outgoing request: %v", err) + } + u.logger.Debug("%s", dumpReqOut) + } + + // TODO: set all the usual http transport & client options... + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if u.conf.DebugHTTP { + dumpResp, err := httputil.DumpResponse(resp, true) + if err != nil { + u.logger.Error("Couldn't dump outgoing request: %v", err) + return nil, err + } + u.logger.Debug("%s", dumpResp) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read response body: %v", err) + } + + if resp.StatusCode < 200 || resp.StatusCode > 299 { + return nil, fmt.Errorf("unsuccessful status %s: %s", resp.Status, body) + } + + etag := resp.Header.Get("Etag") + u.logger.Debug("Artifact %s part %d has ETag = %s", u.artifact.ID, u.action.PartNumber, etag) + if etag == "" { + return nil, errors.New("response missing ETag header") + } + + return &api.ArtifactPartETag{ + PartNumber: u.action.PartNumber, + ETag: etag, + }, nil +} + +// bkFormUpload uploads an artifact to a presigned URL in a single request using +// a request body encoded as multipart/form-data. +type bkFormUpload struct { + *BKUploader + artifact *api.Artifact +} + +func (u *bkFormUpload) Artifact() *api.Artifact { return u.artifact } + +func (u *bkFormUpload) Description() string { + return singleUnitDescription(u.artifact) +} + +// DoWork tries the upload. +func (u *bkFormUpload) DoWork(ctx context.Context) (*api.ArtifactPartETag, error) { + request, err := createFormUploadRequest(ctx, u.logger, u.artifact) + if err != nil { + return nil, err + } + + if u.conf.DebugHTTP { + // If the request is a multi-part form, then it's probably a + // file upload, in which case we don't want to spewing out the + // file contents into the debug log (especially if it's been + // gzipped) + var requestDump []byte + if strings.Contains(request.Header.Get("Content-Type"), "multipart/form-data") { + requestDump, err = httputil.DumpRequestOut(request, false) + } else { + requestDump, err = httputil.DumpRequestOut(request, true) + } + + if err != nil { + u.logger.Debug("\nERR: %s\n%s", err, string(requestDump)) + } else { + u.logger.Debug("\n%s", string(requestDump)) + } + + // configure the HTTP request to log the server IP. The IPs for s3.amazonaws.com + // rotate every 5 seconds, and if one of them is misbehaving it may be helpful to + // know which one. + trace := &httptrace.ClientTrace{ + GotConn: func(connInfo httptrace.GotConnInfo) { + u.logger.Debug("artifact %s uploading to: %s", u.artifact.ID, connInfo.Conn.RemoteAddr()) + }, + } + request = request.WithContext(httptrace.WithClientTrace(request.Context(), trace)) + } + + // Create the client + // TODO: this uses the default transport, potentially ignoring many agent + // config options + client := &http.Client{} + + // Perform the request + u.logger.Debug("%s %s", request.Method, request.URL) + response, err := client.Do(request) + if err != nil { + return nil, err + } + defer response.Body.Close() + + if u.conf.DebugHTTP { + responseDump, err := httputil.DumpResponse(response, true) + if err != nil { + u.logger.Debug("\nERR: %s\n%s", err, string(responseDump)) + } else { + u.logger.Debug("\n%s", string(responseDump)) + } + } + + if response.StatusCode/100 != 2 { + body := &bytes.Buffer{} + _, err := body.ReadFrom(response.Body) + if err != nil { + return nil, err + } + + return nil, fmt.Errorf("%s (%d)", body, response.StatusCode) + } + return nil, nil +} + +// Creates a new file upload http request with optional extra params +func createFormUploadRequest(ctx context.Context, _ logger.Logger, artifact *api.Artifact) (*http.Request, error) { + streamer := newMultipartStreamer() + action := artifact.UploadInstructions.Action + + // Set the post data for the request + for key, val := range artifact.UploadInstructions.Data { + // Replace the magical ${artifact:path} variable with the + // artifact's path + newVal := strings.ReplaceAll(val, artifactPathVariable, artifact.Path) + + // Write the new value to the form + if err := streamer.WriteField(key, newVal); err != nil { + return nil, err + } + } + + fh, err := os.Open(artifact.AbsolutePath) + if err != nil { + return nil, err + } + + // It's important that we add the form field last because when + // uploading to an S3 form, they are really nit-picky about the field + // order, and the file needs to be the last one other it doesn't work. + if err := streamer.WriteFile(action.FileInput, artifact.Path, fh); err != nil { + fh.Close() + return nil, err + } + + // Create the URL that we'll send data to + uri, err := url.Parse(action.URL) + if err != nil { + fh.Close() + return nil, err + } + + uri.Path = artifact.UploadInstructions.Action.Path + + // Create the request + req, err := http.NewRequestWithContext(ctx, action.Method, uri.String(), streamer.Reader()) + if err != nil { + fh.Close() + return nil, err + } + + // Setup the content type and length that s3 requires + req.Header.Add("Content-Type", streamer.ContentType) + // Letting the server know the agent version can be helpful for debugging + req.Header.Add("User-Agent", version.UserAgent()) + req.ContentLength = streamer.Len() + + return req, nil +} + +// A wrapper around the complexities of streaming a multipart file and fields to +// an http endpoint that infuriatingly requires a Content-Length +// Derived from https://github.com/technoweenie/multipartstreamer +type multipartStreamer struct { + ContentType string + bodyBuffer *bytes.Buffer + bodyWriter *multipart.Writer + closeBuffer *bytes.Buffer + reader io.ReadCloser + contentLength int64 +} + +// newMultipartStreamer initializes a new MultipartStreamer. +func newMultipartStreamer() *multipartStreamer { + m := &multipartStreamer{ + bodyBuffer: new(bytes.Buffer), + } + + m.bodyWriter = multipart.NewWriter(m.bodyBuffer) + boundary := m.bodyWriter.Boundary() + m.ContentType = "multipart/form-data; boundary=" + boundary + + closeBoundary := fmt.Sprintf("\r\n--%s--\r\n", boundary) + m.closeBuffer = bytes.NewBufferString(closeBoundary) + + return m +} + +// WriteField writes a form field to the multipart.Writer. +func (m *multipartStreamer) WriteField(key, value string) error { + return m.bodyWriter.WriteField(key, value) +} + +// WriteFile writes the multi-part preamble which will be followed by file data +// This can only be called once and must be the last thing written to the streamer +func (m *multipartStreamer) WriteFile(key, artifactPath string, fh http.File) error { + if m.reader != nil { + return errors.New("WriteFile can't be called multiple times") + } + + // Set up a reader that combines the body, the file and the closer in a stream + m.reader = &multipartReadCloser{ + Reader: io.MultiReader(m.bodyBuffer, fh, m.closeBuffer), + fh: fh, + } + + stat, err := fh.Stat() + if err != nil { + return err + } + + m.contentLength = stat.Size() + + _, err = m.bodyWriter.CreateFormFile(key, artifactPath) + return err +} + +// Len calculates the byte size of the multipart content. +func (m *multipartStreamer) Len() int64 { + return m.contentLength + int64(m.bodyBuffer.Len()) + int64(m.closeBuffer.Len()) +} + +// Reader gets an io.ReadCloser for passing to an http.Request. +func (m *multipartStreamer) Reader() io.ReadCloser { + return m.reader +} + +type multipartReadCloser struct { + io.Reader + fh http.File +} + +func (mrc *multipartReadCloser) Close() error { + return mrc.fh.Close() +} + +type errArtifactTooLarge struct { + Size int64 +} + +func (e errArtifactTooLarge) Error() string { + // TODO: Clean up error strings + // https://github.com/golang/go/wiki/CodeReviewComments#error-strings + return fmt.Sprintf("File size (%d bytes) exceeds the maximum supported by Buildkite's default artifact storage (5Gb). Alternative artifact storage options may support larger files.", e.Size) +} diff --git a/internal/artifact/bk_uploader_test.go b/internal/artifact/bk_uploader_test.go new file mode 100644 index 0000000000..86c34f60ad --- /dev/null +++ b/internal/artifact/bk_uploader_test.go @@ -0,0 +1,297 @@ +package artifact + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "io/fs" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strconv" + "testing" + + "github.com/buildkite/agent/v3/api" + "github.com/buildkite/agent/v3/logger" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" +) + +func TestFormUploading(t *testing.T) { + ctx := context.Background() + + server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + if req.Method != "POST" && req.URL.Path != "/buildkiteartifacts.com" { + http.Error(rw, fmt.Sprintf("not found; (method, path) = (%q, %q), want PUT /llamas3.txt", req.Method, req.URL.Path), http.StatusNotFound) + return + } + + if req.ContentLength <= 0 { + http.Error(rw, "zero or unknown Content-Length", http.StatusBadRequest) + return + } + + if err := req.ParseMultipartForm(5 * 1024 * 1024); err != nil { + http.Error(rw, fmt.Sprintf("req.ParseMultipartForm() = %v", err), http.StatusBadRequest) + return + } + + // Check the ${artifact:path} interpolation is working + path := req.FormValue("path") + if got, want := path, "llamas.txt"; got != want { + http.Error(rw, fmt.Sprintf("path = %q, want %q", got, want), http.StatusBadRequest) + return + } + + file, fh, err := req.FormFile("file") + if err != nil { + http.Error(rw, fmt.Sprintf(`req.FormFile("file") error = %v`, err), http.StatusBadRequest) + return + } + defer file.Close() + + b := &bytes.Buffer{} + if _, err := io.Copy(b, file); err != nil { + http.Error(rw, fmt.Sprintf("io.Copy() error = %v", err), http.StatusInternalServerError) + return + } + + // Check the file is attached correctly + if got, want := b.String(), "llamas"; got != want { + http.Error(rw, fmt.Sprintf("uploaded file content = %q, want %q", got, want), http.StatusBadRequest) + return + } + + if got, want := fh.Filename, "llamas.txt"; got != want { + http.Error(rw, fmt.Sprintf("uploaded file name = %q, want %q", got, want), http.StatusInternalServerError) + return + } + })) + defer server.Close() + + temp, err := os.MkdirTemp("", "agent") + if err != nil { + t.Fatalf(`os.MkdirTemp("", "agent") error = %v`, err) + } + defer os.Remove(temp) + + cwd, err := os.Getwd() + if err != nil { + t.Fatalf("os.Getwd() error = %v", err) + } + + for _, wd := range []string{temp, cwd} { + t.Run(wd, func(t *testing.T) { + abspath := filepath.Join(wd, "llamas.txt") + err = os.WriteFile(abspath, []byte("llamas"), 0700) + defer os.Remove(abspath) + + uploader := NewBKUploader(logger.Discard, BKUploaderConfig{}) + artifact := &api.Artifact{ + ID: "xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxx", + Path: "llamas.txt", + AbsolutePath: abspath, + GlobPath: "llamas.txt", + ContentType: "text/plain", + UploadInstructions: &api.ArtifactUploadInstructions{ + Data: map[string]string{ + "path": "${artifact:path}", + }, + Action: api.ArtifactUploadAction{ + URL: server.URL, + Method: "POST", + Path: "buildkiteartifacts.com", + FileInput: "file", + }, + }, + } + + work, err := uploader.CreateWork(artifact) + if err != nil { + t.Fatalf("uploader.CreateWork(artifact) error = %v", err) + } + + for _, wu := range work { + if _, err := wu.DoWork(ctx); err != nil { + t.Errorf("bkUploaderWork.DoWork(ctx) = %v", err) + } + } + }) + } +} + +func TestMultipartUploading(t *testing.T) { + ctx := context.Background() + + server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + if req.Method != "PUT" || req.URL.Path != "/llamas3.txt" { + http.Error(rw, fmt.Sprintf("not found; (method, path) = (%q, %q), want PUT /llamas3.txt", req.Method, req.URL.Path), http.StatusNotFound) + return + } + partNum, err := strconv.Atoi(req.URL.Query().Get("partNumber")) + if err != nil { + http.Error(rw, fmt.Sprintf("strconv.Atoi(req.URL.Query().Get(partNumber)) error = %v", err), http.StatusBadRequest) + return + } + + if partNum < 1 || partNum > 3 { + http.Error(rw, fmt.Sprintf("partNumber %d out of range [1, 3]", partNum), http.StatusBadRequest) + return + } + + b, err := io.ReadAll(req.Body) + if err != nil { + http.Error(rw, fmt.Sprintf("io.ReadAll(req.Body) error = %v", err), http.StatusInternalServerError) + return + } + + if got, want := string(b), "llamas"; got != want { + http.Error(rw, fmt.Sprintf("req.Body = %q, want %q", got, want), http.StatusExpectationFailed) + } + + rw.Header().Set("ETag", fmt.Sprintf(`"part number %d"`, partNum)) + rw.WriteHeader(http.StatusCreated) + })) + defer server.Close() + + temp, err := os.MkdirTemp("", "agent") + if err != nil { + t.Fatalf(`os.MkdirTemp("", "agent") error = %v`, err) + } + defer os.Remove(temp) + + cwd, err := os.Getwd() + if err != nil { + t.Fatalf("os.Getwd() error = %v", err) + } + + for _, wd := range []string{temp, cwd} { + t.Run(wd, func(t *testing.T) { + abspath := filepath.Join(wd, "llamas3.txt") + err = os.WriteFile(abspath, []byte("llamasllamasllamas"), 0700) + defer os.Remove(abspath) + + uploader := NewBKUploader(logger.Discard, BKUploaderConfig{}) + actions := []api.ArtifactUploadAction{ + {URL: server.URL + "/llamas3.txt?partNumber=1", Method: "PUT", PartNumber: 1}, + {URL: server.URL + "/llamas3.txt?partNumber=2", Method: "PUT", PartNumber: 2}, + {URL: server.URL + "/llamas3.txt?partNumber=3", Method: "PUT", PartNumber: 3}, + } + artifact := &api.Artifact{ + ID: "xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxx", + Path: "llamas3.txt", + AbsolutePath: abspath, + GlobPath: "llamas3.txt", + FileSize: 18, + ContentType: "text/plain", + UploadInstructions: &api.ArtifactUploadInstructions{ + Actions: actions, + }, + } + + work, err := uploader.CreateWork(artifact) + if err != nil { + t.Fatalf("uploader.CreateWork(artifact) error = %v", err) + } + + want := []workUnit{ + &bkMultipartUpload{BKUploader: uploader, artifact: artifact, partCount: 3, action: &actions[0], offset: 0, size: 6}, + &bkMultipartUpload{BKUploader: uploader, artifact: artifact, partCount: 3, action: &actions[1], offset: 6, size: 6}, + &bkMultipartUpload{BKUploader: uploader, artifact: artifact, partCount: 3, action: &actions[2], offset: 12, size: 6}, + } + + if diff := cmp.Diff(work, want, + cmp.AllowUnexported(bkMultipartUpload{}), + cmpopts.EquateComparable(uploader), + ); diff != "" { + t.Fatalf("CreateWork diff (-got +want):\n%s", diff) + } + + var gotEtags []api.ArtifactPartETag + for _, wu := range work { + etag, err := wu.DoWork(ctx) + if err != nil { + t.Errorf("bkUploaderWork.DoWork(ctx) = %v", err) + } + gotEtags = append(gotEtags, *etag) + } + + wantEtags := []api.ArtifactPartETag{ + {PartNumber: 1, ETag: `"part number 1"`}, + {PartNumber: 2, ETag: `"part number 2"`}, + {PartNumber: 3, ETag: `"part number 3"`}, + } + if diff := cmp.Diff(gotEtags, wantEtags); diff != "" { + t.Errorf("etags diff (-got +want):\n%s", diff) + } + }) + } +} + +func TestFormUploadFileMissing(t *testing.T) { + ctx := context.Background() + server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + http.Error(rw, "Not found", http.StatusNotFound) + })) + defer server.Close() + + temp, err := os.MkdirTemp("", "agent") + if err != nil { + t.Fatalf(`os.MkdirTemp("", "agent") error = %v`, err) + } + defer os.Remove(temp) + + abspath := filepath.Join(temp, "llamas.txt") + + uploader := NewBKUploader(logger.Discard, BKUploaderConfig{}) + artifact := &api.Artifact{ + ID: "xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxx", + Path: "llamas.txt", + AbsolutePath: abspath, + GlobPath: "llamas.txt", + ContentType: "text/plain", + UploadInstructions: &api.ArtifactUploadInstructions{ + Data: map[string]string{ + "path": "${artifact:path}", + }, + Action: api.ArtifactUploadAction{ + URL: server.URL, + Method: "POST", + Path: "buildkiteartifacts.com", + FileInput: "file", + }}, + } + + work, err := uploader.CreateWork(artifact) + if err != nil { + t.Fatalf("uploader.CreateWork(artifact) error = %v", err) + } + + for _, wu := range work { + if _, err := wu.DoWork(ctx); !errors.Is(err, fs.ErrNotExist) { + t.Errorf("bkUploaderWork.DoWork(ctx) = %v, want %v", err, fs.ErrNotExist) + } + } +} + +func TestFormUploadTooBig(t *testing.T) { + uploader := NewBKUploader(logger.Discard, BKUploaderConfig{}) + const size = int64(6442450944) // 6Gb + artifact := &api.Artifact{ + ID: "xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxx", + Path: "llamas.txt", + AbsolutePath: "/llamas.txt", + GlobPath: "llamas.txt", + ContentType: "text/plain", + FileSize: size, + UploadInstructions: &api.ArtifactUploadInstructions{}, + } + + wantErr := errArtifactTooLarge{Size: size} + if _, err := uploader.CreateWork(artifact); !errors.Is(err, wantErr) { + t.Fatalf("uploader.CreateWork(artifact) error = %v, want %v", err, wantErr) + } +} diff --git a/internal/artifact/form_uploader.go b/internal/artifact/form_uploader.go deleted file mode 100644 index 39a16aa1ed..0000000000 --- a/internal/artifact/form_uploader.go +++ /dev/null @@ -1,275 +0,0 @@ -package artifact - -import ( - "bytes" - "context" - _ "crypto/sha512" // import sha512 to make sha512 ssl certs work - "fmt" - "io" - "mime/multipart" - "net/http" - "net/http/httptrace" - "net/http/httputil" - "regexp" - "strings" - - // "net/http/httputil" - "errors" - "net/url" - "os" - - "github.com/buildkite/agent/v3/api" - "github.com/buildkite/agent/v3/logger" - "github.com/buildkite/agent/v3/version" -) - -var ArtifactPathVariableRegex = regexp.MustCompile("\\$\\{artifact\\:path\\}") - -// FormUploader uploads to S3 as a single signed POST, which have a hard limit of 5Gb. -var maxFormUploadedArtifactSize = int64(5368709120) - -type FormUploaderConfig struct { - // Whether or not HTTP calls should be debugged - DebugHTTP bool -} - -type FormUploader struct { - // The configuration - conf FormUploaderConfig - - // The logger instance to use - logger logger.Logger -} - -func NewFormUploader(l logger.Logger, c FormUploaderConfig) *FormUploader { - return &FormUploader{ - logger: l, - conf: c, - } -} - -// The FormUploader doens't specify a URL, as one is provided by Buildkite -// after uploading -func (u *FormUploader) URL(artifact *api.Artifact) string { - return "" -} - -func (u *FormUploader) Upload(_ context.Context, artifact *api.Artifact) error { - if artifact.FileSize > maxFormUploadedArtifactSize { - return errArtifactTooLarge{Size: artifact.FileSize} - } - - // Create a HTTP request for uploading the file - request, err := createUploadRequest(u.logger, artifact) - if err != nil { - return err - } - - if u.conf.DebugHTTP { - // If the request is a multi-part form, then it's probably a - // file upload, in which case we don't want to spewing out the - // file contents into the debug log (especially if it's been - // gzipped) - var requestDump []byte - if strings.Contains(request.Header.Get("Content-Type"), "multipart/form-data") { - requestDump, err = httputil.DumpRequestOut(request, false) - } else { - requestDump, err = httputil.DumpRequestOut(request, true) - } - - if err != nil { - u.logger.Debug("\nERR: %s\n%s", err, string(requestDump)) - } else { - u.logger.Debug("\n%s", string(requestDump)) - } - - // configure the HTTP request to log the server IP. The IPs for s3.amazonaws.com - // rotate every 5 seconds, and if one of them is misbehaving it may be helpful to - // know which one. - trace := &httptrace.ClientTrace{ - GotConn: func(connInfo httptrace.GotConnInfo) { - u.logger.Debug("artifact %s uploading to: %s", artifact.ID, connInfo.Conn.RemoteAddr()) - }, - } - request = request.WithContext(httptrace.WithClientTrace(request.Context(), trace)) - } - - // Create the client - client := &http.Client{} - - // Perform the request - u.logger.Debug("%s %s", request.Method, request.URL) - response, err := client.Do(request) - - // Check for errors - if err != nil { - return err - } else { - // Be sure to close the response body at the end of - // this function - defer response.Body.Close() - - if u.conf.DebugHTTP { - responseDump, err := httputil.DumpResponse(response, true) - if err != nil { - u.logger.Debug("\nERR: %s\n%s", err, string(responseDump)) - } else { - u.logger.Debug("\n%s", string(responseDump)) - } - } - - if response.StatusCode/100 != 2 { - body := &bytes.Buffer{} - _, err := body.ReadFrom(response.Body) - if err != nil { - return err - } - - // Return a custom error with the response body from the page - message := fmt.Sprintf("%s (%d)", body, response.StatusCode) - return errors.New(message) - } - } - - return nil -} - -// Creates a new file upload http request with optional extra params -func createUploadRequest(_ logger.Logger, artifact *api.Artifact) (*http.Request, error) { - streamer := newMultipartStreamer() - - // Set the post data for the request - for key, val := range artifact.UploadInstructions.Data { - // Replace the magical ${artifact:path} variable with the - // artifact's path - newVal := ArtifactPathVariableRegex.ReplaceAllLiteralString(val, artifact.Path) - - // Write the new value to the form - if err := streamer.WriteField(key, newVal); err != nil { - return nil, err - } - } - - fh, err := os.Open(artifact.AbsolutePath) - if err != nil { - return nil, err - } - - // It's important that we add the form field last because when - // uploading to an S3 form, they are really nit-picky about the field - // order, and the file needs to be the last one other it doesn't work. - if err := streamer.WriteFile(artifact.UploadInstructions.Action.FileInput, artifact.Path, fh); err != nil { - fh.Close() - return nil, err - } - - // Create the URL that we'll send data to - uri, err := url.Parse(artifact.UploadInstructions.Action.URL) - if err != nil { - fh.Close() - return nil, err - } - - uri.Path = artifact.UploadInstructions.Action.Path - - // Create the request - req, err := http.NewRequest(artifact.UploadInstructions.Action.Method, uri.String(), streamer.Reader()) - if err != nil { - fh.Close() - return nil, err - } - - // Setup the content type and length that s3 requires - req.Header.Add("Content-Type", streamer.ContentType) - // Letting the server know the agent version can be helpful for debugging - req.Header.Add("User-Agent", version.UserAgent()) - req.ContentLength = streamer.Len() - - return req, nil -} - -// A wrapper around the complexities of streaming a multipart file and fields to -// an http endpoint that infuriatingly requires a Content-Length -// Derived from https://github.com/technoweenie/multipartstreamer -type multipartStreamer struct { - ContentType string - bodyBuffer *bytes.Buffer - bodyWriter *multipart.Writer - closeBuffer *bytes.Buffer - reader io.ReadCloser - contentLength int64 -} - -// newMultipartStreamer initializes a new MultipartStreamer. -func newMultipartStreamer() *multipartStreamer { - m := &multipartStreamer{ - bodyBuffer: new(bytes.Buffer), - } - - m.bodyWriter = multipart.NewWriter(m.bodyBuffer) - boundary := m.bodyWriter.Boundary() - m.ContentType = "multipart/form-data; boundary=" + boundary - - closeBoundary := fmt.Sprintf("\r\n--%s--\r\n", boundary) - m.closeBuffer = bytes.NewBufferString(closeBoundary) - - return m -} - -// WriteField writes a form field to the multipart.Writer. -func (m *multipartStreamer) WriteField(key, value string) error { - return m.bodyWriter.WriteField(key, value) -} - -// WriteFile writes the multi-part preamble which will be followed by file data -// This can only be called once and must be the last thing written to the streamer -func (m *multipartStreamer) WriteFile(key, artifactPath string, fh http.File) error { - if m.reader != nil { - return errors.New("WriteFile can't be called multiple times") - } - - // Set up a reader that combines the body, the file and the closer in a stream - m.reader = &multipartReadCloser{ - Reader: io.MultiReader(m.bodyBuffer, fh, m.closeBuffer), - fh: fh, - } - - stat, err := fh.Stat() - if err != nil { - return err - } - - m.contentLength = stat.Size() - - _, err = m.bodyWriter.CreateFormFile(key, artifactPath) - return err -} - -// Len calculates the byte size of the multipart content. -func (m *multipartStreamer) Len() int64 { - return m.contentLength + int64(m.bodyBuffer.Len()) + int64(m.closeBuffer.Len()) -} - -// Reader gets an io.ReadCloser for passing to an http.Request. -func (m *multipartStreamer) Reader() io.ReadCloser { - return m.reader -} - -type multipartReadCloser struct { - io.Reader - fh http.File -} - -func (mrc *multipartReadCloser) Close() error { - return mrc.fh.Close() -} - -type errArtifactTooLarge struct { - Size int64 -} - -func (e errArtifactTooLarge) Error() string { - // TODO: Clean up error strings - // https://github.com/golang/go/wiki/CodeReviewComments#error-strings - return fmt.Sprintf("File size (%d bytes) exceeds the maximum supported by Buildkite's default artifact storage (5Gb). Alternative artifact storage options may support larger files.", e.Size) -} diff --git a/internal/artifact/form_uploader_test.go b/internal/artifact/form_uploader_test.go deleted file mode 100644 index c0ff17a75f..0000000000 --- a/internal/artifact/form_uploader_test.go +++ /dev/null @@ -1,183 +0,0 @@ -package artifact - -import ( - "bytes" - "context" - "errors" - "fmt" - "io" - "net/http" - "net/http/httptest" - "os" - "path/filepath" - "testing" - - "github.com/buildkite/agent/v3/api" - "github.com/buildkite/agent/v3/logger" -) - -func TestFormUploading(t *testing.T) { - ctx := context.Background() - - server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - switch req.URL.Path { - case "/buildkiteartifacts.com": - if req.ContentLength <= 0 { - http.Error(rw, "zero or unknown Content-Length", http.StatusBadRequest) - return - } - - if err := req.ParseMultipartForm(5 * 1024 * 1024); err != nil { - http.Error(rw, fmt.Sprintf("req.ParseMultipartForm() = %v", err), http.StatusBadRequest) - return - } - - // Check the ${artifact:path} interpolation is working - path := req.FormValue("path") - if got, want := path, "llamas.txt"; got != want { - http.Error(rw, fmt.Sprintf("path = %q, want %q", got, want), http.StatusBadRequest) - return - } - - file, fh, err := req.FormFile("file") - if err != nil { - http.Error(rw, fmt.Sprintf(`req.FormFile("file") error = %v`, err), http.StatusBadRequest) - return - } - defer file.Close() - - b := &bytes.Buffer{} - if _, err := io.Copy(b, file); err != nil { - http.Error(rw, fmt.Sprintf("io.Copy() error = %v", err), http.StatusInternalServerError) - return - } - - // Check the file is attached correctly - if got, want := b.String(), "llamas"; got != want { - http.Error(rw, fmt.Sprintf("uploaded file content = %q, want %q", got, want), http.StatusBadRequest) - return - } - - if got, want := fh.Filename, "llamas.txt"; got != want { - http.Error(rw, fmt.Sprintf("uploaded file name = %q, want %q", got, want), http.StatusInternalServerError) - return - } - - default: - http.Error(rw, fmt.Sprintf("not found; method = %q, path = %q", req.Method, req.URL.Path), http.StatusNotFound) - } - })) - defer server.Close() - - temp, err := os.MkdirTemp("", "agent") - if err != nil { - t.Fatalf(`os.MkdirTemp("", "agent") error = %v`, err) - } - defer os.Remove(temp) - - cwd, err := os.Getwd() - if err != nil { - t.Fatalf("os.Getwd() error = %v", err) - } - - runtest := func(wd string) { - abspath := filepath.Join(wd, "llamas.txt") - err = os.WriteFile(abspath, []byte("llamas"), 0700) - defer os.Remove(abspath) - - uploader := NewFormUploader(logger.Discard, FormUploaderConfig{}) - artifact := &api.Artifact{ - ID: "xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxx", - Path: "llamas.txt", - AbsolutePath: abspath, - GlobPath: "llamas.txt", - ContentType: "text/plain", - UploadInstructions: &api.ArtifactUploadInstructions{ - Data: map[string]string{ - "path": "${artifact:path}", - }, - Action: struct { - URL string "json:\"url,omitempty\"" - Method string "json:\"method\"" - Path string "json:\"path\"" - FileInput string "json:\"file_input\"" - }{ - URL: server.URL, - Method: "POST", - Path: "buildkiteartifacts.com", - FileInput: "file", - }}, - } - - if err := uploader.Upload(ctx, artifact); err != nil { - t.Errorf("uploader.Upload(artifact) = %v", err) - } - } - - for _, wd := range []string{temp, cwd} { - runtest(wd) - } -} - -func TestFormUploadFileMissing(t *testing.T) { - ctx := context.Background() - server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - http.Error(rw, "Not found", http.StatusNotFound) - })) - defer server.Close() - - temp, err := os.MkdirTemp("", "agent") - if err != nil { - t.Fatalf(`os.MkdirTemp("", "agent") error = %v`, err) - } - defer os.Remove(temp) - - abspath := filepath.Join(temp, "llamas.txt") - - uploader := NewFormUploader(logger.Discard, FormUploaderConfig{}) - artifact := &api.Artifact{ - ID: "xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxx", - Path: "llamas.txt", - AbsolutePath: abspath, - GlobPath: "llamas.txt", - ContentType: "text/plain", - UploadInstructions: &api.ArtifactUploadInstructions{ - Data: map[string]string{ - "path": "${artifact:path}", - }, - Action: struct { - URL string "json:\"url,omitempty\"" - Method string "json:\"method\"" - Path string "json:\"path\"" - FileInput string "json:\"file_input\"" - }{ - URL: server.URL, - Method: "POST", - Path: "buildkiteartifacts.com", - FileInput: "file", - }}, - } - - if err := uploader.Upload(ctx, artifact); !os.IsNotExist(err) { - t.Errorf("uploader.Upload(artifact) = %v, want os.ErrNotExist", err) - } -} - -func TestFormUploadTooBig(t *testing.T) { - ctx := context.Background() - uploader := NewFormUploader(logger.Discard, FormUploaderConfig{}) - const size = int64(6442450944) // 6Gb - artifact := &api.Artifact{ - ID: "xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxx", - Path: "llamas.txt", - AbsolutePath: "/llamas.txt", - GlobPath: "llamas.txt", - ContentType: "text/plain", - FileSize: size, - UploadInstructions: &api.ArtifactUploadInstructions{}, - } - - if err := uploader.Upload(ctx, artifact); !errors.Is(err, errArtifactTooLarge{Size: size}) { - t.Errorf("uploader.Upload(artifact) = %v, want errArtifactTooLarge", err) - } -} diff --git a/internal/artifact/gs_uploader.go b/internal/artifact/gs_uploader.go index 3938f19494..11bfdaf446 100644 --- a/internal/artifact/gs_uploader.go +++ b/internal/artifact/gs_uploader.go @@ -119,7 +119,25 @@ func (u *GSUploader) URL(artifact *api.Artifact) string { return artifactURL.String() } -func (u *GSUploader) Upload(_ context.Context, artifact *api.Artifact) error { +func (u *GSUploader) CreateWork(artifact *api.Artifact) ([]workUnit, error) { + return []workUnit{&gsUploaderWork{ + GSUploader: u, + artifact: artifact, + }}, nil +} + +type gsUploaderWork struct { + *GSUploader + artifact *api.Artifact +} + +func (u *gsUploaderWork) Artifact() *api.Artifact { return u.artifact } + +func (u *gsUploaderWork) Description() string { + return singleUnitDescription(u.artifact) +} + +func (u *gsUploaderWork) DoWork(_ context.Context) (*api.ArtifactPartETag, error) { permission := os.Getenv("BUILDKITE_GS_ACL") // The dirtiest validation method ever... @@ -129,24 +147,24 @@ func (u *GSUploader) Upload(_ context.Context, artifact *api.Artifact) error { permission != "projectPrivate" && permission != "publicRead" && permission != "publicReadWrite" { - return fmt.Errorf("Invalid GS ACL `%s`", permission) + return nil, fmt.Errorf("Invalid GS ACL `%s`", permission) } if permission == "" { u.logger.Debug("Uploading \"%s\" to bucket \"%s\" with default permission", - u.artifactPath(artifact), u.BucketName) + u.artifactPath(u.artifact), u.BucketName) } else { u.logger.Debug("Uploading \"%s\" to bucket \"%s\" with permission \"%s\"", - u.artifactPath(artifact), u.BucketName, permission) + u.artifactPath(u.artifact), u.BucketName, permission) } object := &storage.Object{ - Name: u.artifactPath(artifact), - ContentType: artifact.ContentType, - ContentDisposition: u.contentDisposition(artifact), + Name: u.artifactPath(u.artifact), + ContentType: u.artifact.ContentType, + ContentDisposition: u.contentDisposition(u.artifact), } - file, err := os.Open(artifact.AbsolutePath) + file, err := os.Open(u.artifact.AbsolutePath) if err != nil { - return errors.New(fmt.Sprintf("Failed to open file \"%q\" (%v)", artifact.AbsolutePath, err)) + return nil, errors.New(fmt.Sprintf("Failed to open file %q (%v)", u.artifact.AbsolutePath, err)) } call := u.service.Objects.Insert(u.BucketName, object) if permission != "" { @@ -155,10 +173,10 @@ func (u *GSUploader) Upload(_ context.Context, artifact *api.Artifact) error { if res, err := call.Media(file, googleapi.ContentType("")).Do(); err == nil { u.logger.Debug("Created object %v at location %v\n\n", res.Name, res.SelfLink) } else { - return errors.New(fmt.Sprintf("Failed to PUT file \"%s\" (%v)", u.artifactPath(artifact), err)) + return nil, errors.New(fmt.Sprintf("Failed to PUT file %q (%v)", u.artifactPath(u.artifact), err)) } - return nil + return nil, nil } func (u *GSUploader) artifactPath(artifact *api.Artifact) string { diff --git a/internal/artifact/s3_uploader.go b/internal/artifact/s3_uploader.go index 3b9836bf2c..b13b6c1718 100644 --- a/internal/artifact/s3_uploader.go +++ b/internal/artifact/s3_uploader.go @@ -81,30 +81,47 @@ func (u *S3Uploader) URL(artifact *api.Artifact) string { return url.String() } -func (u *S3Uploader) Upload(_ context.Context, artifact *api.Artifact) error { +func (u *S3Uploader) CreateWork(artifact *api.Artifact) ([]workUnit, error) { + return []workUnit{&s3UploaderWork{ + S3Uploader: u, + artifact: artifact, + }}, nil +} + +type s3UploaderWork struct { + *S3Uploader + artifact *api.Artifact +} + +func (u *s3UploaderWork) Artifact() *api.Artifact { return u.artifact } +func (u *s3UploaderWork) Description() string { + return singleUnitDescription(u.artifact) +} + +func (u *s3UploaderWork) DoWork(context.Context) (*api.ArtifactPartETag, error) { permission, err := u.resolvePermission() if err != nil { - return err + return nil, err } // Create an uploader with the session and default options uploader := s3manager.NewUploaderWithClient(u.client) // Open file from filesystem - u.logger.Debug("Reading file \"%s\"", artifact.AbsolutePath) - f, err := os.Open(artifact.AbsolutePath) + u.logger.Debug("Reading file %q", u.artifact.AbsolutePath) + f, err := os.Open(u.artifact.AbsolutePath) if err != nil { - return fmt.Errorf("failed to open file %q (%w)", artifact.AbsolutePath, err) + return nil, fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err) } // Upload the file to S3. - u.logger.Debug("Uploading \"%s\" to bucket with permission `%s`", u.artifactPath(artifact), permission) + u.logger.Debug("Uploading %q to bucket with permission %q", u.artifactPath(u.artifact), permission) params := &s3manager.UploadInput{ Bucket: aws.String(u.BucketName), - Key: aws.String(u.artifactPath(artifact)), - ContentType: aws.String(artifact.ContentType), + Key: aws.String(u.artifactPath(u.artifact)), + ContentType: aws.String(u.artifact.ContentType), ACL: aws.String(permission), Body: f, } @@ -114,8 +131,7 @@ func (u *S3Uploader) Upload(_ context.Context, artifact *api.Artifact) error { } _, err = uploader.Upload(params) - - return err + return nil, err } func (u *S3Uploader) artifactPath(artifact *api.Artifact) string { diff --git a/internal/artifact/uploader.go b/internal/artifact/uploader.go index c4cfc00b35..baa584bc94 100644 --- a/internal/artifact/uploader.go +++ b/internal/artifact/uploader.go @@ -1,6 +1,7 @@ package artifact import ( + "cmp" "context" "crypto/sha1" "crypto/sha256" @@ -11,6 +12,7 @@ import ( "os" "path/filepath" "runtime" + "slices" "strings" "sync" "time" @@ -20,7 +22,6 @@ import ( "github.com/buildkite/agent/v3/internal/experiments" "github.com/buildkite/agent/v3/internal/mime" "github.com/buildkite/agent/v3/logger" - "github.com/buildkite/agent/v3/pool" "github.com/buildkite/roko" "github.com/dustin/go-humanize" "github.com/mattn/go-zglob" @@ -52,6 +53,9 @@ type UploaderConfig struct { // Whether to not upload symlinks UploadSkipSymlinks bool + + // Whether to allow multipart uploads to the BK-hosted bucket + AllowMultipart bool } type Uploader struct { @@ -75,7 +79,7 @@ func NewUploader(l logger.Logger, ac APIClient, c UploaderConfig) *Uploader { func (a *Uploader) Upload(ctx context.Context) error { // Create artifact structs for all the files we need to upload - artifacts, err := a.Collect(ctx) + artifacts, err := a.collect(ctx) if err != nil { return fmt.Errorf("collecting artifacts: %w", err) } @@ -86,7 +90,32 @@ func (a *Uploader) Upload(ctx context.Context) error { } a.logger.Info("Found %d files that match %q", len(artifacts), a.conf.Paths) - if err := a.upload(ctx, artifacts); err != nil { + + // Determine what uploader to use + uploader, err := a.createUploader() + if err != nil { + return fmt.Errorf("creating uploader: %w", err) + } + + // Set the URLs of the artifacts based on the uploader + for _, artifact := range artifacts { + artifact.URL = uploader.URL(artifact) + } + + // Batch-create the artifact records on Buildkite + batchCreator := NewArtifactBatchCreator(a.logger, a.apiClient, BatchCreatorConfig{ + JobID: a.conf.JobID, + Artifacts: artifacts, + UploadDestination: a.conf.Destination, + CreateArtifactsTimeout: 10 * time.Second, + AllowMultipart: a.conf.AllowMultipart, + }) + artifacts, err = batchCreator.Create(ctx) + if err != nil { + return err + } + + if err := a.upload(ctx, artifacts, uploader); err != nil { return fmt.Errorf("uploading artifacts: %w", err) } @@ -109,7 +138,7 @@ func isDir(path string) bool { return fi.IsDir() } -func (a *Uploader) Collect(ctx context.Context) ([]*api.Artifact, error) { +func (a *Uploader) collect(ctx context.Context) ([]*api.Artifact, error) { wd, err := os.Getwd() if err != nil { return nil, fmt.Errorf("getting working directory: %w", err) @@ -358,7 +387,7 @@ func (a *Uploader) build(path string, absolutePath string) (*api.Artifact, error // createUploader applies some heuristics to the destination to infer which // uploader to use. -func (a *Uploader) createUploader() (_ uploader, err error) { +func (a *Uploader) createUploader() (_ workCreator, err error) { var dest string defer func() { if err != nil || dest == "" { @@ -370,7 +399,7 @@ func (a *Uploader) createUploader() (_ uploader, err error) { switch { case a.conf.Destination == "": a.logger.Info("Uploading to default Buildkite artifact storage") - return NewFormUploader(a.logger, FormUploaderConfig{ + return NewBKUploader(a.logger, BKUploaderConfig{ DebugHTTP: a.conf.DebugHTTP, }), nil @@ -406,194 +435,356 @@ func (a *Uploader) createUploader() (_ uploader, err error) { } } -type uploader interface { +// workCreator implementations convert artifacts into units of work for uploading. +type workCreator interface { // The Artifact.URL property is populated with what ever is returned // from this method prior to uploading. URL(*api.Artifact) string - // The actual uploading of the file - Upload(context.Context, *api.Artifact) error + // CreateWork provide units of work for uploading an artifact. + CreateWork(*api.Artifact) ([]workUnit, error) } -func (a *Uploader) upload(ctx context.Context, artifacts []*api.Artifact) error { - // Determine what uploader to use - uploader, err := a.createUploader() - if err != nil { - return fmt.Errorf("creating uploader: %w", err) +// workUnit implementations upload a whole artifact, or a part of an artifact, +// or could one day do some other work related to an artifact. +type workUnit interface { + // Artifact returns the artifact being worked on. + Artifact() *api.Artifact + + // Description describes the unit of work. + Description() string + + // DoWork does the work. + DoWork(context.Context) (*api.ArtifactPartETag, error) +} + +// workUnitResult is just a tuple (workUnit, partETag | error). +type workUnitResult struct { + workUnit workUnit + partETag *api.ArtifactPartETag + err error +} + +// artifactUploadWorker contains the shared state between the worker goroutines +// and the state uploader. +type artifactUploadWorker struct { + *Uploader + + // Counts the worker goroutines. + wg sync.WaitGroup + + // A tracker for every artifact. + // The map is written at the start of upload, and other goroutines only read + // afterwards. + trackers map[*api.Artifact]*artifactTracker +} + +// artifactTracker tracks the amount of work pending for an artifact. +type artifactTracker struct { + // All the work for uploading this artifact. + work []workUnit + + // Normally storing a context in a struct is a bad idea. It's explicitly + // called out in pkg.go.dev/context as a no-no (unless you are required to + // store a context for some reason like interface implementation or + // backwards compatibility). + // + // The main reason is that contexts are intended to be associated with a + // clear chain of function calls (i.e. work), and having contexts stored in + // a struct somewhere means there's confusion over which context should be + // used for a given function call. (Do you use one passed in, or from a + // struct? Do you listen for Done on both? Which context values apply?) + // + // So here's how this context is situated within the chain of work, and + // how it will be used: + // + // This context applies to all the work units for this artifact. + // It is a child context of the context passed to Uploader.upload, which + // should be exactly the same context that Uploader.upload passes to + // artifactUploadWorker. + // Canceling this context cancels all work associated with this artifact + // (across the fixed-size pool of worker goroutines). + ctx context.Context + cancel context.CancelCauseFunc + + // pendingWork is the number of incomplete units for this artifact. + // This is set once at the start of upload, and then decremented by the + // state updater goroutine as work units complete. + pendingWork int + + // State that will be uploaded to BK when the artifact is finished or errored. + // Only the state updater goroutine writes this. + api.ArtifactState +} + +func (a *Uploader) upload(ctx context.Context, artifacts []*api.Artifact, uploader workCreator) error { + worker := &artifactUploadWorker{ + Uploader: a, + trackers: make(map[*api.Artifact]*artifactTracker), } - // Set the URLs of the artifacts based on the uploader + // Create work and trackers for each artifact. for _, artifact := range artifacts { - artifact.URL = uploader.URL(artifact) + workUnits, err := uploader.CreateWork(artifact) + if err != nil { + a.logger.Error("Couldn't create upload workers for artifact %q: %v", artifact.Path, err) + return err + } + + actx, acancel := context.WithCancelCause(ctx) + worker.trackers[artifact] = &artifactTracker{ + ctx: actx, + cancel: acancel, + work: workUnits, + pendingWork: len(workUnits), + ArtifactState: api.ArtifactState{ + ID: artifact.ID, + Multipart: len(workUnits) > 1, + }, + } } - // Create the artifacts on Buildkite - batchCreator := NewArtifactBatchCreator(a.logger, a.apiClient, BatchCreatorConfig{ - JobID: a.conf.JobID, - Artifacts: artifacts, - UploadDestination: a.conf.Destination, - CreateArtifactsTimeout: 10 * time.Second, - }) + // unitsCh: work unit creation --(work unit to be run)--> multiple worker goroutines + unitsCh := make(chan workUnit) + // resultsCh: multiple worker goroutines --(work unit result)--> state updater + resultsCh := make(chan workUnitResult) + // errCh: receives the final error from the status updater + errCh := make(chan error, 1) - artifacts, err = batchCreator.Create(ctx) - if err != nil { - return err + // The status updater goroutine: updates batches of artifact states on + // Buildkite every few seconds. + go worker.stateUpdater(ctx, resultsCh, errCh) + + // Worker goroutines that work on work units. + for range runtime.GOMAXPROCS(0) { + worker.wg.Add(1) + go worker.doWorkUnits(ctx, unitsCh, resultsCh) } - // Prepare a concurrency pool to upload the artifacts - p := pool.New(pool.MaxConcurrencyLimit) - errs := []error{} - var errorsMutex sync.Mutex - - // Create a wait group so we can make sure the uploader waits for all - // the artifact states to upload before finishing - var stateUploaderWaitGroup sync.WaitGroup - stateUploaderWaitGroup.Add(1) - - // A map to keep track of artifact states and how many we've uploaded - artifactStates := make(map[string]string) - artifactStatesUploaded := 0 - var artifactStatesMutex sync.Mutex - - // Spin up a gourtine that'll uploading artifact statuses every few - // seconds in batches - go func() { - for artifactStatesUploaded < len(artifacts) { - statesToUpload := make(map[string]string) - - // Grab all the states we need to upload, and remove - // them from the tracking map - // - // Since we mutate the artifactStates variable in - // multiple routines, we need to lock it to make sure - // nothing else is changing it at the same time. - artifactStatesMutex.Lock() - for id, state := range artifactStates { - statesToUpload[id] = state - delete(artifactStates, id) + // Send the work units for each artifact to the workers. + // This must happen after creating worker goroutines listening on workUnitsCh. + for _, tracker := range worker.trackers { + for _, workUnit := range tracker.work { + select { + case <-ctx.Done(): + return ctx.Err() + case unitsCh <- workUnit: } - artifactStatesMutex.Unlock() + } + } - if len(statesToUpload) > 0 { - artifactStatesUploaded += len(statesToUpload) - for id, state := range statesToUpload { - a.logger.Debug("Artifact `%s` has state `%s`", id, state) - } + // All work units have been sent to workers. + close(unitsCh) - timeout := 5 * time.Second - - // Update the states of the artifacts in bulk. - err := roko.NewRetrier( - roko.WithMaxAttempts(10), - roko.WithStrategy(roko.ExponentialSubsecond(500*time.Millisecond)), - ).DoWithContext(ctx, func(r *roko.Retrier) error { - - ctxTimeout := ctx - if timeout != 0 { - var cancel func() - ctxTimeout, cancel = context.WithTimeout(ctx, timeout) - defer cancel() - } - - _, err := a.apiClient.UpdateArtifacts(ctxTimeout, a.conf.JobID, statesToUpload) - if err != nil { - a.logger.Warn("%s (%s)", err, r) - } - - // after four attempts (0, 1, 2, 3)... - if r.AttemptCount() == 3 { - // The short timeout has given us fast feedback on the first couple of attempts, - // but perhaps the server needs more time to complete the request, so fall back to - // the default HTTP client timeout. - a.logger.Debug("UpdateArtifacts timeout (%s) removed for subsequent attempts", timeout) - timeout = 0 - } - - return err - }) - if err != nil { - a.logger.Error("Error uploading artifact states: %s", err) - - // Track the error that was raised. We need to - // acquire a lock since we mutate the errors - // slice in multiple routines. - errorsMutex.Lock() - errs = append(errs, err) - errorsMutex.Unlock() - } + a.logger.Debug("Waiting for uploads to complete...") - a.logger.Debug("Uploaded %d artifact states (%d/%d)", len(statesToUpload), artifactStatesUploaded, len(artifacts)) - } + // Wait for the workers to finish + worker.wg.Wait() - // Check again for states to upload in a few seconds - time.Sleep(1 * time.Second) - } + // Since the workers are done, all work unit states have been sent to the + // state updater. + close(resultsCh) - stateUploaderWaitGroup.Done() - }() + a.logger.Debug("Uploads complete, waiting for upload status to be sent to Buildkite...") - for _, artifact := range artifacts { - p.Spawn(func() { - // Show a nice message that we're starting to upload the file - a.logger.Info("Uploading artifact %s %s (%s)", artifact.ID, artifact.Path, humanize.IBytes(uint64(artifact.FileSize))) + // Wait for the statuses to finish uploading + if err := <-errCh; err != nil { + return fmt.Errorf("errors uploading artifacts: %w", err) + } - var state string + a.logger.Info("Artifact uploads completed successfully") + + return nil +} + +func (a *artifactUploadWorker) doWorkUnits(ctx context.Context, unitsCh <-chan workUnit, resultsCh chan<- workUnitResult) { + defer a.wg.Done() + + for { + select { + case <-ctx.Done(): + return + + case workUnit, open := <-unitsCh: + if !open { + return // Done + } + tracker := a.trackers[workUnit.Artifact()] + // Show a nice message that we're starting to upload the file + a.logger.Info("Uploading %s", workUnit.Description()) // Upload the artifact and then set the state depending // on whether or not it passed. We'll retry the upload // a couple of times before giving up. - err := roko.NewRetrier( + r := roko.NewRetrier( roko.WithMaxAttempts(10), roko.WithStrategy(roko.Constant(5*time.Second)), - ).DoWithContext(ctx, func(r *roko.Retrier) error { - if err := uploader.Upload(ctx, artifact); err != nil { + ) + partETag, err := roko.DoFunc(tracker.ctx, r, func(r *roko.Retrier) (*api.ArtifactPartETag, error) { + etag, err := workUnit.DoWork(tracker.ctx) + if err != nil { a.logger.Warn("%s (%s)", err, r) - return err } - return nil + return etag, err }) - // Did the upload eventually fail? + + // If it failed, abort any other work items for this artifact. if err != nil { - a.logger.Error("Error uploading artifact \"%s\": %s", artifact.Path, err) + a.logger.Info("Upload failed for %s: %v", workUnit.Description(), err) + tracker.cancel(err) + // then the error is sent to the status updater + } + + // Let the state updater know how the work went. + select { + case <-ctx.Done(): // Note: the main context, not the artifact tracker context + return - // Track the error that was raised. We need to - // acquire a lock since we mutate the errors - // slice in multiple routines. - errorsMutex.Lock() + case resultsCh <- workUnitResult{workUnit: workUnit, partETag: partETag, err: err}: + } + } + } +} + +func (a *artifactUploadWorker) stateUpdater(ctx context.Context, resultsCh <-chan workUnitResult, stateUpdaterErrCh chan<- error) { + var errs []error + defer func() { + if len(errs) > 0 { + stateUpdaterErrCh <- errors.Join(errs...) + } + close(stateUpdaterErrCh) + }() + + // When this ticks, upload any pending artifact states as a batch. + updateTicker := time.NewTicker(1 * time.Second) + +selectLoop: + for { + select { + case <-ctx.Done(): + return + + case <-updateTicker.C: + // Note: updateStates removes trackers for completed states. + if err := a.updateStates(ctx); err != nil { errs = append(errs, err) - errorsMutex.Unlock() + } + + case result, open := <-resultsCh: + if !open { + // No more input: we're done! + break selectLoop + } + artifact := result.workUnit.Artifact() + tracker := a.trackers[artifact] + + if result.err != nil { + // The work unit failed, so the whole artifact upload has failed. + errs = append(errs, result.err) + tracker.State = "error" + a.logger.Debug("Artifact %s has entered state %s", tracker.ID, tracker.State) + continue + } - state = "error" - } else { - a.logger.Info("Successfully uploaded artifact \"%s\"", artifact.Path) - state = "finished" + // The work unit is complete - it's no longer pending. + if partETag := result.partETag; partETag != nil { + tracker.MultipartETags = append(tracker.MultipartETags, *partETag) } - // Since we mutate the artifactStates variable in - // multiple routines, we need to lock it to make sure - // nothing else is changing it at the same time. - artifactStatesMutex.Lock() - artifactStates[artifact.ID] = state - artifactStatesMutex.Unlock() - }) - } + tracker.pendingWork-- + if tracker.pendingWork > 0 { + continue + } - a.logger.Debug("Waiting for uploads to complete...") + // No pending units remain, so the whole artifact is complete. + // Add it to the next batch of states to upload. + tracker.State = "finished" + a.logger.Debug("Artifact %s has entered state %s", tracker.ID, tracker.State) + } + } - // Wait for the pool to finish - p.Wait() + // Upload any remaining states. + if err := a.updateStates(ctx); err != nil { + errs = append(errs, err) + } + return +} - a.logger.Debug("Uploads complete, waiting for upload status to be sent to buildkite...") +// updateStates uploads terminal artifact states to Buildkite in a batch. +func (a *artifactUploadWorker) updateStates(ctx context.Context) error { + // Only upload states that are finished or error. + var statesToUpload []api.ArtifactState + var trackersToMarkSent []*artifactTracker + for _, tracker := range a.trackers { + switch tracker.State { + case "finished", "error": + // Only send these states. + default: + continue + } + // This artifact is complete, move it from a.trackers to statesToUpload. + statesToUpload = append(statesToUpload, tracker.ArtifactState) + trackersToMarkSent = append(trackersToMarkSent, tracker) + } - // Wait for the statuses to finish uploading - stateUploaderWaitGroup.Wait() + if len(statesToUpload) == 0 { // no news from the frontier + return nil + } - if len(errs) > 0 { - err := errors.Join(errs...) - return fmt.Errorf("errors uploading artifacts: %w", err) + for _, state := range statesToUpload { + // Ensure ETags are in ascending order by part number. + // This is required by S3. + slices.SortFunc(state.MultipartETags, func(a, b api.ArtifactPartETag) int { + return cmp.Compare(a.PartNumber, b.PartNumber) + }) } - a.logger.Info("Artifact uploads completed successfully") + // Post the update + timeout := 5 * time.Second + + // Update the states of the artifacts in bulk. + err := roko.NewRetrier( + roko.WithMaxAttempts(10), + roko.WithStrategy(roko.ExponentialSubsecond(500*time.Millisecond)), + ).DoWithContext(ctx, func(r *roko.Retrier) error { + ctxTimeout := ctx + if timeout != 0 { + var cancel func() + ctxTimeout, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + + _, err := a.apiClient.UpdateArtifacts(ctxTimeout, a.conf.JobID, statesToUpload) + if err != nil { + a.logger.Warn("%s (%s)", err, r) + } + + // after four attempts (0, 1, 2, 3)... + if r.AttemptCount() == 3 { + // The short timeout has given us fast feedback on the first couple of attempts, + // but perhaps the server needs more time to complete the request, so fall back to + // the default HTTP client timeout. + a.logger.Debug("UpdateArtifacts timeout (%s) removed for subsequent attempts", timeout) + timeout = 0 + } + return err + }) + if err != nil { + a.logger.Error("Error updating artifact states: %v", err) + return err + } + + for _, tracker := range trackersToMarkSent { + // Don't send this state again. + tracker.State = "sent" + } + a.logger.Debug("Updated %d artifact states", len(statesToUpload)) return nil } + +// singleUnitDescription can be used by uploader implementations to describe +// artifact uploads consisting of a single work unit. +func singleUnitDescription(artifact *api.Artifact) string { + return fmt.Sprintf("%s %s (%s)", artifact.ID, artifact.Path, humanize.IBytes(uint64(artifact.FileSize))) +} diff --git a/internal/artifact/uploader_test.go b/internal/artifact/uploader_test.go index c1b32b9fc1..a7a50ba4dd 100644 --- a/internal/artifact/uploader_test.go +++ b/internal/artifact/uploader_test.go @@ -97,13 +97,13 @@ func TestCollect(t *testing.T) { ctxExpEnabled, _ := experiments.Enable(ctx, experiments.NormalisedUploadPaths) ctxExpDisabled := experiments.Disable(ctx, experiments.NormalisedUploadPaths) - artifactsWithoutExperimentEnabled, err := uploader.Collect(ctxExpDisabled) + artifactsWithoutExperimentEnabled, err := uploader.collect(ctxExpDisabled) if err != nil { t.Fatalf("[normalised-upload-paths disabled] uploader.Collect() error = %v", err) } assert.Equal(t, 5, len(artifactsWithoutExperimentEnabled)) - artifactsWithExperimentEnabled, err := uploader.Collect(ctxExpEnabled) + artifactsWithExperimentEnabled, err := uploader.collect(ctxExpEnabled) if err != nil { t.Fatalf("[normalised-upload-paths enabled] uploader.Collect() error = %v", err) } @@ -165,7 +165,7 @@ func TestCollectThatDoesntMatchAnyFiles(t *testing.T) { }, ";"), }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -185,7 +185,7 @@ func TestCollectWithSomeGlobsThatDontMatchAnything(t *testing.T) { }, ";"), }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -209,7 +209,7 @@ func TestCollectWithSomeGlobsThatDontMatchAnythingFollowingSymlinks(t *testing.T GlobResolveFollowSymlinks: true, }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -230,7 +230,7 @@ func TestCollectWithDuplicateMatches(t *testing.T) { }, ";"), }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -263,7 +263,7 @@ func TestCollectWithDuplicateMatchesFollowingSymlinks(t *testing.T) { GlobResolveFollowSymlinks: true, }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -296,7 +296,7 @@ func TestCollectMatchesUploadSymlinks(t *testing.T) { UploadSkipSymlinks: true, }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -389,13 +389,13 @@ func TestCollect_WithZZGlob(t *testing.T) { ctxExpEnabled, _ := experiments.Enable(ctx, experiments.NormalisedUploadPaths) ctxExpDisabled := experiments.Disable(ctx, experiments.NormalisedUploadPaths) - artifactsWithoutExperimentEnabled, err := uploader.Collect(ctxExpDisabled) + artifactsWithoutExperimentEnabled, err := uploader.collect(ctxExpDisabled) if err != nil { t.Fatalf("[normalised-upload-paths disabled] uploader.Collect() error = %v", err) } assert.Equal(t, 5, len(artifactsWithoutExperimentEnabled)) - artifactsWithExperimentEnabled, err := uploader.Collect(ctxExpEnabled) + artifactsWithExperimentEnabled, err := uploader.collect(ctxExpEnabled) if err != nil { t.Fatalf("[normalised-upload-paths enabled] uploader.Collect() error = %v", err) } @@ -457,7 +457,7 @@ func TestCollectThatDoesntMatchAnyFiles_WithZZGlob(t *testing.T) { }, ";"), }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -477,7 +477,7 @@ func TestCollectWithSomeGlobsThatDontMatchAnything_WithZZGlob(t *testing.T) { }, ";"), }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -501,7 +501,7 @@ func TestCollectWithSomeGlobsThatDontMatchAnythingFollowingSymlinks_WithZZGlob(t GlobResolveFollowSymlinks: true, }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -522,7 +522,7 @@ func TestCollectWithDuplicateMatches_WithZZGlob(t *testing.T) { }, ";"), }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -555,7 +555,7 @@ func TestCollectWithDuplicateMatchesFollowingSymlinks_WithZZGlob(t *testing.T) { GlobResolveFollowSymlinks: true, }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) } @@ -588,7 +588,7 @@ func TestCollectMatchesUploadSymlinks_WithZZGlob(t *testing.T) { UploadSkipSymlinks: true, }) - artifacts, err := uploader.Collect(ctx) + artifacts, err := uploader.collect(ctx) if err != nil { t.Fatalf("uploader.Collect() error = %v", err) }