From a116b54aca8d8b5a21461d37bb7e7235a21cb8fc Mon Sep 17 00:00:00 2001 From: Christopher Hlubek Date: Tue, 11 Jul 2023 11:10:45 +0200 Subject: [PATCH] Allow cancel of queued job --- prunner.go | 17 ++++++++++- prunner_test.go | 76 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 1 deletion(-) diff --git a/prunner.go b/prunner.go index 5d4aea3..63cafc3 100644 --- a/prunner.go +++ b/prunner.go @@ -153,6 +153,13 @@ func (j *PipelineJob) deinitScheduler() { j.taskRunner = nil } +func (j *PipelineJob) markAsCanceled() { + j.Canceled = true + for i := range j.Tasks { + j.Tasks[i].Canceled = true + } +} + // jobTask is a single task invocation inside the PipelineJob type jobTask struct { definition.TaskDef @@ -937,7 +944,15 @@ func (r *PipelineRunner) cancelJobInternal(id uuid.UUID) error { } if job.Start == nil { - return errJobNotStarted + job.markAsCanceled() + + log. + WithField("component", "runner"). + WithField("pipeline", job.Pipeline). + WithField("jobID", job.ID). + Debugf("Marked job as canceled, since it was not started") + + return nil } log. diff --git a/prunner_test.go b/prunner_test.go index 3295fba..ff2d51b 100644 --- a/prunner_test.go +++ b/prunner_test.go @@ -270,6 +270,70 @@ func TestPipelineRunner_CancelJob_WithRunningJob(t *testing.T) { } } +func TestPipelineRunner_CancelJob_WithQueuedJob(t *testing.T) { + var defs = &definition.PipelinesDef{ + Pipelines: map[string]definition.PipelineDef{ + "long_running": { + // Concurrency of 1 is the default for a single concurrent execution + Concurrency: 1, + QueueLimit: nil, + Tasks: map[string]definition.TaskDef{ + "sleep": { + Script: []string{"# that takes long"}, + }, + }, + SourcePath: "fixtures", + }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ( + startedJobsIDs []string + wait = make(chan struct{}) + ) + + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { + return &test.MockRunner{ + OnRun: func(t *task.Task) error { + jobID := t.Variables.Get(taskctl.JobIDVariableName) + startedJobsIDs = append(startedJobsIDs, jobID.(string)) + + // Wait until the job should proceed (wait channel is closed) + <-wait + + return nil + }, + } + }, nil, test.NewMockOutputStore()) + require.NoError(t, err) + + job1, err := pRunner.ScheduleAsync("long_running", ScheduleOpts{}) + require.NoError(t, err) + + job2, err := pRunner.ScheduleAsync("long_running", ScheduleOpts{}) + require.NoError(t, err) + + waitForStartedJobTask(t, pRunner, job1.ID, "sleep") + + // Make sure the queued job can be canceled + err = pRunner.CancelJob(job2.ID) + require.NoError(t, err) + + // Close the channel to let the first job proceed + close(wait) + + waitForCompletedJob(t, pRunner, job1.ID) + waitForCanceledJob(t, pRunner, job2.ID) + + assert.Equal(t, true, job2.Tasks.ByName("sleep").Canceled, "job task was marked as canceled") + + assert.Equal(t, []string{job1.ID.String()}, startedJobsIDs, "only job1 was started") + +} + func TestPipelineRunner_CancelJob_WithStoppedJob_ShouldNotThrowFatalError(t *testing.T) { var defs = &definition.PipelinesDef{ Pipelines: map[string]definition.PipelineDef{ @@ -456,6 +520,18 @@ func waitForCompletedJob(t *testing.T, pRunner *PipelineRunner, jobID uuid.UUID) }, 1*time.Millisecond, "job completed") } +func waitForCanceledJob(t *testing.T, pRunner *PipelineRunner, jobID uuid.UUID) { + t.Helper() + + test.WaitForCondition(t, func() bool { + var canceled bool + _ = pRunner.ReadJob(jobID, func(j *PipelineJob) { + canceled = j.Canceled + }) + return canceled + }, 1*time.Millisecond, "job canceled") +} + func TestPipelineRunner_ShouldRemoveOldJobsWhenRetentionPeriodIsConfigured(t *testing.T) { var defs = &definition.PipelinesDef{ Pipelines: map[string]definition.PipelineDef{