Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: auto-cancel PipelineRuns on PR close #1867

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/formatting/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ package formatting
import (
tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/apis"
)

// PipelineRunStatus return status of PR success failed or skipped.
func PipelineRunStatus(pr *tektonv1.PipelineRun) string {
if len(pr.Status.Conditions) == 0 {
return "neutral"
}
if pr.Status.GetCondition(apis.ConditionSucceeded).GetReason() == tektonv1.PipelineRunSpecStatusCancelled {
return "cancelled"
}
if pr.Status.Conditions[0].Status == corev1.ConditionFalse {
return "failure"
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/formatting/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
"gotest.tools/v3/assert"
corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/apis"
knativeduckv1 "knative.dev/pkg/apis/duck/v1"
)

Expand All @@ -30,6 +31,23 @@ func TestPipelineRunStatus(t *testing.T) {
},
},
},
{
name: "cancelled",
pr: &tektonv1.PipelineRun{
Status: tektonv1.PipelineRunStatus{
Status: knativeduckv1.Status{
Conditions: knativeduckv1.Conditions{
{
Status: corev1.ConditionTrue,
Reason: tektonv1.PipelineRunSpecStatusCancelled,
Message: "Cancelled",
Type: apis.ConditionSucceeded,
},
},
},
},
},
},
{
name: "failure",
pr: &tektonv1.PipelineRun{
Expand Down
15 changes: 8 additions & 7 deletions pkg/params/triggertype/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ func StringToType(s string) Trigger {
}

const (
OkToTest Trigger = "ok-to-test"
Retest Trigger = "retest"
Push Trigger = "push"
PullRequest Trigger = "pull_request"
LabelUpdate Trigger = "label_update"
Cancel Trigger = "cancel"
CheckSuiteRerequested Trigger = "check-suite-rerequested"
CheckRunRerequested Trigger = "check-run-rerequested"
Incoming Trigger = "incoming"
CheckSuiteRerequested Trigger = "check-suite-rerequested"
Comment Trigger = "comment"
Incoming Trigger = "incoming"
LabelUpdate Trigger = "label_update"
OkToTest Trigger = "ok-to-test"
PullRequestClosed Trigger = "pull_request_closed"
PullRequest Trigger = "pull_request" // it's should be "pull_request_opened_updated" but let's keep it simple.
Push Trigger = "push"
Retest Trigger = "retest"
)
105 changes: 63 additions & 42 deletions pkg/pipelineascode/cancel_pipelineruns.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,47 @@ import (
"github.com/openshift-pipelines/pipelines-as-code/pkg/params/triggertype"
)

type matchingCond func(pr tektonv1.PipelineRun) bool

var cancelMergePatch = map[string]interface{}{
"spec": map[string]interface{}{
"status": tektonv1.PipelineRunSpecStatusCancelledRunFinally,
},
}

// cancelInProgress cancels all PipelineRuns associated with a given repository and pull request,
func (p *PacRun) cancelAllInProgressBelongingToPullRequest(ctx context.Context, repo *v1alpha1.Repository) error {
labelSelector := getLabelSelector(map[string]string{
keys.URLRepository: formatting.CleanValueKubernetes(p.event.Repository),
keys.PullRequest: strconv.Itoa(int(p.event.PullRequestNumber)),
})
prs, err := p.run.Clients.Tekton.TektonV1().PipelineRuns(repo.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return fmt.Errorf("failed to list pipelineRuns : %w", err)
}

if len(prs.Items) == 0 {
msg := fmt.Sprintf("no pipelinerun found for repository: %v and pullRequest %v",
p.event.Repository, p.event.PullRequestNumber)
p.eventEmitter.EmitMessage(repo, zap.InfoLevel, "RepositoryPipelineRun", msg)
return nil
}

p.cancelPipelineRuns(ctx, prs, repo, func(_ tektonv1.PipelineRun) bool {
return true
})

return nil
}

// cancelInProgressMatchingPR cancels all PipelineRuns associated with a given repository and pull request,
// except for the one that triggered the cancellation. It first checks if the cancellation is in progress
// and if the repository has a concurrency limit. If a concurrency limit is set, it returns an error as
// cancellation is not supported with concurrency limits. It then retrieves the original pull request name
// from the annotations and lists all PipelineRuns with matching labels. For each PipelineRun that is not
// already done, cancelled, or gracefully stopped, it patches the PipelineRun to cancel it.
func (p *PacRun) cancelInProgress(ctx context.Context, matchPR *tektonv1.PipelineRun, repo *v1alpha1.Repository) error {
func (p *PacRun) cancelInProgressMatchingPR(ctx context.Context, matchPR *tektonv1.PipelineRun, repo *v1alpha1.Repository) error {
if matchPR == nil {
return nil
}
Expand Down Expand Up @@ -67,51 +95,28 @@ func (p *PacRun) cancelInProgress(ctx context.Context, matchPR *tektonv1.Pipelin
if err != nil {
return fmt.Errorf("failed to list pipelineRuns : %w", err)
}
var wg sync.WaitGroup
for _, pr := range prs.Items {
if pr.GetName() == matchPR.GetName() {
continue
}

p.cancelPipelineRuns(ctx, prs, repo, func(pr tektonv1.PipelineRun) bool {
// skip our own for cancellation
if sourceBranch, ok := pr.GetAnnotations()[keys.SourceBranch]; ok {
// NOTE(chmouel): Every PR has their own branch and so is every push to different branch
// it means we only cancel pipelinerun of the same name that runs to
// the unique branch. Note: HeadBranch is the branch from where the PR
// comes from in git jargon.
if sourceBranch != p.event.HeadBranch {
p.logger.Infof("cancel-in-progress: skipping pipelinerun %v/%v as it is not from the same branch, annotation source-branch: %s event headbranch: %s", pr.GetNamespace(), pr.GetName(), sourceBranch, p.event.HeadBranch)
continue
return false
}
}

if pr.IsPending() {
p.logger.Infof("cancel-in-progress: skipping pipelinerun %v/%v as it is pending", pr.GetNamespace(), pr.GetName())
}

if pr.IsDone() {
p.logger.Infof("cancel-in-progress: skipping pipelinerun %v/%v as it is done", pr.GetNamespace(), pr.GetName())
continue
}
if pr.IsCancelled() || pr.IsGracefullyCancelled() || pr.IsGracefullyStopped() {
p.logger.Infof("cancel-in-progress: skipping pipelinerun %v/%v as it is already in %v state", pr.GetNamespace(), pr.GetName(), pr.Spec.Status)
continue
}

p.logger.Infof("cancel-in-progress: cancelling pipelinerun %v/%v", pr.GetNamespace(), pr.GetName())
wg.Add(1)
go func(ctx context.Context, pr tektonv1.PipelineRun) {
defer wg.Done()
if _, err := action.PatchPipelineRun(ctx, p.logger, "cancel patch", p.run.Clients.Tekton, &pr, cancelMergePatch); err != nil {
errMsg := fmt.Sprintf("failed to cancel pipelineRun %s/%s: %s", pr.GetNamespace(), pr.GetName(), err.Error())
p.eventEmitter.EmitMessage(repo, zap.ErrorLevel, "RepositoryPipelineRun", errMsg)
}
}(ctx, pr)
}
wg.Wait()

return pr.GetName() != matchPR.GetName()
})
return nil
}

func (p *PacRun) cancelPipelineRuns(ctx context.Context, repo *v1alpha1.Repository) error {
// cancelPipelineRunsOpsComment cancels all PipelineRuns associated with a given repository and pull request.
// when the user issue a cancel comment.
func (p *PacRun) cancelPipelineRunsOpsComment(ctx context.Context, repo *v1alpha1.Repository) error {
labelSelector := getLabelSelector(map[string]string{
keys.URLRepository: formatting.CleanValueKubernetes(p.event.Repository),
keys.SHA: formatting.CleanValueKubernetes(p.event.SHA),
Expand All @@ -137,22 +142,40 @@ func (p *PacRun) cancelPipelineRuns(ctx context.Context, repo *v1alpha1.Reposito
return nil
}

var wg sync.WaitGroup
for _, pr := range prs.Items {
p.cancelPipelineRuns(ctx, prs, repo, func(pr tektonv1.PipelineRun) bool {
if p.event.TargetCancelPipelineRun != "" {
if prName, ok := pr.GetAnnotations()[keys.OriginalPRName]; !ok || prName != p.event.TargetCancelPipelineRun {
continue
return false
}
}
if pr.IsDone() {
p.logger.Infof("pipelinerun %v/%v is done, skipping cancellation", pr.GetNamespace(), pr.GetName())
return true
})

return nil
}

func (p *PacRun) cancelPipelineRuns(ctx context.Context, prs *tektonv1.PipelineRunList, repo *v1alpha1.Repository, condition matchingCond) {
var wg sync.WaitGroup
for _, pr := range prs.Items {
if !condition(pr) {
continue
}

if pr.IsCancelled() || pr.IsGracefullyCancelled() || pr.IsGracefullyStopped() {
p.logger.Infof("pipelinerun %v/%v is already in %v state", pr.GetNamespace(), pr.GetName(), pr.Spec.Status)
p.logger.Infof("cancel-in-progress: skipping cancelling pipelinerun %v/%v, already in %v state", pr.GetNamespace(), pr.GetName(), pr.Spec.Status)
continue
}

if pr.IsDone() {
p.logger.Infof("cancel-in-progress: skipping cancelling pipelinerun %v/%v, already done", pr.GetNamespace(), pr.GetName())
continue
}

if pr.IsPending() {
p.logger.Infof("cancel-in-progress: skipping cancelling pipelinerun %v/%v in pending state", pr.GetNamespace(), pr.GetName())
}

p.logger.Infof("cancel-in-progress: cancelling pipelinerun %v/%v", pr.GetNamespace(), pr.GetName())
wg.Add(1)
go func(ctx context.Context, pr tektonv1.PipelineRun) {
defer wg.Done()
Expand All @@ -163,8 +186,6 @@ func (p *PacRun) cancelPipelineRuns(ctx context.Context, repo *v1alpha1.Reposito
}(ctx, pr)
}
wg.Wait()

return nil
}

func getLabelSelector(labelsMap map[string]string) string {
Expand Down
96 changes: 92 additions & 4 deletions pkg/pipelineascode/cancel_pipelineruns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var (
}
)

func TestCancelPipelinerun(t *testing.T) {
func TestCancelPipelinerunOpsComment(t *testing.T) {
observer, _ := zapobserver.New(zap.InfoLevel)
logger := zap.New(observer).Sugar()
tests := []struct {
Expand Down Expand Up @@ -300,7 +300,7 @@ func TestCancelPipelinerun(t *testing.T) {
},
}
pac := NewPacs(tt.event, nil, cs, &info.PacOpts{}, nil, logger, nil)
err := pac.cancelPipelineRuns(ctx, tt.repo)
err := pac.cancelPipelineRunsOpsComment(ctx, tt.repo)
assert.NilError(t, err)

got, err := cs.Clients.Tekton.TektonV1().PipelineRuns("foo").List(ctx, metav1.ListOptions{})
Expand All @@ -318,7 +318,7 @@ func TestCancelPipelinerun(t *testing.T) {
}
}

func TestCancelInProgress(t *testing.T) {
func TestCancelInProgressMatchingPR(t *testing.T) {
observer, catcher := zapobserver.New(zap.InfoLevel)
logger := zap.New(observer).Sugar()
tests := []struct {
Expand Down Expand Up @@ -789,7 +789,7 @@ func TestCancelInProgress(t *testing.T) {
if len(tt.pipelineRuns) > 0 {
firstPr = tt.pipelineRuns[0]
}
err := pac.cancelInProgress(ctx, firstPr, tt.repo)
err := pac.cancelInProgressMatchingPR(ctx, firstPr, tt.repo)
if tt.wantErrString != "" {
assert.ErrorContains(t, err, tt.wantErrString)
return
Expand Down Expand Up @@ -818,6 +818,94 @@ func TestCancelInProgress(t *testing.T) {
}
}

func TestCancelAllInProgressBelongingToPullRequest(t *testing.T) {
observer, _ := zapobserver.New(zap.InfoLevel)
logger := zap.New(observer).Sugar()

tests := []struct {
name string
event *info.Event
repo *v1alpha1.Repository
pipelineRuns []*pipelinev1.PipelineRun
cancelledPipelineRuns map[string]bool
}{
{
name: "cancel all in progress PipelineRuns",
event: &info.Event{
Repository: "foo",
TriggerTarget: "pull_request",
PullRequestNumber: pullReqNumber,
},
repo: fooRepo,
pipelineRuns: []*pipelinev1.PipelineRun{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pr-foo-1",
Namespace: "foo",
Labels: fooRepoLabels,
},
Spec: pipelinev1.PipelineRunSpec{},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "pr-foo-2",
Namespace: "foo",
Labels: fooRepoLabels,
},
Spec: pipelinev1.PipelineRunSpec{},
},
},
cancelledPipelineRuns: map[string]bool{
"pr-foo-1": true,
"pr-foo-2": true,
},
},
{
name: "no PipelineRuns to cancel",
event: &info.Event{
Repository: "foo",
TriggerTarget: "pull_request",
PullRequestNumber: pullReqNumber,
},
repo: fooRepo,
pipelineRuns: []*pipelinev1.PipelineRun{},
cancelledPipelineRuns: map[string]bool{},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, _ := rtesting.SetupFakeContext(t)

tdata := testclient.Data{
PipelineRuns: tt.pipelineRuns,
}
stdata, _ := testclient.SeedTestData(t, ctx, tdata)
cs := &params.Run{
Clients: clients.Clients{
Log: logger,
Tekton: stdata.Pipeline,
Kube: stdata.Kube,
},
}
pac := NewPacs(tt.event, nil, cs, &info.PacOpts{}, nil, logger, nil)
err := pac.cancelAllInProgressBelongingToPullRequest(ctx, tt.repo)
assert.NilError(t, err)

got, err := cs.Clients.Tekton.TektonV1().PipelineRuns("foo").List(ctx, metav1.ListOptions{})
assert.NilError(t, err)

for _, pr := range got.Items {
if _, ok := tt.cancelledPipelineRuns[pr.Name]; ok {
assert.Equal(t, string(pr.Spec.Status), pipelinev1.PipelineRunSpecStatusCancelledRunFinally)
} else {
assert.Assert(t, string(pr.Spec.Status) != pipelinev1.PipelineRunSpecStatusCancelledRunFinally)
}
}
})
}
}

func TestGetLabelSelector(t *testing.T) {
tests := []struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipelineascode/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (p *PacRun) matchRepoPR(ctx context.Context) ([]matcher.Match, *v1alpha1.Re
}

if p.event.CancelPipelineRuns {
return nil, repo, p.cancelPipelineRuns(ctx, repo)
return nil, repo, p.cancelPipelineRunsOpsComment(ctx, repo)
}

matchedPRs, err := p.getPipelineRunsFromRepo(ctx, repo)
Expand Down
Loading
Loading