Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Fix/deploy async fixes #239

Merged
merged 8 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion electron/install.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const { paths } = require('./constants');
* - use "" (nothing as a suffix) for latest release candidate, for example "0.1.0rc26"
* - use "alpha" for alpha release, for example "0.1.0rc26-alpha"
*/
const OlasMiddlewareVersion = '0.1.0rc71';
const OlasMiddlewareVersion = '0.1.0rc75';

const Env = {
...process.env,
Expand Down
69 changes: 38 additions & 31 deletions operate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"""Operate app CLI module."""

import asyncio
from concurrent.futures import ThreadPoolExecutor
import logging
import os
import signal
Expand All @@ -42,6 +43,7 @@
from operate.account.user import UserAccount
from operate.constants import KEY, KEYS, OPERATE, SERVICES
from operate.ledger import get_ledger_type_from_chain_type
from operate.services.health_checker import HealthChecker
from operate.types import ChainType, DeploymentStatus
from operate.wallet.master import MasterWalletManager

Expand Down Expand Up @@ -145,14 +147,24 @@ def create_app( # pylint: disable=too-many-locals, unused-argument, too-many-st
logger = setup_logger(name="operate")
operate = OperateApp(home=home, logger=logger)
funding_jobs: t.Dict[str, asyncio.Task] = {}
healthcheck_jobs: t.Dict[str, asyncio.Task] = {}

health_checker = HealthChecker(operate.service_manager())
# Create shutdown endpoint
shutdown_endpoint = uuid.uuid4().hex
(operate._path / "operate.kill").write_text( # pylint: disable=protected-access
shutdown_endpoint
)

thread_pool_executor = ThreadPoolExecutor()

async def run_in_executor(fn, *args):
loop = asyncio.get_event_loop()
future = loop.run_in_executor(thread_pool_executor, fn, *args)
res = await future
exception = future.exception()
if exception is not None:
raise exception
return res

def schedule_funding_job(
service: str,
from_safe: bool = True,
Expand All @@ -176,17 +188,7 @@ def schedule_healthcheck_job(
service: str,
) -> None:
"""Schedule a healthcheck job."""
logger.info(f"Starting healthcheck job for {service}")
if service in healthcheck_jobs:
logger.info(f"Cancelling existing healthcheck_jobs job for {service}")
cancel_healthcheck_job(service=service)

loop = asyncio.get_running_loop()
healthcheck_jobs[service] = loop.create_task(
operate.service_manager().healthcheck_job(
hash=service,
)
)
health_checker.start_for_service(service)

def cancel_funding_job(service: str) -> None:
"""Cancel funding job."""
Expand All @@ -210,16 +212,9 @@ def pause_all_services_on_startup() -> None:
deployment.stop(force=True)
logger.info(f"Cancelling funding job for {service}")
cancel_funding_job(service=service)
health_checker.stop_for_service(service=service)
logger.info("Stopping services on startup done.")

def cancel_healthcheck_job(service: str) -> None:
"""Cancel healthcheck job."""
if service not in healthcheck_jobs:
return
status = healthcheck_jobs[service].cancel()
if not status:
logger.info(f"Healthcheck job cancellation for {service} failed")

# on backend app started we assume there are now started agents, so we force to pause all
pause_all_services_on_startup()

Expand Down Expand Up @@ -545,10 +540,12 @@ async def _create_services(request: Request) -> JSONResponse:
)

if template.get("deploy", False):
manager.deploy_service_onchain_from_safe(hash=service.hash, update=update)
manager.stake_service_on_chain_from_safe(hash=service.hash)
manager.fund_service(hash=service.hash)
manager.deploy_service_locally(hash=service.hash)
def _fn():
manager.deploy_service_onchain_from_safe(hash=service.hash, update=update)
manager.stake_service_on_chain_from_safe(hash=service.hash)
manager.fund_service(hash=service.hash)
manager.deploy_service_locally(hash=service.hash)
await run_in_executor(_fn)
schedule_funding_job(service=service.hash)
schedule_healthcheck_job(service=service.hash)

Expand Down Expand Up @@ -668,7 +665,11 @@ async def _build_service_locally(request: Request) -> JSONResponse:
)
.deployment
)
deployment.build(force=True)

def _fn():
deployment.build(force=True)

await run_in_executor(_fn)
return JSONResponse(content=deployment.json)

@app.post("/api/services/{service}/deployment/start")
Expand All @@ -679,10 +680,14 @@ async def _start_service_locally(request: Request) -> JSONResponse:
return service_not_found_error(service=request.path_params["service"])
service = request.path_params["service"]
manager = operate.service_manager()
manager.deploy_service_onchain(hash=service)
manager.stake_service_on_chain(hash=service)
manager.fund_service(hash=service)
manager.deploy_service_locally(hash=service, force=True)

def _fn():
manager.deploy_service_onchain(hash=service)
manager.stake_service_on_chain(hash=service)
manager.fund_service(hash=service)
manager.deploy_service_locally(hash=service, force=True)

await run_in_executor(_fn)
schedule_funding_job(service=service)
schedule_healthcheck_job(service=service.hash)
return JSONResponse(content=manager.load_or_create(service).deployment)
Expand All @@ -695,7 +700,9 @@ async def _stop_service_locally(request: Request) -> JSONResponse:
return service_not_found_error(service=request.path_params["service"])
service = request.path_params["service"]
deployment = operate.service_manager().load_or_create(service).deployment
deployment.stop()
health_checker.stop_for_service(service=service)

await run_in_executor(deployment.stop)
logger.info(f"Cancelling funding job for {service}")
cancel_funding_job(service=service)
return JSONResponse(content=deployment.json)
Expand Down
183 changes: 183 additions & 0 deletions operate/services/health_checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
#
# Copyright 2024 Valory AG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ------------------------------------------------------------------------------
"""Source code for checking aea is alive.."""
import asyncio
import typing as t
from concurrent.futures import ThreadPoolExecutor

import aiohttp # type: ignore
from aea.helpers.logging import setup_logger

from operate.services.manage import ServiceManager # type: ignore


HTTP_OK = 200


class HealthChecker:
"""Health checker manager."""

SLEEP_PERIOD = 30
PORT_UP_TIMEOUT = 120 # seconds

def __init__(self, service_manager: ServiceManager) -> None:
"""Init the healtch checker."""
self._jobs: t.Dict[str, asyncio.Task] = {}
self.logger = setup_logger(name="operate.health_checker")
self.logger.info("[HEALTCHECKER]: created")
self._service_manager = service_manager

def start_for_service(self, service: str) -> None:
"""Start for a specific service."""
self.logger.info(f"[HEALTCHECKER]: Starting healthcheck job for {service}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny typo, not a big deal

Suggested change
self.logger.info(f"[HEALTCHECKER]: Starting healthcheck job for {service}")
self.logger.info(f"[HEALTHCHECKER]: Starting healthcheck job for {service}")

if service in self._jobs:
self.stop_for_service(service=service)

loop = asyncio.get_running_loop()
self._jobs[service] = loop.create_task(
self.healthcheck_job(
service=service,
)
)

def stop_for_service(self, service: str) -> None:
"""Stop for a specific service."""
if service not in self._jobs:
return
self.logger.info(
f"[HEALTCHECKER]: Cancelling existing healthcheck_jobs job for {service}"
)
status = self._jobs[service].cancel()
if not status:
self.logger.info(
f"[HEALTCHECKER]: Healthcheck job cancellation for {service} failed"
)

@staticmethod
async def check_service_health(service: str) -> bool:
"""Check the service health"""
del service
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the purpose of del here?

async with aiohttp.ClientSession() as session:
async with session.get("http://localhost:8716/healthcheck") as resp:
status = resp.status
response_json = await resp.json()
return status == HTTP_OK and response_json.get(
"is_transitioning_fast", False
)

async def healthcheck_job(
self,
service: str,
) -> None:
"""Start a background health check job."""

try:
self.logger.info(
f"[HEALTCHECKER] Start healthcheck job for service: {service}"
)

async def _wait_for_port(sleep_period: int = 15) -> None:
self.logger.info("[HEALTCHECKER]: wait port is up")
while True:
try:
await self.check_service_health(service)
self.logger.info("[HEALTCHECKER]: port is UP")
return
except aiohttp.ClientConnectionError:
self.logger.error("[HEALTCHECKER]: error connecting http port")
await asyncio.sleep(sleep_period)

async def _check_port_ready(
timeout: int = self.PORT_UP_TIMEOUT, sleep_period: int = 15
) -> bool:
try:
await asyncio.wait_for(
_wait_for_port(sleep_period=sleep_period), timeout=timeout
)
return True
except asyncio.TimeoutError:
return False

async def _check_health(
number_of_fails: int = 5, sleep_period: int = self.SLEEP_PERIOD
) -> None:
fails = 0
while True:
try:
# Check the service health
healthy = await self.check_service_health(service)
except aiohttp.ClientConnectionError:
self.logger.info(
f"[HEALTCHECKER] {service} port read failed. restart"
)
return

if not healthy:
fails += 1
self.logger.info(
f"[HEALTCHECKER] {service} not healthy for {fails} time in a row"
)
else:
self.logger.info(f"[HEALTCHECKER] {service} is HEALTHY")
# reset fails if comes healty
fails = 0

if fails >= number_of_fails:
# too much fails, exit
self.logger.error(
f"[HEALTCHECKER] {service} failed {fails} times in a row. restart"
)
return
await asyncio.sleep(sleep_period)

async def _restart(service_manager: ServiceManager, service: str) -> None:
def _do_restart() -> None:
service_manager.stop_service_locally(hash=service)
service_manager.deploy_service_locally(hash=service)

loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
future = loop.run_in_executor(executor, _do_restart)
await future
exception = future.exception()
if exception is not None:
raise exception

# upper cycle
while True:
self.logger.info(f"[HEALTCHECKER] {service} wait for port ready")
if await _check_port_ready(timeout=self.PORT_UP_TIMEOUT):
# blocking till restart needed
self.logger.info(
f"[HEALTCHECKER] {service} port is ready, checking health every {self.SLEEP_PERIOD}"
)
await _check_health(sleep_period=self.SLEEP_PERIOD)

else:
self.logger.info(
"[HEALTCHECKER] port not ready within timeout. restart deployment"
)

# perform restart
# TODO: blocking!!!!!!!
await _restart(self._service_manager, service)
except Exception:
self.logger.exception(f"problems running healthcheckr for {service}")
raise
38 changes: 0 additions & 38 deletions operate/services/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path

import aiohttp # type: ignore
from aea.helpers.base import IPFSHash
from aea.helpers.logging import setup_logger
from autonomy.chain.base import registry_contracts
Expand Down Expand Up @@ -62,17 +61,6 @@
HTTP_OK = 200


async def check_service_health() -> bool:
"""Check the service health"""
async with aiohttp.ClientSession() as session:
async with session.get("http://localhost:8716/healthcheck") as resp:
status = resp.status
response_json = await resp.json()
return status == HTTP_OK and response_json.get(
"is_transitioning_fast", False
)


class ServiceManager:
"""Service manager."""

Expand Down Expand Up @@ -922,32 +910,6 @@ async def funding_job(
)
await asyncio.sleep(60)

async def healthcheck_job(
self,
hash: str,
) -> None:
"""Start a background funding job."""
failed_health_checks = 0

while True:
try:
# Check the service health
healthy = await check_service_health()
# Restart the service if the health failed 5 times in a row
if not healthy:
failed_health_checks += 1
else:
failed_health_checks = 0
if failed_health_checks >= 4:
self.stop_service_locally(hash=hash)
self.deploy_service_locally(hash=hash)

except Exception: # pylint: disable=broad-except
logging.info(
f"Error occured while checking the service health\n{traceback.format_exc()}"
)
await asyncio.sleep(30)

def deploy_service_locally(self, hash: str, force: bool = True) -> Deployment:
"""
Deploy service locally
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@
"start:frontend": "cd frontend && yarn start",
"test:frontend": "cd frontend && yarn test"
},
"version": "0.1.0-rc71"
"version": "0.1.0-rc75"
}
Loading
Loading