diff --git a/electron/install.js b/electron/install.js index ba13ac9f8..c6356214e 100644 --- a/electron/install.js +++ b/electron/install.js @@ -13,7 +13,7 @@ const { paths } = require('./constants'); * - use "" (nothing as a suffix) for latest release candidate, for example "0.1.0rc26" * - use "alpha" for alpha release, for example "0.1.0rc26-alpha" */ -const OlasMiddlewareVersion = '0.1.0rc71'; +const OlasMiddlewareVersion = '0.1.0rc75'; const Env = { ...process.env, diff --git a/operate/cli.py b/operate/cli.py index 80dfa36bf..818b497af 100644 --- a/operate/cli.py +++ b/operate/cli.py @@ -20,6 +20,7 @@ """Operate app CLI module.""" import asyncio +from concurrent.futures import ThreadPoolExecutor import logging import os import signal @@ -42,6 +43,7 @@ from operate.account.user import UserAccount from operate.constants import KEY, KEYS, OPERATE, SERVICES from operate.ledger import get_ledger_type_from_chain_type +from operate.services.health_checker import HealthChecker from operate.types import ChainType, DeploymentStatus from operate.wallet.master import MasterWalletManager @@ -145,14 +147,24 @@ def create_app( # pylint: disable=too-many-locals, unused-argument, too-many-st logger = setup_logger(name="operate") operate = OperateApp(home=home, logger=logger) funding_jobs: t.Dict[str, asyncio.Task] = {} - healthcheck_jobs: t.Dict[str, asyncio.Task] = {} - + health_checker = HealthChecker(operate.service_manager()) # Create shutdown endpoint shutdown_endpoint = uuid.uuid4().hex (operate._path / "operate.kill").write_text( # pylint: disable=protected-access shutdown_endpoint ) + thread_pool_executor = ThreadPoolExecutor() + + async def run_in_executor(fn, *args): + loop = asyncio.get_event_loop() + future = loop.run_in_executor(thread_pool_executor, fn, *args) + res = await future + exception = future.exception() + if exception is not None: + raise exception + return res + def schedule_funding_job( service: str, from_safe: bool = True, @@ -176,17 +188,7 @@ def schedule_healthcheck_job( service: str, ) -> None: """Schedule a healthcheck job.""" - logger.info(f"Starting healthcheck job for {service}") - if service in healthcheck_jobs: - logger.info(f"Cancelling existing healthcheck_jobs job for {service}") - cancel_healthcheck_job(service=service) - - loop = asyncio.get_running_loop() - healthcheck_jobs[service] = loop.create_task( - operate.service_manager().healthcheck_job( - hash=service, - ) - ) + health_checker.start_for_service(service) def cancel_funding_job(service: str) -> None: """Cancel funding job.""" @@ -210,16 +212,9 @@ def pause_all_services_on_startup() -> None: deployment.stop(force=True) logger.info(f"Cancelling funding job for {service}") cancel_funding_job(service=service) + health_checker.stop_for_service(service=service) logger.info("Stopping services on startup done.") - def cancel_healthcheck_job(service: str) -> None: - """Cancel healthcheck job.""" - if service not in healthcheck_jobs: - return - status = healthcheck_jobs[service].cancel() - if not status: - logger.info(f"Healthcheck job cancellation for {service} failed") - # on backend app started we assume there are now started agents, so we force to pause all pause_all_services_on_startup() @@ -545,10 +540,12 @@ async def _create_services(request: Request) -> JSONResponse: ) if template.get("deploy", False): - manager.deploy_service_onchain_from_safe(hash=service.hash, update=update) - manager.stake_service_on_chain_from_safe(hash=service.hash) - manager.fund_service(hash=service.hash) - manager.deploy_service_locally(hash=service.hash) + def _fn(): + manager.deploy_service_onchain_from_safe(hash=service.hash, update=update) + manager.stake_service_on_chain_from_safe(hash=service.hash) + manager.fund_service(hash=service.hash) + manager.deploy_service_locally(hash=service.hash) + await run_in_executor(_fn) schedule_funding_job(service=service.hash) schedule_healthcheck_job(service=service.hash) @@ -668,7 +665,11 @@ async def _build_service_locally(request: Request) -> JSONResponse: ) .deployment ) - deployment.build(force=True) + + def _fn(): + deployment.build(force=True) + + await run_in_executor(_fn) return JSONResponse(content=deployment.json) @app.post("/api/services/{service}/deployment/start") @@ -679,10 +680,14 @@ async def _start_service_locally(request: Request) -> JSONResponse: return service_not_found_error(service=request.path_params["service"]) service = request.path_params["service"] manager = operate.service_manager() - manager.deploy_service_onchain(hash=service) - manager.stake_service_on_chain(hash=service) - manager.fund_service(hash=service) - manager.deploy_service_locally(hash=service, force=True) + + def _fn(): + manager.deploy_service_onchain(hash=service) + manager.stake_service_on_chain(hash=service) + manager.fund_service(hash=service) + manager.deploy_service_locally(hash=service, force=True) + + await run_in_executor(_fn) schedule_funding_job(service=service) schedule_healthcheck_job(service=service.hash) return JSONResponse(content=manager.load_or_create(service).deployment) @@ -695,7 +700,9 @@ async def _stop_service_locally(request: Request) -> JSONResponse: return service_not_found_error(service=request.path_params["service"]) service = request.path_params["service"] deployment = operate.service_manager().load_or_create(service).deployment - deployment.stop() + health_checker.stop_for_service(service=service) + + await run_in_executor(deployment.stop) logger.info(f"Cancelling funding job for {service}") cancel_funding_job(service=service) return JSONResponse(content=deployment.json) diff --git a/operate/services/health_checker.py b/operate/services/health_checker.py new file mode 100644 index 000000000..e1979be13 --- /dev/null +++ b/operate/services/health_checker.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# ------------------------------------------------------------------------------ +# +# Copyright 2024 Valory AG +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------------ +"""Source code for checking aea is alive..""" +import asyncio +import typing as t +from concurrent.futures import ThreadPoolExecutor + +import aiohttp # type: ignore +from aea.helpers.logging import setup_logger + +from operate.services.manage import ServiceManager # type: ignore + + +HTTP_OK = 200 + + +class HealthChecker: + """Health checker manager.""" + + SLEEP_PERIOD = 30 + PORT_UP_TIMEOUT = 120 # seconds + + def __init__(self, service_manager: ServiceManager) -> None: + """Init the healtch checker.""" + self._jobs: t.Dict[str, asyncio.Task] = {} + self.logger = setup_logger(name="operate.health_checker") + self.logger.info("[HEALTCHECKER]: created") + self._service_manager = service_manager + + def start_for_service(self, service: str) -> None: + """Start for a specific service.""" + self.logger.info(f"[HEALTCHECKER]: Starting healthcheck job for {service}") + if service in self._jobs: + self.stop_for_service(service=service) + + loop = asyncio.get_running_loop() + self._jobs[service] = loop.create_task( + self.healthcheck_job( + service=service, + ) + ) + + def stop_for_service(self, service: str) -> None: + """Stop for a specific service.""" + if service not in self._jobs: + return + self.logger.info( + f"[HEALTCHECKER]: Cancelling existing healthcheck_jobs job for {service}" + ) + status = self._jobs[service].cancel() + if not status: + self.logger.info( + f"[HEALTCHECKER]: Healthcheck job cancellation for {service} failed" + ) + + @staticmethod + async def check_service_health(service: str) -> bool: + """Check the service health""" + del service + async with aiohttp.ClientSession() as session: + async with session.get("http://localhost:8716/healthcheck") as resp: + status = resp.status + response_json = await resp.json() + return status == HTTP_OK and response_json.get( + "is_transitioning_fast", False + ) + + async def healthcheck_job( + self, + service: str, + ) -> None: + """Start a background health check job.""" + + try: + self.logger.info( + f"[HEALTCHECKER] Start healthcheck job for service: {service}" + ) + + async def _wait_for_port(sleep_period: int = 15) -> None: + self.logger.info("[HEALTCHECKER]: wait port is up") + while True: + try: + await self.check_service_health(service) + self.logger.info("[HEALTCHECKER]: port is UP") + return + except aiohttp.ClientConnectionError: + self.logger.error("[HEALTCHECKER]: error connecting http port") + await asyncio.sleep(sleep_period) + + async def _check_port_ready( + timeout: int = self.PORT_UP_TIMEOUT, sleep_period: int = 15 + ) -> bool: + try: + await asyncio.wait_for( + _wait_for_port(sleep_period=sleep_period), timeout=timeout + ) + return True + except asyncio.TimeoutError: + return False + + async def _check_health( + number_of_fails: int = 5, sleep_period: int = self.SLEEP_PERIOD + ) -> None: + fails = 0 + while True: + try: + # Check the service health + healthy = await self.check_service_health(service) + except aiohttp.ClientConnectionError: + self.logger.info( + f"[HEALTCHECKER] {service} port read failed. restart" + ) + return + + if not healthy: + fails += 1 + self.logger.info( + f"[HEALTCHECKER] {service} not healthy for {fails} time in a row" + ) + else: + self.logger.info(f"[HEALTCHECKER] {service} is HEALTHY") + # reset fails if comes healty + fails = 0 + + if fails >= number_of_fails: + # too much fails, exit + self.logger.error( + f"[HEALTCHECKER] {service} failed {fails} times in a row. restart" + ) + return + await asyncio.sleep(sleep_period) + + async def _restart(service_manager: ServiceManager, service: str) -> None: + def _do_restart() -> None: + service_manager.stop_service_locally(hash=service) + service_manager.deploy_service_locally(hash=service) + + loop = asyncio.get_event_loop() + with ThreadPoolExecutor() as executor: + future = loop.run_in_executor(executor, _do_restart) + await future + exception = future.exception() + if exception is not None: + raise exception + + # upper cycle + while True: + self.logger.info(f"[HEALTCHECKER] {service} wait for port ready") + if await _check_port_ready(timeout=self.PORT_UP_TIMEOUT): + # blocking till restart needed + self.logger.info( + f"[HEALTCHECKER] {service} port is ready, checking health every {self.SLEEP_PERIOD}" + ) + await _check_health(sleep_period=self.SLEEP_PERIOD) + + else: + self.logger.info( + "[HEALTCHECKER] port not ready within timeout. restart deployment" + ) + + # perform restart + # TODO: blocking!!!!!!! + await _restart(self._service_manager, service) + except Exception: + self.logger.exception(f"problems running healthcheckr for {service}") + raise diff --git a/operate/services/manage.py b/operate/services/manage.py index 927fff7cc..f1cb481a0 100644 --- a/operate/services/manage.py +++ b/operate/services/manage.py @@ -27,7 +27,6 @@ from concurrent.futures import ThreadPoolExecutor from pathlib import Path -import aiohttp # type: ignore from aea.helpers.base import IPFSHash from aea.helpers.logging import setup_logger from autonomy.chain.base import registry_contracts @@ -62,17 +61,6 @@ HTTP_OK = 200 -async def check_service_health() -> bool: - """Check the service health""" - async with aiohttp.ClientSession() as session: - async with session.get("http://localhost:8716/healthcheck") as resp: - status = resp.status - response_json = await resp.json() - return status == HTTP_OK and response_json.get( - "is_transitioning_fast", False - ) - - class ServiceManager: """Service manager.""" @@ -922,32 +910,6 @@ async def funding_job( ) await asyncio.sleep(60) - async def healthcheck_job( - self, - hash: str, - ) -> None: - """Start a background funding job.""" - failed_health_checks = 0 - - while True: - try: - # Check the service health - healthy = await check_service_health() - # Restart the service if the health failed 5 times in a row - if not healthy: - failed_health_checks += 1 - else: - failed_health_checks = 0 - if failed_health_checks >= 4: - self.stop_service_locally(hash=hash) - self.deploy_service_locally(hash=hash) - - except Exception: # pylint: disable=broad-except - logging.info( - f"Error occured while checking the service health\n{traceback.format_exc()}" - ) - await asyncio.sleep(30) - def deploy_service_locally(self, hash: str, force: bool = True) -> Deployment: """ Deploy service locally diff --git a/package.json b/package.json index e41167849..8f2d1deef 100644 --- a/package.json +++ b/package.json @@ -56,5 +56,5 @@ "start:frontend": "cd frontend && yarn start", "test:frontend": "cd frontend && yarn test" }, - "version": "0.1.0-rc71" + "version": "0.1.0-rc75" } diff --git a/pyproject.toml b/pyproject.toml index 8ac78d250..746f8b62d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "olas-operate-middleware" -version = "0.1.0-rc71" +version = "0.1.0-rc75" description = "" authors = ["David Vilela ", "Viraj Patel "] readme = "README.md"