Skip to content

Commit

Permalink
Make sure we collect the right prun in the Queue
Browse files Browse the repository at this point in the history
When initializing the queues, we need to make sure we only collect the
pruns that we want to for pending and running queues.

This filter out as well the non existent ones, in case if it was deleted
but we have the old one referencing it in its state.

Signed-off-by: Chmouel Boudjnah <[email protected]>
  • Loading branch information
chmouel authored and savitaashture committed Nov 5, 2024
1 parent 9dd1a16 commit cf499ef
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 25 deletions.
4 changes: 3 additions & 1 deletion pkg/reconciler/queue_pipelineruns.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

"github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/keys"
"github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/v1alpha1"
"github.com/openshift-pipelines/pipelines-as-code/pkg/kubeinteraction"
"github.com/openshift-pipelines/pipelines-as-code/pkg/sync"
tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -50,7 +52,7 @@ func (r *Reconciler) queuePipelineRun(ctx context.Context, logger *zap.SugaredLo
return nil
}

orderedList := strings.Split(order, ",")
orderedList := sync.FilterPipelineRunByState(ctx, r.run.Clients.Tekton, strings.Split(order, ","), tektonv1.PipelineRunSpecStatusPending, kubeinteraction.StateQueued)
acquired, err := r.qm.AddListToRunningQueue(repo, orderedList)
if err != nil {
return fmt.Errorf("failed to add to queue: %s: %w", pr.GetName(), err)
Expand Down
35 changes: 32 additions & 3 deletions pkg/sync/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,35 @@ func getQueueKey(run *tektonv1.PipelineRun) string {
return fmt.Sprintf("%s/%s", run.Namespace, run.Name)
}

// FilterPipelineRunByState filters the given list of PipelineRun names to only include those
// that are in a "queued" state and have a pending status. It retrieves the PipelineRun objects
// from the Tekton API and checks their annotations and status to determine if they should be included.
//
// Returns A list of PipelineRun names that are in a "queued" state and have a pending status.
func FilterPipelineRunByState(ctx context.Context, tekton versioned2.Interface, orderList []string, wantedStatus, wantedState string) []string {
orderedList := []string{}
for _, prName := range orderList {
prKey := strings.Split(prName, "/")
pr, err := tekton.TektonV1().PipelineRuns(prKey[0]).Get(ctx, prKey[1], v1.GetOptions{})
if err != nil {
continue
}

state, exist := pr.GetAnnotations()[keys.State]
if !exist {
continue
}

if state == wantedState {
if wantedStatus != "" && pr.Spec.Status != tektonv1.PipelineRunSpecStatus(wantedStatus) {
continue
}
orderedList = append(orderedList, prName)
}
}
return orderedList
}

// InitQueues rebuild all the queues for all repository if concurrency is defined before
// reconciler started reconciling them.
func (qm *QueueManager) InitQueues(ctx context.Context, tekton versioned2.Interface, pac versioned.Interface) error {
Expand Down Expand Up @@ -194,7 +223,8 @@ func (qm *QueueManager) InitQueues(ctx context.Context, tekton versioned2.Interf
// if the pipelineRun doesn't have order label then wait
return nil
}
orderedList := strings.Split(order, ",")
orderedList := FilterPipelineRunByState(ctx, tekton, strings.Split(order, ","), "", kubeinteraction.StateStarted)

_, err = qm.AddListToRunningQueue(&repo, orderedList)
if err != nil {
qm.logger.Error("failed to init queue for repo: ", repo.GetName())
Expand All @@ -219,8 +249,7 @@ func (qm *QueueManager) InitQueues(ctx context.Context, tekton versioned2.Interf
// if the pipelineRun doesn't have order label then wait
return nil
}
orderedList := strings.Split(order, ",")

orderedList := FilterPipelineRunByState(ctx, tekton, strings.Split(order, ","), tektonv1.PipelineRunSpecStatusPending, kubeinteraction.StateQueued)
if err := qm.AddToPendingQueue(&repo, orderedList); err != nil {
qm.logger.Error("failed to init queue for repo: ", repo.GetName())
}
Expand Down
113 changes: 92 additions & 21 deletions pkg/sync/queue_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sync

import (
"fmt"
"testing"
"time"

Expand All @@ -16,6 +17,7 @@ import (
"go.uber.org/zap"
zapobserver "go.uber.org/zap/zaptest/observer"
"gotest.tools/v3/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
rtesting "knative.dev/pkg/reconciler/testing"
Expand All @@ -30,7 +32,7 @@ func TestSomeoneElseSetPendingWithNoConcurrencyLimit(t *testing.T) {
// unset concurrency limit
repo.Spec.ConcurrencyLimit = nil

pr := newTestPR("first", time.Now(), nil, nil)
pr := newTestPR("first", time.Now(), nil, nil, tektonv1.PipelineRunSpec{})
// set to pending
pr.Status.Conditions = duckv1.Conditions{
{
Expand All @@ -52,7 +54,7 @@ func TestAddToPendingQueueDirectly(t *testing.T) {
// unset concurrency limit
repo.Spec.ConcurrencyLimit = nil

pr := newTestPR("first", time.Now(), nil, nil)
pr := newTestPR("first", time.Now(), nil, nil, tektonv1.PipelineRunSpec{})
// set to pending
pr.Status.Conditions = duckv1.Conditions{
{
Expand All @@ -77,7 +79,7 @@ func TestNewQueueManagerForList(t *testing.T) {
repo := newTestRepo(1)

// first pipelineRun
prFirst := newTestPR("first", time.Now(), nil, nil)
prFirst := newTestPR("first", time.Now(), nil, nil, tektonv1.PipelineRunSpec{})

// added to queue, as there is only one should start
started, err := qm.AddListToRunningQueue(repo, []string{getQueueKey(prFirst)})
Expand All @@ -89,8 +91,8 @@ func TestNewQueueManagerForList(t *testing.T) {

// adding another 2 pipelineRun, limit is 1 so this will be added to pending queue and
// then one will be started
prSecond := newTestPR("second", time.Now().Add(1*time.Second), nil, nil)
prThird := newTestPR("third", time.Now().Add(7*time.Second), nil, nil)
prSecond := newTestPR("second", time.Now().Add(1*time.Second), nil, nil, tektonv1.PipelineRunSpec{})
prThird := newTestPR("third", time.Now().Add(7*time.Second), nil, nil, tektonv1.PipelineRunSpec{})

started, err = qm.AddListToRunningQueue(repo, []string{getQueueKey(prSecond), getQueueKey(prThird)})
assert.NilError(t, err)
Expand All @@ -99,8 +101,8 @@ func TestNewQueueManagerForList(t *testing.T) {
assert.Equal(t, started[0], getQueueKey(prSecond))

// adding 2 more, will be going to pending queue
prFourth := newTestPR("fourth", time.Now().Add(5*time.Second), nil, nil)
prFifth := newTestPR("fifth", time.Now().Add(4*time.Second), nil, nil)
prFourth := newTestPR("fourth", time.Now().Add(5*time.Second), nil, nil, tektonv1.PipelineRunSpec{})
prFifth := newTestPR("fifth", time.Now().Add(4*time.Second), nil, nil, tektonv1.PipelineRunSpec{})

started, err = qm.AddListToRunningQueue(repo, []string{getQueueKey(prFourth), getQueueKey(prFifth)})
assert.NilError(t, err)
Expand All @@ -112,9 +114,9 @@ func TestNewQueueManagerForList(t *testing.T) {
// changing the concurrency limit to 2
repo.Spec.ConcurrencyLimit = intPtr(2)

prSixth := newTestPR("sixth", time.Now().Add(7*time.Second), nil, nil)
prSeventh := newTestPR("seventh", time.Now().Add(5*time.Second), nil, nil)
prEight := newTestPR("eight", time.Now().Add(4*time.Second), nil, nil)
prSixth := newTestPR("sixth", time.Now().Add(7*time.Second), nil, nil, tektonv1.PipelineRunSpec{})
prSeventh := newTestPR("seventh", time.Now().Add(5*time.Second), nil, nil, tektonv1.PipelineRunSpec{})
prEight := newTestPR("eight", time.Now().Add(4*time.Second), nil, nil, tektonv1.PipelineRunSpec{})

started, err = qm.AddListToRunningQueue(repo, []string{getQueueKey(prSixth), getQueueKey(prSeventh), getQueueKey(prEight)})
assert.NilError(t, err)
Expand All @@ -132,9 +134,9 @@ func TestNewQueueManagerReListing(t *testing.T) {
// repository for which pipelineRun are created
repo := newTestRepo(2)

prFirst := newTestPR("first", time.Now(), nil, nil)
prSecond := newTestPR("second", time.Now().Add(1*time.Second), nil, nil)
prThird := newTestPR("third", time.Now().Add(7*time.Second), nil, nil)
prFirst := newTestPR("first", time.Now(), nil, nil, tektonv1.PipelineRunSpec{})
prSecond := newTestPR("second", time.Now().Add(1*time.Second), nil, nil, tektonv1.PipelineRunSpec{})
prThird := newTestPR("third", time.Now().Add(7*time.Second), nil, nil, tektonv1.PipelineRunSpec{})

// added to queue, as there is only one should start
started, err := qm.AddListToRunningQueue(repo, []string{getQueueKey(prFirst), getQueueKey(prSecond), getQueueKey(prThird)})
Expand All @@ -158,9 +160,9 @@ func TestNewQueueManagerReListing(t *testing.T) {
assert.Equal(t, qm.QueuedPipelineRuns(repo)[0], "test-ns/third")

// a new request comes
prFourth := newTestPR("fourth", time.Now(), nil, nil)
prFifth := newTestPR("fifth", time.Now().Add(1*time.Second), nil, nil)
prSixths := newTestPR("sixth", time.Now().Add(7*time.Second), nil, nil)
prFourth := newTestPR("fourth", time.Now(), nil, nil, tektonv1.PipelineRunSpec{})
prFifth := newTestPR("fifth", time.Now().Add(1*time.Second), nil, nil, tektonv1.PipelineRunSpec{})
prSixths := newTestPR("sixth", time.Now().Add(7*time.Second), nil, nil, tektonv1.PipelineRunSpec{})

started, err = qm.AddListToRunningQueue(repo, []string{getQueueKey(prFourth), getQueueKey(prFifth), getQueueKey(prSixths)})
assert.NilError(t, err)
Expand All @@ -184,7 +186,7 @@ func newTestRepo(limit int) *v1alpha1.Repository {

var intPtr = func(val int) *int { return &val }

func newTestPR(name string, time time.Time, labels, annotations map[string]string) *tektonv1.PipelineRun {
func newTestPR(name string, time time.Time, labels, annotations map[string]string, spec tektonv1.PipelineRunSpec) *tektonv1.PipelineRun {
return &tektonv1.PipelineRun{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -194,7 +196,7 @@ func newTestPR(name string, time time.Time, labels, annotations map[string]strin
Labels: labels,
Annotations: annotations,
},
Spec: tektonv1.PipelineRunSpec{},
Spec: spec,
Status: tektonv1.PipelineRunStatus{},
}
}
Expand Down Expand Up @@ -222,9 +224,13 @@ func TestQueueManager_InitQueues(t *testing.T) {
keys.ExecutionOrder: "test-ns/first,test-ns/second,test-ns/third",
keys.State: kubeinteraction.StateStarted,
}
firstPR := newTestPR("first", cw.Now(), startedLabel, startedAnnotations)
secondPR := newTestPR("second", cw.Now().Add(5*time.Second), queuedLabel, queuedAnnotations)
thirdPR := newTestPR("third", cw.Now().Add(3*time.Second), queuedLabel, queuedAnnotations)
firstPR := newTestPR("first", cw.Now(), startedLabel, startedAnnotations, tektonv1.PipelineRunSpec{})
secondPR := newTestPR("second", cw.Now().Add(5*time.Second), queuedLabel, queuedAnnotations, tektonv1.PipelineRunSpec{
Status: tektonv1.PipelineRunSpecStatusPending,
})
thirdPR := newTestPR("third", cw.Now().Add(3*time.Second), queuedLabel, queuedAnnotations, tektonv1.PipelineRunSpec{
Status: tektonv1.PipelineRunSpecStatusPending,
})

tdata := testclient.Data{
Repositories: []*v1alpha1.Repository{repo},
Expand Down Expand Up @@ -255,3 +261,68 @@ func TestQueueManager_InitQueues(t *testing.T) {
runs = qm.QueuedPipelineRuns(repo)
assert.Equal(t, len(runs), 1)
}

func TestFilterPipelineRunByInProgress(t *testing.T) {
ctx, _ := rtesting.SetupFakeContext(t)
ns := "test-ns"

// Create a fake Tekton client
pipelineRuns := []*tektonv1.PipelineRun{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pr1",
Namespace: ns,
Annotations: map[string]string{
keys.State: kubeinteraction.StateQueued,
},
},
Spec: tektonv1.PipelineRunSpec{
Status: tektonv1.PipelineRunSpecStatusPending,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "pr2",
Namespace: ns,
Annotations: map[string]string{
keys.State: kubeinteraction.StateCompleted,
},
},
Spec: tektonv1.PipelineRunSpec{
Status: tektonv1.PipelineRunSpecStatusPending,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "pr3",
Namespace: ns,
Annotations: map[string]string{
keys.State: kubeinteraction.StateQueued,
},
},
Spec: tektonv1.PipelineRunSpec{
Status: tektonv1.PipelineRunSpecStatusCancelled,
},
},
}

tdata := testclient.Data{
Namespaces: []*corev1.Namespace{
{
ObjectMeta: metav1.ObjectMeta{
Name: ns,
},
},
},
PipelineRuns: pipelineRuns,
}

orderList := []string{}
for _, pr := range pipelineRuns {
orderList = append(orderList, fmt.Sprintf("%s/%s", ns, pr.GetName()))
}
stdata, _ := testclient.SeedTestData(t, ctx, tdata)
filtered := FilterPipelineRunByState(ctx, stdata.Pipeline, orderList, tektonv1.PipelineRunSpecStatusPending, kubeinteraction.StateQueued)
expected := []string{"test-ns/pr1"}
assert.DeepEqual(t, filtered, expected)
}

0 comments on commit cf499ef

Please sign in to comment.