Skip to content

Commit

Permalink
Merge pull request #36 from cloudblue/feature/LITE-28073-Tasks-executor
Browse files Browse the repository at this point in the history
LITE-28073: Adding async main function to execute the full process.
  • Loading branch information
d3rky authored Jul 20, 2023
2 parents 4a99bd4 + 6668b19 commit 9a7347a
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 2 deletions.
2 changes: 1 addition & 1 deletion connect_ext_ppr/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class Task(Model):

PREFIX = 'TSK'

id = db.Column(db.String(20), primary_key=True)
id = db.Column(db.String(30), primary_key=True)
status = db.Column(
db.Enum(TasksStatusChoices, validate_strings=True),
default=TasksStatusChoices.PENDING,
Expand Down
73 changes: 73 additions & 0 deletions connect_ext_ppr/tasks_manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
from datetime import datetime

from sqlalchemy.orm import joinedload

from connect_ext_ppr.db import get_db_ctx_manager
from connect_ext_ppr.models.enums import (
DeploymentRequestStatusChoices,
DeploymentStatusChoices,
TasksStatusChoices,
)
from connect_ext_ppr.models.deployment import DeploymentRequest
from connect_ext_ppr.models.task import Task


async def validate_ppr():
return True

Expand All @@ -8,3 +22,62 @@ async def apply_ppr_and_delegate_to_marketplaces():

async def delegate_to_l2():
return True


async def execute_task(task, db, task_function):
if task.status == TasksStatusChoices.PENDING:
task.started_at = datetime.utcnow()
finished_ok = await task_function()
task.status = TasksStatusChoices.DONE if finished_ok else TasksStatusChoices.ERROR
task.finished_at = datetime.utcnow()
db.add(task)
db.commit()
return task.status


async def main_process(deployment_request_id, config):
deployment_final_status = DeploymentStatusChoices.PENDING
deployment_request_final_status = DeploymentRequestStatusChoices.DONE

with get_db_ctx_manager(config) as db:
deployment_request = db.query(DeploymentRequest).options(
joinedload(DeploymentRequest.deployment),
).filter_by(id=deployment_request_id).first()
deployment = deployment_request.deployment

deployment.status = DeploymentStatusChoices.PROCESSING
deployment_request.status = DeploymentRequestStatusChoices.PROCESSING
db.add(deployment)
db.add(deployment_request)
db.commit()

tasks = db.query(Task).filter_by(
deployment_request=deployment_request.id,
).order_by(Task.id).all()

last_task_result = await execute_task(tasks[0], db, validate_ppr)
if last_task_result == TasksStatusChoices.ERROR:
deployment_request_final_status = DeploymentRequestStatusChoices.ERROR
else:
db.refresh(tasks[1])
last_task_result = await execute_task(
tasks[1],
db,
apply_ppr_and_delegate_to_marketplaces,
)
if last_task_result == TasksStatusChoices.ERROR:
deployment_request_final_status = DeploymentRequestStatusChoices.ERROR
elif deployment_request.delegate_l2:
db.refresh(tasks[2])
last_task_result = await execute_task(tasks[2], db, delegate_to_l2)
if last_task_result == TasksStatusChoices.DONE:
deployment_final_status = DeploymentStatusChoices.SYNCED
else:
deployment_request_final_status = DeploymentRequestStatusChoices.ERROR

deployment.status = deployment_final_status
deployment_request.status = deployment_request_final_status
db.add(deployment_request)
db.add(deployment)
db.commit()
return deployment_request.status
31 changes: 30 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
from connect_ext_ppr.models.configuration import Configuration
from connect_ext_ppr.models.deployment import Deployment, DeploymentRequest
from connect_ext_ppr.models.file import File
from connect_ext_ppr.models.replicas import Product
from connect_ext_ppr.models.ppr import PPRVersion
from connect_ext_ppr.services.cbc_extension import get_hub_credentials
from connect_ext_ppr.models.replicas import Product
from connect_ext_ppr.models.task import Task
from connect_ext_ppr.webapp import ConnectExtensionXvsWebApplication


Expand Down Expand Up @@ -73,6 +74,11 @@ def mocked_context(config):
wraps=mocked_context,
)

mocker.patch(
'connect_ext_ppr.tasks_manager.get_db_ctx_manager',
wraps=mocked_context,
)


@pytest.fixture
def product_factory(dbsession):
Expand Down Expand Up @@ -137,6 +143,7 @@ def deployment_request_factory(dbsession):
def _build_deployment_request(
deployment=None,
ppr_id='PPRFL-12345',
delegate_l2=False,
):
if not deployment:
deployment = deployment_factory(dbsession, id='DPLR-123-123-123')
Expand All @@ -146,6 +153,7 @@ def _build_deployment_request(
deployment_id=deployment.id,
ppr_id=ppr_id,
created_by=deployment.account_id,
delegate_l2=delegate_l2,
)
dbsession.add(ppr)
dbsession.set_verbose(dep_req)
Expand All @@ -154,6 +162,27 @@ def _build_deployment_request(
return _build_deployment_request


@pytest.fixture
def task_factory(dbsession, deployment_request_factory):
def _build_task(
deployment_request=None,
task_index='001',
):
if not deployment_request:
deployment_request = deployment_request_factory()

task_id = f'TSK-{deployment_request.id[4:]}-{task_index}'
task = Task(
id=task_id,
deployment_request=deployment_request.id,
title=f'Title Task {task_index}',
)
dbsession.add(task)
dbsession.commit()
return task
return _build_task


@pytest.fixture
def file(dbsession, media_response):
file = File(
Expand Down
116 changes: 116 additions & 0 deletions tests/test_tasks_manager.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
import pytest
from sqlalchemy import null

from connect_ext_ppr.models.deployment import Deployment, DeploymentRequest
from connect_ext_ppr.models.enums import (
DeploymentRequestStatusChoices,
DeploymentStatusChoices,
TasksStatusChoices,
)
from connect_ext_ppr.models.task import Task
from connect_ext_ppr.tasks_manager import (
apply_ppr_and_delegate_to_marketplaces,
delegate_to_l2,
main_process,
validate_ppr,
)

Expand All @@ -20,3 +29,110 @@ async def test_delegate_to_l2():
@pytest.mark.asyncio
async def test_validate_ppr():
assert await validate_ppr()


@pytest.mark.asyncio
async def test_main_process(
dbsession,
deployment_factory,
deployment_request_factory,
task_factory,
):
dep = deployment_factory(dbsession)
dr = deployment_request_factory(deployment=dep, delegate_l2=True)
task_factory(deployment_request=dr, task_index='0001')
task_factory(deployment_request=dr, task_index='0002')
task_factory(deployment_request=dr, task_index='0003')
assert await main_process(dr.id, {}) == DeploymentRequestStatusChoices.DONE

assert dbsession.query(Deployment).filter_by(status=DeploymentStatusChoices.SYNCED).count() == 1
assert dbsession.query(DeploymentRequest).filter_by(
status=DeploymentRequestStatusChoices.DONE,
).count() == 1
assert dbsession.query(Task).filter(
Task.status == TasksStatusChoices.DONE,
Task.started_at.is_not(null()),
Task.finished_at.is_not(null()),
).count() == 3


@pytest.mark.asyncio
async def test_main_process_wo_l2_delegation(
dbsession,
deployment_factory,
deployment_request_factory,
task_factory,
):
dep = deployment_factory(dbsession)
dr = deployment_request_factory(deployment=dep, delegate_l2=False)
task_factory(deployment_request=dr, task_index='0001')
task_factory(deployment_request=dr, task_index='0002')
assert await main_process(dr.id, {}) == DeploymentRequestStatusChoices.DONE

assert dbsession.query(Deployment).filter_by(
status=DeploymentStatusChoices.PENDING,
).count() == 1
assert dbsession.query(DeploymentRequest).filter_by(
status=DeploymentRequestStatusChoices.DONE,
).count() == 1
assert dbsession.query(Task).filter(
Task.status == TasksStatusChoices.DONE,
Task.started_at.is_not(null()),
Task.finished_at.is_not(null()),
).count() == 2


@pytest.mark.asyncio
@pytest.mark.parametrize(
('function_to_mock', 'done_tasks', 'tasks_w_errors', 'pending_tasks'),
(
('validate_ppr', 0, 1, 2),
('apply_ppr_and_delegate_to_marketplaces', 1, 1, 1),
('delegate_to_l2', 2, 1, 0),
),
)
async def test_main_process_ends_w_error(
dbsession,
deployment_factory,
deployment_request_factory,
done_tasks,
tasks_w_errors,
pending_tasks,
function_to_mock,
mocker,
task_factory,
):
dep = deployment_factory(dbsession)
dr = deployment_request_factory(deployment=dep, delegate_l2=True)
task_factory(deployment_request=dr, task_index='0001')
task_factory(deployment_request=dr, task_index='0002')
task_factory(deployment_request=dr, task_index='0003')

mocker.patch(
f'connect_ext_ppr.tasks_manager.{function_to_mock}',
return_value=False,
)
assert await main_process(dr.id, {}) == DeploymentRequestStatusChoices.ERROR

assert dbsession.query(Deployment).filter_by(
status=DeploymentStatusChoices.PENDING,
).count() == 1
assert dbsession.query(DeploymentRequest).filter_by(
status=DeploymentRequestStatusChoices.ERROR,
).count() == 1

assert dbsession.query(Task).filter(
Task.status == TasksStatusChoices.DONE,
Task.started_at.is_not(null()),
Task.finished_at.is_not(null()),
).count() == done_tasks
assert dbsession.query(Task).filter(
Task.status == TasksStatusChoices.ERROR,
Task.started_at.is_not(null()),
Task.finished_at.is_not(null()),
).count() == tasks_w_errors
assert dbsession.query(Task).filter(
Task.status == TasksStatusChoices.PENDING,
Task.started_at.is_(null()),
Task.finished_at.is_(null()),
).count() == pending_tasks

0 comments on commit 9a7347a

Please sign in to comment.