diff --git a/connect_ext_ppr/models/task.py b/connect_ext_ppr/models/task.py index a62caba..de02cce 100644 --- a/connect_ext_ppr/models/task.py +++ b/connect_ext_ppr/models/task.py @@ -12,7 +12,7 @@ class Task(Model): PREFIX = 'TSK' - id = db.Column(db.String(20), primary_key=True) + id = db.Column(db.String(30), primary_key=True) status = db.Column( db.Enum(TasksStatusChoices, validate_strings=True), default=TasksStatusChoices.PENDING, diff --git a/connect_ext_ppr/tasks_manager.py b/connect_ext_ppr/tasks_manager.py index c18a187..ab6242a 100644 --- a/connect_ext_ppr/tasks_manager.py +++ b/connect_ext_ppr/tasks_manager.py @@ -1,3 +1,17 @@ +from datetime import datetime + +from sqlalchemy.orm import joinedload + +from connect_ext_ppr.db import get_db_ctx_manager +from connect_ext_ppr.models.enums import ( + DeploymentRequestStatusChoices, + DeploymentStatusChoices, + TasksStatusChoices, +) +from connect_ext_ppr.models.deployment import DeploymentRequest +from connect_ext_ppr.models.task import Task + + async def validate_ppr(): return True @@ -8,3 +22,62 @@ async def apply_ppr_and_delegate_to_marketplaces(): async def delegate_to_l2(): return True + + +async def execute_task(task, db, task_function): + if task.status == TasksStatusChoices.PENDING: + task.started_at = datetime.utcnow() + finished_ok = await task_function() + task.status = TasksStatusChoices.DONE if finished_ok else TasksStatusChoices.ERROR + task.finished_at = datetime.utcnow() + db.add(task) + db.commit() + return task.status + + +async def main_process(deployment_request_id, config): + deployment_final_status = DeploymentStatusChoices.PENDING + deployment_request_final_status = DeploymentRequestStatusChoices.DONE + + with get_db_ctx_manager(config) as db: + deployment_request = db.query(DeploymentRequest).options( + joinedload(DeploymentRequest.deployment), + ).filter_by(id=deployment_request_id).first() + deployment = deployment_request.deployment + + deployment.status = DeploymentStatusChoices.PROCESSING + deployment_request.status = DeploymentRequestStatusChoices.PROCESSING + db.add(deployment) + db.add(deployment_request) + db.commit() + + tasks = db.query(Task).filter_by( + deployment_request=deployment_request.id, + ).order_by(Task.id).all() + + last_task_result = await execute_task(tasks[0], db, validate_ppr) + if last_task_result == TasksStatusChoices.ERROR: + deployment_request_final_status = DeploymentRequestStatusChoices.ERROR + else: + db.refresh(tasks[1]) + last_task_result = await execute_task( + tasks[1], + db, + apply_ppr_and_delegate_to_marketplaces, + ) + if last_task_result == TasksStatusChoices.ERROR: + deployment_request_final_status = DeploymentRequestStatusChoices.ERROR + elif deployment_request.delegate_l2: + db.refresh(tasks[2]) + last_task_result = await execute_task(tasks[2], db, delegate_to_l2) + if last_task_result == TasksStatusChoices.DONE: + deployment_final_status = DeploymentStatusChoices.SYNCED + else: + deployment_request_final_status = DeploymentRequestStatusChoices.ERROR + + deployment.status = deployment_final_status + deployment_request.status = deployment_request_final_status + db.add(deployment_request) + db.add(deployment) + db.commit() + return deployment_request.status diff --git a/tests/conftest.py b/tests/conftest.py index 8eb75d0..0a28ec6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -24,9 +24,10 @@ from connect_ext_ppr.models.configuration import Configuration from connect_ext_ppr.models.deployment import Deployment, DeploymentRequest from connect_ext_ppr.models.file import File -from connect_ext_ppr.models.replicas import Product from connect_ext_ppr.models.ppr import PPRVersion from connect_ext_ppr.services.cbc_extension import get_hub_credentials +from connect_ext_ppr.models.replicas import Product +from connect_ext_ppr.models.task import Task from connect_ext_ppr.webapp import ConnectExtensionXvsWebApplication @@ -73,6 +74,11 @@ def mocked_context(config): wraps=mocked_context, ) + mocker.patch( + 'connect_ext_ppr.tasks_manager.get_db_ctx_manager', + wraps=mocked_context, + ) + @pytest.fixture def product_factory(dbsession): @@ -137,6 +143,7 @@ def deployment_request_factory(dbsession): def _build_deployment_request( deployment=None, ppr_id='PPRFL-12345', + delegate_l2=False, ): if not deployment: deployment = deployment_factory(dbsession, id='DPLR-123-123-123') @@ -146,6 +153,7 @@ def _build_deployment_request( deployment_id=deployment.id, ppr_id=ppr_id, created_by=deployment.account_id, + delegate_l2=delegate_l2, ) dbsession.add(ppr) dbsession.set_verbose(dep_req) @@ -154,6 +162,27 @@ def _build_deployment_request( return _build_deployment_request +@pytest.fixture +def task_factory(dbsession, deployment_request_factory): + def _build_task( + deployment_request=None, + task_index='001', + ): + if not deployment_request: + deployment_request = deployment_request_factory() + + task_id = f'TSK-{deployment_request.id[4:]}-{task_index}' + task = Task( + id=task_id, + deployment_request=deployment_request.id, + title=f'Title Task {task_index}', + ) + dbsession.add(task) + dbsession.commit() + return task + return _build_task + + @pytest.fixture def file(dbsession, media_response): file = File( diff --git a/tests/test_tasks_manager.py b/tests/test_tasks_manager.py index 71ab942..e2314ea 100644 --- a/tests/test_tasks_manager.py +++ b/tests/test_tasks_manager.py @@ -1,8 +1,17 @@ import pytest +from sqlalchemy import null +from connect_ext_ppr.models.deployment import Deployment, DeploymentRequest +from connect_ext_ppr.models.enums import ( + DeploymentRequestStatusChoices, + DeploymentStatusChoices, + TasksStatusChoices, +) +from connect_ext_ppr.models.task import Task from connect_ext_ppr.tasks_manager import ( apply_ppr_and_delegate_to_marketplaces, delegate_to_l2, + main_process, validate_ppr, ) @@ -20,3 +29,110 @@ async def test_delegate_to_l2(): @pytest.mark.asyncio async def test_validate_ppr(): assert await validate_ppr() + + +@pytest.mark.asyncio +async def test_main_process( + dbsession, + deployment_factory, + deployment_request_factory, + task_factory, +): + dep = deployment_factory(dbsession) + dr = deployment_request_factory(deployment=dep, delegate_l2=True) + task_factory(deployment_request=dr, task_index='0001') + task_factory(deployment_request=dr, task_index='0002') + task_factory(deployment_request=dr, task_index='0003') + assert await main_process(dr.id, {}) == DeploymentRequestStatusChoices.DONE + + assert dbsession.query(Deployment).filter_by(status=DeploymentStatusChoices.SYNCED).count() == 1 + assert dbsession.query(DeploymentRequest).filter_by( + status=DeploymentRequestStatusChoices.DONE, + ).count() == 1 + assert dbsession.query(Task).filter( + Task.status == TasksStatusChoices.DONE, + Task.started_at.is_not(null()), + Task.finished_at.is_not(null()), + ).count() == 3 + + +@pytest.mark.asyncio +async def test_main_process_wo_l2_delegation( + dbsession, + deployment_factory, + deployment_request_factory, + task_factory, +): + dep = deployment_factory(dbsession) + dr = deployment_request_factory(deployment=dep, delegate_l2=False) + task_factory(deployment_request=dr, task_index='0001') + task_factory(deployment_request=dr, task_index='0002') + assert await main_process(dr.id, {}) == DeploymentRequestStatusChoices.DONE + + assert dbsession.query(Deployment).filter_by( + status=DeploymentStatusChoices.PENDING, + ).count() == 1 + assert dbsession.query(DeploymentRequest).filter_by( + status=DeploymentRequestStatusChoices.DONE, + ).count() == 1 + assert dbsession.query(Task).filter( + Task.status == TasksStatusChoices.DONE, + Task.started_at.is_not(null()), + Task.finished_at.is_not(null()), + ).count() == 2 + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ('function_to_mock', 'done_tasks', 'tasks_w_errors', 'pending_tasks'), + ( + ('validate_ppr', 0, 1, 2), + ('apply_ppr_and_delegate_to_marketplaces', 1, 1, 1), + ('delegate_to_l2', 2, 1, 0), + ), +) +async def test_main_process_ends_w_error( + dbsession, + deployment_factory, + deployment_request_factory, + done_tasks, + tasks_w_errors, + pending_tasks, + function_to_mock, + mocker, + task_factory, +): + dep = deployment_factory(dbsession) + dr = deployment_request_factory(deployment=dep, delegate_l2=True) + task_factory(deployment_request=dr, task_index='0001') + task_factory(deployment_request=dr, task_index='0002') + task_factory(deployment_request=dr, task_index='0003') + + mocker.patch( + f'connect_ext_ppr.tasks_manager.{function_to_mock}', + return_value=False, + ) + assert await main_process(dr.id, {}) == DeploymentRequestStatusChoices.ERROR + + assert dbsession.query(Deployment).filter_by( + status=DeploymentStatusChoices.PENDING, + ).count() == 1 + assert dbsession.query(DeploymentRequest).filter_by( + status=DeploymentRequestStatusChoices.ERROR, + ).count() == 1 + + assert dbsession.query(Task).filter( + Task.status == TasksStatusChoices.DONE, + Task.started_at.is_not(null()), + Task.finished_at.is_not(null()), + ).count() == done_tasks + assert dbsession.query(Task).filter( + Task.status == TasksStatusChoices.ERROR, + Task.started_at.is_not(null()), + Task.finished_at.is_not(null()), + ).count() == tasks_w_errors + assert dbsession.query(Task).filter( + Task.status == TasksStatusChoices.PENDING, + Task.started_at.is_(null()), + Task.finished_at.is_(null()), + ).count() == pending_tasks