Skip to content

Commit

Permalink
Merge pull request #290 from lmiccini/iha_liveness_probe
Browse files Browse the repository at this point in the history
Add liveness probe and allow InstanceHa to resume evacuations
  • Loading branch information
openshift-merge-bot[bot] authored Oct 18, 2024
2 parents b9247e5 + 025b946 commit 6534d49
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 17 deletions.
16 changes: 15 additions & 1 deletion pkg/instanceha/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
)

Expand All @@ -41,6 +42,18 @@ func Deployment(
volumes := instancehaPodVolumes(instance)
volumeMounts := instancehaPodVolumeMounts()

livenessProbe := &corev1.Probe{
// TODO might need tuning
TimeoutSeconds: 30,
PeriodSeconds: 30,
InitialDelaySeconds: 10,
}

livenessProbe.HTTPGet = &corev1.HTTPGetAction{
Path: "/",
Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
}

// add CA cert if defined
if instance.Spec.CaBundleSecretName != "" {
volumes = append(volumes, instance.Spec.CreateVolume())
Expand Down Expand Up @@ -91,7 +104,8 @@ func Deployment(
Protocol: "UDP",
Name: "instanceha",
}},
VolumeMounts: volumeMounts,
VolumeMounts: volumeMounts,
LivenessProbe: livenessProbe,
}},
},
},
Expand Down
92 changes: 76 additions & 16 deletions templates/instanceha/bin/instanceha.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from yaml.loader import SafeLoader
import socket
import struct
import threading
import hashlib
from http import server
import json

from novaclient import client
Expand All @@ -30,6 +33,30 @@
UDP_IP = ''
UDP_PORT = os.getenv('UDP_PORT', 7410)

current_hash = ""
hash_update_successful = True

class HealthCheckServer(server.BaseHTTPRequestHandler):
def do_GET(self):
logging.debug("Health check request received.")

if hash_update_successful:
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(current_hash.encode('utf-8'))
else:
self.send_response(500)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"Error: Hash not updated properly.")

def start_health_check_server():
logging.debug("Start of the health check server on port 8080...")
server_address = ('', 8080)
httpd = server.HTTPServer(server_address, HealthCheckServer)
httpd.serve_forever()

with open("/var/lib/instanceha/config.yaml", 'r') as stream:
try:
config = yaml.load(stream, Loader=SafeLoader)["config"]
Expand Down Expand Up @@ -646,14 +673,15 @@ def _host_fence(host, action):
return True


def process_service(service, reserved_hosts):
def process_service(service, reserved_hosts, resume):

try:
logging.info('Fencing %s' % service.host)
_host_fence(service.host, 'off')
except Exception as e:
logging.error('Failed to fence %s: %s' % (service.host, e))
return False
if not resume:
try:
logging.info('Fencing %s' % service.host)
_host_fence(service.host, 'off')
except Exception as e:
logging.error('Failed to fence %s: %s' % (service.host, e))
return False

CLOUD = os.getenv('OS_CLOUD', 'overcloud')

Expand All @@ -670,12 +698,13 @@ def process_service(service, reserved_hosts):
except Exception as e:
logging.error("Failed: Unable to connect to Nova: " + str(e))

try:
logging.info('Disabling %s before evacuation' % service.host)
_host_disable(conn, service)
except Exception as e:
logging.error('Failed to disable %s: %s' % (service.host, e))
return False
if not resume:
try:
logging.info('Disabling %s before evacuation' % service.host)
_host_disable(conn, service)
except Exception as e:
logging.error('Failed to disable %s: %s' % (service.host, e))
return False

if 'true' in RESERVED_HOSTS.lower():
try:
Expand Down Expand Up @@ -733,6 +762,14 @@ def process_service(service, reserved_hosts):


def main():
global current_hash
global hash_update_successful

health_check_thread = threading.Thread(target=start_health_check_server)
health_check_thread.daemon = True
health_check_thread.start()

previous_hash = ""

CLOUD = os.getenv('OS_CLOUD', 'overcloud')

Expand All @@ -754,6 +791,19 @@ def main():
logging.error("Failed: Unable to connect to Nova: " + str(e))

while True:
current_time = str(time.time()).encode('utf-8')
new_hash = hashlib.sha256(current_time).hexdigest()

if new_hash == previous_hash:
logging.error("Hash has not changed. Something went wrong.")
hash_update_successful = False
else:
logging.debug("Hash updated successfully.")
current_hash = new_hash
hash_update_successful = True

previous_hash = current_hash

services = conn.services.list(binary="nova-compute")

# How fast do we want to react / how much do we want to wait before considering a host worth of our attention
Expand All @@ -764,7 +814,11 @@ def main():
# We filter previously disabled hosts or the ones that are forced_down.
compute_nodes = [service for service in services if (datetime.fromisoformat(service.updated_at) < target_date and service.state != 'down') or (service.state == 'down') and 'disabled' not in service.status and not service.forced_down]

if not compute_nodes == []:
# Let's check if there are computes nodes that were already being processed, we want to check them again in case the pod was restarted
to_resume = [service for service in services if service.forced_down and (service.state == 'down') and 'disabled' in service.status and 'instanceha evacuation' in service.disabled_reason]


if not (compute_nodes + to_resume) == []:
logging.warning('The following computes are down:' + str([service.host for service in compute_nodes]))

# Filter out computes that have no vms running (we don't want to waste time evacuating those anyway)
Expand All @@ -790,7 +844,7 @@ def main():
else:
reserved_hosts = []

if compute_nodes:
if compute_nodes or to_resume:
if (len(compute_nodes) / len(services) * 100) > THRESHOLD:
logging.error('Number of impacted computes exceeds the defined threshold. There is something wrong.')
pass
Expand All @@ -802,8 +856,14 @@ def main():
to_evacuate = compute_nodes

if 'false' in DISABLED.lower():
# process computes that are seen as down for the first time
with concurrent.futures.ThreadPoolExecutor() as executor:
results = list(executor.map(lambda service: process_service(service, reserved_hosts, False), to_evacuate))
if not all(results):
logging.warning('Some services failed to evacuate. Retrying in 30 seconds.')
# process computes that were half-evacuated
with concurrent.futures.ThreadPoolExecutor() as executor:
results = list(executor.map(lambda service: process_service(service, reserved_hosts), to_evacuate))
results = list(executor.map(lambda service: process_service(service, reserved_hosts, True), to_resume))
if not all(results):
logging.warning('Some services failed to evacuate. Retrying in 30 seconds.')

Expand Down

0 comments on commit 6534d49

Please sign in to comment.