From 530f84749f4fdf592755f1e74f06375ed712301b Mon Sep 17 00:00:00 2001 From: Trung Minh Lai Date: Sun, 4 Feb 2024 20:32:44 +0700 Subject: [PATCH] Add topologySpreadConstraints configuration to pod spec. --- e2e/tests/test_e2e.py | 89 ++++++++++++++++--- manifests/postgresql.crd.yaml | 6 ++ pkg/apis/acid.zalan.do/v1/crds.go | 10 +++ .../v1/operator_configuration_type.go | 1 + pkg/apis/acid.zalan.do/v1/postgresql_type.go | 2 + pkg/cluster/cluster.go | 5 ++ pkg/cluster/k8sres.go | 30 ++++++- pkg/cluster/k8sres_test.go | 44 +++++++++ pkg/util/config/config.go | 1 + 9 files changed, 176 insertions(+), 12 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 43dd467b5c..ce575f05b2 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -556,7 +556,7 @@ def compare_config(): pg_patch_config["spec"]["patroni"]["slots"][slot_to_change]["database"] = "bar" del pg_patch_config["spec"]["patroni"]["slots"][slot_to_remove] - + k8s.api.custom_objects_api.patch_namespaced_custom_object( "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_delete_slot_patch) @@ -573,7 +573,7 @@ def compare_config(): self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", get_slot_query%("database", slot_to_change))[0], "bar", "The replication slot cannot be updated", 10, 5) - + # make sure slot from Patroni didn't get deleted self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_slot_query%("slot_name", patroni_slot))), 1, "The replication slot from Patroni gets deleted", 10, 5) @@ -929,7 +929,7 @@ def test_ignored_annotations(self): }, } } - + old_sts_creation_timestamp = sts.metadata.creation_timestamp k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch) old_svc_creation_timestamp = svc.metadata.creation_timestamp @@ -1254,7 +1254,7 @@ def test_persistent_volume_claim_retention_policy(self): } k8s.update_config(patch_scaled_policy_retain) self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - + # decrease the number of instances k8s.api.custom_objects_api.patch_namespaced_custom_object( 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', pg_patch_scale_down_instances) @@ -1622,7 +1622,7 @@ def test_password_rotation(self): }, } k8s.api.core_v1.patch_namespaced_secret( - name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do", + name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do", namespace="default", body=secret_fake_rotation) @@ -1638,7 +1638,7 @@ def test_password_rotation(self): "data": { "enable_password_rotation": "true", "password_rotation_interval": "30", - "password_rotation_user_retention": "30", # should be set to 60 + "password_rotation_user_retention": "30", # should be set to 60 }, } k8s.update_config(enable_password_rotation) @@ -1691,7 +1691,7 @@ def test_password_rotation(self): "Unexpected username in secret of test.db_user: expected {}, got {}".format("test.db_user", secret_username)) # disable password rotation for all other users (foo_user) - # and pick smaller intervals to see if the third fake rotation user is dropped + # and pick smaller intervals to see if the third fake rotation user is dropped enable_password_rotation = { "data": { "enable_password_rotation": "false", @@ -2158,7 +2158,7 @@ def assert_distributed_pods(self, target_nodes, cluster_labels='cluster-name=aci # if nodes are different we can quit here if master_nodes[0] not in replica_nodes: - return True + return True # enable pod anti affintiy in config map which should trigger movement of replica patch_enable_antiaffinity = { @@ -2182,7 +2182,7 @@ def assert_distributed_pods(self, target_nodes, cluster_labels='cluster-name=aci } k8s.update_config(patch_disable_antiaffinity, "disable antiaffinity") self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_labels) k8s.wait_for_running_pods(cluster_labels, 2) @@ -2193,7 +2193,7 @@ def assert_distributed_pods(self, target_nodes, cluster_labels='cluster-name=aci # if nodes are different we can quit here for target_node in target_nodes: if (target_node not in master_nodes or target_node not in replica_nodes) and master_nodes[0] in replica_nodes: - print('Pods run on the same node') + print('Pods run on the same node') return False except timeout_decorator.TimeoutError: @@ -2272,5 +2272,74 @@ def query_database_with_user(self, pod_name, db_name, query, user_name): return result_set + def test_topology_spread_constraints(self): + ''' + Enable topologySpreadConstraints for pods + ''' + k8s = self.k8s + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' + + # Verify we are in good state from potential previous tests + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") + + patch_node_label = { + "metadata": { + "labels": { + "topology.kubernetes.io/zone": "zalando" + } + } + } + + nodes = k8s.api.core_v1.list_node() + for node in nodes.items: + k8s.api.core_v1.patch_node(node.metadata.name, patch_node_label) + + podsList = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_label) + k8s.wait_for_pod_start('spilo-role=master,' + cluster_label) + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + + patch_cordon_node = { + "spec": { + "unschedulable": True + } + } + + master_nodes, replica_nodes = k8s.get_cluster_nodes() + self.assertNotEqual(master_nodes, []) + self.assertNotEqual(replica_nodes, []) + + # Cordon replicas node + k8s.api.core_v1.patch_node(replica_nodes[0], patch_cordon_node) + # Delete replicas pod so it can be re-scheduled to master node + replicas_pod = k8s.get_cluster_replica_pod() + k8s.api.core_v1.delete_namespaced_pod(replicas_pod.metadata.name, 'default') + # Wait for replicas pod re-scheduled to master node + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + # Assert master pod and replicas pod are in the same node + master_nodes, replica_nodes = k8s.get_cluster_nodes() + self.assertEqual(master_nodes[0].metadata.name, replica_nodes[0].metadata.name) + + patch_uncordon_node = { + "spec": { + "unschedulable": False + } + } + + # Uncordon replicas node + k8s.api.core_v1.patch_node(replica_nodes[0], patch_uncordon_node) + + patch_enable_topology_spread_constraints = { + "data": { + "enable_postgres_topology_spread_constraints": "true" + } + } + + k8s.update_config(patch_enable_topology_spread_constraints, "enable topologySpreadConstraints") + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + # Assert master pod and replicas pod are spread in two diffrence nodes + master_nodes, replica_nodes = k8s.get_cluster_nodes() + self.assertNotEqual(master_nodes[0].metadata.name, replica_nodes[0].metadata.name) + if __name__ == '__main__': unittest.main() diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index 4bd757f383..9ce1393cce 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -575,6 +575,12 @@ spec: - PreferNoSchedule tolerationSeconds: type: integer + topologySpreadConstraints: + type: array + nullable: true + items: + type: object + x-kubernetes-preserve-unknown-fields: true useLoadBalancer: type: boolean description: deprecated diff --git a/pkg/apis/acid.zalan.do/v1/crds.go b/pkg/apis/acid.zalan.do/v1/crds.go index 9e65869e7d..1514f0fe03 100644 --- a/pkg/apis/acid.zalan.do/v1/crds.go +++ b/pkg/apis/acid.zalan.do/v1/crds.go @@ -898,6 +898,16 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ }, }, }, + "topologySpreadConstraints": { + Type: "array", + Nullable: true, + Items: &apiextv1.JSONSchemaPropsOrArray{ + Schema: &apiextv1.JSONSchemaProps{ + Type: "object", + XPreserveUnknownFields: util.True(), + }, + }, + }, "useLoadBalancer": { Type: "boolean", Description: "deprecated", diff --git a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go index 48fd0a13c7..9a078d456a 100644 --- a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go +++ b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go @@ -107,6 +107,7 @@ type KubernetesMetaConfiguration struct { EnableReadinessProbe bool `json:"enable_readiness_probe,omitempty"` EnableCrossNamespaceSecret bool `json:"enable_cross_namespace_secret,omitempty"` EnableFinalizers *bool `json:"enable_finalizers,omitempty"` + EnablePostgresTopologySpreadConstraints bool `json:"enable_postgres_topology_spread_constraints,omitempty"` } // PostgresPodResourcesDefaults defines the spec of default resources diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 612cf7041b..1782be8a47 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -93,6 +93,8 @@ type PostgresSpec struct { // deprecated json tags InitContainersOld []v1.Container `json:"init_containers,omitempty"` PodPriorityClassNameOld string `json:"pod_priority_class_name,omitempty"` + + AdditionalTopologySpreadConstraints []v1.TopologySpreadConstraint `json:"additionalTopologySpreadConstraints,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index e3acdb8355..53264b0793 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -472,6 +472,11 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa needsRollUpdate = true reasons = append(reasons, "new statefulset's pod affinity does not match the current one") } + if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.TopologySpreadConstraints, statefulSet.Spec.Template.Spec.TopologySpreadConstraints) { + needsReplace = true + needsRollUpdate = true + reasons = append(reasons, "new statefulset's pod topologySpreadConstraints does not match the current one") + } if len(c.Statefulset.Spec.Template.Spec.Tolerations) != len(statefulSet.Spec.Template.Spec.Tolerations) { needsReplace = true needsRollUpdate = true diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 5a2ce6600a..62d51f75a4 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -610,6 +610,22 @@ func generatePodAntiAffinity(podAffinityTerm v1.PodAffinityTerm, preferredDuring return podAntiAffinity } +func generateTopologySpreadConstraints(labels labels.Set, additionalTopologySpreadConstraints []v1.TopologySpreadConstraint) []v1.TopologySpreadConstraint { + topologySpreadConstraint := v1.TopologySpreadConstraint{ + MaxSkew: int32(1), + TopologyKey: "topology.kubernetes.io/zone", + WhenUnsatisfiable: v1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + } + topologySpreadConstraints := []v1.TopologySpreadConstraint{topologySpreadConstraint} + if len(additionalTopologySpreadConstraints) > 0 { + topologySpreadConstraints = append(topologySpreadConstraints, additionalTopologySpreadConstraints...) + } + return topologySpreadConstraints +} + func tolerations(tolerationsSpec *[]v1.Toleration, podToleration map[string]string) []v1.Toleration { // allow to override tolerations by postgresql manifest if len(*tolerationsSpec) > 0 { @@ -832,6 +848,8 @@ func (c *Cluster) generatePodTemplate( additionalSecretMount string, additionalSecretMountPath string, additionalVolumes []acidv1.AdditionalVolume, + topologySpreadConstraints bool, + additionalTopologySpreadConstraints []v1.TopologySpreadConstraint, ) (*v1.PodTemplateSpec, error) { terminateGracePeriodSeconds := terminateGracePeriod @@ -884,6 +902,10 @@ func (c *Cluster) generatePodTemplate( podSpec.PriorityClassName = priorityClassName } + if topologySpreadConstraints { + podSpec.TopologySpreadConstraints = generateTopologySpreadConstraints(labels, additionalTopologySpreadConstraints) + } + if sharePgSocketWithSidecars != nil && *sharePgSocketWithSidecars { addVarRunVolume(&podSpec) } @@ -1487,7 +1509,9 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef c.OpConfig.PodAntiAffinityPreferredDuringScheduling, c.OpConfig.AdditionalSecretMount, c.OpConfig.AdditionalSecretMountPath, - additionalVolumes) + additionalVolumes, + c.OpConfig.EnablePostgresTopologySpreadConstraints, + spec.AdditionalTopologySpreadConstraints) if err != nil { return nil, fmt.Errorf("could not generate pod template: %v", err) @@ -2334,7 +2358,9 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1.CronJob, error) { false, c.OpConfig.AdditionalSecretMount, c.OpConfig.AdditionalSecretMountPath, - []acidv1.AdditionalVolume{}); err != nil { + []acidv1.AdditionalVolume{}, + true, + []v1.TopologySpreadConstraint{}); err != nil { return nil, fmt.Errorf("could not generate pod template for logical backup pod: %v", err) } diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 2eeefb2189..beff1e67c1 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -3795,3 +3795,47 @@ func TestGenerateCapabilities(t *testing.T) { } } } + +func TestTopologySpreadConstraints(t *testing.T) { + clusterName := "acid-test-cluster" + namespace := "default" + + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + Namespace: namespace, + }, + Spec: acidv1.PostgresSpec{ + NumberOfInstances: 1, + Resources: &acidv1.Resources{ + ResourceRequests: acidv1.ResourceDescription{CPU: k8sutil.StringToPointer("1"), Memory: k8sutil.StringToPointer("10")}, + ResourceLimits: acidv1.ResourceDescription{CPU: k8sutil.StringToPointer("1"), Memory: k8sutil.StringToPointer("10")}, + }, + Volume: acidv1.Volume{ + Size: "1G", + }, + }, + } + + cluster := New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + EnablePostgresTopologySpreadConstraints: true, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) + cluster.Name = clusterName + cluster.Namespace = namespace + cluster.labelsSet(true) + + s, err := cluster.generateStatefulSet(&pg.Spec) + assert.NoError(t, err) + assert.Contains(t, s.Spec.Template.Spec.TopologySpreadConstraints, v1.TopologySpreadConstraint{ + MaxSkew: int32(1), + TopologyKey: "topology.kubernetes.io/zone", + WhenUnsatisfiable: v1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{ + MatchLabels: cluster.labelsSet(true), + }, + }) +} diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 829c1d19e9..dea1b74fcd 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -253,6 +253,7 @@ type Config struct { EnableSecretsDeletion *bool `name:"enable_secrets_deletion" default:"true"` EnablePersistentVolumeClaimDeletion *bool `name:"enable_persistent_volume_claim_deletion" default:"true"` PersistentVolumeClaimRetentionPolicy map[string]string `name:"persistent_volume_claim_retention_policy" default:"when_deleted:retain,when_scaled:retain"` + EnablePostgresTopologySpreadConstraints bool `json:"enable_postgres_topology_spread_constraints,omitempty"` } // MustMarshal marshals the config or panics