diff --git a/operate/cli.py b/operate/cli.py index 821fa88ac..aed5c90b1 100644 --- a/operate/cli.py +++ b/operate/cli.py @@ -145,6 +145,7 @@ def create_app( # pylint: disable=too-many-locals, unused-argument, too-many-st logger = setup_logger(name="operate") operate = OperateApp(home=home, logger=logger) funding_jobs: t.Dict[str, asyncio.Task] = {} + healthcheck_jobs: t.Dict[str, asyncio.Task] = {} # Create shutdown endpoint shutdown_endpoint = uuid.uuid4().hex @@ -171,6 +172,23 @@ def schedule_funding_job( ) ) + def schedule_healthcheck_job( + service: str, + ) -> None: + """Schedule a healthcheck job.""" + logger.info(f"Starting healthcheck job for {service}") + if service in healthcheck_jobs: + logger.info(f"Cancelling existing healthcheck_jobs job for {service}") + cancel_healthcheck_job(service=service) + + loop = asyncio.get_running_loop() + healthcheck_jobs[service] = loop.create_task( + operate.service_manager().healthcheck_job( + hash=service, + loop=loop, + ) + ) + def cancel_funding_job(service: str) -> None: """Cancel funding job.""" if service not in funding_jobs: @@ -179,6 +197,14 @@ def cancel_funding_job(service: str) -> None: if not status: logger.info(f"Funding job cancellation for {service} failed") + def cancel_healthcheck_job(service: str) -> None: + """Cancel healthcheck job.""" + if service not in healthcheck_jobs: + return + status = healthcheck_jobs[service].cancel() + if not status: + logger.info(f"Healthcheck job cancellation for {service} failed") + app = FastAPI() app.add_middleware( @@ -506,6 +532,7 @@ async def _create_services(request: Request) -> JSONResponse: manager.fund_service(hash=service.hash) manager.deploy_service_locally(hash=service.hash) schedule_funding_job(service=service.hash) + schedule_healthcheck_job(service=service.hash) return JSONResponse( content=operate.service_manager().create_or_load(hash=service.hash).json @@ -529,6 +556,7 @@ async def _update_services(request: Request) -> JSONResponse: manager.fund_service(hash=service.hash) manager.deploy_service_locally(hash=service.hash) schedule_funding_job(service=service.hash) + schedule_healthcheck_job(service=service.hash) return JSONResponse(content=service.json) @@ -638,6 +666,7 @@ async def _start_service_locally(request: Request) -> JSONResponse: manager.fund_service(hash=service) manager.deploy_service_locally(hash=service, force=True) schedule_funding_job(service=service) + schedule_healthcheck_job(service=service.hash) return JSONResponse(content=manager.create_or_load(service).deployment) @app.post("/api/services/{service}/deployment/stop") diff --git a/operate/services/manage.py b/operate/services/manage.py index 803ee8527..0de2b497d 100644 --- a/operate/services/manage.py +++ b/operate/services/manage.py @@ -25,7 +25,7 @@ import typing as t from concurrent.futures import ThreadPoolExecutor from pathlib import Path - +import aiohttp from aea.helpers.base import IPFSHash from aea.helpers.logging import setup_logger from autonomy.chain.base import registry_contracts @@ -56,6 +56,7 @@ KEYS_JSON = "keys.json" DOCKER_COMPOSE_YAML = "docker-compose.yaml" SERVICE_YAML = "service.yaml" +HTTP_OK = 200 class ServiceManager: @@ -824,6 +825,15 @@ def fund_service( # pylint: disable=too-many-arguments chain_type=service.ledger_config.chain, ) + async def check_service_health( + self, + ) -> bool: + async with aiohttp.ClientSession() as session: + async with session.get("http://localhost:8000/healthcheck") as resp: + status = resp.status + response_json = await resp.json() + return status == HTTP_OK and response_json.get("is_transitioning_fast", False) + async def funding_job( self, hash: str, @@ -853,6 +863,38 @@ async def funding_job( ) await asyncio.sleep(60) + async def healthcheck_job( + self, + hash: str, + loop: t.Optional[asyncio.AbstractEventLoop] = None, + ) -> None: + """Start a background funding job.""" + loop = loop or asyncio.get_event_loop() + failed_health_checks = 0 + + with ThreadPoolExecutor() as executor: + while True: + try: + # Check the service health + healthy = await loop.run_in_executor( + executor, + self.check_service_health, + ) + # Restart the service if the health failed 5 times in a row + if not healthy: + failed_health_checks += 1 + else: + failed_health_checks = 0 + if failed_health_checks >= 5: + self.stop_service_locally(hash=hash) + self.deploy_service_locally(hash=hash) + + except Exception: # pylint: disable=broad-except + logging.info( + f"Error occured while checking the service health\n{traceback.format_exc()}" + ) + await asyncio.sleep(60) + def deploy_service_locally(self, hash: str, force: bool = True) -> Deployment: """ Deploy service locally