Skip to content

Commit

Permalink
feat: implement automatic analysis status updates to hub
Browse files Browse the repository at this point in the history
Co-authored-by: Nightknight3000 <[email protected]>
  • Loading branch information
antidodo and Nightknight3000 committed Oct 29, 2024
1 parent 32250f9 commit 6b78567
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/k8s/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def create_analysis_deployment(name: str,
container1 = client.V1Container(name=name, image=image, image_pull_policy="IfNotPresent",
ports=[client.V1ContainerPort(port) for port in ports],
env=[client.V1EnvVar(name=key, value=val) for key, val in env.items()],
)#liveness_probe=liveness_probe)
liveness_probe=liveness_probe)
containers.append(container1)

depl_metadata = client.V1ObjectMeta(name=name, namespace=namespace)
Expand Down
2 changes: 2 additions & 0 deletions src/resources/analysis/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@


class AnalysisStatus(Enum):
PENDING = 'pending'
CREATED = 'created'
RUNNING = 'running'
STOPPED = 'stopped'
FINISHED = 'finished'
8 changes: 4 additions & 4 deletions src/resources/analysis/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@

class Analysis(BaseModel):
analysis_id: str
deployment_name: Optional[str] = None
deployment_name: str = ''
project_id: str
image_registry_address: str
ports: list[int]
tokens: Optional[dict[str, str]] = None
analysis_config: Optional[dict[str, str]] = None
status: Optional[str] = None
status: str = AnalysisStatus.PENDING.value
log: Optional[str] = None
pod_ids: Optional[list[str]] = None

def start(self, database: Database) -> None:
self.status = AnalysisStatus.CREATED.value
self.deployment_name = self.analysis_id + str(random.randint(0, 10000))
self.deployment_name = "analysis-" + self.analysis_id + str(random.randint(0, 10000))
# TODO: solution for some analyzes that have to be started multiple times
self.tokens = create_analysis_tokens(self.deployment_name, self.analysis_id, self.project_id)
self.analysis_config = self.tokens
Expand All @@ -36,7 +36,7 @@ def start(self, database: Database) -> None:
image=self.image_registry_address,
ports=self.ports,
env=self.analysis_config)
self.status = AnalysisStatus.RUNNING.value

database.create_analysis(analysis_id=self.analysis_id,
deployment_name=self.deployment_name,
project_id=self.project_id,
Expand Down
18 changes: 15 additions & 3 deletions src/resources/database/entity.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import os
from typing import Optional, List, Type
from typing import Optional, List, Type, Literal
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

Expand Down Expand Up @@ -58,6 +58,13 @@ def update_analysis(self, analysis_id: str, **kwargs) -> list[AnalysisDB]:
self.session.commit()
return analysis

def update_deployment(self, deployment_name: str, **kwargs) -> AnalysisDB:
deployment = self.get_deployment(deployment_name)
for key, value in kwargs.items():
setattr(deployment, key, value)
self.session.commit()
return deployment

def delete_analysis(self, analysis_id: str) -> None:
analysis = self.get_deployments(analysis_id)
for deployment in analysis:
Expand All @@ -80,6 +87,11 @@ def get_deployment_pod_ids(self, deployment_name: str) -> list[str]:
def get_analysis_pod_ids(self, analysis_id: str) -> list[str]:
return [deployment.pod_ids for deployment in self.get_deployments(analysis_id) if deployment is not None]

def stop_analysis(self, analysis_id: str) -> None:
self.update_analysis(analysis_id, status=AnalysisStatus.STOPPED.value)
def update_analysis_status(self, analysis_id: str, status: AnalysisStatus) -> None:
self.update_analysis(analysis_id, status=status)

def update_deployment_status(self, deployment_name: str, status: AnalysisStatus) -> None:
self.update_deployment(deployment_name, status=status)

def stop_analysis(self, analysis_id: str) -> None:
self.update_analysis_status(analysis_id, status=AnalysisStatus.STOPPED.value)
14 changes: 14 additions & 0 deletions src/status/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from enum import Enum


class AnalysisHubStatus(Enum):
STARTING = 'starting'
STARTED = 'started'

STOPPING = 'stopping'
STOPPED = 'stopped'

RUNNING = 'running'
FINISHED = 'finished'

FAILED = 'failed'
185 changes: 155 additions & 30 deletions src/status/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
import asyncio
from typing import Literal
from httpx import AsyncClient
import httpx


from src.resources.database.entity import Database
from src.resources.analysis.entity import Analysis, AnalysisStatus, read_db_analysis
from src.utils.token import delete_keycloak_client, get_hub_token
from src.status.constants import AnalysisHubStatus


def status_loop(database: Database):
Expand All @@ -21,26 +24,147 @@ def status_loop(database: Database):
if database.get_analysis_ids():
if node_id is None:
node_id = _get_node_id()
print(f"node_id: {node_id}")

for analysis_id in database.get_analysis_ids():
node_analysis_id = _get_node_analysis_id(node_id, analysis_id)

deployments = [read_db_analysis(deployment) for deployment in database.get_deployments(analysis_id)]
database_status = _get_status(deployments)
if database_status == 'running':
print(f"analysis_id: {analysis_id}")
internal_status = asyncio.run(_get_internal_analysis_status(deployments))
if internal_status == 'finished': # if analysis successful
# TODO: final local log save (minio?) # archive logs
_delete_analysis(analysis_id, database, deployments) # delete analysis

# TODO: submit database_status to HUB

db_status, int_status = (_get_status(deployments), {'status': {}})

# update created to running status if deployment responsive
db_status, int_status = _update_running_status(deployments, database, db_status, int_status)

db_status, int_status = update_finished_status(deployments, analysis_id, database, db_status, int_status)

_set_analysis_hub_status(node_analysis_id, db_status, int_status)
time.sleep(10)

def _get_node_id():

def update_finished_status(deployments: list[Analysis],
analysis_id: str,
database: Database,
database_status: dict[str, dict[str, str]],
internal_status: dict[str, dict[str, str]]) \
-> tuple[dict[str, dict[str, str]], dict[str, dict[str, str]]]:
running_deployment_names = [deployment.deployment_name for deployment in deployments if deployment.status == 'running']

# internal_status = asyncio.run(_get_internal_deployment_status(deployments))

newly_finished_deployment_names = [deployment_name
for deployment_name in internal_status['status'].keys()
if (deployment_name in running_deployment_names) and
(internal_status['status'][deployment_name] == 'finished')]
for deployment_name in newly_finished_deployment_names:
database.update_deployment_status(deployment_name, AnalysisStatus.FINISHED.value) # change database status to finished
# TODO: final local log save (minio?) # archive logs
_delete_analysis(analysis_id, database, deployments) # delete analysis from database

return database_status, internal_status


def _update_running_status(deployments: list[Analysis],
database: Database,
database_status: dict[str, dict[str, str]],
internal_status: dict[str, dict[str, str]]) \
-> tuple[dict[str, dict[str, str]], dict[str, dict[str, str]]]:
newly_created_deployments = [deployment for deployment in deployments if deployment.status == 'created']
for deployment in newly_created_deployments:
deployment_name = deployment.deployment_name
try:
internal_status['status'][deployment_name] = asyncio.run(_get_internal_deployment_status(deployment_name))
database.update_deployment_status(deployment_name, AnalysisStatus.RUNNING.value)
database_status = _get_status(deployments)
except httpx.HTTPError:
pass
return database_status, internal_status



def _set_analysis_hub_status(node_analysis_id: str,
database_status: dict[str, dict[str, str]],
internal_status: dict[str, dict[str, str]]) -> None:
analysis_hub_status = None
# get keys from database_status
for deployment_name in database_status['status'].keys():
db_depl_status = database_status['status'][deployment_name]
try:
intern_depl_status = internal_status['status'][deployment_name]
except KeyError:
intern_depl_status = None

if intern_depl_status == 'failed':
analysis_hub_status = AnalysisHubStatus.FAILED.value
break
elif intern_depl_status == 'finished':
analysis_hub_status = AnalysisHubStatus.FINISHED.value
break
elif intern_depl_status == 'ongoing':
analysis_hub_status = AnalysisHubStatus.ONGOING.value
break
elif db_depl_status == 'stopped':
analysis_hub_status = AnalysisHubStatus.STOPPED.value
break
elif db_depl_status == 'finished':
analysis_hub_status = AnalysisHubStatus.FINISHED.value
break
elif db_depl_status == 'running':
analysis_hub_status = AnalysisHubStatus.RUNNING.value
break
elif db_depl_status == 'created':
analysis_hub_status = AnalysisHubStatus.STARTING.value
break
print(f"Analysis hub status: {analysis_hub_status}")

_submit_analysis_status_update(node_analysis_id, analysis_hub_status)


def _submit_analysis_status_update(node_analysis_id: str, status: AnalysisHubStatus) -> None:
"""
update status of analysis at hub
POST https://core.privateaim.dev/analysis-nodes/3c895658-69f1-4fbe-b65c-768601b83f83
Payload { "run_status": "started" }
:return:
"""
if status is not None:

response = asyncio.run(AsyncClient(base_url=os.getenv('HUB_URL_CORE'),
headers={"accept": "application/json"})
.post(f'/analysis-nodes/{node_analysis_id}',
json={"run_status": status},
headers=[('Connection', 'close')]))
print(f"resposne status update: {response.json()}")
response.raise_for_status()


def _get_node_analysis_id(node_id: str, analysis_id: str) -> str:
"""
get node-analysis id from hub
analysis-id: 893761b5-d8ac-42be-ad71-a2d3e70b3990
node-id: e64a1551-4007-4754-a7b9-57c9cb56a7c5
endpoint: GET https://core.privateaim.dev/analysis-nodes?filter[node_id]=e64a1551-4007-4754-a7b9-57c9cb56a7c5&filter[analysis_id]=893761b5-d8ac-42be-ad71-a2d3e70b3990
retrieve analysis id from object: 1b1b1b1b-1b1b-1b1b-1b1b-1b1b1b1b1b1b
:param analysis_id:
:param node_id:
:param analysis_id:
:return:
"""
response = asyncio.run(AsyncClient(base_url=os.getenv('HUB_URL_CORE'),
headers={"accept": "application/json",
"Authorization": f"Bearer {get_hub_token()['hub_token']}"})
.get(f'/analysis-nodes?filter[node_id]={node_id}&filter[analysis_id]={analysis_id}',
headers=[('Connection', 'close')]))
response.raise_for_status()
return response.json()['data'][0]['id']


def _get_node_id() -> str:
"""
robot-id: 170c1cd8-d468-41c3-9bee-8e3cb1813210
endpoint: GET https://core.privateaim.dev/nodes?filter[robot_id]=170c1cd8-d468-41c3-9bee-8e3cb1813210
node id aus objekt auslesen: e64a1551-4007-4754-a7b9-57c9cb56a7c5
node id read from object: e64a1551-4007-4754-a7b9-57c9cb56a7c5
:return: node ID
"""
robot_id, robot_secret, hub_url_core = (os.getenv('HUB_ROBOT_USER'),
Expand All @@ -56,11 +180,13 @@ def _get_node_id():
return response.json()['data'][0]['id']


def _get_status(deployments: list[Analysis]):
def _get_status(deployments: list[Analysis]) -> dict[Literal['status'],
dict[str, Literal['created', 'running', 'stopped', 'finished']]]:
return {"status": {deployment.deployment_name: deployment.status for deployment in deployments}}


def _delete_analysis(analysis_id: str, database: Database, deployments: list[Analysis]):
def _delete_analysis(analysis_id: str, database: Database, deployments: list[Analysis]) -> dict[Literal['status'],
dict[str, Literal['created', 'running', 'stopped', 'finished']]]:
for deployment in deployments:
if deployment.status != AnalysisStatus.STOPPED.value:
deployment.stop(database)
Expand All @@ -70,19 +196,18 @@ def _delete_analysis(analysis_id: str, database: Database, deployments: list[Ana
return {"status": {deployment.deployment_name: deployment.status for deployment in deployments}}


async def _get_internal_analysis_status(deployments: list[Analysis]) \
-> dict[str, Literal['finished', 'ongoing', 'failed']]:
health_status_list = {}
for deployment in deployments:
if deployment.deployment_name is not None:
analysis_health_status = await AsyncClient(
base_url=f'http://analysis-nginx-{deployment.deployment_name}:80').get('/analysis/healthz',
headers=[('Connection', 'close')]).json()['status']
if analysis_health_status == 'finished':
health_status_list[deployment.deployment_name] = 'finished'
elif analysis_health_status == 'ongoing':
health_status_list[deployment.deployment_name] = 'ongoing'
else:
health_status_list[deployment.deployment_name] = 'failed'

return health_status_list
async def _get_internal_deployment_status(deployment_name: str) -> Literal['finished', 'ongoing', 'failed']:
response = await AsyncClient(
base_url=f'http://analysis-nginx-{deployment_name}:80').get('/analysis/healthz',
headers=[('Connection', 'close')])
response.raise_for_status()

analysis_health_status = response.json()['status']
if analysis_health_status == 'finished':
health_status = 'finished'
elif analysis_health_status == 'ongoing':
health_status = 'ongoing'
else:
health_status = 'failed'

return health_status

0 comments on commit 6b78567

Please sign in to comment.