Skip to content

Commit 42b277d

Browse files
committed
Set ownerRef to managed VMClusters
1 parent 129d0b6 commit 42b277d

File tree

5 files changed

+86
-21
lines changed

5 files changed

+86
-21
lines changed

api/operator/v1alpha1/zz_generated.deepcopy.go

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/controller/operator/factory/vmdistributedcluster/vmdistributedcluster.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@ import (
3232
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
3333
k8serrors "k8s.io/apimachinery/pkg/api/errors"
3434
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35+
"k8s.io/apimachinery/pkg/runtime"
3536
"k8s.io/apimachinery/pkg/types"
3637
"k8s.io/apimachinery/pkg/util/wait"
3738
"sigs.k8s.io/controller-runtime/pkg/client"
39+
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
3840

3941
vmv1alpha1 "github.com/VictoriaMetrics/operator/api/operator/v1alpha1"
4042
vmv1beta1 "github.com/VictoriaMetrics/operator/api/operator/v1beta1"
@@ -45,7 +47,7 @@ const (
4547
)
4648

4749
// CreateOrUpdate handles VM deployment reconciliation.
48-
func CreateOrUpdate(ctx context.Context, cr *vmv1alpha1.VMDistributedCluster, rclient client.Client, vmclusterWaitReadyDeadline, httpTimeout time.Duration) error {
50+
func CreateOrUpdate(ctx context.Context, cr *vmv1alpha1.VMDistributedCluster, rclient client.Client, scheme *runtime.Scheme, vmclusterWaitReadyDeadline, httpTimeout time.Duration) error {
4951
// Store the previous CR for comparison
5052
var prevCR *vmv1alpha1.VMDistributedCluster
5153
if cr.ParsedLastAppliedSpec != nil {
@@ -132,15 +134,22 @@ func CreateOrUpdate(ctx context.Context, cr *vmv1alpha1.VMDistributedCluster, rc
132134
if err = rclient.Get(ctx, types.NamespacedName{Name: vmClusterObj.Name, Namespace: vmClusterObj.Namespace}, vmClusterObj); k8serrors.IsNotFound(err) {
133135
needsToBeCreated = true
134136
}
135-
mergedSpec, modified, err := ApplyOverrideSpec(vmClusterObj.Spec, zoneRefOrSpec.OverrideSpec)
137+
138+
// Update the VMCluster when overrideSpec needs to be applied or ownerref set
139+
mergedSpec, modifiedSpec, err := ApplyOverrideSpec(vmClusterObj.Spec, zoneRefOrSpec.OverrideSpec)
136140
if err != nil {
137141
return fmt.Errorf("failed to apply override spec for vmcluster %s at index %d: %w", vmClusterObj.Name, i, err)
138142
}
139-
if !needsToBeCreated && !modified {
143+
modifiedOwnerRef, err := setOwnerRefIfNeeded(cr, vmClusterObj, i, scheme)
144+
if err != nil {
145+
return fmt.Errorf("failed to set owner reference for vmcluster %s at index %d: %w", vmClusterObj.Name, i, err)
146+
}
147+
if !needsToBeCreated && !modifiedSpec && !modifiedOwnerRef {
140148
continue
141149
}
142150

143151
vmClusterObj.Spec = mergedSpec
152+
144153
if needsToBeCreated {
145154
if err := rclient.Create(ctx, vmClusterObj); err != nil {
146155
return fmt.Errorf("failed to create vmcluster %s at index %d after applying override spec: %w", vmClusterObj.Name, i, err)
@@ -179,6 +188,9 @@ func validateVMClusterRefOrSpec(i int, refOrSpec vmv1alpha1.VMClusterRefOrSpec)
179188
if refOrSpec.Spec == nil && refOrSpec.Ref == nil {
180189
return fmt.Errorf("VMClusterRefOrSpec at index %d must have either Ref or Spec set, got: %+v", i, refOrSpec)
181190
}
191+
if refOrSpec.Spec != nil && refOrSpec.OverrideSpec != nil {
192+
return fmt.Errorf("VMClusterRefOrSpec at index %d cannot have both Spec and OverrideSpec set, got: %+v", i, refOrSpec)
193+
}
182194
if refOrSpec.Spec != nil && refOrSpec.Name == "" {
183195
return fmt.Errorf("VMClusterRefOrSpec.Name must be set when Spec is provided for index %d", i)
184196
}
@@ -558,3 +570,22 @@ func fetchVMAgentDiskBufferMetric(ctx context.Context, httpClient *http.Client,
558570
}
559571
return 0, fmt.Errorf("metric %s not found", VMAgentQueueMetricName)
560572
}
573+
574+
func setOwnerRefIfNeeded(cr *vmv1alpha1.VMDistributedCluster, vmClusterObj *vmv1beta1.VMCluster, index int, scheme *runtime.Scheme) (bool, error) {
575+
ref := metav1.OwnerReference{
576+
APIVersion: cr.APIVersion,
577+
Kind: cr.Kind,
578+
UID: cr.GetUID(),
579+
Name: cr.GetName(),
580+
}
581+
if ok, err := controllerutil.HasOwnerReference([]metav1.OwnerReference{ref}, vmClusterObj, scheme); err != nil {
582+
return false, fmt.Errorf("failed to check owner reference for vmcluster %s at index %d: %w", vmClusterObj.Name, index, err)
583+
} else if !ok {
584+
// Set owner reference for the VMCluster to the VMDistributedCluster
585+
if err := controllerutil.SetOwnerReference(cr, vmClusterObj, scheme); err != nil {
586+
return false, fmt.Errorf("failed to set owner reference for vmcluster %s at index %d: %w", vmClusterObj.Name, index, err)
587+
}
588+
return true, nil
589+
}
590+
return false, nil
591+
}

internal/controller/operator/factory/vmdistributedcluster/vmdistributedcluster_test.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -322,13 +322,18 @@ func beforeEach() testData {
322322
}
323323

324324
func TestCreateOrUpdate_ErrorHandling(t *testing.T) {
325+
scheme := runtime.NewScheme()
326+
_ = vmv1alpha1.AddToScheme(scheme)
327+
_ = vmv1beta1.AddToScheme(scheme)
328+
_ = corev1.AddToScheme(scheme)
329+
325330
t.Run("Paused CR should do nothing", func(t *testing.T) {
326331
data := beforeEach()
327332
data.cr.Spec.Paused = true
328333
rclient := data.trackingClient
329334
ctx := context.TODO()
330335

331-
err := CreateOrUpdate(ctx, data.cr, rclient, vmclusterWaitReadyDeadline, httpTimeout)
336+
err := CreateOrUpdate(ctx, data.cr, rclient, scheme, vmclusterWaitReadyDeadline, httpTimeout)
332337
assert.NoError(t, err) // No error as it's paused
333338
assert.Empty(t, rclient.Actions)
334339
})
@@ -339,7 +344,7 @@ func TestCreateOrUpdate_ErrorHandling(t *testing.T) {
339344
rclient := data.trackingClient
340345
ctx := context.TODO()
341346

342-
err := CreateOrUpdate(ctx, data.cr, rclient, vmclusterWaitReadyDeadline, httpTimeout)
347+
err := CreateOrUpdate(ctx, data.cr, rclient, scheme, vmclusterWaitReadyDeadline, httpTimeout)
343348
assert.Error(t, err)
344349
assert.Contains(t, err.Error(), "failed to fetch global vmagent")
345350
})
@@ -350,7 +355,7 @@ func TestCreateOrUpdate_ErrorHandling(t *testing.T) {
350355
rclient := data.trackingClient
351356
ctx := context.TODO()
352357

353-
err := CreateOrUpdate(ctx, data.cr, rclient, vmclusterWaitReadyDeadline, httpTimeout)
358+
err := CreateOrUpdate(ctx, data.cr, rclient, scheme, vmclusterWaitReadyDeadline, httpTimeout)
354359
assert.Error(t, err)
355360
assert.Contains(t, err.Error(), "failed to fetch vmusers")
356361
})
@@ -361,7 +366,7 @@ func TestCreateOrUpdate_ErrorHandling(t *testing.T) {
361366
rclient := data.trackingClient
362367
ctx := context.TODO()
363368

364-
err := CreateOrUpdate(ctx, data.cr, rclient, vmclusterWaitReadyDeadline, httpTimeout)
369+
err := CreateOrUpdate(ctx, data.cr, rclient, scheme, vmclusterWaitReadyDeadline, httpTimeout)
365370
assert.Error(t, err)
366371
assert.Contains(t, err.Error(), "failed to fetch vmclusters")
367372
})
@@ -379,23 +384,23 @@ func TestCreateOrUpdate_ErrorHandling(t *testing.T) {
379384
Spec: &vmv1beta1.VMClusterSpec{},
380385
},
381386
}
382-
err := CreateOrUpdate(ctx, data.cr, rclient, vmclusterWaitReadyDeadline, httpTimeout)
387+
err := CreateOrUpdate(ctx, data.cr, rclient, scheme, vmclusterWaitReadyDeadline, httpTimeout)
383388
assert.Error(t, err)
384389
assert.Contains(t, err.Error(), "either VMClusterRefOrSpec.Spec or VMClusterRefOrSpec.Ref must be set for zone at index 0")
385390

386391
// Neither Ref nor Spec set
387392
data.cr.Spec.Zones = []vmv1alpha1.VMClusterRefOrSpec{
388393
{},
389394
}
390-
err = CreateOrUpdate(ctx, data.cr, rclient, vmclusterWaitReadyDeadline, httpTimeout)
395+
err = CreateOrUpdate(ctx, data.cr, rclient, scheme, vmclusterWaitReadyDeadline, httpTimeout)
391396
assert.Error(t, err)
392397
assert.Contains(t, err.Error(), "VMClusterRefOrSpec.Spec or VMClusterRefOrSpec.Ref must be set for zone at index 0")
393398

394399
// Spec provided but Name missing
395400
data.cr.Spec.Zones = []vmv1alpha1.VMClusterRefOrSpec{
396401
{Spec: &vmv1beta1.VMClusterSpec{}},
397402
}
398-
err = CreateOrUpdate(ctx, data.cr, rclient, vmclusterWaitReadyDeadline, httpTimeout)
403+
err = CreateOrUpdate(ctx, data.cr, rclient, scheme, vmclusterWaitReadyDeadline, httpTimeout)
399404
assert.Error(t, err)
400405
assert.Contains(t, err.Error(), "VMClusterRefOrSpec.Name must be set when Spec is provided for zone at index 0")
401406
})

internal/controller/operator/vmclusterdistributed_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func (r *VMDistributedClusterReconciler) Reconcile(ctx context.Context, req ctrl
9595
// }
9696
r.Client.Scheme().Default(instance)
9797
result, err = reconcileAndTrackStatus(ctx, r.Client, instance.DeepCopy(), func() (ctrl.Result, error) {
98-
if err := vmdistributedcluster.CreateOrUpdate(ctx, instance, r, vmclusterWaitReadyDeadline, httpTimeout); err != nil {
98+
if err := vmdistributedcluster.CreateOrUpdate(ctx, instance, r, r.OriginScheme, vmclusterWaitReadyDeadline, httpTimeout); err != nil {
9999
return result, fmt.Errorf("vmdistributedcluster %s update failed: %w", instance.Name, err)
100100
}
101101

test/e2e/vmdistributedcluster_test.go

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,18 @@ func cleanupVMClusters(ctx context.Context, k8sClient client.Client, vmclusters
8686
}
8787
}
8888

89+
func verifyOwnerReferences(ctx context.Context, cr *vmv1alpha1.VMDistributedCluster, vmclusters []vmv1beta1.VMCluster, namespace string) {
90+
var fetchedVMCluster vmv1beta1.VMCluster
91+
for _, vmcluster := range vmclusters {
92+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: vmcluster.Name, Namespace: namespace}, &fetchedVMCluster)).To(Succeed())
93+
Expect(fetchedVMCluster.GetOwnerReferences()).To(HaveLen(1))
94+
ownerRef := fetchedVMCluster.GetOwnerReferences()[0]
95+
Expect(ownerRef.Kind).To(Equal("VMDistributedCluster"))
96+
Expect(ownerRef.APIVersion).To(Equal("operator.victoriametrics.com/v1alpha1"))
97+
Expect(ownerRef.Name).To(Equal(cr.Name))
98+
}
99+
}
100+
89101
var _ = Describe("e2e vmdistributedcluster", Label("vm", "vmdistributedcluster"), func() {
90102
var ctx context.Context
91103

@@ -210,7 +222,7 @@ var _ = Describe("e2e vmdistributedcluster", Label("vm", "vmdistributedcluster")
210222
}
211223

212224
Context("create", func() {
213-
DescribeTable("should create vmdistributedcluster with VMAgent", func(cr *vmv1alpha1.VMDistributedCluster, vmclusters []vmv1beta1.VMCluster, vmagents map[string]*vmv1beta1.VMAgent, verify func(cr *vmv1alpha1.VMDistributedCluster)) {
225+
DescribeTable("should create vmdistributedcluster with VMAgent", func(cr *vmv1alpha1.VMDistributedCluster, vmclusters []vmv1beta1.VMCluster, vmagents map[string]*vmv1beta1.VMAgent, verify func(cr *vmv1alpha1.VMDistributedCluster, createdVMClusters []vmv1beta1.VMCluster)) {
214226
beforeEach()
215227
DeferCleanup(func() {
216228
cleanupVMClusters(ctx, k8sClient, vmclusters, namespacedName)
@@ -241,7 +253,7 @@ var _ = Describe("e2e vmdistributedcluster", Label("vm", "vmdistributedcluster")
241253
if verify != nil {
242254
var createdCluster vmv1alpha1.VMDistributedCluster
243255
Expect(k8sClient.Get(ctx, namespacedName, &createdCluster)).To(Succeed())
244-
verify(&createdCluster)
256+
verify(&createdCluster, vmclusters)
245257
}
246258
},
247259
Entry("with single VMCluster", &vmv1alpha1.VMDistributedCluster{
@@ -291,12 +303,13 @@ var _ = Describe("e2e vmdistributedcluster", Label("vm", "vmdistributedcluster")
291303
{URL: "http://localhost:8428/api/v1/write"},
292304
},
293305
},
294-
}}, func(cr *vmv1alpha1.VMDistributedCluster) {
306+
}}, func(cr *vmv1alpha1.VMDistributedCluster, createdVMClusters []vmv1beta1.VMCluster) {
295307
Expect(cr.Status.VMClusterInfo).To(HaveLen(1))
296308
names := []string{
297309
cr.Status.VMClusterInfo[0].VMClusterName,
298310
}
299311
Expect(names).To(ContainElements("vmcluster-1"))
312+
verifyOwnerReferences(ctx, cr, createdVMClusters, namespace)
300313
}),
301314
Entry("with multiple VMClusters and VMAgent pairs", &vmv1alpha1.VMDistributedCluster{
302315
ObjectMeta: metav1.ObjectMeta{
@@ -379,13 +392,14 @@ var _ = Describe("e2e vmdistributedcluster", Label("vm", "vmdistributedcluster")
379392
{URL: "http://localhost:8428/api/v1/write"},
380393
},
381394
},
382-
}}, func(cr *vmv1alpha1.VMDistributedCluster) {
395+
}}, func(cr *vmv1alpha1.VMDistributedCluster, createdVMClusters []vmv1beta1.VMCluster) {
383396
Expect(cr.Status.VMClusterInfo).To(HaveLen(2))
384397
names := []string{
385398
cr.Status.VMClusterInfo[0].VMClusterName,
386399
cr.Status.VMClusterInfo[1].VMClusterName,
387400
}
388401
Expect(names).To(ContainElements("vmcluster-1", "vmcluster-2"))
402+
verifyOwnerReferences(ctx, cr, createdVMClusters, namespace)
389403
}),
390404
Entry("with mixed VMClusters (some with VMAgent, some without)", &vmv1alpha1.VMDistributedCluster{
391405
ObjectMeta: metav1.ObjectMeta{
@@ -458,13 +472,14 @@ var _ = Describe("e2e vmdistributedcluster", Label("vm", "vmdistributedcluster")
458472
{URL: "http://localhost:8428/api/v1/write"},
459473
},
460474
},
461-
}}, func(cr *vmv1alpha1.VMDistributedCluster) {
475+
}}, func(cr *vmv1alpha1.VMDistributedCluster, createdVMClusters []vmv1beta1.VMCluster) {
462476
Expect(cr.Status.VMClusterInfo).To(HaveLen(2))
463477
names := []string{
464478
cr.Status.VMClusterInfo[0].VMClusterName,
465479
cr.Status.VMClusterInfo[1].VMClusterName,
466480
}
467481
Expect(names).To(ContainElements("vmcluster-1", "vmcluster-2"))
482+
verifyOwnerReferences(ctx, cr, createdVMClusters, namespace)
468483
}),
469484
)
470485

@@ -540,6 +555,7 @@ var _ = Describe("e2e vmdistributedcluster", Label("vm", "vmdistributedcluster")
540555
Eventually(func() error {
541556
return expectObjectStatusOperational(ctx, k8sClient, &vmv1alpha1.VMDistributedCluster{}, namespacedName)
542557
}, eventualStatefulsetAppReadyTimeout).WithContext(ctx).Should(Succeed())
558+
verifyOwnerReferences(ctx, cr, vmclusters, namespace)
543559

544560
// Apply spec update
545561
Expect(k8sClient.Get(ctx, namespacedName, cr)).To(Succeed())
@@ -630,6 +646,7 @@ var _ = Describe("e2e vmdistributedcluster", Label("vm", "vmdistributedcluster")
630646
Eventually(func() error {
631647
return expectObjectStatusOperational(ctx, k8sClient, &vmv1alpha1.VMDistributedCluster{}, namespacedName)
632648
}, eventualStatefulsetAppReadyTimeout).Should(Succeed())
649+
verifyOwnerReferences(ctx, cr, vmclusters, namespace)
633650

634651
By("pausing the VMDistributedCluster")
635652
// Re-fetch the latest VMDistributedCluster object to avoid conflict errors
@@ -783,6 +800,7 @@ var _ = Describe("e2e vmdistributedcluster", Label("vm", "vmdistributedcluster")
783800
Eventually(func() error {
784801
return expectObjectStatusOperational(ctx, k8sClient, &vmv1alpha1.VMDistributedCluster{}, namespacedName)
785802
}, eventualStatefulsetAppReadyTimeout).WithContext(ctx).Should(Succeed())
803+
verifyOwnerReferences(ctx, cr, vmclusters, namespace)
786804

787805
// Update to update VMAgent
788806
Eventually(func() error {
@@ -843,6 +861,16 @@ var _ = Describe("e2e vmdistributedcluster", Label("vm", "vmdistributedcluster")
843861
},
844862
}
845863
Expect(k8sClient.Create(ctx, cr)).To(Succeed())
864+
var inlineVMClusters []vmv1beta1.VMCluster
865+
for _, zone := range cr.Spec.Zones {
866+
inlineVMClusters = append(inlineVMClusters, vmv1beta1.VMCluster{
867+
ObjectMeta: metav1.ObjectMeta{
868+
Name: zone.Name,
869+
Namespace: namespace,
870+
},
871+
})
872+
}
873+
verifyOwnerReferences(ctx, cr, inlineVMClusters, namespace)
846874
DeferCleanup(func() {
847875
Expect(finalize.SafeDeleteWithFinalizer(ctx, k8sClient, cr)).To(Succeed())
848876
})
@@ -978,6 +1006,7 @@ var _ = Describe("e2e vmdistributedcluster", Label("vm", "vmdistributedcluster")
9781006
Eventually(func() error {
9791007
return expectObjectStatusOperational(ctx, k8sClient, &vmv1alpha1.VMDistributedCluster{}, namespacedName)
9801008
}, eventualStatefulsetAppReadyTimeout).WithContext(ctx).Should(Succeed())
1009+
verifyOwnerReferences(ctx, cr, []vmv1beta1.VMCluster{*initialCluster}, namespace)
9811010

9821011
By("verifying that the referenced VMCluster has the override applied")
9831012
var updatedCluster vmv1beta1.VMCluster

0 commit comments

Comments
 (0)