Skip to content

Commit 15d1b83

Browse files
vmagent and vmanomaly: create PDB per shard to guarantee proper application protection, before there was a single PDB, which could lead to unpredicted application disruptions. fixes #1548
1 parent 51b2a5d commit 15d1b83

File tree

10 files changed

+466
-559
lines changed

10 files changed

+466
-559
lines changed

api/operator/v1beta1/vmagent_types.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,14 @@ func (cr *VMAgent) UnmarshalJSON(src []byte) error {
435435
return nil
436436
}
437437

438+
// GetShardCount returns shard count for vmagent
439+
func (cr *VMAgent) GetShardCount() int {
440+
if cr == nil || cr.Spec.ShardCount == nil {
441+
return 0
442+
}
443+
return *cr.Spec.ShardCount
444+
}
445+
438446
// UnmarshalJSON implements json.Unmarshaler interface
439447
func (cr *VMAgentSpec) UnmarshalJSON(src []byte) error {
440448
type pcr VMAgentSpec

docs/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ This change could be reverted by providing env variable `VM_USECUSTOMCONFIGRELOA
6666
* BUGFIX: [vmoperator](https://docs.victoriametrics.com/operator/): fix an issue where the return value from a couple of controllers was always `nil`. See [#1532](https://github.com/VictoriaMetrics/operator/pull/1532) for details.
6767
* BUGFIX: [VMCluster](https://docs.victoriametrics.com/operator/resources/vmcluster/): emit warning if `vmcluster.spec.vmselect.persistentVolume` is set, previously it was emitted for `vmcluster.spec.vmselect.storage`.
6868
* BUGFIX: [vmoperator](https://docs.victoriametrics.com/operator/): Prevent endless Service reconcile loop by correctly track changes to Service.spec.LoadBalancerClass. See this issue [#1550](https://github.com/VictoriaMetrics/operator/issues/1550) for details.
69+
* BUGFIX: [vmagent](https://docs.victoriametrics.com/operator/resources/vmagent/) and [vmanomaly](https://docs.victoriametrics.com/operator/resources/vmanomaly/): create PDB per shard to guarantee proper application protection. See [#1548](https://github.com/VictoriaMetrics/operator/issues/1548).
6970

7071

7172
## [v0.63.0](https://github.com/VictoriaMetrics/operator/releases/tag/v0.63.0)

internal/controller/operator/factory/build/pdb.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package build
22

33
import (
4+
"fmt"
5+
"strconv"
6+
47
policyv1 "k8s.io/api/policy/v1"
58
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
69

@@ -9,6 +12,11 @@ import (
912

1013
// PodDisruptionBudget creates object for given CRD
1114
func PodDisruptionBudget(cr builderOpts, spec *vmv1beta1.EmbeddedPodDisruptionBudgetSpec) *policyv1.PodDisruptionBudget {
15+
return PodDisruptionBudgetSharded(cr, spec, nil)
16+
}
17+
18+
// PodDisruptionBudgetSharded creates object for given CRD and shard num
19+
func PodDisruptionBudgetSharded(cr builderOpts, spec *vmv1beta1.EmbeddedPodDisruptionBudgetSpec, num *int) *policyv1.PodDisruptionBudget {
1220
pdb := policyv1.PodDisruptionBudget{
1321
ObjectMeta: metav1.ObjectMeta{
1422
Name: cr.PrefixedName(),
@@ -25,6 +33,10 @@ func PodDisruptionBudget(cr builderOpts, spec *vmv1beta1.EmbeddedPodDisruptionBu
2533
},
2634
},
2735
}
36+
if num != nil {
37+
pdb.Name = fmt.Sprintf("%s-%d", pdb.Name, *num)
38+
pdb.Spec.Selector.MatchLabels[shardLabelName] = strconv.Itoa(*num)
39+
}
2840
if len(spec.UnhealthyPodEvictionPolicy) > 0 {
2941
p := policyv1.UnhealthyPodEvictionPolicyType(spec.UnhealthyPodEvictionPolicy)
3042
pdb.Spec.UnhealthyPodEvictionPolicy = &p
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package build
2+
3+
import (
4+
"fmt"
5+
"iter"
6+
"strconv"
7+
8+
"github.com/VictoriaMetrics/operator/internal/controller/operator/factory/k8stools"
9+
)
10+
11+
const (
12+
ShardNumPlaceholder = "%SHARD_NUM%"
13+
shardLabelName = "shard-num"
14+
)
15+
16+
// ShardNumIter iterates over shardCount in order defined in backward
17+
func ShardNumIter(backward bool, shardCount int) iter.Seq[*int] {
18+
if shardCount <= 1 {
19+
return func(yield func(*int) bool) {
20+
yield(nil)
21+
}
22+
}
23+
if backward {
24+
return func(yield func(*int) bool) {
25+
for shardCount > 0 {
26+
shardCount--
27+
num := shardCount
28+
if !yield(&num) {
29+
return
30+
}
31+
}
32+
}
33+
}
34+
return func(yield func(*int) bool) {
35+
for i := 0; i < shardCount; i++ {
36+
if !yield(&i) {
37+
return
38+
}
39+
}
40+
}
41+
}
42+
43+
func isSharded(shardCount int) bool {
44+
return shardCount > 1
45+
}
46+
47+
// ShardName adds shard suffix to a name if shards count is > 1
48+
func ShardName(name string, shardCount int) string {
49+
if isSharded(shardCount) {
50+
name = fmt.Sprintf("%s-%s", name, ShardNumPlaceholder)
51+
}
52+
return name
53+
}
54+
55+
// ShardLabels adds shard label pair if shards count is > 1
56+
func ShardLabels(labels map[string]string, shardCount int) map[string]string {
57+
if isSharded(shardCount) {
58+
labels[shardLabelName] = "%SHARD_NUM%"
59+
}
60+
return labels
61+
}
62+
63+
// RenderShard renders resource's placeholders with optional shard number placeholder in num is defined
64+
func RenderShard[T any](resource *T, placeholders map[string]string, num *int) (*T, error) {
65+
if placeholders == nil {
66+
placeholders = make(map[string]string)
67+
}
68+
if num != nil {
69+
placeholders[ShardNumPlaceholder] = strconv.Itoa(*num)
70+
}
71+
return k8stools.RenderPlaceholders(resource, placeholders)
72+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package build
2+
3+
import (
4+
"slices"
5+
"testing"
6+
)
7+
8+
func TestShardNumIter(t *testing.T) {
9+
f := func(backward bool, upperBound int) {
10+
output := slices.Collect(ShardNumIter(backward, upperBound))
11+
if upperBound <= 1 {
12+
if len(output) != 1 || output[0] != nil {
13+
t.Errorf("invalid ShardNumIter() values, want: [nil]; got: %v", output)
14+
}
15+
return
16+
}
17+
if upperBound > 1 && len(output) != upperBound {
18+
t.Errorf("invalid ShardNumIter() items count, want: %d, got: %d", upperBound, len(output))
19+
}
20+
var lowerBound int
21+
if backward {
22+
lowerBound = upperBound - 1
23+
upperBound = 0
24+
} else {
25+
upperBound--
26+
}
27+
if *output[0] != lowerBound || *output[len(output)-1] != upperBound {
28+
t.Errorf("invalid ShardNumIter() bounds, want: [%d, %d], got: [%d, %d]", lowerBound, upperBound, *output[0], *output[len(output)-1])
29+
}
30+
}
31+
f(true, 9)
32+
f(false, 5)
33+
f(false, 1)
34+
f(true, 0)
35+
}

internal/controller/operator/factory/finalize/orphaned.go

Lines changed: 42 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ package finalize
33
import (
44
"context"
55

6-
appsv1 "k8s.io/api/apps/v1"
6+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
77
"k8s.io/apimachinery/pkg/labels"
8+
"k8s.io/apimachinery/pkg/runtime/schema"
89
"sigs.k8s.io/controller-runtime/pkg/client"
910
)
1011

@@ -14,84 +15,71 @@ type orphanedCRD interface {
1415
}
1516

1617
// RemoveOrphanedDeployments removes deployments detached from given object
17-
func RemoveOrphanedDeployments(ctx context.Context, rclient client.Client, cr orphanedCRD, keepDeployments map[string]struct{}) error {
18-
deployToRemove, err := discoverDeploymentsByLabels(ctx, rclient, cr.GetNamespace(), cr.SelectorLabels())
19-
if err != nil {
20-
return err
21-
}
22-
for i := range deployToRemove {
23-
dep := deployToRemove[i]
24-
if _, ok := keepDeployments[dep.Name]; !ok {
25-
// need to remove
26-
if err := RemoveFinalizer(ctx, rclient, dep); err != nil {
27-
return err
28-
}
29-
if err := SafeDelete(ctx, rclient, dep); err != nil {
30-
return err
31-
}
32-
}
18+
func RemoveOrphanedDeployments(ctx context.Context, rclient client.Client, cr orphanedCRD, keepNames map[string]struct{}) error {
19+
gvk := schema.GroupVersionKind{
20+
Group: "apps",
21+
Version: "v1",
22+
Kind: "Deployment",
3323
}
34-
return nil
24+
return removeOrphaned(ctx, rclient, cr, gvk, keepNames)
3525
}
3626

37-
// discoverDeploymentsByLabels - returns deployments with given args.
38-
func discoverDeploymentsByLabels(ctx context.Context, rclient client.Client, ns string, selector map[string]string) ([]*appsv1.Deployment, error) {
39-
var deps appsv1.DeploymentList
40-
opts := client.ListOptions{
41-
Namespace: ns,
42-
LabelSelector: labels.SelectorFromSet(selector),
43-
}
44-
if err := rclient.List(ctx, &deps, &opts); err != nil {
45-
return nil, err
46-
}
47-
resp := make([]*appsv1.Deployment, 0, len(deps.Items))
48-
for i := range deps.Items {
49-
resp = append(resp, &deps.Items[i])
27+
// RemoveOrphanedSTSs removes deployments detached from given object
28+
func RemoveOrphanedSTSs(ctx context.Context, rclient client.Client, cr orphanedCRD, keepNames map[string]struct{}) error {
29+
gvk := schema.GroupVersionKind{
30+
Group: "apps",
31+
Version: "v1",
32+
Kind: "StatefulSet",
5033
}
51-
return resp, nil
34+
return removeOrphaned(ctx, rclient, cr, gvk, keepNames)
5235
}
5336

54-
// RemoveSvcArgs defines interface for service deletion
55-
type RemoveSvcArgs struct {
56-
PrefixedName func() string
57-
SelectorLabels func() map[string]string
58-
GetNameSpace func() string
37+
// RemoveOrphanedPDBs removes PDBs detached from given object
38+
func RemoveOrphanedPDBs(ctx context.Context, rclient client.Client, cr orphanedCRD, keepNames map[string]struct{}) error {
39+
gvk := schema.GroupVersionKind{
40+
Group: "policy",
41+
Version: "v1",
42+
Kind: "PodDisruptionBudget",
43+
}
44+
return removeOrphaned(ctx, rclient, cr, gvk, keepNames)
5945
}
6046

61-
// RemoveOrphanedSTSs removes deployments detached from given object
62-
func RemoveOrphanedSTSs(ctx context.Context, rclient client.Client, cr orphanedCRD, keepSTSNames map[string]struct{}) error {
63-
deployToRemove, err := discoverSTSsByLabels(ctx, rclient, cr.GetNamespace(), cr.SelectorLabels())
47+
// removeOrphaned removes orphaned resources
48+
func removeOrphaned(ctx context.Context, rclient client.Client, cr orphanedCRD, gvk schema.GroupVersionKind, keepNames map[string]struct{}) error {
49+
toRemove, err := getOrphaned(ctx, rclient, gvk, cr)
6450
if err != nil {
6551
return err
6652
}
67-
for i := range deployToRemove {
68-
dep := deployToRemove[i]
69-
if _, ok := keepSTSNames[dep.Name]; !ok {
53+
for i := range toRemove {
54+
item := toRemove[i]
55+
if _, ok := keepNames[item.GetName()]; !ok {
7056
// need to remove
71-
if err := RemoveFinalizer(ctx, rclient, dep); err != nil {
57+
if err := RemoveFinalizer(ctx, rclient, item); err != nil {
7258
return err
7359
}
74-
if err := SafeDelete(ctx, rclient, dep); err != nil {
60+
if err := SafeDelete(ctx, rclient, item); err != nil {
7561
return err
7662
}
7763
}
7864
}
7965
return nil
8066
}
8167

82-
// discoverDeploymentsByLabels - returns deployments with given args.
83-
func discoverSTSsByLabels(ctx context.Context, rclient client.Client, ns string, selector map[string]string) ([]*appsv1.StatefulSet, error) {
84-
var deps appsv1.StatefulSetList
68+
// getOrphaned - returns resources by orphaned CR selector.
69+
func getOrphaned(ctx context.Context, rclient client.Client, gvk schema.GroupVersionKind, cr orphanedCRD) ([]client.Object, error) {
70+
var l unstructured.UnstructuredList
71+
l.SetGroupVersionKind(gvk)
8572
opts := client.ListOptions{
86-
Namespace: ns,
87-
LabelSelector: labels.SelectorFromSet(selector),
73+
Namespace: cr.GetNamespace(),
74+
LabelSelector: labels.SelectorFromSet(cr.SelectorLabels()),
8875
}
89-
if err := rclient.List(ctx, &deps, &opts); err != nil {
76+
if err := rclient.List(ctx, &l, &opts); err != nil {
9077
return nil, err
9178
}
92-
resp := make([]*appsv1.StatefulSet, 0, len(deps.Items))
93-
for i := range deps.Items {
94-
resp = append(resp, &deps.Items[i])
79+
resp := make([]client.Object, 0, len(l.Items))
80+
for i := range l.Items {
81+
item := &l.Items[i]
82+
resp = append(resp, item)
9583
}
9684
return resp, nil
9785
}

0 commit comments

Comments
 (0)