Skip to content

Commit

Permalink
reimplemented healthcheck
Browse files Browse the repository at this point in the history
  • Loading branch information
solarw committed Jul 17, 2024
1 parent 45baebf commit 80cf517
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 59 deletions.
26 changes: 5 additions & 21 deletions operate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from operate.account.user import UserAccount
from operate.constants import KEY, KEYS, OPERATE, SERVICES
from operate.ledger import get_ledger_type_from_chain_type
from operate.services.health_checker import HealthChecker
from operate.types import ChainType, DeploymentStatus
from operate.wallet.master import MasterWalletManager

Expand Down Expand Up @@ -145,8 +146,7 @@ def create_app( # pylint: disable=too-many-locals, unused-argument, too-many-st
logger = setup_logger(name="operate")
operate = OperateApp(home=home, logger=logger)
funding_jobs: t.Dict[str, asyncio.Task] = {}
healthcheck_jobs: t.Dict[str, asyncio.Task] = {}

health_checker = HealthChecker(operate.service_manager())
# Create shutdown endpoint
shutdown_endpoint = uuid.uuid4().hex
(operate._path / "operate.kill").write_text( # pylint: disable=protected-access
Expand Down Expand Up @@ -176,17 +176,7 @@ def schedule_healthcheck_job(
service: str,
) -> None:
"""Schedule a healthcheck job."""
logger.info(f"Starting healthcheck job for {service}")
if service in healthcheck_jobs:
logger.info(f"Cancelling existing healthcheck_jobs job for {service}")
cancel_healthcheck_job(service=service)

loop = asyncio.get_running_loop()
healthcheck_jobs[service] = loop.create_task(
operate.service_manager().healthcheck_job(
hash=service,
)
)
health_checker.start_for_service(service)

def cancel_funding_job(service: str) -> None:
"""Cancel funding job."""
Expand All @@ -210,16 +200,9 @@ def pause_all_services_on_startup() -> None:
deployment.stop(force=True)
logger.info(f"Cancelling funding job for {service}")
cancel_funding_job(service=service)
health_checker.stop_for_service(service=service)
logger.info("Stopping services on startup done.")

def cancel_healthcheck_job(service: str) -> None:
"""Cancel healthcheck job."""
if service not in healthcheck_jobs:
return
status = healthcheck_jobs[service].cancel()
if not status:
logger.info(f"Healthcheck job cancellation for {service} failed")

# on backend app started we assume there are now started agents, so we force to pause all
pause_all_services_on_startup()

Expand Down Expand Up @@ -695,6 +678,7 @@ async def _stop_service_locally(request: Request) -> JSONResponse:
return service_not_found_error(service=request.path_params["service"])
service = request.path_params["service"]
deployment = operate.service_manager().load_or_create(service).deployment
health_checker.stop_for_service(service=service)
deployment.stop()
logger.info(f"Cancelling funding job for {service}")
cancel_funding_job(service=service)
Expand Down
144 changes: 144 additions & 0 deletions operate/services/health_checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import asyncio
import time
import traceback
import typing as t

import aiohttp
from aea.helpers.logging import setup_logger

from operate.services.manage import ServiceManager # type: ignore


HTTP_OK = 200


class HealthChecker:
SLEEP_PERIOD = 30
PORT_UP_TIMEOUT = 120 # seconds

def __init__(self, service_manager: ServiceManager):
self._jobs: t.Dict[str, asyncio.Task] = {}
self.logger = setup_logger(name="operate.health_checker")
self.logger.info("[HEALTCHECKER]: created")
self._service_manager = service_manager

def start_for_service(self, service: str):
self.logger.info(f"[HEALTCHECKER]: Starting healthcheck job for {service}")
if service in self._jobs:
self.stop_for_service(service=service)

loop = asyncio.get_running_loop()
self._jobs[service] = loop.create_task(
self.healthcheck_job(
hash=service,
)
)

def stop_for_service(self, service: str):
if service not in self._jobs:
return
self.logger.info(
f"[HEALTCHECKER]: Cancelling existing healthcheck_jobs job for {service}"
)
status = self._jobs[service].cancel()
if not status:
self.logger.info(
f"[HEALTCHECKER]: Healthcheck job cancellation for {service} failed"
)

async def check_service_health(self, service: str) -> bool:
"""Check the service health"""
del service
async with aiohttp.ClientSession() as session:
async with session.get("http://localhost:8716/healthcheck") as resp:
status = resp.status
response_json = await resp.json()
# self.logger.info(f"[HEALTCHECKER]: check {status}, {response_json}")
return status == HTTP_OK and response_json.get(
"is_transitioning_fast", False
)

async def healthcheck_job(
self,
hash: str,
) -> None:
"""Start a background funding job."""

try:
service = hash

self.logger.info(
f"[HEALTCHECKER] Start healthcheck job for service: {service}"
)

async def _wait_for_port(sleep_period=15):
self.logger.info("[HEALTCHECKER]: wait port is up")
while True:
try:
await self.check_service_health(service)
self.logger.info("[HEALTCHECKER]: port is UP")
return
except aiohttp.ClientConnectionError:
self.logger.error("[HEALTCHECKER]: error connecting http port")
await asyncio.sleep(sleep_period)

async def _check_port_ready(timeout=self.PORT_UP_TIMEOUT, sleep_period=15):
try:
await asyncio.wait_for(
_wait_for_port(sleep_period=sleep_period), timeout=timeout
)
return True
except asyncio.TimeoutError:
return False

async def _check_health(number_of_fails=5, sleep_period=self.SLEEP_PERIOD):
fails = 0
while True:
try:
# Check the service health
healthy = await self.check_service_health(service)
except aiohttp.ClientConnectionError:
self.logger.info("[HEALTCHECKER] port read failed. restart")
return
self.logger.info(f"[HEALTCHECKER] is HEALTHY")

if not healthy:
fails += 1
self.logger.info(
f"[HEALTCHECKER] not healthy for {fails} time in a row"
)
else:
# reset fails if comes healty
fails = 0

if fails >= number_of_fails:
# too much fails, exit
self.logger.error(
f"[HEALTCHECKER] failed {fails} times in a row. restart"
)
return
await asyncio.sleep(sleep_period)

async def _restart(service_manager, service):
service_manager.stop_service_locally(hash=service)
service_manager.deploy_service_locally(hash=service)

# upper cycle
while True:
self.logger.info("[HEALTCHECKER] wait for port ready")
if not (await _check_port_ready(timeout=self.PORT_UP_TIMEOUT)):
self.logger.info(
"[HEALTCHECKER] port not ready within timeout. restart deploymen"
)
else:
# blocking till restart needed
self.logger.info(
f"[HEALTCHECKER] port is ready, checking health every {self.SLEEP_PERIOD}"
)
await _check_health(sleep_period=self.SLEEP_PERIOD)

# perform restart
# TODO: blocking!!!!!!!
await _restart(self._service_manager, service)
except Exception as e:
self.logger.exception("oops")
38 changes: 0 additions & 38 deletions operate/services/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path

import aiohttp # type: ignore
from aea.helpers.base import IPFSHash
from aea.helpers.logging import setup_logger
from autonomy.chain.base import registry_contracts
Expand Down Expand Up @@ -62,17 +61,6 @@
HTTP_OK = 200


async def check_service_health() -> bool:
"""Check the service health"""
async with aiohttp.ClientSession() as session:
async with session.get("http://localhost:8716/healthcheck") as resp:
status = resp.status
response_json = await resp.json()
return status == HTTP_OK and response_json.get(
"is_transitioning_fast", False
)


class ServiceManager:
"""Service manager."""

Expand Down Expand Up @@ -922,32 +910,6 @@ async def funding_job(
)
await asyncio.sleep(60)

async def healthcheck_job(
self,
hash: str,
) -> None:
"""Start a background funding job."""
failed_health_checks = 0

while True:
try:
# Check the service health
healthy = await check_service_health()
# Restart the service if the health failed 5 times in a row
if not healthy:
failed_health_checks += 1
else:
failed_health_checks = 0
if failed_health_checks >= 4:
self.stop_service_locally(hash=hash)
self.deploy_service_locally(hash=hash)

except Exception: # pylint: disable=broad-except
logging.info(
f"Error occured while checking the service health\n{traceback.format_exc()}"
)
await asyncio.sleep(30)

def deploy_service_locally(self, hash: str, force: bool = True) -> Deployment:
"""
Deploy service locally
Expand Down

0 comments on commit 80cf517

Please sign in to comment.