Skip to content

Commit

Permalink
Support job deletion in CPL-only k8s tests (#227)
Browse files Browse the repository at this point in the history
* Rename mock ctrl because we want that name for the imports

* support job deletion in local tests

* Test for job restart
  • Loading branch information
l0kix2 authored Apr 12, 2024
1 parent 9a3fb26 commit 058ce31
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 1 deletion.
137 changes: 137 additions & 0 deletions pkg/components/init_job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package components

import (
"context"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1"
"github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy"
"github.com/ytsaurus/yt-k8s-operator/pkg/consts"
"github.com/ytsaurus/yt-k8s-operator/pkg/labeller"
"github.com/ytsaurus/yt-k8s-operator/pkg/testutil"
"github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig"
)

const (
ytsaurusName = "testsaurus"
domain = "testdomain"

scriptBefore = "SCRIPT"
scriptAfter = "UPDATED SCRIPT"
)

var (
waitTimeout = 5 * time.Second
waitTick = 300 * time.Millisecond
)

func prepareTest(t *testing.T, namespace string) (*testutil.TestHelper, *apiproxy.Ytsaurus, *ytconfig.Generator) {
h := testutil.NewTestHelper(t, namespace, filepath.Join("..", "..", "config", "crd", "bases"))
h.Start(func(mgr ctrl.Manager) error { return nil })

ytsaurusResource := testutil.BuildMinimalYtsaurus(namespace, ytsaurusName)
// Deploy of ytsaurus spec is required, so it could set valid owner references for child resources.
testutil.DeployObject(h, &ytsaurusResource)

scheme := runtime.NewScheme()
utilruntime.Must(ytv1.AddToScheme(scheme))
fakeRecorder := record.NewFakeRecorder(100)

ytsaurus := apiproxy.NewYtsaurus(&ytsaurusResource, h.GetK8sClient(), fakeRecorder, scheme)
cfgen := ytconfig.NewGenerator(ytsaurus.GetResource(), domain)
return h, ytsaurus, cfgen
}

func syncJobUntilReady(t *testing.T, job *InitJob) {
ctx := context.Background()

require.Eventually(
t,
func() bool {
err := job.Fetch(ctx)
require.NoError(t, err)
st, err := job.Sync(ctx, false)
require.NoError(t, err)
return st.SyncStatus == SyncStatusReady
},
waitTimeout,
waitTick,
)
}

func newTestJob(ytsaurus *apiproxy.Ytsaurus) *InitJob {
k8sName := "dummy-name"
return NewInitJob(
&labeller.Labeller{
ObjectMeta: &metav1.ObjectMeta{
Name: k8sName,
Namespace: ytsaurus.GetResource().Namespace,
},
ComponentLabel: "ms",
ComponentName: k8sName,
},
ytsaurus.APIProxy(),
ytsaurus,
[]corev1.LocalObjectReference{},
"dummy",
consts.ClientConfigFileName,
"dummy-image",
func() ([]byte, error) { return []byte("dummy-cfg"), nil },
)
}

func TestJobRestart(t *testing.T) {
ctx := context.Background()

namespace := "testjobrestart"
h, ytsaurus, _ := prepareTest(t, namespace)
// TODO: separate helper so no need to remember to call stop
defer h.Stop()

job := newTestJob(ytsaurus)
job.SetInitScript(scriptBefore)
syncJobUntilReady(t, job)

err := job.prepareRestart(ctx, false)
require.NoError(t, err)

t.Log("Ensure job is deleted on restart")
require.Eventually(t,
func() bool {
batchJob := batchv1.Job{}
err = ytsaurus.APIProxy().Client().Get(ctx, client.ObjectKey{
Name: "ms-init-job-dummy",
Namespace: namespace,
}, &batchJob)
return apierrors.IsNotFound(err)
},
waitTimeout,
100*time.Millisecond,
)

t.Log("Wait for restart to be prepared.")
require.Eventually(
t,
func() bool {
job = newTestJob(ytsaurus)
err = job.Fetch(ctx)
require.NoError(t, err)
return job.isRestartPrepared()
},
waitTimeout,
waitTick,
)
}
16 changes: 15 additions & 1 deletion pkg/testutil/testhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewTestHelper(t *testing.T, namespace, crdDirectoryPath string) *TestHelper
cancel: testCancel,
k8sTestEnv: k8sTestEnv,
Namespace: namespace,
ticker: time.NewTicker(1 * time.Second),
ticker: time.NewTicker(200 * time.Millisecond),
}
}

Expand Down Expand Up @@ -182,6 +182,20 @@ func MarkAllJobsCompleted(h *TestHelper) {
job.Status.Succeeded = 1
UpdateObjectStatus(h, job)
}
if !job.DeletionTimestamp.IsZero() {
h.t.Logf(
"found job %s, with deletion ts %s and finalizers: %s. Deleting as gc would do in real cluster.",
job.Name,
job.DeletionTimestamp,
job.Finalizers,
)
job.Finalizers = []string{}
UpdateObject(h, &batchv1.Job{}, job)
err = h.k8sClient.Delete(context.Background(), job)
if err != nil && !apierrors.IsNotFound(err) {
panic(fmt.Sprintf("failed to delete job %s", job))
}
}
}
}

Expand Down

0 comments on commit 058ce31

Please sign in to comment.