Skip to content

Commit

Permalink
Merge pull request #150 from nayihz/feat-startup-policy
Browse files Browse the repository at this point in the history
feat: support startup policy
  • Loading branch information
k8s-ci-robot committed Jun 3, 2024
2 parents 02039b4 + e10d6ba commit f55ce01
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 21 deletions.
16 changes: 16 additions & 0 deletions api/leaderworkerset/v1/leaderworkerset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ type LeaderWorkerSetSpec struct {
// when a revision is made to the leaderWorkerTemplate.
// +optional
RolloutStrategy RolloutStrategy `json:"rolloutStrategy,omitempty"`

// StartupPolicy determines the startup policy for the worker statefulset.
// +kubebuilder:default=LeaderCreated
// +kubebuilder:validation:Enum={LeaderCreated,LeaderReady}
// +optional
StartupPolicy StartupPolicyType `json:"startupPolicy"`
}

// Template of the leader/worker pods, the group will include at least one leader pod.
Expand Down Expand Up @@ -227,6 +233,16 @@ const (
DefaultRestartPolicy RestartPolicyType = "Default"
)

type StartupPolicyType string

const (
// LeaderReady creates the workers statefulset after the leader pod is ready.
LeaderReadyStartupPolicy StartupPolicyType = "LeaderReady"

// LeaderCreated creates the workers statefulset immediately after the leader pod is created.
LeaderCreatedStartupPolicy StartupPolicyType = "LeaderCreated"
)

// LeaderWorkerSetStatus defines the observed state of LeaderWorkerSet
type LeaderWorkerSetStatus struct {
// Conditions track the condition of the leaderworkerset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15340,6 +15340,14 @@ spec:
required:
- type
type: object
startupPolicy:
default: LeaderCreated
description: StartupPolicy determines the startup policy for the worker
statefulset.
enum:
- LeaderCreated
- LeaderReady
type: string
required:
- leaderWorkerTemplate
type: object
Expand Down
9 changes: 6 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module sigs.k8s.io/lws

go 1.22
go 1.22.0

require (
github.com/google/go-cmp v0.6.0
Expand All @@ -12,7 +12,8 @@ require (
k8s.io/apimachinery v0.29.5
k8s.io/client-go v0.29.5
k8s.io/code-generator v0.29.5
k8s.io/klog/v2 v2.110.1
k8s.io/klog/v2 v2.120.1
k8s.io/kubernetes v1.29.5
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
sigs.k8s.io/controller-runtime v0.17.3
sigs.k8s.io/structured-merge-diff/v4 v4.4.1
Expand Down Expand Up @@ -60,6 +61,7 @@ require (
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/oauth2 v0.12.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/term v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
Expand All @@ -73,7 +75,8 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/component-base v0.29.5 // indirect
k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 // indirect
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
k8s.io/gengo/v2 v2.0.0-20240228010128-51d4e06bde70 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)
13 changes: 8 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ github.com/evanphx/json-patch/v5 v5.8.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/zapr v1.3.0 h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ=
Expand Down Expand Up @@ -200,13 +199,17 @@ k8s.io/component-base v0.29.5 h1:Ptj8AzG+p8c2a839XriHwxakDpZH9uvIgYz+o1agjg8=
k8s.io/component-base v0.29.5/go.mod h1:9nBUoPxW/yimISIgAG7sJDrUGJlu7t8HnDafIrOdU8Q=
k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 h1:pWEwq4Asjm4vjW7vcsmijwBhOr1/shsbSYiWXmNGlks=
k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E=
k8s.io/gengo/v2 v2.0.0-20240228010128-51d4e06bde70 h1:NGrVE502P0s0/1hudf8zjgwki1X/TByhmAoILTarmzo=
k8s.io/gengo/v2 v2.0.0-20240228010128-51d4e06bde70/go.mod h1:VH3AT8AaQOqiGjMF9p0/IM1Dj+82ZwjfxUP1IxaHE+8=
k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0=
k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo=
k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw=
k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kube-aggregator v0.28.1 h1:rvG4llYnQKHjj6YjjoBPEJxfD1uH0DJwkrJTNKGAaCs=
k8s.io/kube-aggregator v0.28.1/go.mod h1:JaLizMe+AECSpO2OmrWVsvnG0V3dX1RpW+Wq/QHbu18=
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780=
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA=
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag=
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98=
k8s.io/kubernetes v1.29.5 h1:G+i73mlMcmqRge1STYospiN8X9FYHGeBOer/e2uGJ1k=
k8s.io/kubernetes v1.29.5/go.mod h1:28sDhcb87LX5z3GWAKYmLrhrifxi4W9bEWua4DRTIvk=
k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI=
k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
sigs.k8s.io/controller-runtime v0.17.3 h1:65QmN7r3FWgTxDMz9fvGnO1kbf2nu+acg9p2R9oYYYk=
Expand Down
3 changes: 0 additions & 3 deletions pkg/controllers/leaderworkerset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,6 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l
return false, err
}

if err != nil {
return false, err
}
var ready, updated bool
if statefulsetutils.StatefulsetReady(sts) && podutils.PodRunningAndReady(leaderPod) {
ready = true
Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
coreapplyv1 "k8s.io/client-go/applyconfigurations/core/v1"
metaapplyv1 "k8s.io/client-go/applyconfigurations/meta/v1"
"k8s.io/klog/v2"
k8spodutils "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -102,6 +103,11 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
}

// logic for handling leader pod
if leaderWorkerSet.Spec.StartupPolicy == leaderworkerset.LeaderReadyStartupPolicy && !k8spodutils.IsPodReady(&pod) {
log.V(2).Info("defer the creation of the worker statefulset because leader pod is not ready.")
return ctrl.Result{}, nil
}

statefulSet, err := constructWorkerStatefulSetApplyConfiguration(pod, leaderWorkerSet)
if err != nil {
return ctrl.Result{}, err
Expand Down
44 changes: 42 additions & 2 deletions test/integration/controllers/leaderworkerset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
v1 "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -271,7 +270,7 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
updates: []*update{
{
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
var scale v1.Scale
var scale autoscalingv1.Scale
gomega.Expect(k8sClient.SubResource("scale").Get(ctx, lws, &scale)).To(gomega.Succeed())
gomega.Expect(int32(scale.Spec.Replicas)).To(gomega.Equal(*lws.Spec.Replicas))
gomega.Expect(int32(scale.Status.Replicas)).To(gomega.Equal(lws.Status.Replicas))
Expand Down Expand Up @@ -1512,6 +1511,47 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
},
},
}),
ginkgo.Entry("create a leaderworkerset with spec.startupPolicyy=LeaderReady", &testCase{
makeLeaderWorkerSet: func(nsName string) *testing.LeaderWorkerSetWrapper {
return testing.BuildLeaderWorkerSet(nsName).Replica(4).StartupPolicy(leaderworkerset.LeaderReadyStartupPolicy)
},
updates: []*update{
{
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
testing.ExpectWorkerStatefulSetsNotCreated(ctx, k8sClient, lws)
},
},
{
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
testing.SetLeaderPodsToReady(ctx, k8sClient, lws, 0, 2)
},
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
testing.ExpectSpecifiedWorkerStatefulSetsCreated(ctx, k8sClient, lws, 0, 2)
testing.ExpectSpecifiedWorkerStatefulSetsNotCreated(ctx, k8sClient, lws, 2, 4)
},
},
{
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
testing.SetLeaderPodsToReady(ctx, k8sClient, lws, 2, 4)
},
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
testing.ExpectSpecifiedWorkerStatefulSetsCreated(ctx, k8sClient, lws, 2, 4)
},
},
},
}),
ginkgo.Entry("create a leaderworkerset with spec.startupPolicyy=LeaderCreated", &testCase{
makeLeaderWorkerSet: func(nsName string) *testing.LeaderWorkerSetWrapper {
return testing.BuildLeaderWorkerSet(nsName).Replica(4).StartupPolicy(leaderworkerset.LeaderCreatedStartupPolicy)
},
updates: []*update{
{
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
testing.ExpectSpecifiedWorkerStatefulSetsCreated(ctx, k8sClient, lws, 0, 4)
},
},
},
}),
) // end of DescribeTable
}) // end of Describe

Expand Down
31 changes: 31 additions & 0 deletions test/integration/webhooks/leaderworkerset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,22 @@ var _ = ginkgo.Describe("leaderworkerset defaulting, creation and update", func(
return testutils.BuildLeaderWorkerSet(ns.Name).Replica(2).Size(2).RestartPolicy(leaderworkerset.RecreateGroupOnPodRestart)
},
}),
ginkgo.Entry("defaulting logic applies when spec.startpolicy is not set", &testDefaultingCase{
makeLeaderWorkerSet: func(ns *corev1.Namespace) *testutils.LeaderWorkerSetWrapper {
return testutils.BuildLeaderWorkerSet(ns.Name).Replica(2).Size(2)
},
getExpectedLWS: func(lws *leaderworkerset.LeaderWorkerSet) *testutils.LeaderWorkerSetWrapper {
return testutils.BuildLeaderWorkerSet(ns.Name).Replica(2).Size(2).StartupPolicy(leaderworkerset.LeaderCreatedStartupPolicy)
},
}),
ginkgo.Entry("defaulting logic applies when spec.startpolicy is set", &testDefaultingCase{
makeLeaderWorkerSet: func(ns *corev1.Namespace) *testutils.LeaderWorkerSetWrapper {
return testutils.BuildLeaderWorkerSet(ns.Name).Replica(2).Size(2).StartupPolicy(leaderworkerset.LeaderReadyStartupPolicy)
},
getExpectedLWS: func(lws *leaderworkerset.LeaderWorkerSet) *testutils.LeaderWorkerSetWrapper {
return testutils.BuildLeaderWorkerSet(ns.Name).Replica(2).Size(2).StartupPolicy(leaderworkerset.LeaderReadyStartupPolicy)
},
}),
ginkgo.Entry("apply default rollout strategy", &testDefaultingCase{
makeLeaderWorkerSet: func(ns *corev1.Namespace) *testutils.LeaderWorkerSetWrapper {
return testutils.BuildLeaderWorkerSet(ns.Name).RolloutStrategy(leaderworkerset.RolloutStrategy{}) // unset rollout strategy
Expand Down Expand Up @@ -197,6 +213,12 @@ var _ = ginkgo.Describe("leaderworkerset defaulting, creation and update", func(
},
lwsCreationShouldFail: true,
}),
ginkgo.Entry("creation with invalid startpolicy should fail", &testValidationCase{
makeLeaderWorkerSet: func(ns *corev1.Namespace) *testutils.LeaderWorkerSetWrapper {
return testutils.BuildLeaderWorkerSet(ns.Name).StartupPolicy("invalidValue")
},
lwsCreationShouldFail: true,
}),
ginkgo.Entry("creation with invalid subGroupSize should fail", &testValidationCase{
makeLeaderWorkerSet: func(ns *corev1.Namespace) *testutils.LeaderWorkerSetWrapper {
return testutils.BuildLeaderWorkerSet(ns.Name).Size(2).SubGroupSize(-1)
Expand Down Expand Up @@ -227,6 +249,15 @@ var _ = ginkgo.Describe("leaderworkerset defaulting, creation and update", func(
},
updateShouldFail: true,
}),
ginkgo.Entry("update with invalid startpolicy should fail", &testValidationCase{
makeLeaderWorkerSet: func(ns *corev1.Namespace) *testutils.LeaderWorkerSetWrapper {
return testutils.BuildLeaderWorkerSet(ns.Name).StartupPolicy(leaderworkerset.LeaderReadyStartupPolicy)
},
updateLeaderWorkerSet: func(lws *leaderworkerset.LeaderWorkerSet) {
lws.Spec.StartupPolicy = "invalidValue"
},
updateShouldFail: true,
}),
ginkgo.Entry("number of size can not be updated", &testValidationCase{
makeLeaderWorkerSet: func(ns *corev1.Namespace) *testutils.LeaderWorkerSetWrapper {
return testutils.BuildLeaderWorkerSet(ns.Name).Replica(1).Size(1)
Expand Down
25 changes: 21 additions & 4 deletions test/testutils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,10 @@ func SetPodGroupsToReady(ctx context.Context, k8sClient client.Client, lws *lead
}
}

// SetPodGroupToReady set one podGroup(leaderPod+workerStatefulset) of leaderWorkerSet to ready state, workerPods not included.
func SetPodGroupToReady(ctx context.Context, k8sClient client.Client, statefulsetName string, lws *leaderworkerset.LeaderWorkerSet) {
func SetLeaderPodToReady(ctx context.Context, k8sClient client.Client, podName string, lws *leaderworkerset.LeaderWorkerSet) {
gomega.Eventually(func() error {
var leaderPod corev1.Pod
if err := k8sClient.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: statefulsetName}, &leaderPod); err != nil {
if err := k8sClient.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: podName}, &leaderPod); err != nil {
return err
}

Expand All @@ -256,7 +255,7 @@ func SetPodGroupToReady(ctx context.Context, k8sClient client.Client, statefulse

gomega.Eventually(func() error {
var leaderPod corev1.Pod
if err := k8sClient.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: statefulsetName}, &leaderPod); err != nil {
if err := k8sClient.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: podName}, &leaderPod); err != nil {
return err
}

Expand All @@ -268,7 +267,11 @@ func SetPodGroupToReady(ctx context.Context, k8sClient client.Client, statefulse
leaderPod.Status.Conditions = append(leaderPod.Status.Conditions, condition)
return k8sClient.Status().Update(ctx, &leaderPod)
}, Timeout, Interval).Should(gomega.Succeed())
}

// SetPodGroupToReady set one podGroup(leaderPod+workerStatefulset) of leaderWorkerSet to ready state, workerPods not included.
func SetPodGroupToReady(ctx context.Context, k8sClient client.Client, statefulsetName string, lws *leaderworkerset.LeaderWorkerSet) {
SetLeaderPodToReady(ctx, k8sClient, statefulsetName, lws)
gomega.Eventually(func() error {
var sts appsv1.StatefulSet
if err := k8sClient.Get(ctx, types.NamespacedName{Name: statefulsetName, Namespace: lws.Namespace}, &sts); err != nil {
Expand Down Expand Up @@ -492,3 +495,17 @@ func DeleteNamespace(ctx context.Context, c client.Client, ns *corev1.Namespace)
}
return nil
}

func SetLeaderPodsToReady(ctx context.Context, k8sClient client.Client, lws *leaderworkerset.LeaderWorkerSet, start, end int) {
var leaderSts appsv1.StatefulSet
gomega.Eventually(func() error {
if err := k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderSts); err != nil {
return err
}
return nil
}, Timeout, Interval).Should(gomega.Succeed())

for i := start; i < end; i++ {
SetLeaderPodToReady(ctx, k8sClient, fmt.Sprintf("%s-%d", leaderSts.Name, i), lws)
}
}
37 changes: 37 additions & 0 deletions test/testutils/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -373,3 +374,39 @@ func ValidateLatestEvent(ctx context.Context, k8sClient client.Client, eventReas

}, Timeout, Interval).Should(gomega.BeNil())
}

func ExpectWorkerStatefulSetsNotCreated(ctx context.Context, k8sClient client.Client, lws *leaderworkerset.LeaderWorkerSet) {
ginkgo.By("checking no worker statefulset created")
gomega.Consistently(func() bool {
var statefulSetList appsv1.StatefulSetList
if err := k8sClient.List(ctx, &statefulSetList, client.InNamespace(lws.Namespace), &client.MatchingLabels{leaderworkerset.SetNameLabelKey: lws.Name}); err != nil {
return false
}
return len(statefulSetList.Items) == 1 && statefulSetList.Items[0].Name == lws.Name
}, Timeout, Interval).Should(gomega.Equal(true))
}

func ExpectSpecifiedWorkerStatefulSetsCreated(ctx context.Context, k8sClient client.Client, lws *leaderworkerset.LeaderWorkerSet, start, end int) {
gomega.Eventually(func() error {
var sts appsv1.StatefulSet
for i := start; i < end; i++ {
if err := k8sClient.Get(ctx, types.NamespacedName{Name: fmt.Sprintf("%s-%d", lws.Name, i), Namespace: lws.Namespace}, &sts); err != nil {
return err
}
}
return nil
}, Timeout, Interval).Should(gomega.BeNil())
}

func ExpectSpecifiedWorkerStatefulSetsNotCreated(ctx context.Context, k8sClient client.Client, lws *leaderworkerset.LeaderWorkerSet, start, end int) {
gomega.Consistently(func() bool {
var sts appsv1.StatefulSet
for i := start; i < end; i++ {
if err := k8sClient.Get(ctx, types.NamespacedName{Name: fmt.Sprintf("%s-%d", lws.Name, i), Namespace: lws.Namespace}, &sts); err == nil ||
!apierrors.IsNotFound(err) {
return false
}
}
return true
}, Timeout, Interval).Should(gomega.Equal(true))
}
Loading

0 comments on commit f55ce01

Please sign in to comment.