Skip to content

Commit

Permalink
chore: cascade deletion on isbsvc pvc (numaproj#2305)
Browse files Browse the repository at this point in the history
  • Loading branch information
whynowy authored and SaniyaKalamkar committed Jan 19, 2025
1 parent 8127748 commit d469485
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 2 deletions.
11 changes: 11 additions & 0 deletions pkg/apis/numaflow/v1alpha1/deprecated.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,14 @@ func isSidecarSupported() bool {
k8sVersion, _ := strconv.ParseFloat(v, 32)
return k8sVersion >= 1.29
}

// TODO: (k8s 1.27) Remove this once we deprecate the support for k8s < 1.27
func IsPVCRetentionPolicySupported() bool {
v := os.Getenv(EnvK8sServerVersion)
if v == "" {
return true // default to true if the env var is not found
}
// e.g. 1.31
k8sVersion, _ := strconv.ParseFloat(v, 32)
return k8sVersion >= 1.27
}
4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ func (j JetStreamBufferService) GetStatefulSetSpec(req GetJetStreamStatefulSetSp
}
j.AbstractPodTemplate.ApplyToPodSpec(podSpec)
spec := appv1.StatefulSetSpec{
PersistentVolumeClaimRetentionPolicy: &appv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: appv1.DeletePersistentVolumeClaimRetentionPolicyType,
WhenScaled: appv1.RetainPersistentVolumeClaimRetentionPolicyType,
},
PodManagementPolicy: appv1.ParallelPodManagement,
Replicas: &replicas,
ServiceName: req.ServiceName,
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/jetstream_buffer_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)
Expand Down Expand Up @@ -74,6 +75,9 @@ func TestJetStreamGetStatefulSetSpec(t *testing.T) {
},
}
spec := s.GetStatefulSetSpec(req)
assert.NotNil(t, spec.PersistentVolumeClaimRetentionPolicy)
assert.Equal(t, appv1.DeletePersistentVolumeClaimRetentionPolicyType, spec.PersistentVolumeClaimRetentionPolicy.WhenDeleted)
assert.Equal(t, appv1.RetainPersistentVolumeClaimRetentionPolicyType, spec.PersistentVolumeClaimRetentionPolicy.WhenScaled)
assert.True(t, len(spec.VolumeClaimTemplates) > 0)
})

Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/redis_buffer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ redis_exporter`},
nr.AbstractPodTemplate.ApplyToPodSpec(podSpec)

spec := appv1.StatefulSetSpec{
PersistentVolumeClaimRetentionPolicy: &appv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: appv1.DeletePersistentVolumeClaimRetentionPolicyType,
WhenScaled: appv1.RetainPersistentVolumeClaimRetentionPolicyType,
},
Replicas: &replicas,
ServiceName: req.ServiceName,
Selector: &metav1.LabelSelector{
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/redis_buffer_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)
Expand Down Expand Up @@ -66,6 +67,9 @@ func TestRedisGetStatefulSetSpec(t *testing.T) {
},
}
spec := s.GetStatefulSetSpec(req)
assert.NotNil(t, spec.PersistentVolumeClaimRetentionPolicy)
assert.Equal(t, appv1.DeletePersistentVolumeClaimRetentionPolicyType, spec.PersistentVolumeClaimRetentionPolicy.WhenDeleted)
assert.Equal(t, appv1.RetainPersistentVolumeClaimRetentionPolicyType, spec.PersistentVolumeClaimRetentionPolicy.WhenScaled)
assert.True(t, len(spec.VolumeClaimTemplates) > 0)
assert.True(t, len(spec.Template.Spec.InitContainers) > 0)
assert.NotNil(t, spec.Template.Spec.SecurityContext)
Expand Down
6 changes: 5 additions & 1 deletion pkg/reconciler/isbsvc/installer/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,11 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error {
func (r *jetStreamInstaller) Uninstall(ctx context.Context) error {
// Clean up metrics
_ = reconciler.JetStreamISBSvcReplicas.DeleteLabelValues(r.isbSvc.Namespace, r.isbSvc.Name)
return r.uninstallPVCs(ctx)
// TODO: (k8s 1.27) Remove this once we deprecate the support for k8s < 1.27
if !dfv1.IsPVCRetentionPolicySupported() {
return r.uninstallPVCs(ctx)
}
return nil
}

func (r *jetStreamInstaller) uninstallPVCs(ctx context.Context) error {
Expand Down
6 changes: 5 additions & 1 deletion pkg/reconciler/isbsvc/installer/native_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,11 @@ func (r *redisInstaller) createStatefulSet(ctx context.Context) error {
func (r *redisInstaller) Uninstall(ctx context.Context) error {
// Clean up metrics
_ = reconciler.RedisISBSvcReplicas.DeleteLabelValues(r.isbSvc.Namespace, r.isbSvc.Name)
return r.uninstallPVCs(ctx)
// TODO: (k8s 1.27) Remove this once we deprecate the support for k8s < 1.27
if !dfv1.IsPVCRetentionPolicySupported() {
return r.uninstallPVCs(ctx)
}
return nil
}

func (r *redisInstaller) uninstallPVCs(ctx context.Context) error {
Expand Down

0 comments on commit d469485

Please sign in to comment.