Skip to content

Commit

Permalink
fix: catch missing node analysis id, store node analysis ids in dict
Browse files Browse the repository at this point in the history
Co-authored-by: Nightknight3000 <[email protected]>
  • Loading branch information
antidodo and Nightknight3000 committed Nov 13, 2024
1 parent 65202c6 commit 2bc702b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 24 deletions.
5 changes: 3 additions & 2 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from threading import Thread

from dotenv import load_dotenv, find_dotenv
Expand All @@ -21,8 +22,8 @@ def main():
api_thread = Thread(target=start_po_api, kwargs={'database': database})
api_thread.start()

# start the status loop
status_loop(database)
# start status loop
status_loop(database, os.getenv('STATUS_LOOP_INTERVAL', 10))


def start_po_api(database: Database):
Expand Down
73 changes: 51 additions & 22 deletions src/status/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,48 @@
from src.status.constants import AnalysisHubStatus


def status_loop(database: Database):
def status_loop(database: Database, status_loop_interval: int) -> None:
"""
Send the status of the analysis to the HUB, kill deployment if analysis finished
:return:
"""
node_id = None
node_analysis_ids = {}

while True:
if database.get_analysis_ids():
if node_id is None:
node_id = _get_node_id()
else:
for analysis_id in set(database.get_analysis_ids()):
if analysis_id not in node_analysis_ids.keys():
node_analysis_id = _get_node_analysis_id(node_id, analysis_id)
if node_analysis_id is not None:
node_analysis_ids[analysis_id] = node_analysis_id
else:
node_analysis_id = node_analysis_ids[analysis_id]

for analysis_id in set(database.get_analysis_ids()):
node_analysis_id = _get_node_analysis_id(node_id, analysis_id)
if node_analysis_id is not None:
deployments = [read_db_analysis(deployment)
for deployment in database.get_deployments(analysis_id)]

deployments = [read_db_analysis(deployment) for deployment in database.get_deployments(analysis_id)]
db_status, int_status = (_get_status(deployments), {'status': {}})

db_status, int_status = (_get_status(deployments), {'status': {}})
# update created to running status if deployment responsive
db_status, int_status = _update_running_status(deployments,
database,
db_status,
int_status)

# update created to running status if deployment responsive
db_status, int_status = _update_running_status(deployments, database, db_status, int_status)
db_status, int_status = update_finished_status(deployments,
analysis_id,
database,
db_status,
int_status)

db_status, int_status = update_finished_status(deployments, analysis_id, database, db_status, int_status)

_set_analysis_hub_status(node_analysis_id, db_status, int_status)
time.sleep(10)
_set_analysis_hub_status(node_analysis_id, db_status, int_status)
time.sleep(status_loop_interval)


def update_finished_status(deployments: list[Analysis],
Expand Down Expand Up @@ -130,19 +145,20 @@ def _submit_analysis_status_update(node_analysis_id: str, status: AnalysisHubSta
:return:
"""
if status is not None:

response = asyncio.run(AsyncClient(base_url=os.getenv('HUB_URL_CORE'),
headers={"accept": "application/json",
"Authorization":f"Bearer {get_hub_token()['hub_token']}"})
.post(f'/analysis-nodes/{node_analysis_id}',
json={"run_status": status},
headers=[('Connection', 'close')]))
#print(f"resposne status update: {response.json()}")
try:
response = asyncio.run(AsyncClient(base_url=os.getenv('HUB_URL_CORE'),
headers={"accept": "application/json",
"Authorization":f"Bearer {get_hub_token()['hub_token']}"})
.post(f'/analysis-nodes/{node_analysis_id}',
json={"run_status": status},
headers=[('Connection', 'close')]))
#print(f"resposne status update: {response.json()}")
response.raise_for_status()
except httpx.HTTPStatusError as e:
print(f"Error updating analysis status: {e}")

def _get_node_analysis_id(node_id: str, analysis_id: str) -> str:
def _get_node_analysis_id(node_id: str, analysis_id: str) -> Optional[str]:
"""
get node-analysis id from hub
analysis-id: 893761b5-d8ac-42be-ad71-a2d3e70b3990
Expand All @@ -160,7 +176,12 @@ def _get_node_analysis_id(node_id: str, analysis_id: str) -> str:
"Authorization": f"Bearer {get_hub_token()['hub_token']}"})
.get(f'/analysis-nodes?filter[node_id]={node_id}&filter[analysis_id]={analysis_id}',
headers=[('Connection', 'close')]))
response.raise_for_status()
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
print(f"Error getting node-analysis id: {e}")
return None

return response.json()['data'][0]['id']


Expand All @@ -180,7 +201,11 @@ def _get_node_id() -> str:
"Authorization": f"Bearer {get_hub_token()['hub_token']}"})
.get(f'/nodes?filter[robot_id]={robot_id}', headers=[('Connection', 'close')]))
print(response.json())
response.raise_for_status()
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
print(f"Error getting node id: {e}")
return None
return response.json()['data'][0]['id']


Expand All @@ -205,7 +230,11 @@ async def _get_internal_deployment_status(deployment_name: str) -> Literal['fini
base_url=f'http://nginx-{deployment_name}:80').get('/analysis/healthz',
headers=[('Connection', 'close')])
print(f"response nginx-{deployment_name}/analysis/healthz: {response}")
response.raise_for_status()
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
print(f"Error getting internal deployment status: {e}")
return 'failed'
try:
print(f"analyse status: {response.json()}")
except json.decoder.JSONDecodeError:
Expand Down

0 comments on commit 2bc702b

Please sign in to comment.