diff --git a/operate/cli.py b/operate/cli.py index 7d3292760..081010101 100644 --- a/operate/cli.py +++ b/operate/cli.py @@ -138,22 +138,26 @@ def json(self) -> dict: "home": str(self._path), } - + def create_app( # pylint: disable=too-many-locals, unused-argument, too-many-statements home: t.Optional[Path] = None, ) -> FastAPI: """Create FastAPI object.""" + HEALTH_CHECKER_OFF = os.environ.get("HEALTH_CHECKER_OFF", "0") == "1" + number_of_fails = int(os.environ.get("HEALTH_CHECKER_TRIES", "5")) + logger = setup_logger(name="operate") + if HEALTH_CHECKER_OFF: + logger.warning("healthchecker is off!!!") operate = OperateApp(home=home, logger=logger) funding_jobs: t.Dict[str, asyncio.Task] = {} - health_checker = HealthChecker(operate.service_manager()) + health_checker = HealthChecker(operate.service_manager(), number_of_fails=number_of_fails) # Create shutdown endpoint shutdown_endpoint = uuid.uuid4().hex (operate._path / "operate.kill").write_text( # pylint: disable=protected-access shutdown_endpoint ) - thread_pool_executor = ThreadPoolExecutor() async def run_in_executor(fn: t.Callable, *args: t.Any) -> t.Any: @@ -188,7 +192,9 @@ def schedule_healthcheck_job( service: str, ) -> None: """Schedule a healthcheck job.""" - health_checker.start_for_service(service) + if not HEALTH_CHECKER_OFF: + # dont start health checker if it's switched off + health_checker.start_for_service(service) def cancel_funding_job(service: str) -> None: """Cancel funding job.""" diff --git a/operate/services/health_checker.py b/operate/services/health_checker.py index e1979be13..ba1c5d57c 100644 --- a/operate/services/health_checker.py +++ b/operate/services/health_checker.py @@ -34,19 +34,28 @@ class HealthChecker: """Health checker manager.""" - SLEEP_PERIOD = 30 - PORT_UP_TIMEOUT = 120 # seconds + SLEEP_PERIOD_DEFAULT = 30 + PORT_UP_TIMEOUT_DEFAULT = 120 # seconds + NUMBER_OF_FAILS_DEFAULT = 5 - def __init__(self, service_manager: ServiceManager) -> None: + def __init__( + self, + service_manager: ServiceManager, + port_up_timeout: int | None = None, + sleep_period: int | None = None, + number_of_fails: int | None = None, + ) -> None: """Init the healtch checker.""" self._jobs: t.Dict[str, asyncio.Task] = {} self.logger = setup_logger(name="operate.health_checker") - self.logger.info("[HEALTCHECKER]: created") self._service_manager = service_manager + self.port_up_timeout = port_up_timeout or self.PORT_UP_TIMEOUT_DEFAULT + self.sleep_period = sleep_period or self.SLEEP_PERIOD_DEFAULT + self.number_of_fails = number_of_fails or self.NUMBER_OF_FAILS_DEFAULT def start_for_service(self, service: str) -> None: """Start for a specific service.""" - self.logger.info(f"[HEALTCHECKER]: Starting healthcheck job for {service}") + self.logger.info(f"[HEALTH_CHECKER]: Starting healthcheck job for {service}") if service in self._jobs: self.stop_for_service(service=service) @@ -62,12 +71,12 @@ def stop_for_service(self, service: str) -> None: if service not in self._jobs: return self.logger.info( - f"[HEALTCHECKER]: Cancelling existing healthcheck_jobs job for {service}" + f"[HEALTH_CHECKER]: Cancelling existing healthcheck_jobs job for {service}" ) status = self._jobs[service].cancel() if not status: self.logger.info( - f"[HEALTCHECKER]: Healthcheck job cancellation for {service} failed" + f"[HEALTH_CHECKER]: Healthcheck job cancellation for {service} failed" ) @staticmethod @@ -90,22 +99,22 @@ async def healthcheck_job( try: self.logger.info( - f"[HEALTCHECKER] Start healthcheck job for service: {service}" + f"[HEALTH_CHECKER] Start healthcheck job for service: {service}" ) async def _wait_for_port(sleep_period: int = 15) -> None: - self.logger.info("[HEALTCHECKER]: wait port is up") + self.logger.info("[HEALTH_CHECKER]: wait port is up") while True: try: await self.check_service_health(service) - self.logger.info("[HEALTCHECKER]: port is UP") + self.logger.info("[HEALTH_CHECKER]: port is UP") return except aiohttp.ClientConnectionError: - self.logger.error("[HEALTCHECKER]: error connecting http port") + self.logger.error("[HEALTH_CHECKER]: error connecting http port") await asyncio.sleep(sleep_period) async def _check_port_ready( - timeout: int = self.PORT_UP_TIMEOUT, sleep_period: int = 15 + timeout: int = self.port_up_timeout, sleep_period: int = 15 ) -> bool: try: await asyncio.wait_for( @@ -116,7 +125,7 @@ async def _check_port_ready( return False async def _check_health( - number_of_fails: int = 5, sleep_period: int = self.SLEEP_PERIOD + number_of_fails: int = 5, sleep_period: int = self.sleep_period ) -> None: fails = 0 while True: @@ -125,24 +134,24 @@ async def _check_health( healthy = await self.check_service_health(service) except aiohttp.ClientConnectionError: self.logger.info( - f"[HEALTCHECKER] {service} port read failed. restart" + f"[HEALTH_CHECKER] {service} port read failed. restart" ) return if not healthy: fails += 1 self.logger.info( - f"[HEALTCHECKER] {service} not healthy for {fails} time in a row" + f"[HEALTH_CHECKER] {service} not healthy for {fails} time in a row" ) else: - self.logger.info(f"[HEALTCHECKER] {service} is HEALTHY") + self.logger.info(f"[HEALTH_CHECKER] {service} is HEALTHY") # reset fails if comes healty fails = 0 if fails >= number_of_fails: # too much fails, exit self.logger.error( - f"[HEALTCHECKER] {service} failed {fails} times in a row. restart" + f"[HEALTH_CHECKER] {service} failed {fails} times in a row. restart" ) return await asyncio.sleep(sleep_period) @@ -162,17 +171,20 @@ def _do_restart() -> None: # upper cycle while True: - self.logger.info(f"[HEALTCHECKER] {service} wait for port ready") - if await _check_port_ready(timeout=self.PORT_UP_TIMEOUT): + self.logger.info(f"[HEALTH_CHECKER] {service} wait for port ready") + if await _check_port_ready(timeout=self.port_up_timeout): # blocking till restart needed self.logger.info( - f"[HEALTCHECKER] {service} port is ready, checking health every {self.SLEEP_PERIOD}" + f"[HEALTH_CHECKER] {service} port is ready, checking health every {self.sleep_period}" + ) + await _check_health( + number_of_fails=self.number_of_fails, + sleep_period=self.sleep_period, ) - await _check_health(sleep_period=self.SLEEP_PERIOD) else: self.logger.info( - "[HEALTCHECKER] port not ready within timeout. restart deployment" + "[HEALTH_CHECKER] port not ready within timeout. restart deployment" ) # perform restart