From d469485235d5145276cbc04c9cded41b60078b0c Mon Sep 17 00:00:00 2001 From: Derek Wang Date: Wed, 25 Dec 2024 11:44:20 -0800 Subject: [PATCH] chore: cascade deletion on isbsvc pvc (#2305) --- pkg/apis/numaflow/v1alpha1/deprecated.go | 11 +++++++++++ .../numaflow/v1alpha1/jetstream_buffer_service.go | 4 ++++ .../v1alpha1/jetstream_buffer_service_test.go | 4 ++++ pkg/apis/numaflow/v1alpha1/redis_buffer_service.go | 4 ++++ .../numaflow/v1alpha1/redis_buffer_service_test.go | 4 ++++ pkg/reconciler/isbsvc/installer/jetstream.go | 6 +++++- pkg/reconciler/isbsvc/installer/native_redis.go | 6 +++++- 7 files changed, 37 insertions(+), 2 deletions(-) diff --git a/pkg/apis/numaflow/v1alpha1/deprecated.go b/pkg/apis/numaflow/v1alpha1/deprecated.go index 9fd3152115..978c8d60b7 100644 --- a/pkg/apis/numaflow/v1alpha1/deprecated.go +++ b/pkg/apis/numaflow/v1alpha1/deprecated.go @@ -31,3 +31,14 @@ func isSidecarSupported() bool { k8sVersion, _ := strconv.ParseFloat(v, 32) return k8sVersion >= 1.29 } + +// TODO: (k8s 1.27) Remove this once we deprecate the support for k8s < 1.27 +func IsPVCRetentionPolicySupported() bool { + v := os.Getenv(EnvK8sServerVersion) + if v == "" { + return true // default to true if the env var is not found + } + // e.g. 1.31 + k8sVersion, _ := strconv.ParseFloat(v, 32) + return k8sVersion >= 1.27 +} diff --git a/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go b/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go index cd0a1440df..b39d78ea65 100644 --- a/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go +++ b/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go @@ -255,6 +255,10 @@ func (j JetStreamBufferService) GetStatefulSetSpec(req GetJetStreamStatefulSetSp } j.AbstractPodTemplate.ApplyToPodSpec(podSpec) spec := appv1.StatefulSetSpec{ + PersistentVolumeClaimRetentionPolicy: &appv1.StatefulSetPersistentVolumeClaimRetentionPolicy{ + WhenDeleted: appv1.DeletePersistentVolumeClaimRetentionPolicyType, + WhenScaled: appv1.RetainPersistentVolumeClaimRetentionPolicyType, + }, PodManagementPolicy: appv1.ParallelPodManagement, Replicas: &replicas, ServiceName: req.ServiceName, diff --git a/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service_test.go b/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service_test.go index becf05d86a..de87ab6ec5 100644 --- a/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service_test.go +++ b/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + appv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" ) @@ -74,6 +75,9 @@ func TestJetStreamGetStatefulSetSpec(t *testing.T) { }, } spec := s.GetStatefulSetSpec(req) + assert.NotNil(t, spec.PersistentVolumeClaimRetentionPolicy) + assert.Equal(t, appv1.DeletePersistentVolumeClaimRetentionPolicyType, spec.PersistentVolumeClaimRetentionPolicy.WhenDeleted) + assert.Equal(t, appv1.RetainPersistentVolumeClaimRetentionPolicyType, spec.PersistentVolumeClaimRetentionPolicy.WhenScaled) assert.True(t, len(spec.VolumeClaimTemplates) > 0) }) diff --git a/pkg/apis/numaflow/v1alpha1/redis_buffer_service.go b/pkg/apis/numaflow/v1alpha1/redis_buffer_service.go index 258388ab09..c9632cecad 100644 --- a/pkg/apis/numaflow/v1alpha1/redis_buffer_service.go +++ b/pkg/apis/numaflow/v1alpha1/redis_buffer_service.go @@ -338,6 +338,10 @@ redis_exporter`}, nr.AbstractPodTemplate.ApplyToPodSpec(podSpec) spec := appv1.StatefulSetSpec{ + PersistentVolumeClaimRetentionPolicy: &appv1.StatefulSetPersistentVolumeClaimRetentionPolicy{ + WhenDeleted: appv1.DeletePersistentVolumeClaimRetentionPolicyType, + WhenScaled: appv1.RetainPersistentVolumeClaimRetentionPolicyType, + }, Replicas: &replicas, ServiceName: req.ServiceName, Selector: &metav1.LabelSelector{ diff --git a/pkg/apis/numaflow/v1alpha1/redis_buffer_service_test.go b/pkg/apis/numaflow/v1alpha1/redis_buffer_service_test.go index 0fd1c821fd..348fce305f 100644 --- a/pkg/apis/numaflow/v1alpha1/redis_buffer_service_test.go +++ b/pkg/apis/numaflow/v1alpha1/redis_buffer_service_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + appv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" ) @@ -66,6 +67,9 @@ func TestRedisGetStatefulSetSpec(t *testing.T) { }, } spec := s.GetStatefulSetSpec(req) + assert.NotNil(t, spec.PersistentVolumeClaimRetentionPolicy) + assert.Equal(t, appv1.DeletePersistentVolumeClaimRetentionPolicyType, spec.PersistentVolumeClaimRetentionPolicy.WhenDeleted) + assert.Equal(t, appv1.RetainPersistentVolumeClaimRetentionPolicyType, spec.PersistentVolumeClaimRetentionPolicy.WhenScaled) assert.True(t, len(spec.VolumeClaimTemplates) > 0) assert.True(t, len(spec.Template.Spec.InitContainers) > 0) assert.NotNil(t, spec.Template.Spec.SecurityContext) diff --git a/pkg/reconciler/isbsvc/installer/jetstream.go b/pkg/reconciler/isbsvc/installer/jetstream.go index f0faa09caa..e9d92ce82c 100644 --- a/pkg/reconciler/isbsvc/installer/jetstream.go +++ b/pkg/reconciler/isbsvc/installer/jetstream.go @@ -518,7 +518,11 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error { func (r *jetStreamInstaller) Uninstall(ctx context.Context) error { // Clean up metrics _ = reconciler.JetStreamISBSvcReplicas.DeleteLabelValues(r.isbSvc.Namespace, r.isbSvc.Name) - return r.uninstallPVCs(ctx) + // TODO: (k8s 1.27) Remove this once we deprecate the support for k8s < 1.27 + if !dfv1.IsPVCRetentionPolicySupported() { + return r.uninstallPVCs(ctx) + } + return nil } func (r *jetStreamInstaller) uninstallPVCs(ctx context.Context) error { diff --git a/pkg/reconciler/isbsvc/installer/native_redis.go b/pkg/reconciler/isbsvc/installer/native_redis.go index 495d24b03b..84b6ea4a78 100644 --- a/pkg/reconciler/isbsvc/installer/native_redis.go +++ b/pkg/reconciler/isbsvc/installer/native_redis.go @@ -585,7 +585,11 @@ func (r *redisInstaller) createStatefulSet(ctx context.Context) error { func (r *redisInstaller) Uninstall(ctx context.Context) error { // Clean up metrics _ = reconciler.RedisISBSvcReplicas.DeleteLabelValues(r.isbSvc.Namespace, r.isbSvc.Name) - return r.uninstallPVCs(ctx) + // TODO: (k8s 1.27) Remove this once we deprecate the support for k8s < 1.27 + if !dfv1.IsPVCRetentionPolicySupported() { + return r.uninstallPVCs(ctx) + } + return nil } func (r *redisInstaller) uninstallPVCs(ctx context.Context) error {