diff --git a/paasta_tools/setup_kubernetes_cr.py b/paasta_tools/setup_kubernetes_cr.py index 6b8da97bb6..3708d15d54 100644 --- a/paasta_tools/setup_kubernetes_cr.py +++ b/paasta_tools/setup_kubernetes_cr.py @@ -50,6 +50,9 @@ from paasta_tools.utils import load_all_configs from paasta_tools.utils import load_system_paasta_config from paasta_tools.vitesscell_tools import load_vitess_cell_instance_configs +from paasta_tools.vitesscell_tools import ( + update_related_api_objects as update_vitess_cell_related_api_objects, +) from paasta_tools.vitesscell_tools import VITESSCELL_KUBERNETES_NAMESPACE from paasta_tools.vitesscluster_tools import load_vitess_cluster_instance_configs from paasta_tools.vitesscluster_tools import VITESSCLUSTER_KUBERNETES_NAMESPACE @@ -67,6 +70,11 @@ } +INSTANCE_TYPE_TO_RELATED_OBJECTS_UPDATER = { + "vitesscell": update_vitess_cell_related_api_objects, +} + + INSTANCE_TYPE_TO_NAMESPACE_LOADER = { "vitesscluster": VITESSCLUSTER_KUBERNETES_NAMESPACE, "vitesscell": VITESSCELL_KUBERNETES_NAMESPACE, @@ -444,6 +452,15 @@ def reconcile_kubernetes_resource( ) else: log.info(f"{desired_resource} is up to date, no action taken") + + if crd.file_prefix in INSTANCE_TYPE_TO_RELATED_OBJECTS_UPDATER: + INSTANCE_TYPE_TO_RELATED_OBJECTS_UPDATER[crd.file_prefix]( + service=service, + instance=inst, + cluster=cluster, + kube_client=kube_client, + soa_dir=DEFAULT_SOA_DIR, + ) except Exception as e: log.error(str(e)) succeeded = False diff --git a/paasta_tools/vitesscell_tools.py b/paasta_tools/vitesscell_tools.py index 2b4e2fc2a2..80b8f05391 100644 --- a/paasta_tools/vitesscell_tools.py +++ b/paasta_tools/vitesscell_tools.py @@ -8,8 +8,14 @@ from typing import Union import service_configuration_lib +from kubernetes.client import V1ObjectMeta +from kubernetes.client import V2beta2CrossVersionObjectReference +from kubernetes.client import V2beta2HorizontalPodAutoscaler +from kubernetes.client import V2beta2HorizontalPodAutoscalerSpec +from paasta_tools.kubernetes_tools import KubeClient from paasta_tools.kubernetes_tools import KubernetesDeploymentConfigDict +from paasta_tools.kubernetes_tools import paasta_prefixed from paasta_tools.kubernetes_tools import sanitised_cr_name from paasta_tools.utils import BranchDictV2 from paasta_tools.utils import deep_merge_dictionaries @@ -59,6 +65,7 @@ class GatewayConfigDict(TypedDict, total=False): extraFlags: Dict[str, str] extraLabels: Dict[str, str] replicas: int + yelp_selector: str resources: Dict[str, Any] annotations: Mapping[str, Any] @@ -115,6 +122,7 @@ def get_cell_config( }, extraLabels=labels, replicas=replicas, + yelp_selector=",".join([f"{k}={v}" for k, v in labels.items()]), resources={ "requests": requests, "limits": requests, @@ -175,6 +183,129 @@ def get_global_lock_server(self) -> Dict[str, str]: "rootPath": TOPO_GLOBAL_ROOT, } + def get_autoscaling_target(self, name: str) -> V2beta2CrossVersionObjectReference: + return V2beta2CrossVersionObjectReference( + api_version="planetscale.com/v2", kind="VitessCell", name=name + ) + + def get_autoscaling_metric_spec( + self, + name: str, + cluster: str, + kube_client: KubeClient, + namespace: str, + ) -> Optional[V2beta2HorizontalPodAutoscaler]: + # Returns None if an HPA should not be attached based on the config, + # or the config is invalid. + + if self.get_desired_state() == "stop": + return None + + if not self.is_autoscaling_enabled(): + return None + + autoscaling_params = self.get_autoscaling_params() + if autoscaling_params["metrics_providers"][0]["decision_policy"] == "bespoke": + return None + + min_replicas = self.get_min_instances() + max_replicas = self.get_max_instances() + if min_replicas == 0 or max_replicas == 0: + log.error( + f"Invalid value for min or max_instances on {name}: {min_replicas}, {max_replicas}" + ) + return None + + metrics = [] + for provider in autoscaling_params["metrics_providers"]: + spec = self.get_autoscaling_provider_spec(name, namespace, provider) + if spec is not None: + metrics.append(spec) + scaling_policy = self.get_autoscaling_scaling_policy( + max_replicas, + autoscaling_params, + ) + + labels = { + paasta_prefixed("service"): self.service, + paasta_prefixed("instance"): self.instance, + paasta_prefixed("pool"): self.get_pool(), + paasta_prefixed("managed"): "true", + } + + hpa = V2beta2HorizontalPodAutoscaler( + kind="HorizontalPodAutoscaler", + metadata=V1ObjectMeta( + name=name, namespace=namespace, annotations=dict(), labels=labels + ), + spec=V2beta2HorizontalPodAutoscalerSpec( + behavior=scaling_policy, + max_replicas=max_replicas, + min_replicas=min_replicas, + metrics=metrics, + scale_target_ref=self.get_autoscaling_target(name), + ), + ) + + return hpa + + def get_min_instances(self) -> Optional[int]: + vtgate_resources = self.config_dict.get("vtgate_resources") + return vtgate_resources.get("min_instances", 1) + + def get_max_instances(self) -> Optional[int]: + vtgate_resources = self.config_dict.get("vtgate_resources") + return vtgate_resources.get("max_instances") + + def update_related_api_objects( + self, + kube_client: KubeClient, + ): + name = sanitised_cr_name(self.service, self.instance) + + min_instances = self.get_min_instances() + max_instances = self.get_max_instances() + should_exist = min_instances and max_instances + + exists = ( + len( + kube_client.autoscaling.list_namespaced_horizontal_pod_autoscaler( + field_selector=f"metadata.name={name}", + namespace=self.get_namespace(), + limit=1, + ).items + ) + > 0 + ) + + if should_exist: + hpa = self.get_autoscaling_metric_spec( + name=sanitised_cr_name(self.service, self.instance), + cluster=self.get_cluster(), + kube_client=kube_client, + namespace=self.get_namespace(), + ) + if not hpa: + return + + if exists: + kube_client.autoscaling.replace_namespaced_horizontal_pod_autoscaler( + name=name, + namespace=self.get_namespace(), + body=hpa, + ) + else: + log.info(f"Creating HPA for {name} in {self.get_namespace()}") + kube_client.autoscaling.create_namespaced_horizontal_pod_autoscaler( + namespace=self.get_namespace(), + body=hpa, + ) + elif exists: + kube_client.autoscaling.delete_namespaced_horizontal_pod_autoscaler( + name=name, + namespace=self.get_namespace(), + ) + def get_vitess_cell_config(self) -> VitessCellConfigDict: cell = self.config_dict.get("cell") all_cells = self.config_dict.get("cells") @@ -278,6 +409,18 @@ def load_vitess_cell_instance_configs( return vitess_cell_instance_configs +def update_related_api_objects( + service: str, + instance: str, + cluster: str, + kube_client: KubeClient, + soa_dir: str = DEFAULT_SOA_DIR, +) -> None: + load_vitess_cell_instance_config( + service, instance, cluster, soa_dir=soa_dir + ).update_related_api_objects(kube_client) + + # TODO: read this from CRD in service configs def cr_id(service: str, instance: str) -> Mapping[str, str]: return dict( diff --git a/paasta_tools/vitesscluster_tools.py b/paasta_tools/vitesscluster_tools.py index 0f1e578956..12c9cf4d8e 100644 --- a/paasta_tools/vitesscluster_tools.py +++ b/paasta_tools/vitesscluster_tools.py @@ -73,6 +73,8 @@ class RequestsDict(TypedDict, total=False): class ResourceConfigDict(TypedDict, total=False): replicas: int + min_instances: Optional[int] + max_instances: Optional[int] requests: Dict[str, RequestsDict] limits: Dict[str, RequestsDict]