diff --git a/README.md b/README.md index c195b8f..aa6c8f6 100644 --- a/README.md +++ b/README.md @@ -177,6 +177,7 @@ A terraform example on how to deploy `eks-rolling-update` as a Kubernetes CronJo | Environment Variable | Description | Default | |----------------------------|----------------------------------------------------------------------------------------------------------------------------|------------------------------------------| | K8S_AUTOSCALER_ENABLED | If True Kubernetes Autoscaler will be paused before running update | False | +| K8S_AUTOSCALER_RESUME_ON_ERROR | If True, Kubernetes Autoscaler will be automatically resumed in case of a previous failure. | True | | K8S_AUTOSCALER_NAMESPACE | Namespace where Kubernetes Autoscaler is deployed | default | | K8S_AUTOSCALER_DEPLOYMENT | Deployment name of Kubernetes Autoscaler | cluster-autoscaler | | K8S_AUTOSCALER_REPLICAS | Number of replicas to scale back up to after Kubernentes Autoscaler paused | 2 | @@ -185,6 +186,7 @@ A terraform example on how to deploy `eks-rolling-update` as a Kubernetes CronJo | TAINT_NODES | Replace the default **cordon**-before-drain strategy with `NoSchedule` **taint**ing, as a workaround for K8S < `1.19` [prematurely removing cordoned nodes](https://github.com/kubernetes/kubernetes/issues/65013) from `Service`-managed `LoadBalancer`s | False | | EXTRA_DRAIN_ARGS | Additional space-delimited args to supply to the `kubectl drain` function, e.g `--force=true`. See `kubectl drain -h` | "" | | ENFORCED_DRAINING | If draining fails for a node due to corrupted `PodDisruptionBudget`s or failing pods, retry draining with `--disable-eviction=true` and `--force=true` for this node to prevent aborting the script. This is useful to get the rolling update done in development and testing environments and **should not be used in productive environments** since this will bypass checking `PodDisruptionBudget`s | False | +| UNCORDON_ON_DRAIN_ERROR | If true, and a node can't be drained, the rolling-update will afterwards mark the node as schedulable again to minimize workload interruption | True | ## Run Modes diff --git a/eksrollup/cli.py b/eksrollup/cli.py index 77ff55d..cfade87 100755 --- a/eksrollup/cli.py +++ b/eksrollup/cli.py @@ -7,8 +7,8 @@ from .lib.aws import is_asg_scaled, is_asg_healthy, instance_terminated, get_asg_tag, modify_aws_autoscaling, \ count_all_cluster_instances, save_asg_tags, get_asgs, scale_asg, plan_asgs, terminate_instance_in_asg, delete_asg_tags, plan_asgs_older_nodes from .lib.k8s import k8s_nodes_count, k8s_nodes_ready, get_k8s_nodes, modify_k8s_autoscaler, get_node_by_instance_id, \ - drain_node, delete_node, cordon_node, taint_node -from .lib.exceptions import RollingUpdateException + drain_node, delete_node, cordon_node, taint_node, uncordon_node +from .lib.exceptions import RollingUpdateException, NodeNotDrainedException def validate_cluster_health(asg_name, new_desired_asg_capacity, cluster_name, predictive, health_check_type="regular",): @@ -220,25 +220,32 @@ def update_asgs(asgs, cluster_name): # start draining and terminating desired_asg_capacity = asg_state_dict[asg_name][0] + exceptions = [] for outdated in outdated_instances: # catch any failures so we can resume aws autoscaling try: # get the k8s node name instead of instance id node_name = get_node_by_instance_id(k8s_nodes, outdated['InstanceId']) desired_asg_capacity -= 1 - drain_node(node_name) - delete_node(node_name) - save_asg_tags(asg_name, app_config["ASG_DESIRED_STATE_TAG"], desired_asg_capacity) - # terminate/detach outdated instances only if ASG termination policy is ignored - if not use_asg_termination_policy: - terminate_instance_in_asg(outdated['InstanceId']) - if not instance_terminated(outdated['InstanceId']): - raise Exception('Instance is failing to terminate. Cancelling out.') - - between_nodes_wait = app_config['BETWEEN_NODES_WAIT'] - if between_nodes_wait != 0: - logger.info(f'Waiting for {between_nodes_wait} seconds before continuing...') - time.sleep(between_nodes_wait) + try: + drain_node(node_name) + delete_node(node_name) + save_asg_tags(asg_name, app_config["ASG_DESIRED_STATE_TAG"], desired_asg_capacity) + # terminate/detach outdated instances only if ASG termination policy is ignored + if not use_asg_termination_policy: + terminate_instance_in_asg(outdated['InstanceId']) + if not instance_terminated(outdated['InstanceId']): + raise Exception('Instance is failing to terminate. Cancelling out.') + except NodeNotDrainedException as e: + logger.warn(f"node {e.node_name} could not be drained force={e.forced}, log={e.message}") + if app_config['UNCORDON_ON_DRAIN_ERROR']: + cordon_node(node_name, False) + exceptions.append(e) + + between_nodes_wait = app_config['BETWEEN_NODES_WAIT'] + if between_nodes_wait != 0: + logger.info(f'Waiting for {between_nodes_wait} seconds before continuing...') + time.sleep(between_nodes_wait) except Exception as drain_exception: try: node_name = get_node_by_instance_id(k8s_excluded_nodes, outdated['InstanceId']) @@ -247,6 +254,9 @@ def update_asgs(asgs, cluster_name): except: raise RollingUpdateException("Rolling update on ASG failed", asg_name) + if len(exceptions > 0): + raise RollingUpdateException("Rolling update on ASG failed", asg_name) + # scaling cluster back down logger.info("Scaling asg back down to original state") asg_desired_capacity, asg_orig_desired_capacity, asg_orig_max_capacity = asg_state_dict[asg_name] @@ -298,5 +308,8 @@ def main(args=None): logger.error('*** Rolling update of ASG has failed. Exiting ***') logger.error('AWS Auto Scaling Group processes will need resuming manually') if app_config['K8S_AUTOSCALER_ENABLED']: - logger.error('Kubernetes Cluster Autoscaler will need resuming manually') + if app_config['K8S_AUTOSCALER_RESUME_ON_ERROR']: + modify_k8s_autoscaler("resume") + else: + logger.error('Kubernetes Cluster Autoscaler will need resuming manually') sys.exit(1) diff --git a/eksrollup/config.py b/eksrollup/config.py index 0e8029a..a25fb2b 100644 --- a/eksrollup/config.py +++ b/eksrollup/config.py @@ -13,6 +13,7 @@ def str_to_bool(val): 'K8S_AUTOSCALER_NAMESPACE': os.getenv('K8S_AUTOSCALER_NAMESPACE', 'default'), 'K8S_AUTOSCALER_DEPLOYMENT': os.getenv('K8S_AUTOSCALER_DEPLOYMENT', 'cluster-autoscaler'), 'K8S_AUTOSCALER_REPLICAS': int(os.getenv('K8S_AUTOSCALER_REPLICAS', 2)), + 'K8S_AUTOSCALER_RESUME_ON_ERROR': str_to_bool(os.getenv('K8S_AUTOSCALER_RESUME_ON_ERROR', True)), 'K8S_CONTEXT': os.getenv('K8S_CONTEXT', None), 'K8S_PROXY_BYPASS': str_to_bool(os.getenv('K8S_PROXY_BYPASS', False)), 'ASG_DESIRED_STATE_TAG': os.getenv('ASG_DESIRED_STATE_TAG', 'eks-rolling-update:desired_capacity'), @@ -33,5 +34,6 @@ def str_to_bool(val): 'TAINT_NODES': str_to_bool(os.getenv('TAINT_NODES', False)), 'BATCH_SIZE': int(os.getenv('BATCH_SIZE', 0)), 'ENFORCED_DRAINING': str_to_bool(os.getenv('ENFORCED_DRAINING', False)), + 'UNCORDON_ON_DRAIN_ERROR': str_to_bool(os.getenv('UNCORDON_ON_DRAIN_ERROR', True)), 'ASG_NAMES': os.getenv('ASG_NAMES', '').split() } diff --git a/eksrollup/lib/exceptions.py b/eksrollup/lib/exceptions.py index 622f49e..be164eb 100644 --- a/eksrollup/lib/exceptions.py +++ b/eksrollup/lib/exceptions.py @@ -2,3 +2,9 @@ class RollingUpdateException(Exception): def __init__(self, message, asg_name): self.message = message self.asg_name = asg_name + +class NodeNotDrainedException(Exception): + def __init__(self, message, node_name, forced): + self.message = message + self.node_name = node_name + self.forced = forced \ No newline at end of file diff --git a/eksrollup/lib/k8s.py b/eksrollup/lib/k8s.py index 05bc999..65bb726 100644 --- a/eksrollup/lib/k8s.py +++ b/eksrollup/lib/k8s.py @@ -5,6 +5,7 @@ import time import sys from .logger import logger +from .exceptions import NodeNotDrainedException from eksrollup.config import app_config @@ -122,7 +123,7 @@ def delete_node(node_name): logger.info("Exception when calling CoreV1Api->delete_node: {}".format(e)) -def cordon_node(node_name): +def cordon_node(node_name, cordon=True): """ Cordon a kubernetes node to avoid new pods being scheduled on it """ @@ -131,14 +132,14 @@ def cordon_node(node_name): # create an instance of the API class k8s_api = client.CoreV1Api() - logger.info("Cordoning k8s node {}...".format(node_name)) + logger.info("Cordoning k8s node {}={}...".format(node_name, cordon)) try: - api_call_body = client.V1Node(spec=client.V1NodeSpec(unschedulable=True)) + api_call_body = client.V1Node(spec=client.V1NodeSpec(unschedulable=cordon)) if not app_config['DRY_RUN']: k8s_api.patch_node(node_name, api_call_body) else: k8s_api.patch_node(node_name, api_call_body, dry_run=True) - logger.info("Node cordoned") + logger.info("Node (un)cordoned") except ApiException as e: logger.info("Exception when calling CoreV1Api->patch_node: {}".format(e)) @@ -181,7 +182,7 @@ def drain_node(node_name): kubectl_args += ['--dry-run'] logger.info('Draining worker node with {}...'.format(' '.join(kubectl_args))) - result = subprocess.run(kubectl_args) + result = subprocess.run(kubectl_args, capture_output=True) # If returncode is non-zero run enforced draining of the node or raise a CalledProcessError. if result.returncode != 0: @@ -191,11 +192,11 @@ def drain_node(node_name): '--force=true' ] logger.info('There was an error draining the worker node, proceed with enforced draining ({})...'.format(' '.join(kubectl_args))) - enforced_result = subprocess.run(kubectl_args) + enforced_result = subprocess.run(kubectl_args, capture_output=True) if enforced_result.returncode != 0: - raise Exception("Node not drained properly with enforced draining enabled. Exiting") + raise NodeNotDrainedException(enforced_result.stdout, node_name, True) else: - raise Exception("Node not drained properly. Exiting") + raise NodeNotDrainedException(result.stdout, node_name, False) def k8s_nodes_ready(max_retry=app_config['GLOBAL_MAX_RETRY'], wait=app_config['GLOBAL_HEALTH_WAIT']):