Skip to content

Commit

Permalink
Allow cancel of queued job
Browse files Browse the repository at this point in the history
  • Loading branch information
hlubek committed Jul 11, 2023
1 parent 4863774 commit a116b54
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 1 deletion.
17 changes: 16 additions & 1 deletion prunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,13 @@ func (j *PipelineJob) deinitScheduler() {
j.taskRunner = nil
}

func (j *PipelineJob) markAsCanceled() {
j.Canceled = true
for i := range j.Tasks {
j.Tasks[i].Canceled = true
}
}

// jobTask is a single task invocation inside the PipelineJob
type jobTask struct {
definition.TaskDef
Expand Down Expand Up @@ -937,7 +944,15 @@ func (r *PipelineRunner) cancelJobInternal(id uuid.UUID) error {
}

if job.Start == nil {
return errJobNotStarted
job.markAsCanceled()

log.
WithField("component", "runner").
WithField("pipeline", job.Pipeline).
WithField("jobID", job.ID).
Debugf("Marked job as canceled, since it was not started")

return nil
}

log.
Expand Down
76 changes: 76 additions & 0 deletions prunner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,70 @@ func TestPipelineRunner_CancelJob_WithRunningJob(t *testing.T) {
}
}

func TestPipelineRunner_CancelJob_WithQueuedJob(t *testing.T) {
var defs = &definition.PipelinesDef{
Pipelines: map[string]definition.PipelineDef{
"long_running": {
// Concurrency of 1 is the default for a single concurrent execution
Concurrency: 1,
QueueLimit: nil,
Tasks: map[string]definition.TaskDef{
"sleep": {
Script: []string{"# that takes long"},
},
},
SourcePath: "fixtures",
},
},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var (
startedJobsIDs []string
wait = make(chan struct{})
)

pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner {
return &test.MockRunner{
OnRun: func(t *task.Task) error {
jobID := t.Variables.Get(taskctl.JobIDVariableName)
startedJobsIDs = append(startedJobsIDs, jobID.(string))

// Wait until the job should proceed (wait channel is closed)
<-wait

return nil
},
}
}, nil, test.NewMockOutputStore())
require.NoError(t, err)

job1, err := pRunner.ScheduleAsync("long_running", ScheduleOpts{})
require.NoError(t, err)

job2, err := pRunner.ScheduleAsync("long_running", ScheduleOpts{})
require.NoError(t, err)

waitForStartedJobTask(t, pRunner, job1.ID, "sleep")

// Make sure the queued job can be canceled
err = pRunner.CancelJob(job2.ID)
require.NoError(t, err)

// Close the channel to let the first job proceed
close(wait)

waitForCompletedJob(t, pRunner, job1.ID)
waitForCanceledJob(t, pRunner, job2.ID)

assert.Equal(t, true, job2.Tasks.ByName("sleep").Canceled, "job task was marked as canceled")

assert.Equal(t, []string{job1.ID.String()}, startedJobsIDs, "only job1 was started")

}

func TestPipelineRunner_CancelJob_WithStoppedJob_ShouldNotThrowFatalError(t *testing.T) {
var defs = &definition.PipelinesDef{
Pipelines: map[string]definition.PipelineDef{
Expand Down Expand Up @@ -456,6 +520,18 @@ func waitForCompletedJob(t *testing.T, pRunner *PipelineRunner, jobID uuid.UUID)
}, 1*time.Millisecond, "job completed")
}

func waitForCanceledJob(t *testing.T, pRunner *PipelineRunner, jobID uuid.UUID) {
t.Helper()

test.WaitForCondition(t, func() bool {
var canceled bool
_ = pRunner.ReadJob(jobID, func(j *PipelineJob) {
canceled = j.Canceled
})
return canceled
}, 1*time.Millisecond, "job canceled")
}

func TestPipelineRunner_ShouldRemoveOldJobsWhenRetentionPeriodIsConfigured(t *testing.T) {
var defs = &definition.PipelinesDef{
Pipelines: map[string]definition.PipelineDef{
Expand Down

0 comments on commit a116b54

Please sign in to comment.