From 025b946f1b46b3edff07274f4e8dd8bb495d31f5 Mon Sep 17 00:00:00 2001 From: Luca Miccini Date: Thu, 17 Oct 2024 08:50:17 +0200 Subject: [PATCH] Add liveness probe and allow InstanceHa to resume evacuations This commit adds a liveness probe to the InstanceHa pod so to eventually recover from the pod not responding. Since this could interrupt the recovery workflow we also add a way to resume evacuations if a host has been previously been disabled and marked by InstanceHa. Co-authored-by: Antonio Romito --- pkg/instanceha/funcs.go | 16 ++++- templates/instanceha/bin/instanceha.py | 92 +++++++++++++++++++++----- 2 files changed, 91 insertions(+), 17 deletions(-) diff --git a/pkg/instanceha/funcs.go b/pkg/instanceha/funcs.go index d02de909..6df397b0 100644 --- a/pkg/instanceha/funcs.go +++ b/pkg/instanceha/funcs.go @@ -19,6 +19,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/ptr" ) @@ -41,6 +42,18 @@ func Deployment( volumes := instancehaPodVolumes(instance) volumeMounts := instancehaPodVolumeMounts() + livenessProbe := &corev1.Probe{ + // TODO might need tuning + TimeoutSeconds: 30, + PeriodSeconds: 30, + InitialDelaySeconds: 10, + } + + livenessProbe.HTTPGet = &corev1.HTTPGetAction{ + Path: "/", + Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080}, + } + // add CA cert if defined if instance.Spec.CaBundleSecretName != "" { volumes = append(volumes, instance.Spec.CreateVolume()) @@ -91,7 +104,8 @@ func Deployment( Protocol: "UDP", Name: "instanceha", }}, - VolumeMounts: volumeMounts, + VolumeMounts: volumeMounts, + LivenessProbe: livenessProbe, }}, }, }, diff --git a/templates/instanceha/bin/instanceha.py b/templates/instanceha/bin/instanceha.py index df4c5d12..ddd23010 100755 --- a/templates/instanceha/bin/instanceha.py +++ b/templates/instanceha/bin/instanceha.py @@ -15,6 +15,9 @@ from yaml.loader import SafeLoader import socket import struct +import threading +import hashlib +from http import server import json from novaclient import client @@ -30,6 +33,30 @@ UDP_IP = '' UDP_PORT = os.getenv('UDP_PORT', 7410) +current_hash = "" +hash_update_successful = True + +class HealthCheckServer(server.BaseHTTPRequestHandler): + def do_GET(self): + logging.debug("Health check request received.") + + if hash_update_successful: + self.send_response(200) + self.send_header("Content-type", "text/plain") + self.end_headers() + self.wfile.write(current_hash.encode('utf-8')) + else: + self.send_response(500) + self.send_header("Content-type", "text/plain") + self.end_headers() + self.wfile.write(b"Error: Hash not updated properly.") + +def start_health_check_server(): + logging.debug("Start of the health check server on port 8080...") + server_address = ('', 8080) + httpd = server.HTTPServer(server_address, HealthCheckServer) + httpd.serve_forever() + with open("/var/lib/instanceha/config.yaml", 'r') as stream: try: config = yaml.load(stream, Loader=SafeLoader)["config"] @@ -646,14 +673,15 @@ def _host_fence(host, action): return True -def process_service(service, reserved_hosts): +def process_service(service, reserved_hosts, resume): - try: - logging.info('Fencing %s' % service.host) - _host_fence(service.host, 'off') - except Exception as e: - logging.error('Failed to fence %s: %s' % (service.host, e)) - return False + if not resume: + try: + logging.info('Fencing %s' % service.host) + _host_fence(service.host, 'off') + except Exception as e: + logging.error('Failed to fence %s: %s' % (service.host, e)) + return False CLOUD = os.getenv('OS_CLOUD', 'overcloud') @@ -670,12 +698,13 @@ def process_service(service, reserved_hosts): except Exception as e: logging.error("Failed: Unable to connect to Nova: " + str(e)) - try: - logging.info('Disabling %s before evacuation' % service.host) - _host_disable(conn, service) - except Exception as e: - logging.error('Failed to disable %s: %s' % (service.host, e)) - return False + if not resume: + try: + logging.info('Disabling %s before evacuation' % service.host) + _host_disable(conn, service) + except Exception as e: + logging.error('Failed to disable %s: %s' % (service.host, e)) + return False if 'true' in RESERVED_HOSTS.lower(): try: @@ -733,6 +762,14 @@ def process_service(service, reserved_hosts): def main(): + global current_hash + global hash_update_successful + + health_check_thread = threading.Thread(target=start_health_check_server) + health_check_thread.daemon = True + health_check_thread.start() + + previous_hash = "" CLOUD = os.getenv('OS_CLOUD', 'overcloud') @@ -754,6 +791,19 @@ def main(): logging.error("Failed: Unable to connect to Nova: " + str(e)) while True: + current_time = str(time.time()).encode('utf-8') + new_hash = hashlib.sha256(current_time).hexdigest() + + if new_hash == previous_hash: + logging.error("Hash has not changed. Something went wrong.") + hash_update_successful = False + else: + logging.debug("Hash updated successfully.") + current_hash = new_hash + hash_update_successful = True + + previous_hash = current_hash + services = conn.services.list(binary="nova-compute") # How fast do we want to react / how much do we want to wait before considering a host worth of our attention @@ -764,7 +814,11 @@ def main(): # We filter previously disabled hosts or the ones that are forced_down. compute_nodes = [service for service in services if (datetime.fromisoformat(service.updated_at) < target_date and service.state != 'down') or (service.state == 'down') and 'disabled' not in service.status and not service.forced_down] - if not compute_nodes == []: + # Let's check if there are computes nodes that were already being processed, we want to check them again in case the pod was restarted + to_resume = [service for service in services if service.forced_down and (service.state == 'down') and 'disabled' in service.status and 'instanceha evacuation' in service.disabled_reason] + + + if not (compute_nodes + to_resume) == []: logging.warning('The following computes are down:' + str([service.host for service in compute_nodes])) # Filter out computes that have no vms running (we don't want to waste time evacuating those anyway) @@ -790,7 +844,7 @@ def main(): else: reserved_hosts = [] - if compute_nodes: + if compute_nodes or to_resume: if (len(compute_nodes) / len(services) * 100) > THRESHOLD: logging.error('Number of impacted computes exceeds the defined threshold. There is something wrong.') pass @@ -802,8 +856,14 @@ def main(): to_evacuate = compute_nodes if 'false' in DISABLED.lower(): + # process computes that are seen as down for the first time + with concurrent.futures.ThreadPoolExecutor() as executor: + results = list(executor.map(lambda service: process_service(service, reserved_hosts, False), to_evacuate)) + if not all(results): + logging.warning('Some services failed to evacuate. Retrying in 30 seconds.') + # process computes that were half-evacuated with concurrent.futures.ThreadPoolExecutor() as executor: - results = list(executor.map(lambda service: process_service(service, reserved_hosts), to_evacuate)) + results = list(executor.map(lambda service: process_service(service, reserved_hosts, True), to_resume)) if not all(results): logging.warning('Some services failed to evacuate. Retrying in 30 seconds.')