From a3d9e6999c6b1a8c2bc4ec935b18c7e1bebdc655 Mon Sep 17 00:00:00 2001 From: Ben Moskovitz Date: Mon, 17 Jul 2023 13:31:45 +1000 Subject: [PATCH 1/3] Small JobRunner cleanup --- agent/job_runner.go | 161 +++++++++++++++++++++++--------------------- 1 file changed, 83 insertions(+), 78 deletions(-) diff --git a/agent/job_runner.go b/agent/job_runner.go index 710baeb45c..8e94433e9a 100644 --- a/agent/job_runner.go +++ b/agent/job_runner.go @@ -199,19 +199,10 @@ func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterRe return nil, err } - // The bootstrap-script gets parsed based on the operating system - cmd, err := shellwords.Split(conf.AgentConfiguration.BootstrapScript) - if err != nil { - return nil, fmt.Errorf("Failed to split bootstrap-script (%q) into tokens: %v", - conf.AgentConfiguration.BootstrapScript, err) - } - // Our log streamer works off a buffer of output runner.output = &process.Buffer{} var outputWriter io.Writer = runner.output - pr, pw := io.Pipe() - // {stdout, stderr} -> processWriter // processWriter = io.MultiWriter(allWriters...) var allWriters []io.Writer @@ -233,6 +224,8 @@ func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterRe outputWriter = io.MultiWriter(outputWriter, tmpFile) } + pr, pw := io.Pipe() + switch { case conf.AgentConfiguration.ANSITimestamps: // processWriter -> prefixer -> outputWriter @@ -326,6 +319,12 @@ func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterRe ClientCount: containerCount, }) } else { + // The bootstrap-script gets parsed based on the operating system + cmd, err := shellwords.Split(conf.AgentConfiguration.BootstrapScript) + if err != nil { + return nil, fmt.Errorf("splitting bootstrap-script (%q) into tokens: %w", conf.AgentConfiguration.BootstrapScript, err) + } + runner.process = process.New(l, process.Config{ Path: cmd[0], Args: cmd[1:], @@ -390,29 +389,26 @@ func (r *JobRunner) Run(ctx context.Context) error { } // Default exit status is no exit status - exitStatus := "" - signal := "" - signalReason := "" + exit := processExit{} // Before executing the bootstrap process with the received Job env, // execute the pre-bootstrap hook (if present) for it to tell us // whether it is happy to proceed. - environmentCommandOkay := true + shouldRunJob := true if hook, _ := hook.Find(r.conf.AgentConfiguration.HooksPath, "pre-bootstrap"); hook != "" { - // Once we have a hook any failure to run it MUST be fatal to the job to guarantee a true - // positive result from the hook - okay, err := r.executePreBootstrapHook(ctx, hook) - if !okay { - environmentCommandOkay = false + // Once we have a hook any failure to run it MUST be fatal to the job to guarantee a true positive result from the hook + ok, err := r.executePreBootstrapHook(ctx, hook) + if !ok { + shouldRunJob = false // Ensure the Job UI knows why this job resulted in failure r.logStreamer.Process([]byte("pre-bootstrap hook rejected this job, see the buildkite-agent logs for more details")) // But disclose more information in the agent logs r.logger.Error("pre-bootstrap hook rejected this job: %s", err) - exitStatus = "-1" - signalReason = "agent_refused" + exit.Status = "-1" + exit.SignalReason = "agent_refused" } } @@ -423,62 +419,22 @@ func (r *JobRunner) Run(ctx context.Context) error { cctx, cancel := context.WithCancel(ctx) defer cancel() - if environmentCommandOkay { - // Kick off log streaming and job status checking when the process - // starts. + if shouldRunJob { + // Kick off log streaming and job status checking when the process starts. wg.Add(2) go r.jobLogStreamer(cctx, &wg) go r.jobCancellationChecker(cctx, &wg) - // Run the process. This will block until it finishes. - if err := r.process.Run(cctx); err != nil { - // Send the error as output - r.logStreamer.Process([]byte(err.Error())) - - // The process did not run at all, so make sure it fails - exitStatus = "-1" - signalReason = "process_run_error" - } else { - // Intended to capture situations where the job-exec (aka bootstrap) container did not - // start. Normally such errors are hidden in the Kubernetes events. Let's feed them up - // to the user as they may be the caused by errors in the pipeline definition. - if r.cancelled && !r.stopped { - k8sProcess, ok := r.process.(*kubernetes.Runner) - if ok && k8sProcess.ClientStateUnknown() { - r.logStreamer.Process([]byte( - "Some containers had unknown exit statuses. Perhaps they were in ImagePullBackOff.", - )) - } - } - - // Add the final output to the streamer - r.logStreamer.Process(r.output.ReadAndTruncate()) - - // Collect the finished process' exit status - exitStatus = fmt.Sprintf("%d", r.process.WaitStatus().ExitStatus()) - if ws := r.process.WaitStatus(); ws.Signaled() { - signal = process.SignalString(ws.Signal()) - } - if r.stopped { - // The agent is being gracefully stopped, and we signaled the job to end. Often due - // to pending host shutdown or EC2 spot instance termination - signalReason = "agent_stop" - } else if r.cancelled { - // The job was signaled because it was cancelled via the buildkite web UI - signalReason = "cancel" - } - } + exit = r.runJob(cctx) } // Store the finished at time finishedAt := time.Now() - // Stop the header time streamer. This will block until all the chunks - // have been uploaded + // Stop the header time streamer. This will block until all the chunks have been uploaded r.headerTimesStreamer.Stop() - // Stop the log streamer. This will block until all the chunks have - // been uploaded + // Stop the log streamer. This will block until all the chunks have been uploaded r.logStreamer.Stop() // Warn about failed chunks @@ -502,10 +458,9 @@ func (r *JobRunner) Run(ctx context.Context) error { } // Write some metrics about the job run - jobMetrics := r.metrics.With(metrics.Tags{ - "exit_code": exitStatus, - }) - if exitStatus == "0" { + jobMetrics := r.metrics.With(metrics.Tags{"exit_code": exit.Status}) + + if exit.Status == "0" { jobMetrics.Timing("jobs.duration.success", finishedAt.Sub(startedAt)) jobMetrics.Count("jobs.success", 1) } else { @@ -514,16 +469,66 @@ func (r *JobRunner) Run(ctx context.Context) error { } // Finish the build in the Buildkite Agent API - // - // Once we tell the API we're finished it might assign us new work, so make - // sure everything else is done first. - r.finishJob(ctx, finishedAt, exitStatus, signal, signalReason, r.logStreamer.FailedChunks()) + // Once we tell the API we're finished it might assign us new work, so make sure everything else is done first. + r.finishJob(ctx, finishedAt, exit, r.logStreamer.FailedChunks()) r.logger.Info("Finished job %s", r.job.ID) return nil } +type processExit struct { + Status string + Signal string + SignalReason string +} + +func (r *JobRunner) runJob(ctx context.Context) (exit processExit) { + // Run the process. This will block until it finishes. + if err := r.process.Run(ctx); err != nil { + // Send the error as output + r.logStreamer.Process([]byte(err.Error())) + + // The process did not run at all, so make sure it fails + return processExit{ + Status: "-1", + SignalReason: "process_run_error", + } + } + // Intended to capture situations where the job-exec (aka bootstrap) container did not + // start. Normally such errors are hidden in the Kubernetes events. Let's feed them up + // to the user as they may be the caused by errors in the pipeline definition. + if r.cancelled && !r.stopped { + k8sProcess, ok := r.process.(*kubernetes.Runner) + if ok && k8sProcess.ClientStateUnknown() { + r.logStreamer.Process([]byte( + "Some containers had unknown exit statuses. Perhaps they were in ImagePullBackOff.", + )) + } + } + + // Add the final output to the streamer + r.logStreamer.Process(r.output.ReadAndTruncate()) + + // Collect the finished process' exit status + exit.Status = fmt.Sprintf("%d", r.process.WaitStatus().ExitStatus()) + + if ws := r.process.WaitStatus(); ws.Signaled() { + exit.Signal = process.SignalString(ws.Signal()) + } + + if r.stopped { + // The agent is being gracefully stopped, and we signaled the job to end. Often due + // to pending host shutdown or EC2 spot instance termination + exit.SignalReason = "agent_stop" + } else if r.cancelled { + // The job was signaled because it was cancelled via the buildkite web UI + exit.SignalReason = "cancel" + } + + return exit +} + func (r *JobRunner) CancelAndStop() error { r.cancelLock.Lock() r.stopped = true @@ -672,7 +677,7 @@ func (r *JobRunner) createEnvironment() ([]string, error) { // PTY-mode is enabled by default in `start` and `bootstrap`, so we only need // to propagate it if it's explicitly disabled. - if r.conf.AgentConfiguration.RunInPty == false { + if !r.conf.AgentConfiguration.RunInPty { env["BUILDKITE_PTY"] = "false" } @@ -791,11 +796,11 @@ func (r *JobRunner) startJob(ctx context.Context, startedAt time.Time) error { // finishJob finishes the job in the Buildkite Agent API. If the FinishJob call // cannot return successfully, this will retry for a long time. -func (r *JobRunner) finishJob(ctx context.Context, finishedAt time.Time, exitStatus, signal, signalReason string, failedChunkCount int) error { +func (r *JobRunner) finishJob(ctx context.Context, finishedAt time.Time, exit processExit, failedChunkCount int) error { r.job.FinishedAt = finishedAt.UTC().Format(time.RFC3339Nano) - r.job.ExitStatus = exitStatus - r.job.Signal = signal - r.job.SignalReason = signalReason + r.job.ExitStatus = exit.Status + r.job.Signal = exit.Signal + r.job.SignalReason = exit.SignalReason r.job.ChunksFailedCount = failedChunkCount r.logger.Debug("[JobRunner] Finishing job with exit_status=%s, signal=%s and signal_reason=%s", From b289fd81c48e688c69f22f535d5b52b649de90fe Mon Sep 17 00:00:00 2001 From: Ben Moskovitz Date: Mon, 17 Jul 2023 14:20:25 +1000 Subject: [PATCH 2/3] Move JobRunner options into config struct --- agent/agent_worker.go | 7 +- .../job_runner_integration_test.go | 39 +++---- agent/job_runner.go | 103 ++++++++++-------- 3 files changed, 78 insertions(+), 71 deletions(-) diff --git a/agent/agent_worker.go b/agent/agent_worker.go index bbb83c46a3..094aae4fa3 100644 --- a/agent/agent_worker.go +++ b/agent/agent_worker.go @@ -647,10 +647,15 @@ func (a *AgentWorker) RunJob(ctx context.Context, acceptResponse *api.Job) error }) // Now that we've got a job to do, we can start it. - jr, err := NewJobRunner(a.logger, jobMetricsScope, a.agent, acceptResponse, a.apiClient, JobRunnerConfig{ + jr, err := NewJobRunner(JobRunnerConfig{ + Job: acceptResponse, + APIClient: a.apiClient, + Logger: a.logger, Debug: a.debug, DebugHTTP: a.debugHTTP, CancelSignal: a.cancelSig, + MetricsScope: jobMetricsScope, + JobStatusInterval: time.Duration(a.agent.JobStatusInterval) * time.Second, AgentConfiguration: a.agentConfiguration, AgentStdout: a.agentStdout, }) diff --git a/agent/integration/job_runner_integration_test.go b/agent/integration/job_runner_integration_test.go index 951eb43e9c..2e8046232c 100644 --- a/agent/integration/job_runner_integration_test.go +++ b/agent/integration/job_runner_integration_test.go @@ -15,13 +15,8 @@ import ( ) func TestJobRunner_WhenJobHasToken_ItOverridesAccessToken(t *testing.T) { - agentAccessToken := "llamasrock" jobToken := "actually-llamas-are-only-okay" - ag := &api.AgentRegisterResponse{ - AccessToken: agentAccessToken, - } - j := &api.Job{ ID: "my-job-id", ChunksMaxSizeBytes: 1024, @@ -32,8 +27,7 @@ func TestJobRunner_WhenJobHasToken_ItOverridesAccessToken(t *testing.T) { } cfg := agent.AgentConfiguration{} - - runJob(t, ag, j, cfg, func(c *bintest.Call) { + runJob(t, j, cfg, func(c *bintest.Call) { if got, want := c.GetEnv("BUILDKITE_AGENT_ACCESS_TOKEN"), jobToken; got != want { t.Errorf("c.GetEnv(BUILDKITE_AGENT_ACCESS_TOKEN) = %q, want %q", got, want) } @@ -41,11 +35,9 @@ func TestJobRunner_WhenJobHasToken_ItOverridesAccessToken(t *testing.T) { }) } +// TODO 2023-07-17: What is this testing? How is it testing it? +// Maybe that the job runner pulls the access token from the API client? but that's all handled in the `runJob` helper... func TestJobRunnerPassesAccessTokenToBootstrap(t *testing.T) { - ag := &api.AgentRegisterResponse{ - AccessToken: "llamasrock", - } - j := &api.Job{ ID: "my-job-id", ChunksMaxSizeBytes: 1024, @@ -55,8 +47,7 @@ func TestJobRunnerPassesAccessTokenToBootstrap(t *testing.T) { } cfg := agent.AgentConfiguration{} - - runJob(t, ag, j, cfg, func(c *bintest.Call) { + runJob(t, j, cfg, func(c *bintest.Call) { if got, want := c.GetEnv("BUILDKITE_AGENT_ACCESS_TOKEN"), "llamasrock"; got != want { t.Errorf("c.GetEnv(BUILDKITE_AGENT_ACCESS_TOKEN) = %q, want %q", got, want) } @@ -65,10 +56,6 @@ func TestJobRunnerPassesAccessTokenToBootstrap(t *testing.T) { } func TestJobRunnerIgnoresPipelineChangesToProtectedVars(t *testing.T) { - ag := &api.AgentRegisterResponse{ - AccessToken: "llamasrock", - } - j := &api.Job{ ID: "my-job-id", ChunksMaxSizeBytes: 1024, @@ -78,20 +65,16 @@ func TestJobRunnerIgnoresPipelineChangesToProtectedVars(t *testing.T) { }, } - cfg := agent.AgentConfiguration{ - CommandEval: true, - } - - runJob(t, ag, j, cfg, func(c *bintest.Call) { + cfg := agent.AgentConfiguration{CommandEval: true} + runJob(t, j, cfg, func(c *bintest.Call) { if got, want := c.GetEnv("BUILDKITE_COMMAND_EVAL"), "true"; got != want { t.Errorf("c.GetEnv(BUILDKITE_COMMAND_EVAL) = %q, want %q", got, want) } c.Exit(0) }) - } -func runJob(t *testing.T, ag *api.AgentRegisterResponse, j *api.Job, cfg agent.AgentConfiguration, bootstrap func(c *bintest.Call)) { +func runJob(t *testing.T, j *api.Job, cfg agent.AgentConfiguration, bootstrap func(c *bintest.Call)) { // create a mock agent API server := createTestAgentEndpoint(t, "my-job-id") defer server.Close() @@ -117,11 +100,15 @@ func runJob(t *testing.T, ag *api.AgentRegisterResponse, j *api.Job, cfg agent.A client := api.NewClient(l, api.Config{ Endpoint: server.URL, - Token: ag.AccessToken, + Token: "llamasrock", }) - jr, err := agent.NewJobRunner(l, scope, ag, j, client, agent.JobRunnerConfig{ + jr, err := agent.NewJobRunner(agent.JobRunnerConfig{ + Job: j, + APIClient: client, + Logger: l, AgentConfiguration: cfg, + MetricsScope: scope, }) if err != nil { t.Fatalf("agent.NewJobRunner() error = %v", err) diff --git a/agent/job_runner.go b/agent/job_runner.go index 8e94433e9a..a666ad04d8 100644 --- a/agent/job_runner.go +++ b/agent/job_runner.go @@ -75,6 +75,21 @@ type JobRunnerConfig struct { // The configuration of the agent from the CLI AgentConfiguration AgentConfiguration + // The logger to use + Logger logger.Logger + + // How often to check if the job has been cancelled + JobStatusInterval time.Duration + + // A scope for metrics within a job + MetricsScope *metrics.Scope + + // The job to run + Job *api.Job + + // The APIClient that will be used when updating the job + APIClient APIClient + // What signal to use for worker cancellation CancelSignal process.Signal @@ -100,9 +115,6 @@ type JobRunner struct { // The logger to use logger logger.Logger - // The registered agent API record running this job - agent *api.AgentRegisterResponse - // The job being run job *api.Job @@ -149,32 +161,35 @@ type jobAPI interface { var _ jobRunner = (*JobRunner)(nil) // Initializes the job runner -func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterResponse, job *api.Job, apiClient APIClient, conf JobRunnerConfig) (jobRunner, error) { - runner := &JobRunner{ - agent: ag, - job: job, - logger: l, +func NewJobRunner(conf JobRunnerConfig) (jobRunner, error) { + r := &JobRunner{ + job: conf.Job, + logger: conf.Logger, conf: conf, - metrics: scope, - apiClient: apiClient, + metrics: conf.MetricsScope, + apiClient: conf.APIClient, + } + + if conf.JobStatusInterval == 0 { + conf.JobStatusInterval = 1 * time.Second } // If the accept response has a token attached, we should use that instead of the Agent Access Token that // our current apiClient is using - if job.Token != "" { - clientConf := apiClient.Config() - clientConf.Token = job.Token - runner.apiClient = api.NewClient(l, clientConf) + if r.job.Token != "" { + clientConf := r.apiClient.Config() + clientConf.Token = r.job.Token + r.apiClient = api.NewClient(r.logger, clientConf) } // Create our header times struct - runner.headerTimesStreamer = newHeaderTimesStreamer(l, runner.onUploadHeaderTime) + r.headerTimesStreamer = newHeaderTimesStreamer(r.logger, r.onUploadHeaderTime) // The log streamer that will take the output chunks, and send them to // the Buildkite Agent API - runner.logStreamer = NewLogStreamer(l, runner.onUploadChunk, LogStreamerConfig{ + r.logStreamer = NewLogStreamer(r.logger, r.onUploadChunk, LogStreamerConfig{ Concurrency: 3, - MaxChunkSizeBytes: job.ChunksMaxSizeBytes, + MaxChunkSizeBytes: r.job.ChunksMaxSizeBytes, }) // TempDir is not guaranteed to exist @@ -187,21 +202,21 @@ func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterRe } // Prepare a file to receive the given job environment - if file, err := os.CreateTemp(tempDir, fmt.Sprintf("job-env-%s", job.ID)); err != nil { - return runner, err + if file, err := os.CreateTemp(tempDir, fmt.Sprintf("job-env-%s", r.job.ID)); err != nil { + return r, err } else { - l.Debug("[JobRunner] Created env file: %s", file.Name()) - runner.envFile = file + r.logger.Debug("[JobRunner] Created env file: %s", file.Name()) + r.envFile = file } - env, err := runner.createEnvironment() + env, err := r.createEnvironment() if err != nil { return nil, err } // Our log streamer works off a buffer of output - runner.output = &process.Buffer{} - var outputWriter io.Writer = runner.output + r.output = &process.Buffer{} + var outputWriter io.Writer = r.output // {stdout, stderr} -> processWriter // processWriter = io.MultiWriter(allWriters...) var allWriters []io.Writer @@ -214,7 +229,7 @@ func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterRe jobLogDir := "" if conf.AgentConfiguration.JobLogPath != "" { jobLogDir = conf.AgentConfiguration.JobLogPath - l.Debug("[JobRunner] Job Log Path: %s", jobLogDir) + r.logger.Debug("[JobRunner] Job Log Path: %s", jobLogDir) } tmpFile, err = os.CreateTemp(jobLogDir, "buildkite_job_log") if err != nil { @@ -247,9 +262,9 @@ func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterRe go func() { // Use a scanner to process output line by line - err := process.NewScanner(l).ScanLines(pr, func(line string) { + err := process.NewScanner(r.logger).ScanLines(pr, func(line string) { // Send to our header streamer and determine if it's a header - isHeader := runner.headerTimesStreamer.Scan(line) + isHeader := r.headerTimesStreamer.Scan(line) // Prefix non-header log lines with timestamps if !(isHeaderExpansion(line) || isHeader) { @@ -260,7 +275,7 @@ func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterRe _, _ = outputWriter.Write([]byte(line + "\n")) }) if err != nil { - l.Error("[JobRunner] Encountered error %v", err) + r.logger.Error("[JobRunner] Encountered error %v", err) } }() @@ -273,11 +288,11 @@ func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterRe // Use a scanner to process output for headers only go func() { - err := process.NewScanner(l).ScanLines(pr, func(line string) { - runner.headerTimesStreamer.Scan(line) + err := process.NewScanner(r.logger).ScanLines(pr, func(line string) { + r.headerTimesStreamer.Scan(line) }) if err != nil { - l.Error("[JobRunner] Encountered error %v", err) + r.logger.Error("[JobRunner] Encountered error %v", err) } }() } @@ -285,11 +300,11 @@ func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterRe if conf.AgentConfiguration.WriteJobLogsToStdout { if conf.AgentConfiguration.LogFormat == "json" { log := newJobLogger( - conf.AgentStdout, logger.StringField("org", job.Env["BUILDKITE_ORGANIZATION_SLUG"]), - logger.StringField("pipeline", job.Env["BUILDKITE_PIPELINE_SLUG"]), - logger.StringField("branch", job.Env["BUILDKITE_BRANCH"]), - logger.StringField("queue", job.Env["BUILDKITE_AGENT_META_DATA_QUEUE"]), - logger.StringField("job_id", job.ID), + conf.AgentStdout, logger.StringField("org", r.job.Env["BUILDKITE_ORGANIZATION_SLUG"]), + logger.StringField("pipeline", r.job.Env["BUILDKITE_PIPELINE_SLUG"]), + logger.StringField("branch", r.job.Env["BUILDKITE_BRANCH"]), + logger.StringField("queue", r.job.Env["BUILDKITE_AGENT_META_DATA_QUEUE"]), + logger.StringField("job_id", r.job.ID), ) allWriters = append(allWriters, log) } else { @@ -312,8 +327,8 @@ func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterRe if err != nil { return nil, fmt.Errorf("failed to parse BUILDKITE_CONTAINER_COUNT: %w", err) } - runner.process = kubernetes.New(l, kubernetes.Config{ - AccessToken: apiClient.Config().Token, + r.process = kubernetes.New(r.logger, kubernetes.Config{ + AccessToken: r.apiClient.Config().Token, Stdout: processWriter, Stderr: processWriter, ClientCount: containerCount, @@ -325,7 +340,7 @@ func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterRe return nil, fmt.Errorf("splitting bootstrap-script (%q) into tokens: %w", conf.AgentConfiguration.BootstrapScript, err) } - runner.process = process.New(l, process.Config{ + r.process = process.New(r.logger, process.Config{ Path: cmd[0], Args: cmd[1:], Dir: conf.AgentConfiguration.BuildPath, @@ -339,18 +354,18 @@ func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterRe // Close the writer end of the pipe when the process finishes go func() { - <-runner.process.Done() + <-r.process.Done() if err := pw.Close(); err != nil { - l.Error("%v", err) + r.logger.Error("%v", err) } if tmpFile != nil { if err := os.Remove(tmpFile.Name()); err != nil { - l.Error("%v", err) + r.logger.Error("%v", err) } } }() - return runner, nil + return r, nil } // Runs the job @@ -917,7 +932,7 @@ func (r *JobRunner) jobCancellationChecker(ctx context.Context, wg *sync.WaitGro // Sleep for a bit, or until the job is finished select { - case <-time.After(time.Duration(r.agent.JobStatusInterval) * time.Second): + case <-time.After(time.Duration(r.conf.JobStatusInterval) * time.Second): case <-ctx.Done(): return case <-r.process.Done(): From 5420967e7ca4850f6071394feb1457a7b10db732 Mon Sep 17 00:00:00 2001 From: Ben Moskovitz Date: Mon, 17 Jul 2023 16:38:11 +1000 Subject: [PATCH 3/3] Go a little less ham on the JobRunnerConfig thing --- agent/agent_worker.go | 4 +- .../job_runner_integration_test.go | 4 +- agent/job_runner.go | 88 ++++++++----------- 3 files changed, 39 insertions(+), 57 deletions(-) diff --git a/agent/agent_worker.go b/agent/agent_worker.go index 094aae4fa3..e32976ea78 100644 --- a/agent/agent_worker.go +++ b/agent/agent_worker.go @@ -647,10 +647,8 @@ func (a *AgentWorker) RunJob(ctx context.Context, acceptResponse *api.Job) error }) // Now that we've got a job to do, we can start it. - jr, err := NewJobRunner(JobRunnerConfig{ + jr, err := NewJobRunner(a.logger, a.apiClient, JobRunnerConfig{ Job: acceptResponse, - APIClient: a.apiClient, - Logger: a.logger, Debug: a.debug, DebugHTTP: a.debugHTTP, CancelSignal: a.cancelSig, diff --git a/agent/integration/job_runner_integration_test.go b/agent/integration/job_runner_integration_test.go index 2e8046232c..f3c1bfea56 100644 --- a/agent/integration/job_runner_integration_test.go +++ b/agent/integration/job_runner_integration_test.go @@ -103,10 +103,8 @@ func runJob(t *testing.T, j *api.Job, cfg agent.AgentConfiguration, bootstrap fu Token: "llamasrock", }) - jr, err := agent.NewJobRunner(agent.JobRunnerConfig{ + jr, err := agent.NewJobRunner(l, client, agent.JobRunnerConfig{ Job: j, - APIClient: client, - Logger: l, AgentConfiguration: cfg, MetricsScope: scope, }) diff --git a/agent/job_runner.go b/agent/job_runner.go index a666ad04d8..3427122101 100644 --- a/agent/job_runner.go +++ b/agent/job_runner.go @@ -75,9 +75,6 @@ type JobRunnerConfig struct { // The configuration of the agent from the CLI AgentConfiguration AgentConfiguration - // The logger to use - Logger logger.Logger - // How often to check if the job has been cancelled JobStatusInterval time.Duration @@ -87,9 +84,6 @@ type JobRunnerConfig struct { // The job to run Job *api.Job - // The APIClient that will be used when updating the job - APIClient APIClient - // What signal to use for worker cancellation CancelSignal process.Signal @@ -115,15 +109,9 @@ type JobRunner struct { // The logger to use logger logger.Logger - // The job being run - job *api.Job - // The APIClient that will be used when updating the job apiClient APIClient - // A scope for metrics within a job - metrics *metrics.Scope - // The internal process of the job process jobAPI @@ -161,13 +149,11 @@ type jobAPI interface { var _ jobRunner = (*JobRunner)(nil) // Initializes the job runner -func NewJobRunner(conf JobRunnerConfig) (jobRunner, error) { +func NewJobRunner(l logger.Logger, apiClient APIClient, conf JobRunnerConfig) (jobRunner, error) { r := &JobRunner{ - job: conf.Job, - logger: conf.Logger, + logger: l, conf: conf, - metrics: conf.MetricsScope, - apiClient: conf.APIClient, + apiClient: apiClient, } if conf.JobStatusInterval == 0 { @@ -176,9 +162,9 @@ func NewJobRunner(conf JobRunnerConfig) (jobRunner, error) { // If the accept response has a token attached, we should use that instead of the Agent Access Token that // our current apiClient is using - if r.job.Token != "" { + if r.conf.Job.Token != "" { clientConf := r.apiClient.Config() - clientConf.Token = r.job.Token + clientConf.Token = r.conf.Job.Token r.apiClient = api.NewClient(r.logger, clientConf) } @@ -189,7 +175,7 @@ func NewJobRunner(conf JobRunnerConfig) (jobRunner, error) { // the Buildkite Agent API r.logStreamer = NewLogStreamer(r.logger, r.onUploadChunk, LogStreamerConfig{ Concurrency: 3, - MaxChunkSizeBytes: r.job.ChunksMaxSizeBytes, + MaxChunkSizeBytes: r.conf.Job.ChunksMaxSizeBytes, }) // TempDir is not guaranteed to exist @@ -202,7 +188,7 @@ func NewJobRunner(conf JobRunnerConfig) (jobRunner, error) { } // Prepare a file to receive the given job environment - if file, err := os.CreateTemp(tempDir, fmt.Sprintf("job-env-%s", r.job.ID)); err != nil { + if file, err := os.CreateTemp(tempDir, fmt.Sprintf("job-env-%s", r.conf.Job.ID)); err != nil { return r, err } else { r.logger.Debug("[JobRunner] Created env file: %s", file.Name()) @@ -300,11 +286,11 @@ func NewJobRunner(conf JobRunnerConfig) (jobRunner, error) { if conf.AgentConfiguration.WriteJobLogsToStdout { if conf.AgentConfiguration.LogFormat == "json" { log := newJobLogger( - conf.AgentStdout, logger.StringField("org", r.job.Env["BUILDKITE_ORGANIZATION_SLUG"]), - logger.StringField("pipeline", r.job.Env["BUILDKITE_PIPELINE_SLUG"]), - logger.StringField("branch", r.job.Env["BUILDKITE_BRANCH"]), - logger.StringField("queue", r.job.Env["BUILDKITE_AGENT_META_DATA_QUEUE"]), - logger.StringField("job_id", r.job.ID), + conf.AgentStdout, logger.StringField("org", r.conf.Job.Env["BUILDKITE_ORGANIZATION_SLUG"]), + logger.StringField("pipeline", r.conf.Job.Env["BUILDKITE_PIPELINE_SLUG"]), + logger.StringField("branch", r.conf.Job.Env["BUILDKITE_BRANCH"]), + logger.StringField("queue", r.conf.Job.Env["BUILDKITE_AGENT_META_DATA_QUEUE"]), + logger.StringField("job_id", r.conf.Job.ID), ) allWriters = append(allWriters, log) } else { @@ -370,7 +356,7 @@ func NewJobRunner(conf JobRunnerConfig) (jobRunner, error) { // Runs the job func (r *JobRunner) Run(ctx context.Context) error { - r.logger.Info("Starting job %s", r.job.ID) + r.logger.Info("Starting job %s", r.conf.Job.ID) ctx, done := status.AddItem(ctx, "Job Runner", "", nil) defer done() @@ -386,12 +372,12 @@ func (r *JobRunner) Run(ctx context.Context) error { // If this agent successfully grabs the job from the API, publish metric for // how long this job was in the queue for, if we can calculate that - if r.job.RunnableAt != "" { - runnableAt, err := time.Parse(time.RFC3339Nano, r.job.RunnableAt) + if r.conf.Job.RunnableAt != "" { + runnableAt, err := time.Parse(time.RFC3339Nano, r.conf.Job.RunnableAt) if err != nil { - r.logger.Error("Metric submission failed to parse %s", r.job.RunnableAt) + r.logger.Error("Metric submission failed to parse %s", r.conf.Job.RunnableAt) } else { - r.metrics.Timing("queue.duration", startedAt.Sub(runnableAt)) + r.conf.MetricsScope.Timing("queue.duration", startedAt.Sub(runnableAt)) } } @@ -473,7 +459,7 @@ func (r *JobRunner) Run(ctx context.Context) error { } // Write some metrics about the job run - jobMetrics := r.metrics.With(metrics.Tags{"exit_code": exit.Status}) + jobMetrics := r.conf.MetricsScope.With(metrics.Tags{"exit_code": exit.Status}) if exit.Status == "0" { jobMetrics.Timing("jobs.duration.success", finishedAt.Sub(startedAt)) @@ -487,7 +473,7 @@ func (r *JobRunner) Run(ctx context.Context) error { // Once we tell the API we're finished it might assign us new work, so make sure everything else is done first. r.finishJob(ctx, finishedAt, exit, r.logStreamer.FailedChunks()) - r.logger.Info("Finished job %s", r.job.ID) + r.logger.Info("Finished job %s", r.conf.Job.ID) return nil } @@ -569,7 +555,7 @@ func (r *JobRunner) Cancel() error { reason = " (agent stopping)" } r.logger.Info("Canceling job %s with a grace period of %ds%s", - r.job.ID, r.conf.AgentConfiguration.CancelGracePeriod, reason) + r.conf.Job.ID, r.conf.AgentConfiguration.CancelGracePeriod, reason) r.cancelled = true @@ -581,7 +567,7 @@ func (r *JobRunner) Cancel() error { select { // Grace period for cancelling case <-time.After(time.Second * time.Duration(r.conf.AgentConfiguration.CancelGracePeriod)): - r.logger.Info("Job %s hasn't stopped in time, terminating", r.job.ID) + r.logger.Info("Job %s hasn't stopped in time, terminating", r.conf.Job.ID) // Terminate the process as we've exceeded our context return r.process.Terminate() @@ -599,7 +585,7 @@ func (r *JobRunner) createEnvironment() ([]string, error) { // sent by Buildkite. The variables below should always take // precedence. env := make(map[string]string) - for key, value := range r.job.Env { + for key, value := range r.conf.Job.Env { env[key] = value } @@ -625,7 +611,7 @@ func (r *JobRunner) createEnvironment() ([]string, error) { // Check if the user has defined any protected env for k := range ProtectedEnv { - if _, exists := r.job.Env[k]; exists { + if _, exists := r.conf.Job.Env[k]; exists { ignoredEnv = append(ignoredEnv, k) } } @@ -786,13 +772,13 @@ func (r *JobRunner) executePreBootstrapHook(ctx context.Context, hook string) (b // Buildkite, we won't bother retrying. For example, a "no such host" will // retry, but an HTTP response from Buildkite that isn't retryable won't. func (r *JobRunner) startJob(ctx context.Context, startedAt time.Time) error { - r.job.StartedAt = startedAt.UTC().Format(time.RFC3339Nano) + r.conf.Job.StartedAt = startedAt.UTC().Format(time.RFC3339Nano) return roko.NewRetrier( roko.WithMaxAttempts(7), roko.WithStrategy(roko.Exponential(2*time.Second, 0)), ).DoWithContext(ctx, func(rtr *roko.Retrier) error { - response, err := r.apiClient.StartJob(ctx, r.job) + response, err := r.apiClient.StartJob(ctx, r.conf.Job) if err != nil { if response != nil && api.IsRetryableStatus(response) { @@ -812,14 +798,14 @@ func (r *JobRunner) startJob(ctx context.Context, startedAt time.Time) error { // finishJob finishes the job in the Buildkite Agent API. If the FinishJob call // cannot return successfully, this will retry for a long time. func (r *JobRunner) finishJob(ctx context.Context, finishedAt time.Time, exit processExit, failedChunkCount int) error { - r.job.FinishedAt = finishedAt.UTC().Format(time.RFC3339Nano) - r.job.ExitStatus = exit.Status - r.job.Signal = exit.Signal - r.job.SignalReason = exit.SignalReason - r.job.ChunksFailedCount = failedChunkCount + r.conf.Job.FinishedAt = finishedAt.UTC().Format(time.RFC3339Nano) + r.conf.Job.ExitStatus = exit.Status + r.conf.Job.Signal = exit.Signal + r.conf.Job.SignalReason = exit.SignalReason + r.conf.Job.ChunksFailedCount = failedChunkCount r.logger.Debug("[JobRunner] Finishing job with exit_status=%s, signal=%s and signal_reason=%s", - r.job.ExitStatus, r.job.Signal, r.job.SignalReason) + r.conf.Job.ExitStatus, r.conf.Job.Signal, r.conf.Job.SignalReason) ctx, cancel := context.WithTimeout(ctx, 48*time.Hour) defer cancel() @@ -829,7 +815,7 @@ func (r *JobRunner) finishJob(ctx context.Context, finishedAt time.Time, exit pr roko.WithJitter(), roko.WithStrategy(roko.Constant(1*time.Second)), ).DoWithContext(ctx, func(retrier *roko.Retrier) error { - response, err := r.apiClient.FinishJob(ctx, r.job) + response, err := r.apiClient.FinishJob(ctx, r.conf.Job) if err != nil { // If the API returns with a 422, that means that we // succesfully tried to finish the job, but Buildkite @@ -917,14 +903,14 @@ func (r *JobRunner) jobCancellationChecker(ctx context.Context, wg *sync.WaitGro setStat("📡 Fetching job state from Buildkite") // Re-get the job and check its status to see if it's been cancelled - jobState, _, err := r.apiClient.GetJobState(ctx, r.job.ID) + jobState, _, err := r.apiClient.GetJobState(ctx, r.conf.Job.ID) if err != nil { // We don't really care if it fails, we'll just // try again soon anyway - r.logger.Warn("Problem with getting job state %s (%s)", r.job.ID, err) + r.logger.Warn("Problem with getting job state %s (%s)", r.conf.Job.ID, err) } else if jobState.State == "canceling" || jobState.State == "canceled" { if err := r.Cancel(); err != nil { - r.logger.Error("Unexpected error canceling process as requested by server (job: %s) (err: %s)", r.job.ID, err) + r.logger.Error("Unexpected error canceling process as requested by server (job: %s) (err: %s)", r.conf.Job.ID, err) } } @@ -946,7 +932,7 @@ func (r *JobRunner) onUploadHeaderTime(ctx context.Context, cursor, total int, t roko.WithMaxAttempts(10), roko.WithStrategy(roko.Constant(5*time.Second)), ).DoWithContext(ctx, func(retrier *roko.Retrier) error { - response, err := r.apiClient.SaveHeaderTimes(ctx, r.job.ID, &api.HeaderTimes{Times: times}) + response, err := r.apiClient.SaveHeaderTimes(ctx, r.conf.Job.ID, &api.HeaderTimes{Times: times}) if err != nil { if response != nil && (response.StatusCode >= 400 && response.StatusCode <= 499) { r.logger.Warn("Buildkite rejected the header times (%s)", err) @@ -979,7 +965,7 @@ func (r *JobRunner) onUploadChunk(ctx context.Context, chunk *LogStreamerChunk) roko.WithStrategy(roko.Constant(5*time.Second)), roko.WithJitter(), ).DoWithContext(ctx, func(retrier *roko.Retrier) error { - response, err := r.apiClient.UploadChunk(ctx, r.job.ID, &api.Chunk{ + response, err := r.apiClient.UploadChunk(ctx, r.conf.Job.ID, &api.Chunk{ Data: chunk.Data, Sequence: chunk.Order, Offset: chunk.Offset,