Skip to content

Commit

Permalink
feat: add healthcheck
Browse files Browse the repository at this point in the history
  • Loading branch information
dvilelaf committed Jun 4, 2024
1 parent c588be8 commit e6dac81
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 1 deletion.
29 changes: 29 additions & 0 deletions operate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def create_app( # pylint: disable=too-many-locals, unused-argument, too-many-st
logger = setup_logger(name="operate")
operate = OperateApp(home=home, logger=logger)
funding_jobs: t.Dict[str, asyncio.Task] = {}
healthcheck_jobs: t.Dict[str, asyncio.Task] = {}

# Create shutdown endpoint
shutdown_endpoint = uuid.uuid4().hex
Expand All @@ -171,6 +172,23 @@ def schedule_funding_job(
)
)

def schedule_healthcheck_job(
service: str,
) -> None:
"""Schedule a healthcheck job."""
logger.info(f"Starting healthcheck job for {service}")
if service in healthcheck_jobs:
logger.info(f"Cancelling existing healthcheck_jobs job for {service}")
cancel_healthcheck_job(service=service)

loop = asyncio.get_running_loop()
healthcheck_jobs[service] = loop.create_task(
operate.service_manager().healthcheck_job(
hash=service,
loop=loop,
)
)

def cancel_funding_job(service: str) -> None:
"""Cancel funding job."""
if service not in funding_jobs:
Expand All @@ -179,6 +197,14 @@ def cancel_funding_job(service: str) -> None:
if not status:
logger.info(f"Funding job cancellation for {service} failed")

def cancel_healthcheck_job(service: str) -> None:
"""Cancel healthcheck job."""
if service not in healthcheck_jobs:
return
status = healthcheck_jobs[service].cancel()
if not status:
logger.info(f"Healthcheck job cancellation for {service} failed")

app = FastAPI()

app.add_middleware(
Expand Down Expand Up @@ -506,6 +532,7 @@ async def _create_services(request: Request) -> JSONResponse:
manager.fund_service(hash=service.hash)
manager.deploy_service_locally(hash=service.hash)
schedule_funding_job(service=service.hash)
schedule_healthcheck_job(service=service.hash)

return JSONResponse(
content=operate.service_manager().create_or_load(hash=service.hash).json
Expand All @@ -529,6 +556,7 @@ async def _update_services(request: Request) -> JSONResponse:
manager.fund_service(hash=service.hash)
manager.deploy_service_locally(hash=service.hash)
schedule_funding_job(service=service.hash)
schedule_healthcheck_job(service=service.hash)

return JSONResponse(content=service.json)

Expand Down Expand Up @@ -638,6 +666,7 @@ async def _start_service_locally(request: Request) -> JSONResponse:
manager.fund_service(hash=service)
manager.deploy_service_locally(hash=service, force=True)
schedule_funding_job(service=service)
schedule_healthcheck_job(service=service.hash)
return JSONResponse(content=manager.create_or_load(service).deployment)

@app.post("/api/services/{service}/deployment/stop")
Expand Down
44 changes: 43 additions & 1 deletion operate/services/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import typing as t
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path

import aiohttp
from aea.helpers.base import IPFSHash
from aea.helpers.logging import setup_logger
from autonomy.chain.base import registry_contracts
Expand Down Expand Up @@ -56,6 +56,7 @@
KEYS_JSON = "keys.json"
DOCKER_COMPOSE_YAML = "docker-compose.yaml"
SERVICE_YAML = "service.yaml"
HTTP_OK = 200


class ServiceManager:
Expand Down Expand Up @@ -824,6 +825,15 @@ def fund_service( # pylint: disable=too-many-arguments
chain_type=service.ledger_config.chain,
)

async def check_service_health(
self,
) -> bool:
async with aiohttp.ClientSession() as session:
async with session.get("http://localhost:8000/healthcheck") as resp:
status = resp.status
response_json = await resp.json()
return status == HTTP_OK and response_json.get("is_transitioning_fast", False)

async def funding_job(
self,
hash: str,
Expand Down Expand Up @@ -853,6 +863,38 @@ async def funding_job(
)
await asyncio.sleep(60)

async def healthcheck_job(
self,
hash: str,
loop: t.Optional[asyncio.AbstractEventLoop] = None,
) -> None:
"""Start a background funding job."""
loop = loop or asyncio.get_event_loop()
failed_health_checks = 0

with ThreadPoolExecutor() as executor:
while True:
try:
# Check the service health
healthy = await loop.run_in_executor(
executor,
self.check_service_health,
)
# Restart the service if the health failed 5 times in a row
if not healthy:
failed_health_checks += 1
else:
failed_health_checks = 0
if failed_health_checks >= 5:
self.stop_service_locally(hash=hash)
self.deploy_service_locally(hash=hash)

except Exception: # pylint: disable=broad-except
logging.info(
f"Error occured while checking the service health\n{traceback.format_exc()}"
)
await asyncio.sleep(60)

def deploy_service_locally(self, hash: str, force: bool = True) -> Deployment:
"""
Deploy service locally
Expand Down

0 comments on commit e6dac81

Please sign in to comment.