Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PAASTA-17941: add topology spread constraints option #3641

Merged
18 changes: 18 additions & 0 deletions docs/source/yelpsoa_configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,24 @@ Kubernetes

These options are only applicable to tasks scheduled on Kubernetes.

* ``topology_spread_constraints``: A set of rules to spread Pods across topologies, for example to try spreading Pods evenly across both nodes and availability zones::

topology_spread_constraints:
- max_skew: 1
topology_key: "topology.kubernetes.io/zone"
when_unsatisfiable: "ScheduleAnyway"
- max_skew: 1
topology_key: "kubernetes.io/hostname"
when_unsatisfiable: "ScheduleAnyway"

These can be configured per cluster (or globally) and will be added to every Pod Spec template, using `paasta.yelp.com/service` and `paasta.yelp.com/instance` as selectors.

To avoid conflicts with the `deploy_whitelist` and `deploy_blacklist`, please only use `when_unsatisfiable: "ScheduleAnyway"` (at least until PAASTA-17951 is resolved).

For more information, see the official Kubernetes
documentation on `topology spread constraints
<https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/>`_.

* ``node_selectors``: A map of labels a node is required to have for a task
to be launched on said node. There are several ways to define a selector.
The simplest is a key-value pair. For example, this selector restricts a
Expand Down
49 changes: 49 additions & 0 deletions paasta_tools/kubernetes_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
from kubernetes.client import V1StatefulSetSpec
from kubernetes.client import V1Subject
from kubernetes.client import V1TCPSocketAction
from kubernetes.client import V1TopologySpreadConstraint
from kubernetes.client import V1Volume
from kubernetes.client import V1VolumeMount
from kubernetes.client import V1WeightedPodAffinityTerm
Expand Down Expand Up @@ -1953,6 +1954,17 @@ def get_pod_template_spec(
affinity.pod_anti_affinity = pod_anti_affinity
pod_spec_kwargs["affinity"] = affinity

# PAASTA-17941: Allow configuring topology spread constraints per cluster
pod_topology_spread_constraints = create_pod_topology_spread_constraints(
service=self.get_service(),
instance=self.get_instance(),
topology_spread_constraints=system_paasta_config.get_topology_spread_constraints(),
)
if pod_topology_spread_constraints:
constraints = pod_spec_kwargs.get("topologySpreadConstraints", [])
constraints += pod_topology_spread_constraints
pod_spec_kwargs["topologySpreadConstraints"] = constraints

termination_grace_period = self.get_termination_grace_period()
if termination_grace_period is not None:
pod_spec_kwargs[
Expand Down Expand Up @@ -3542,6 +3554,43 @@ def load_custom_resource_definitions(
return custom_resources


def create_pod_topology_spread_constraints(
service: str,
instance: str,
topology_spread_constraints: List[Dict[str, Any]],
) -> List[V1TopologySpreadConstraint]:
"""
Applies cluster-level topology spread constraints to every Pod template.
This allows us to configure default topology spread constraints on EKS where we cannot configure the scheduler.
"""
if not topology_spread_constraints:
return []

selector = V1LabelSelector(
match_labels={
"paasta.yelp.com/service": service,
"paasta.yelp.com/instance": instance,
}
)

pod_topology_spread_constraints = []
for constraint in topology_spread_constraints:
pod_topology_spread_constraints.append(
V1TopologySpreadConstraint(
label_selector=selector,
topology_key=constraint.get(
"topology_key", None
), # ValueError will be raised if unset
max_skew=constraint.get("max_skew", 1),
when_unsatisfiable=constraint.get(
"when_unsatisfiable", "ScheduleAnyway"
),
)
)

return pod_topology_spread_constraints


def sanitised_cr_name(service: str, instance: str) -> str:
sanitised_service = sanitise_kubernetes_name(service)
sanitised_instance = sanitise_kubernetes_name(instance)
Expand Down
5 changes: 5 additions & 0 deletions paasta_tools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1970,6 +1970,7 @@ class SystemPaastaConfigDict(TypedDict, total=False):
pdb_max_unavailable: Union[str, int]
pki_backend: str
pod_defaults: Dict[str, Any]
topology_spread_constraints: List[Dict[str, Any]]
previous_marathon_servers: List[MarathonConfigDict]
readiness_check_prefix_template: List[str]
register_k8s_pods: bool
Expand Down Expand Up @@ -2556,6 +2557,10 @@ def get_taskproc(self) -> Dict:
def get_disabled_watchers(self) -> List:
return self.config_dict.get("disabled_watchers", [])

def get_topology_spread_constraints(self) -> List[Dict[str, Any]]:
"""List of TopologySpreadConstraints that will be applied to all Pods in the cluster"""
return self.config_dict.get("topology_spread_constraints", [])

def get_vault_environment(self) -> Optional[str]:
"""Get the environment name for the vault cluster
This must match the environment keys in the secret json files
Expand Down
63 changes: 60 additions & 3 deletions tests/test_kubernetes_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from kubernetes.client import V1StatefulSetSpec
from kubernetes.client import V1Subject
from kubernetes.client import V1TCPSocketAction
from kubernetes.client import V1TopologySpreadConstraint
from kubernetes.client import V1Volume
from kubernetes.client import V1VolumeMount
from kubernetes.client import V2beta2CrossVersionObjectReference
Expand Down Expand Up @@ -1621,11 +1622,15 @@ def test_format_kubernetes_app_dict(self, _):
"paasta_tools.kubernetes_tools.KubernetesDeploymentConfig.get_pod_anti_affinity",
autospec=True,
)
@mock.patch(
"paasta_tools.kubernetes_tools.create_pod_topology_spread_constraints",
autospec=True,
)
@pytest.mark.parametrize(
"in_smtstk,routable_ip,node_affinity,anti_affinity,spec_affinity,termination_grace_period",
"in_smtstk,routable_ip,node_affinity,anti_affinity,spec_affinity,termination_grace_period,pod_topology",
[
(True, "true", None, None, {}, None),
(False, "false", None, None, {}, 10),
(True, "true", None, None, {}, None, []),
(False, "false", None, None, {}, 10, []),
# an node affinity absent but pod anti affinity present
(
False,
Expand All @@ -1634,6 +1639,7 @@ def test_format_kubernetes_app_dict(self, _):
"pod_anti_affinity",
{"affinity": V1Affinity(pod_anti_affinity="pod_anti_affinity")},
None,
[],
),
# an affinity obj is only added if there is a node affinity
(
Expand All @@ -1648,11 +1654,13 @@ def test_format_kubernetes_app_dict(self, _):
)
},
None,
[],
),
],
)
def test_get_pod_template_spec(
self,
mock_create_pod_topology_spread_constraints,
mock_get_pod_anti_affinity,
mock_get_termination_grace_period,
mock_load_service_namespace_config,
Expand All @@ -1664,6 +1672,7 @@ def test_get_pod_template_spec(
mock_get_volumes,
in_smtstk,
routable_ip,
pod_topology,
node_affinity,
anti_affinity,
spec_affinity,
Expand All @@ -1674,10 +1683,12 @@ def test_get_pod_template_spec(
mock_service_namespace_config.is_in_smartstack.return_value = in_smtstk
mock_get_node_affinity.return_value = node_affinity
mock_get_pod_anti_affinity.return_value = anti_affinity
mock_create_pod_topology_spread_constraints.return_value = pod_topology
mock_system_paasta_config = mock.Mock()
mock_system_paasta_config.get_kubernetes_add_registration_labels.return_value = (
True
)
mock_system_paasta_config.get_topology_spread_constraints.return_value = []
mock_system_paasta_config.get_pod_defaults.return_value = dict(dns_policy="foo")
mock_get_termination_grace_period.return_value = termination_grace_period

Expand Down Expand Up @@ -1773,6 +1784,52 @@ def test_routable_ip(

assert ret == expected

def test_create_pod_topology_spread_constraints(self):
configured_constraints = [
{
"topology_key": "kubernetes.io/hostname",
"max_skew": 1,
"when_unsatisfiable": "ScheduleAnyway",
},
{
"topology_key": "topology.kubernetes.io/zone",
"max_skew": 3,
"when_unsatisfiable": "DoNotSchedule",
},
]

expected_constraints = [
V1TopologySpreadConstraint(
label_selector=V1LabelSelector(
match_labels={
"paasta.yelp.com/service": "schematizer",
"paasta.yelp.com/instance": "main",
}
),
max_skew=1,
topology_key="kubernetes.io/hostname",
when_unsatisfiable="ScheduleAnyway",
),
V1TopologySpreadConstraint(
label_selector=V1LabelSelector(
match_labels={
"paasta.yelp.com/service": "schematizer",
"paasta.yelp.com/instance": "main",
}
),
max_skew=3,
topology_key="topology.kubernetes.io/zone",
when_unsatisfiable="DoNotSchedule",
),
]

assert (
kubernetes_tools.create_pod_topology_spread_constraints(
"schematizer", "main", configured_constraints
)
== expected_constraints
)

@pytest.mark.parametrize(
"raw_selectors,expected",
[
Expand Down