diff --git a/src/main.py b/src/main.py index 7542861..0a9b260 100644 --- a/src/main.py +++ b/src/main.py @@ -1,3 +1,4 @@ +import os from threading import Thread from dotenv import load_dotenv, find_dotenv @@ -21,8 +22,8 @@ def main(): api_thread = Thread(target=start_po_api, kwargs={'database': database}) api_thread.start() - # start the status loop - status_loop(database) + # start status loop + status_loop(database, os.getenv('STATUS_LOOP_INTERVAL', 10)) def start_po_api(database: Database): diff --git a/src/status/status.py b/src/status/status.py index 5df4c88..7f9dfad 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -13,33 +13,48 @@ from src.status.constants import AnalysisHubStatus -def status_loop(database: Database): +def status_loop(database: Database, status_loop_interval: int) -> None: """ Send the status of the analysis to the HUB, kill deployment if analysis finished :return: """ node_id = None + node_analysis_ids = {} while True: if database.get_analysis_ids(): if node_id is None: node_id = _get_node_id() + else: + for analysis_id in set(database.get_analysis_ids()): + if analysis_id not in node_analysis_ids.keys(): + node_analysis_id = _get_node_analysis_id(node_id, analysis_id) + if node_analysis_id is not None: + node_analysis_ids[analysis_id] = node_analysis_id + else: + node_analysis_id = node_analysis_ids[analysis_id] - for analysis_id in set(database.get_analysis_ids()): - node_analysis_id = _get_node_analysis_id(node_id, analysis_id) + if node_analysis_id is not None: + deployments = [read_db_analysis(deployment) + for deployment in database.get_deployments(analysis_id)] - deployments = [read_db_analysis(deployment) for deployment in database.get_deployments(analysis_id)] + db_status, int_status = (_get_status(deployments), {'status': {}}) - db_status, int_status = (_get_status(deployments), {'status': {}}) + # update created to running status if deployment responsive + db_status, int_status = _update_running_status(deployments, + database, + db_status, + int_status) - # update created to running status if deployment responsive - db_status, int_status = _update_running_status(deployments, database, db_status, int_status) + db_status, int_status = update_finished_status(deployments, + analysis_id, + database, + db_status, + int_status) - db_status, int_status = update_finished_status(deployments, analysis_id, database, db_status, int_status) - - _set_analysis_hub_status(node_analysis_id, db_status, int_status) - time.sleep(10) + _set_analysis_hub_status(node_analysis_id, db_status, int_status) + time.sleep(status_loop_interval) def update_finished_status(deployments: list[Analysis], @@ -130,19 +145,20 @@ def _submit_analysis_status_update(node_analysis_id: str, status: AnalysisHubSta :return: """ if status is not None: + + response = asyncio.run(AsyncClient(base_url=os.getenv('HUB_URL_CORE'), + headers={"accept": "application/json", + "Authorization":f"Bearer {get_hub_token()['hub_token']}"}) + .post(f'/analysis-nodes/{node_analysis_id}', + json={"run_status": status}, + headers=[('Connection', 'close')])) + #print(f"resposne status update: {response.json()}") try: - response = asyncio.run(AsyncClient(base_url=os.getenv('HUB_URL_CORE'), - headers={"accept": "application/json", - "Authorization":f"Bearer {get_hub_token()['hub_token']}"}) - .post(f'/analysis-nodes/{node_analysis_id}', - json={"run_status": status}, - headers=[('Connection', 'close')])) - #print(f"resposne status update: {response.json()}") response.raise_for_status() except httpx.HTTPStatusError as e: print(f"Error updating analysis status: {e}") -def _get_node_analysis_id(node_id: str, analysis_id: str) -> str: +def _get_node_analysis_id(node_id: str, analysis_id: str) -> Optional[str]: """ get node-analysis id from hub analysis-id: 893761b5-d8ac-42be-ad71-a2d3e70b3990 @@ -160,7 +176,12 @@ def _get_node_analysis_id(node_id: str, analysis_id: str) -> str: "Authorization": f"Bearer {get_hub_token()['hub_token']}"}) .get(f'/analysis-nodes?filter[node_id]={node_id}&filter[analysis_id]={analysis_id}', headers=[('Connection', 'close')])) - response.raise_for_status() + try: + response.raise_for_status() + except httpx.HTTPStatusError as e: + print(f"Error getting node-analysis id: {e}") + return None + return response.json()['data'][0]['id'] @@ -180,7 +201,11 @@ def _get_node_id() -> str: "Authorization": f"Bearer {get_hub_token()['hub_token']}"}) .get(f'/nodes?filter[robot_id]={robot_id}', headers=[('Connection', 'close')])) print(response.json()) - response.raise_for_status() + try: + response.raise_for_status() + except httpx.HTTPStatusError as e: + print(f"Error getting node id: {e}") + return None return response.json()['data'][0]['id'] @@ -205,7 +230,11 @@ async def _get_internal_deployment_status(deployment_name: str) -> Literal['fini base_url=f'http://nginx-{deployment_name}:80').get('/analysis/healthz', headers=[('Connection', 'close')]) print(f"response nginx-{deployment_name}/analysis/healthz: {response}") - response.raise_for_status() + try: + response.raise_for_status() + except httpx.HTTPStatusError as e: + print(f"Error getting internal deployment status: {e}") + return 'failed' try: print(f"analyse status: {response.json()}") except json.decoder.JSONDecodeError: