Skip to content

Commit

Permalink
Merge pull request #513 from umccr/feature/workflow-library-link
Browse files Browse the repository at this point in the history
Feature/workflow library link
  • Loading branch information
reisingerf authored Aug 23, 2024
2 parents 304758c + 1ccd835 commit 8bbd692
Show file tree
Hide file tree
Showing 26 changed files with 849 additions and 223 deletions.
2 changes: 2 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/.yarn/releases/** binary
/.yarn/plugins/** binary
5 changes: 5 additions & 0 deletions lib/workload/stateless/stacks/workflow-manager/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ migrate:
start: migrate
@python manage.py runserver_plus 0.0.0.0:8000

mock:
@python manage.py generate_mock_workflow_run

run-mock: reset-db migrate mock start

openapi:
@python manage.py generateschema > orcabus.hlo.openapi.yaml

Expand Down
13 changes: 7 additions & 6 deletions lib/workload/stateless/stacks/workflow-manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ make ps
```
python manage.py help
python manage.py showmigrations
python manage.py makemigrations
python manage.py migrate
```

Expand All @@ -50,7 +51,7 @@ _^^^ please make sure to run `python manage.py migrate` first! ^^^_
#### Generate Workflow Record

```
python manage.py help generate_mock_data
python manage.py help generate_mock_workflow_run
> Generate mock Workflow data into database for local development and testing
```

Expand All @@ -73,17 +74,17 @@ python manage.py runserver_plus
```

```
curl -s http://localhost:8000/wfm/v1/workflow | jq
curl -s http://localhost:8000/api/v1/workflow | jq
```

```
curl -s http://localhost:8000/wfm/v1/workflow/1 | jq
curl -s http://localhost:8000/api/v1/workflow/1 | jq
```

Or visit in browser:
- http://localhost:8000/wfm/v1
- http://localhost:8000/wfm/v1/workflow
- http://localhost:8000/wfm/v1/workflow/1
- http://localhost:8000/api/v1
- http://localhost:8000/api/v1/workflow
- http://localhost:8000/api/v1/workflow/1

### API Doc

Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,161 @@
from django.core.management import BaseCommand
from django.db.models import QuerySet
from django.utils.timezone import make_aware

import json
from datetime import datetime
from libumccr import libjson
from workflow_manager.models import WorkflowRun
from workflow_manager.tests.factories import WorkflowRunFactory, WorkflowFactory, PayloadFactory
from workflow_manager.models import Workflow, WorkflowRun, LibraryAssociation, State
from workflow_manager.tests.factories import WorkflowRunFactory, WorkflowFactory, PayloadFactory, LibraryFactory, \
StateFactory

WORKFLOW_NAME = "TestWorkflow"

STATUS_START = "READY"
STATUS_RUNNING = "RUNNING"
STATUS_END = "SUCCEEDED"
STATUS_FAIL = "FAILED"

# https://docs.djangoproject.com/en/5.0/howto/custom-management-commands/
class Command(BaseCommand):
help = "Generate mock Workflow data into database for local development and testing"
help = """
Generate mock data and populate DB for local testing.
"""

def handle(self, *args, **options):
wf_payload = PayloadFactory()
wf_workflow = WorkflowFactory()
# don't do anything if there is already mock data
if Workflow.objects.filter(workflow_name__startswith=WORKFLOW_NAME).exists():
print("Mock data found, Skipping creation.")
return

wf:WorkflowRun = WorkflowRunFactory(
workflow_run_name = "MockWorkflowRun",
payload = wf_payload,
workflow = wf_workflow
)
# Common components: payload and libraries
generic_payload = PayloadFactory() # Payload content is not important for now
libraries = [
LibraryFactory(orcabus_id="lib.01J5M2JFE1JPYV62RYQEG99CP1", library_id="L000001"),
LibraryFactory(orcabus_id="lib.02J5M2JFE1JPYV62RYQEG99CP2", library_id="L000002"),
LibraryFactory(orcabus_id="lib.03J5M2JFE1JPYV62RYQEG99CP3", library_id="L000003"),
LibraryFactory(orcabus_id="lib.04J5M2JFE1JPYV62RYQEG99CP4", library_id="L000004")
]

# First case: a primary workflow with two executions linked to 4 libraries
# The first execution failed and led to a repetition that succeeded
self.create_primary(generic_payload, libraries)
self.create_secondary(generic_payload, libraries)

print(libjson.dumps(wf.to_dict()))
print("Done")

@staticmethod
def create_primary(generic_payload, libraries):
"""
Case: a primary workflow with two executions linked to 4 libraries
The first execution failed and led to a repetition that succeeded
"""

wf = WorkflowFactory(workflow_name=WORKFLOW_NAME + "Primary")

# The first execution (workflow run 1)
wfr_1: WorkflowRun = WorkflowRunFactory(
workflow_run_name=WORKFLOW_NAME + "PrimaryRun1",
portal_run_id="1234",
workflow=wf
)
for state in [STATUS_START, STATUS_RUNNING, STATUS_FAIL]:
StateFactory(workflow_run=wfr_1, status=state, payload=generic_payload)
for i in [0, 1, 2, 3]:
LibraryAssociation.objects.create(
workflow_run=wfr_1,
library=libraries[i],
association_date=make_aware(datetime.now()),
status="ACTIVE",
)

# The second execution (workflow run 2)
wfr_2: WorkflowRun = WorkflowRunFactory(
workflow_run_name=WORKFLOW_NAME + "PrimaryRun2",
portal_run_id="1235",
workflow=wf
)
for state in [STATUS_START, STATUS_RUNNING, STATUS_END]:
StateFactory(workflow_run=wfr_2, status=state, payload=generic_payload)
for i in [0, 1, 2, 3]:
LibraryAssociation.objects.create(
workflow_run=wfr_2,
library=libraries[i],
association_date=make_aware(datetime.now()),
status="ACTIVE",
)

@staticmethod
def create_secondary(generic_payload, libraries):
"""
Case: a secondary pipeline comprising 3 workflows with corresponding executions
First workflow: QC (2 runs for 2 libraries)
Second workflow: Alignment (1 run for 2 libraries)
Third workflow: VariantCalling (1 run for 2 libraries)
"""

wf_qc = WorkflowFactory(workflow_name=WORKFLOW_NAME + "QC")

# QC of Library 1
wfr_qc_1: WorkflowRun = WorkflowRunFactory(
workflow_run_name=WORKFLOW_NAME + "QCRunLib1",
portal_run_id="2345",
workflow=wf_qc
)
for state in [STATUS_START, STATUS_RUNNING, STATUS_END]:
StateFactory(workflow_run=wfr_qc_1, status=state, payload=generic_payload)
LibraryAssociation.objects.create(
workflow_run=wfr_qc_1,
library=libraries[0],
association_date=make_aware(datetime.now()),
status="ACTIVE",
)

# QC of Library 2
wfr_qc_2: WorkflowRun = WorkflowRunFactory(
workflow_run_name=WORKFLOW_NAME + "QCRunLib2",
portal_run_id="2346",
workflow=wf_qc
)
for state in [STATUS_START, STATUS_RUNNING, STATUS_END]:
StateFactory(workflow_run=wfr_qc_2, status=state, payload=generic_payload)
LibraryAssociation.objects.create(
workflow_run=wfr_qc_2,
library=libraries[1],
association_date=make_aware(datetime.now()),
status="ACTIVE",
)

# Alignment
wf_align = WorkflowFactory(workflow_name=WORKFLOW_NAME + "Alignment")
wfr_a: WorkflowRun = WorkflowRunFactory(
workflow_run_name=WORKFLOW_NAME + "AlignmentRun",
portal_run_id="3456",
workflow=wf_align
)
for state in [STATUS_START, STATUS_RUNNING, STATUS_END]:
StateFactory(workflow_run=wfr_a, status=state, payload=generic_payload)
for i in [0, 1]:
LibraryAssociation.objects.create(
workflow_run=wfr_a,
library=libraries[i],
association_date=make_aware(datetime.now()),
status="ACTIVE",
)

# Variant Calling
wf_vc = WorkflowFactory(workflow_name=WORKFLOW_NAME + "VariantCalling")
wfr_vc: WorkflowRun = WorkflowRunFactory(
workflow_run_name=WORKFLOW_NAME + "VariantCallingRun",
portal_run_id="4567",
workflow=wf_vc
)
for state in [STATUS_START, STATUS_RUNNING, STATUS_END]:
StateFactory(workflow_run=wfr_vc, status=state, payload=generic_payload)
for i in [0, 1]:
LibraryAssociation.objects.create(
workflow_run=wfr_vc,
library=libraries[i],
association_date=make_aware(datetime.now()),
status="ACTIVE",
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Generated by Django 5.0.6 on 2024-05-17 20:10
# Generated by Django 5.1 on 2024-08-21 05:48

import django.core.serializers.json
import django.db.models.deletion
Expand All @@ -9,77 +9,90 @@ class Migration(migrations.Migration):

initial = True

dependencies = []
dependencies = [
]

operations = [
migrations.CreateModel(
name="Payload",
name='Library',
fields=[
('orcabus_id', models.CharField(max_length=255, primary_key=True, serialize=False)),
('library_id', models.CharField(max_length=255)),
],
options={
'abstract': False,
},
),
migrations.CreateModel(
name='Payload',
fields=[
('id', models.BigAutoField(primary_key=True, serialize=False)),
('payload_ref_id', models.CharField(max_length=255, unique=True)),
('version', models.CharField(max_length=255)),
('data', models.JSONField(encoder=django.core.serializers.json.DjangoJSONEncoder)),
],
options={
'abstract': False,
},
),
migrations.CreateModel(
name='LibraryAssociation',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('association_date', models.DateTimeField()),
('status', models.CharField(max_length=255)),
('library', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='workflow_manager.library')),
],
options={
'abstract': False,
},
),
migrations.CreateModel(
name='Workflow',
fields=[
("id", models.BigAutoField(primary_key=True, serialize=False)),
("payload_ref_id", models.CharField(max_length=255, unique=True)),
("version", models.CharField(max_length=255)),
(
"data",
models.JSONField(
encoder=django.core.serializers.json.DjangoJSONEncoder
),
),
('id', models.BigAutoField(primary_key=True, serialize=False)),
('workflow_name', models.CharField(max_length=255)),
('workflow_version', models.CharField(max_length=255)),
('execution_engine', models.CharField(max_length=255)),
('execution_engine_pipeline_id', models.CharField(max_length=255)),
('approval_state', models.CharField(max_length=255)),
],
options={
"abstract": False,
'unique_together': {('workflow_name', 'workflow_version')},
},
),
migrations.CreateModel(
name="Workflow",
name='WorkflowRun',
fields=[
("id", models.BigAutoField(primary_key=True, serialize=False)),
("workflow_name", models.CharField(max_length=255)),
("workflow_version", models.CharField(max_length=255)),
("execution_engine", models.CharField(max_length=255)),
("execution_engine_pipeline_id", models.CharField(max_length=255)),
("approval_state", models.CharField(max_length=255)),
('id', models.BigAutoField(primary_key=True, serialize=False)),
('portal_run_id', models.CharField(max_length=255, unique=True)),
('execution_id', models.CharField(blank=True, max_length=255, null=True)),
('workflow_run_name', models.CharField(blank=True, max_length=255, null=True)),
('comment', models.CharField(blank=True, max_length=255, null=True)),
('libraries', models.ManyToManyField(through='workflow_manager.LibraryAssociation', to='workflow_manager.library')),
('workflow', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to='workflow_manager.workflow')),
],
options={
"unique_together": {("workflow_name", "workflow_version")},
'abstract': False,
},
),
migrations.AddField(
model_name='libraryassociation',
name='workflow_run',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='workflow_manager.workflowrun'),
),
migrations.CreateModel(
name="WorkflowRun",
name='State',
fields=[
("id", models.BigAutoField(primary_key=True, serialize=False)),
("portal_run_id", models.CharField(max_length=255)),
("status", models.CharField(max_length=255)),
("timestamp", models.DateTimeField()),
(
"execution_id",
models.CharField(blank=True, max_length=255, null=True),
),
(
"workflow_run_name",
models.CharField(blank=True, max_length=255, null=True),
),
("comment", models.CharField(blank=True, max_length=255, null=True)),
(
"payload",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
to="workflow_manager.payload",
),
),
(
"workflow",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
to="workflow_manager.workflow",
),
),
('id', models.BigAutoField(primary_key=True, serialize=False)),
('status', models.CharField(max_length=255)),
('timestamp', models.DateTimeField()),
('comment', models.CharField(blank=True, max_length=255, null=True)),
('payload', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to='workflow_manager.payload')),
('workflow_run', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='workflow_manager.workflowrun')),
],
options={
"unique_together": {("portal_run_id", "status", "timestamp")},
'unique_together': {('workflow_run', 'status', 'timestamp')},
},
),
]
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@

from .workflow import Workflow
from .payload import Payload
from .workflow_run import WorkflowRun
from .workflow_run import WorkflowRun, LibraryAssociation
from .library import Library
from .state import State
Loading

0 comments on commit 8bbd692

Please sign in to comment.