Skip to content

Commit

Permalink
helthchecker env vars to switch it off. small refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
solarw committed Aug 20, 2024
1 parent 0930ac2 commit e42ba6e
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 26 deletions.
14 changes: 10 additions & 4 deletions operate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,22 +138,26 @@ def json(self) -> dict:
"home": str(self._path),
}


def create_app( # pylint: disable=too-many-locals, unused-argument, too-many-statements
home: t.Optional[Path] = None,
) -> FastAPI:
"""Create FastAPI object."""
HEALTH_CHECKER_OFF = os.environ.get("HEALTH_CHECKER_OFF", "0") == "1"
number_of_fails = int(os.environ.get("HEALTH_CHECKER_TRIES", "5"))


logger = setup_logger(name="operate")
if HEALTH_CHECKER_OFF:
logger.warning("healthchecker is off!!!")
operate = OperateApp(home=home, logger=logger)
funding_jobs: t.Dict[str, asyncio.Task] = {}
health_checker = HealthChecker(operate.service_manager())
health_checker = HealthChecker(operate.service_manager(), number_of_fails=number_of_fails)
# Create shutdown endpoint
shutdown_endpoint = uuid.uuid4().hex
(operate._path / "operate.kill").write_text( # pylint: disable=protected-access
shutdown_endpoint
)

thread_pool_executor = ThreadPoolExecutor()

async def run_in_executor(fn: t.Callable, *args: t.Any) -> t.Any:
Expand Down Expand Up @@ -188,7 +192,9 @@ def schedule_healthcheck_job(
service: str,
) -> None:
"""Schedule a healthcheck job."""
health_checker.start_for_service(service)
if not HEALTH_CHECKER_OFF:
# dont start health checker if it's switched off
health_checker.start_for_service(service)

def cancel_funding_job(service: str) -> None:
"""Cancel funding job."""
Expand Down
56 changes: 34 additions & 22 deletions operate/services/health_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,28 @@
class HealthChecker:
"""Health checker manager."""

SLEEP_PERIOD = 30
PORT_UP_TIMEOUT = 120 # seconds
SLEEP_PERIOD_DEFAULT = 30
PORT_UP_TIMEOUT_DEFAULT = 120 # seconds
NUMBER_OF_FAILS_DEFAULT = 5

def __init__(self, service_manager: ServiceManager) -> None:
def __init__(
self,
service_manager: ServiceManager,
port_up_timeout: int | None = None,
sleep_period: int | None = None,
number_of_fails: int | None = None,
) -> None:
"""Init the healtch checker."""
self._jobs: t.Dict[str, asyncio.Task] = {}
self.logger = setup_logger(name="operate.health_checker")
self.logger.info("[HEALTCHECKER]: created")
self._service_manager = service_manager
self.port_up_timeout = port_up_timeout or self.PORT_UP_TIMEOUT_DEFAULT
self.sleep_period = sleep_period or self.SLEEP_PERIOD_DEFAULT
self.number_of_fails = number_of_fails or self.NUMBER_OF_FAILS_DEFAULT

def start_for_service(self, service: str) -> None:
"""Start for a specific service."""
self.logger.info(f"[HEALTCHECKER]: Starting healthcheck job for {service}")
self.logger.info(f"[HEALTH_CHECKER]: Starting healthcheck job for {service}")
if service in self._jobs:
self.stop_for_service(service=service)

Expand All @@ -62,12 +71,12 @@ def stop_for_service(self, service: str) -> None:
if service not in self._jobs:
return
self.logger.info(
f"[HEALTCHECKER]: Cancelling existing healthcheck_jobs job for {service}"
f"[HEALTH_CHECKER]: Cancelling existing healthcheck_jobs job for {service}"
)
status = self._jobs[service].cancel()
if not status:
self.logger.info(
f"[HEALTCHECKER]: Healthcheck job cancellation for {service} failed"
f"[HEALTH_CHECKER]: Healthcheck job cancellation for {service} failed"
)

@staticmethod
Expand All @@ -90,22 +99,22 @@ async def healthcheck_job(

try:
self.logger.info(
f"[HEALTCHECKER] Start healthcheck job for service: {service}"
f"[HEALTH_CHECKER] Start healthcheck job for service: {service}"
)

async def _wait_for_port(sleep_period: int = 15) -> None:
self.logger.info("[HEALTCHECKER]: wait port is up")
self.logger.info("[HEALTH_CHECKER]: wait port is up")
while True:
try:
await self.check_service_health(service)
self.logger.info("[HEALTCHECKER]: port is UP")
self.logger.info("[HEALTH_CHECKER]: port is UP")
return
except aiohttp.ClientConnectionError:
self.logger.error("[HEALTCHECKER]: error connecting http port")
self.logger.error("[HEALTH_CHECKER]: error connecting http port")
await asyncio.sleep(sleep_period)

async def _check_port_ready(
timeout: int = self.PORT_UP_TIMEOUT, sleep_period: int = 15
timeout: int = self.port_up_timeout, sleep_period: int = 15
) -> bool:
try:
await asyncio.wait_for(
Expand All @@ -116,7 +125,7 @@ async def _check_port_ready(
return False

async def _check_health(
number_of_fails: int = 5, sleep_period: int = self.SLEEP_PERIOD
number_of_fails: int = 5, sleep_period: int = self.sleep_period
) -> None:
fails = 0
while True:
Expand All @@ -125,24 +134,24 @@ async def _check_health(
healthy = await self.check_service_health(service)
except aiohttp.ClientConnectionError:
self.logger.info(
f"[HEALTCHECKER] {service} port read failed. restart"
f"[HEALTH_CHECKER] {service} port read failed. restart"
)
return

if not healthy:
fails += 1
self.logger.info(
f"[HEALTCHECKER] {service} not healthy for {fails} time in a row"
f"[HEALTH_CHECKER] {service} not healthy for {fails} time in a row"
)
else:
self.logger.info(f"[HEALTCHECKER] {service} is HEALTHY")
self.logger.info(f"[HEALTH_CHECKER] {service} is HEALTHY")
# reset fails if comes healty
fails = 0

if fails >= number_of_fails:
# too much fails, exit
self.logger.error(
f"[HEALTCHECKER] {service} failed {fails} times in a row. restart"
f"[HEALTH_CHECKER] {service} failed {fails} times in a row. restart"
)
return
await asyncio.sleep(sleep_period)
Expand All @@ -162,17 +171,20 @@ def _do_restart() -> None:

# upper cycle
while True:
self.logger.info(f"[HEALTCHECKER] {service} wait for port ready")
if await _check_port_ready(timeout=self.PORT_UP_TIMEOUT):
self.logger.info(f"[HEALTH_CHECKER] {service} wait for port ready")
if await _check_port_ready(timeout=self.port_up_timeout):
# blocking till restart needed
self.logger.info(
f"[HEALTCHECKER] {service} port is ready, checking health every {self.SLEEP_PERIOD}"
f"[HEALTH_CHECKER] {service} port is ready, checking health every {self.sleep_period}"
)
await _check_health(
number_of_fails=self.number_of_fails,
sleep_period=self.sleep_period,
)
await _check_health(sleep_period=self.SLEEP_PERIOD)

else:
self.logger.info(
"[HEALTCHECKER] port not ready within timeout. restart deployment"
"[HEALTH_CHECKER] port not ready within timeout. restart deployment"
)

# perform restart
Expand Down

0 comments on commit e42ba6e

Please sign in to comment.