-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MetaWorkflow Handler and MetaWorkflow Run Handler: Pipeline Automation #11
base: master
Are you sure you want to change the base?
Changes from all commits
15615cd
165a612
ad1cbec
469804d
6cccc8e
f62fabd
4d50191
81c826d
32a576f
fa02b5b
3e9348d
dcb737e
6bf3809
0ddf3d1
c2231c2
5752c83
dae1229
8795286
ffefa6a
5fd8916
2d16ab0
c6a0e7f
6cb41ac
993361d
6321fbf
f4a47ba
ae249bb
58add81
e411ceb
dcbbec0
2db5bff
c1d2a0b
3c54a4c
111a0c5
ffdabb6
d5deff5
4908f1b
c684121
503cd00
93c1715
82c8d55
1d5e77b
4f17490
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
#!/usr/bin/env python3 | ||
|
||
################################################################# | ||
# Vars | ||
################################################################# | ||
TITLE = "title" | ||
|
||
# MetaWorkflow Handler attributes | ||
PROJECT = "project" | ||
INSTITUTION = "institution" | ||
UUID = "uuid" | ||
META_WORKFLOWS = "meta_workflows" | ||
ORDERED_META_WORKFLOWS = "ordered_meta_workflows" | ||
META_WORKFLOW = "meta_workflow" | ||
NAME = "name" | ||
DEPENDENCIES = "dependencies" | ||
ITEMS_FOR_CREATION_PROP_TRACE = "items_for_creation_property_trace" | ||
ITEMS_FOR_CREATION_UUID = "items_for_creation_uuid" | ||
|
||
# MetaWorkflow Run Handler attributes | ||
COST = "cost" | ||
STATUS = "status" | ||
FINAL_STATUS = "final_status" | ||
ASSOCIATED_META_WORKFLOW_HANDLER = "meta_workflow_handler" | ||
ASSOCIATED_ITEM = "associated_item" | ||
META_WORKFLOW_RUN = "meta_workflow_run" | ||
META_WORKFLOW_RUNS = "meta_workflow_runs" | ||
ITEMS_FOR_CREATION = "items_for_creation" | ||
ERROR = "error" | ||
# statuses | ||
PENDING = "pending" | ||
RUNNING = "running" | ||
COMPLETED = "completed" | ||
FAILED = "failed" | ||
STOPPED = "stopped" | ||
|
||
INACTIVE = "inactive" | ||
QC_FAIL = "quality metric failed" | ||
|
||
|
||
#TODO: the following is here in case dup flag is added in the future | ||
#TODO: add back in | ||
# MWFR_TO_HANDLER_STEP_STATUS_DICT = { | ||
# "pending": "pending", | ||
# "running": "running", | ||
# "completed": "completed", | ||
# "failed": "failed", | ||
# "inactive": "pending", | ||
# "stopped": "stopped", | ||
# "quality metric failed": "failed" | ||
# } |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,182 @@ | ||||||||||||||||||||||||||||||||||||||||
#!/usr/bin/env python3 | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
################################################ | ||||||||||||||||||||||||||||||||||||||||
# Libraries | ||||||||||||||||||||||||||||||||||||||||
################################################ | ||||||||||||||||||||||||||||||||||||||||
from copy import deepcopy | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
from magma.validated_dictionary import ValidatedDictionary | ||||||||||||||||||||||||||||||||||||||||
from magma.topological_sort import TopologicalSortHandler | ||||||||||||||||||||||||||||||||||||||||
from magma.magma_constants import * | ||||||||||||||||||||||||||||||||||||||||
from dcicutils.misc_utils import CycleError | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
################################################ | ||||||||||||||||||||||||||||||||||||||||
# Custom Exception classes | ||||||||||||||||||||||||||||||||||||||||
################################################ | ||||||||||||||||||||||||||||||||||||||||
class MetaWorkflowStepCycleError(CycleError): | ||||||||||||||||||||||||||||||||||||||||
"""Custom exception for cycle error tracking.""" | ||||||||||||||||||||||||||||||||||||||||
pass | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
class MetaWorkflowStepDuplicateError(ValueError): | ||||||||||||||||||||||||||||||||||||||||
"""Custom ValueError when MetaWorkflows don't have unique name attributes.""" | ||||||||||||||||||||||||||||||||||||||||
pass | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
class MetaWorkflowStepSelfDependencyError(ValueError): | ||||||||||||||||||||||||||||||||||||||||
"""Custom ValueError when MetaWorkflow Step has a dependency on itself.""" | ||||||||||||||||||||||||||||||||||||||||
pass | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
################################################ | ||||||||||||||||||||||||||||||||||||||||
# MetaWorkflowStep | ||||||||||||||||||||||||||||||||||||||||
################################################ | ||||||||||||||||||||||||||||||||||||||||
class MetaWorkflowStep(ValidatedDictionary): | ||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||
Class to represent a MetaWorkflow, | ||||||||||||||||||||||||||||||||||||||||
as a step within a MetaWorkflow Handler object | ||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
def __init__(self, input_dict): | ||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||
Constructor method, initialize object and attributes. | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
:param input_dict: a dictionary of MetaWorkflow step metadata | ||||||||||||||||||||||||||||||||||||||||
:type input_dict: dict | ||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||
super().__init__(input_dict) | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
# Validate presence of basic attributes of this MetaWorkflow step | ||||||||||||||||||||||||||||||||||||||||
self._validate_basic_attributes(META_WORKFLOW, NAME) | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
self._check_self_dependency() | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
def _validate_basic_attributes(self, *list_of_attributes): | ||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||
Validation of the input dictionary for the MetaWorkflow step. | ||||||||||||||||||||||||||||||||||||||||
Checks that necessary MetaWorkflow attributes are present for this MetaWorkflow step. | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
:param list_of_attributes: attributes that are checked | ||||||||||||||||||||||||||||||||||||||||
:type list_of_attributes: str(s) | ||||||||||||||||||||||||||||||||||||||||
:return: None, if all specified attributes are present | ||||||||||||||||||||||||||||||||||||||||
:raises ValueError: if this object doesn't have a specified attribute | ||||||||||||||||||||||||||||||||||||||||
:raises AttributeError: if not one (and only one) of items_for_creation attributes is present | ||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||
super()._validate_basic_attributes(*list_of_attributes) | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
## Check that one (and only one) of the following attributes is defined on this step: | ||||||||||||||||||||||||||||||||||||||||
## ITEMS_FOR_CREATION_UUID or ITEMS_FOR_CREATION_PROP_TRACE | ||||||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||||||
# set None for [default] arg to not throw AttributeError | ||||||||||||||||||||||||||||||||||||||||
#TODO: handle this within ff instead? It is CGAP portal-specific | ||||||||||||||||||||||||||||||||||||||||
if not getattr(self, ITEMS_FOR_CREATION_UUID, None): | ||||||||||||||||||||||||||||||||||||||||
getattr(self, ITEMS_FOR_CREATION_PROP_TRACE) | ||||||||||||||||||||||||||||||||||||||||
except AttributeError as e: | ||||||||||||||||||||||||||||||||||||||||
raise AttributeError("Object validation error, {0}\n" | ||||||||||||||||||||||||||||||||||||||||
.format(e.args[0])) | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
# for items for creation, this object can only have | ||||||||||||||||||||||||||||||||||||||||
# either the UUID or property trace, but not both | ||||||||||||||||||||||||||||||||||||||||
if hasattr(self, ITEMS_FOR_CREATION_PROP_TRACE) and hasattr(self, ITEMS_FOR_CREATION_UUID): | ||||||||||||||||||||||||||||||||||||||||
raise AttributeError("Object validation error, 'MetaWorkflowStep' object cannot have both of the following attributes: 'items_for_creation_property_trace' and 'items_for_creation_uuid'") | ||||||||||||||||||||||||||||||||||||||||
Comment on lines
+66
to
+78
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider condensing all the
Suggested change
As stated elsewhere, strongly consider moving away from setting all these properties as attributes on the classes to escape the |
||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
def _check_self_dependency(self): | ||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||
Check that this MetaWorkflow Step object doesn't have a self-dependency. | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
:return: None, if no self-dependencies present | ||||||||||||||||||||||||||||||||||||||||
:raises MetaWorkflowStepSelfDependencyError: if there is a self-dependency | ||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||
if hasattr(self, DEPENDENCIES): | ||||||||||||||||||||||||||||||||||||||||
dependencies = getattr(self, DEPENDENCIES) | ||||||||||||||||||||||||||||||||||||||||
Comment on lines
+87
to
+88
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider condensing to provide a default of an empty list:
Suggested change
|
||||||||||||||||||||||||||||||||||||||||
for dependency in dependencies: | ||||||||||||||||||||||||||||||||||||||||
if dependency == getattr(self, NAME): | ||||||||||||||||||||||||||||||||||||||||
raise MetaWorkflowStepSelfDependencyError(f'"{dependency}" has a self dependency.') | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
################################################ | ||||||||||||||||||||||||||||||||||||||||
# MetaWorkflowHandler | ||||||||||||||||||||||||||||||||||||||||
################################################ | ||||||||||||||||||||||||||||||||||||||||
class MetaWorkflowHandler(ValidatedDictionary): | ||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||
Class representing a MetaWorkflow Handler object, | ||||||||||||||||||||||||||||||||||||||||
including a list of MetaWorkflows with specified dependencies & other metadata | ||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
def __init__(self, input_dict): | ||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||
Constructor method, initialize object and attributes. | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
:param input_dict: MetaWorkflow Handler dict, defined by json file from CGAP portal | ||||||||||||||||||||||||||||||||||||||||
:type input_dict: dict | ||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||
### Basic attributes ### | ||||||||||||||||||||||||||||||||||||||||
super().__init__(input_dict) | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
super()._validate_basic_attributes(UUID) | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
### Calculated attributes ### | ||||||||||||||||||||||||||||||||||||||||
# set meta_workflows attribute | ||||||||||||||||||||||||||||||||||||||||
# Using meta_workflows array of dicts from CGAP MetaWorkflow Handler | ||||||||||||||||||||||||||||||||||||||||
# create dict of the form {meta_workflow_name: MetaWorkflow Step object} | ||||||||||||||||||||||||||||||||||||||||
self._set_meta_workflows_dict() | ||||||||||||||||||||||||||||||||||||||||
# TODO: NOTE: nowhere in magma is there a check that meta_workflows | ||||||||||||||||||||||||||||||||||||||||
# is an empty list. I am putting the burden of that on the user | ||||||||||||||||||||||||||||||||||||||||
# would y'all like me to add a check for an empty list? or NoneType? | ||||||||||||||||||||||||||||||||||||||||
# right now I only catch instances where meta_workflows doesn't exist, | ||||||||||||||||||||||||||||||||||||||||
# and I create an empty dict | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
# Create ordered MetaWorkflows name list based on dependencies | ||||||||||||||||||||||||||||||||||||||||
# This ordered list is what's used to create the array of MetaWorkflow Runs in Run handler | ||||||||||||||||||||||||||||||||||||||||
Comment on lines
+110
to
+127
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider cutting down on the commenting here and updating the method names to reflect their mission better. Having too many comments usually reflects difficult-to-follow code in need of refactoring, and I find this class confusing even though the logic isn't terrible, mostly because of how the attributes are handled. |
||||||||||||||||||||||||||||||||||||||||
setattr(self, ORDERED_META_WORKFLOWS, self._create_ordered_meta_workflows_list()) | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
def _set_meta_workflows_dict(self): | ||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||
Checks for meta_workflows attribute (an array of MetaWorkflows and their metadata) from CGAP portal. | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
If nonexistent, set handler's meta_workflows attribute as an empty dictionary | ||||||||||||||||||||||||||||||||||||||||
If present, copy that list temporarily and redefine meta_workflows attribute | ||||||||||||||||||||||||||||||||||||||||
as a dictionary of the form {meta_workflow_name: MetaWorkflow Step object,....} | ||||||||||||||||||||||||||||||||||||||||
checking for duplicate steps in the process (i.e. non-unique MetaWorkflow names) | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
:return: None, if all MetaWorkflowSteps are created successfully | ||||||||||||||||||||||||||||||||||||||||
:raises MetaWorkflowStepDuplicateError: if there are duplicate MetaWorkflows, by name | ||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||
if not hasattr(self, META_WORKFLOWS): | ||||||||||||||||||||||||||||||||||||||||
# if not present, set attribute as empty dictionary | ||||||||||||||||||||||||||||||||||||||||
setattr(self, META_WORKFLOWS, {}) | ||||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||||
orig_meta_workflow_list_copy = deepcopy(getattr(self, META_WORKFLOWS)) | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
temp_meta_workflow_step_dict = {} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
for meta_workflow in orig_meta_workflow_list_copy: | ||||||||||||||||||||||||||||||||||||||||
# create MetaWorkflowStep object for this MetaWorkflow | ||||||||||||||||||||||||||||||||||||||||
meta_workflow_step = MetaWorkflowStep(meta_workflow) | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
# then add to the meta_workflows dictionary | ||||||||||||||||||||||||||||||||||||||||
# of the form {meta_workflow["name"]: MetaWorkflowStep(meta_workflow)} | ||||||||||||||||||||||||||||||||||||||||
if temp_meta_workflow_step_dict.setdefault(meta_workflow["name"], meta_workflow_step) != meta_workflow_step: | ||||||||||||||||||||||||||||||||||||||||
raise MetaWorkflowStepDuplicateError(f'"{meta_workflow["name"]}" is a duplicate MetaWorkflow, \ | ||||||||||||||||||||||||||||||||||||||||
all MetaWorkflow names must be unique.') | ||||||||||||||||||||||||||||||||||||||||
Comment on lines
+156
to
+158
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider refactoring away from the Also, probably want to use a constant for |
||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
# redefine the "meta_workflows" attribute to this generated dictionary of MetaWorkflowStep objects | ||||||||||||||||||||||||||||||||||||||||
setattr(self, META_WORKFLOWS, temp_meta_workflow_step_dict) | ||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider refactoring away from redefining the attribute as information is lost in the process, and there's no way to look at both easily when debugging. |
||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
def _create_ordered_meta_workflows_list(self): | ||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||
Using dictionary of MetaWorkflow name and their corresponding MetaWorkflowStep objects, | ||||||||||||||||||||||||||||||||||||||||
generate ordered list of MetaWorkflows, by name. | ||||||||||||||||||||||||||||||||||||||||
Uses TopologicalSorter to order these steps based on their defined dependencies. | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
:return: list of valid topological sorting of MetaWorkflows (by name) | ||||||||||||||||||||||||||||||||||||||||
:rtype: list[str] | ||||||||||||||||||||||||||||||||||||||||
:raises MetaWorkflowStepCycleError: if there are cyclic dependencies among MetaWorkflow steps | ||||||||||||||||||||||||||||||||||||||||
i.e. no valid topological sorting of steps | ||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||
meta_workflows_dict = getattr(self, META_WORKFLOWS) | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||||||
# create "graph" that will be passed into the topological sorter | ||||||||||||||||||||||||||||||||||||||||
sorter = TopologicalSortHandler(meta_workflows_dict) | ||||||||||||||||||||||||||||||||||||||||
# now topologically sort the steps | ||||||||||||||||||||||||||||||||||||||||
return sorter.sorted_graph_list() | ||||||||||||||||||||||||||||||||||||||||
except CycleError: | ||||||||||||||||||||||||||||||||||||||||
raise MetaWorkflowStepCycleError() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for the shebang or the headers here and elsewhere. I know Michele does this, but I find it unnecessary clutter.