Skip to content

Commit

Permalink
add enum for pipeline type #360
Browse files Browse the repository at this point in the history
  • Loading branch information
donaldcampbelljr committed Jun 3, 2024
1 parent 4ec9bed commit affd8d4
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 21 deletions.
3 changes: 2 additions & 1 deletion looper/cli_pydantic.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from divvy import select_divvy_config

from .const import PipelineLevel
from . import __version__

from .command_models.arguments import ArgumentEnum
Expand Down Expand Up @@ -244,7 +245,7 @@ def run_looper(args: TopLevelParser, parser: ArgumentParser, test_args=None):

# Check at the beginning if user wants to use pipestat and pipestat is configurable
is_pipestat_configured = (
prj._check_if_pipestat_configured(pipeline_type="project")
prj._check_if_pipestat_configured(pipeline_type=PipelineLevel.PROJECT.value)
if getattr(args, "project", None)
else prj._check_if_pipestat_configured()
)
Expand Down
3 changes: 2 additions & 1 deletion looper/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from .exceptions import JobSubmissionException, SampleFailedException
from .processed_project import populate_sample_paths
from .utils import fetch_sample_flags, jinja_render_template_strictly
from .const import PipelineLevel


_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -276,7 +277,7 @@ def is_project_submittable(self, force=False):
psms = {}
if self.prj.pipestat_configured_project:
for piface in self.prj.project_pipeline_interfaces:
if piface.psm.pipeline_type == "project":
if piface.psm.pipeline_type == PipelineLevel.PROJECT.value:
psms[piface.psm.pipeline_name] = piface.psm
psm = psms[self.pl_name]
status = psm.get_status()
Expand Down
8 changes: 8 additions & 0 deletions looper/const.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
""" Shared project constants """

import os
from enum import Enum

__author__ = "Databio lab"
__email__ = "[email protected]"
Expand Down Expand Up @@ -268,3 +269,10 @@ def _get_apperance_dict(type, templ=APPEARANCE_BY_FLAG):
"init-piface": "Initialize generic pipeline interface.",
"link": "Create directory of symlinks for reported results.",
}

# Add project/sample enum


class PipelineLevel(Enum):
SAMPLE = "sample"
PROJECT = "project"
24 changes: 13 additions & 11 deletions looper/looper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# Need specific sequence of actions for colorama imports?
from colorama import init

from .const import PipelineLevel

init()
from shutil import rmtree

Expand Down Expand Up @@ -93,7 +95,7 @@ def __call__(self, args):
if getattr(args, "project", None):

for piface in self.prj.pipeline_interfaces:
if piface.psm.pipeline_type == "project":
if piface.psm.pipeline_type == PipelineLevel.PROJECT.value:
psms[piface.psm.pipeline_name] = piface.psm
s = piface.psm.get_status() or "unknown"
status.setdefault(piface.psm.pipeline_name, {})
Expand All @@ -103,7 +105,7 @@ def __call__(self, args):
else:
for sample in self.prj.samples:
for piface in sample.project.pipeline_interfaces:
if piface.psm.pipeline_type == "sample":
if piface.psm.pipeline_type == PipelineLevel.SAMPLE.value:
psms[piface.psm.pipeline_name] = piface.psm
s = piface.psm.get_status(record_identifier=sample.sample_name)
status.setdefault(piface.psm.pipeline_name, {})
Expand Down Expand Up @@ -275,7 +277,7 @@ def __call__(self, args, preview_flag=True):
else:
if use_pipestat:
for piface in sample.project.pipeline_interfaces:
if piface.psm.pipeline_type == "sample":
if piface.psm.pipeline_type == PipelineLevel.SAMPLE.value:
psms[piface.psm.pipeline_name] = piface.psm
for pipeline_name, psm in psms.items():
psm.backend.remove_record(
Expand Down Expand Up @@ -564,7 +566,7 @@ def __call__(self, args):
if project_level:

for piface in self.prj.pipeline_interfaces:
if piface.psm.pipeline_type == "project":
if piface.psm.pipeline_type == PipelineLevel.PROJECT.value:
psms[piface.psm.pipeline_name] = piface.psm
report_directory = piface.psm.summarize(
looper_samples=self.prj.samples, portable=portable
Expand All @@ -574,7 +576,7 @@ def __call__(self, args):
return self.debug
else:
for piface in self.prj.pipeline_interfaces:
if piface.psm.pipeline_type == "sample":
if piface.psm.pipeline_type == PipelineLevel.SAMPLE.value:
psms[piface.psm.pipeline_name] = piface.psm
report_directory = piface.psm.summarize(
looper_samples=self.prj.samples, portable=portable
Expand All @@ -597,13 +599,13 @@ def __call__(self, args):

if project_level:
for piface in self.prj.pipeline_interfaces:
if piface.psm.pipeline_type == "project":
if piface.psm.pipeline_type == PipelineLevel.PROJECT.value:
psms[piface.psm.pipeline_name] = piface.psm
linked_results_path = piface.psm.link(link_dir=link_dir)
print(f"Linked directory: {linked_results_path}")
else:
for piface in self.prj.pipeline_interfaces:
if piface.psm.pipeline_type == "sample":
if piface.psm.pipeline_type == PipelineLevel.SAMPLE.value:
psms[piface.psm.pipeline_name] = piface.psm
linked_results_path = piface.psm.link(link_dir=link_dir)
print(f"Linked directory: {linked_results_path}")
Expand All @@ -622,12 +624,12 @@ def __call__(self, args):
psms = {}
if project_level:
for piface in self.prj.pipeline_interfaces:
if piface.psm.pipeline_type == "project":
if piface.psm.pipeline_type == PipelineLevel.PROJECT.value:
psms[piface.psm.pipeline_name] = piface.psm
results = piface.psm.table()
else:
for piface in self.prj.pipeline_interfaces:
if piface.psm.pipeline_type == "sample":
if piface.psm.pipeline_type == PipelineLevel.SAMPLE.value:
psms[piface.psm.pipeline_name] = piface.psm
results = piface.psm.table()
# Results contains paths to stats and object summaries.
Expand Down Expand Up @@ -672,7 +674,7 @@ def destroy_summary(prj, dry_run=False, project_level=False):
psms = {}
if project_level:
for piface in prj.pipeline_interfaces:
if piface.psm.pipeline_type == "project":
if piface.psm.pipeline_type == PipelineLevel.PROJECT.value:
psms[piface.psm.pipeline_name] = piface.psm

for name, psm in psms.items():
Expand All @@ -699,7 +701,7 @@ def destroy_summary(prj, dry_run=False, project_level=False):
)
else:
for piface in prj.pipeline_interfaces:
if piface.psm.pipeline_type == "sample":
if piface.psm.pipeline_type == PipelineLevel.SAMPLE.value:
psms[piface.psm.pipeline_name] = piface.psm
for name, psm in psms.items():
_remove_or_dry_run(
Expand Down
21 changes: 13 additions & 8 deletions looper/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from .pipeline_interface import PipelineInterface
from .processed_project import populate_project_paths, populate_sample_paths
from .utils import *
from .const import PipelineLevel

__all__ = ["Project"]

Expand Down Expand Up @@ -308,7 +309,7 @@ def project_pipeline_interfaces(self):
:return list[looper.PipelineInterface]: list of pipeline interfaces
"""
return [
PipelineInterface(pi, pipeline_type="project")
PipelineInterface(pi, pipeline_type=PipelineLevel.PROJECT.value)
for pi in self.project_pipeline_interface_sources
]

Expand Down Expand Up @@ -351,7 +352,9 @@ def pipestat_configured_project(self):
:return bool: whether pipestat configuration is complete
"""
return self._check_if_pipestat_configured(pipeline_type="project")
return self._check_if_pipestat_configured(
pipeline_type=PipelineLevel.PROJECT.value
)

def get_sample_piface(self, sample_name):
"""
Expand Down Expand Up @@ -449,7 +452,7 @@ def get_schemas(pifaces, schema_key=INPUT_SCHEMA_KEY):
schema_set.update([schema_file])
return list(schema_set)

def _check_if_pipestat_configured(self, pipeline_type="sample"):
def _check_if_pipestat_configured(self, pipeline_type=PipelineLevel.SAMPLE.value):

# First check if pipestat key is in looper_config, if not return false

Expand All @@ -463,11 +466,11 @@ def _check_if_pipestat_configured(self, pipeline_type="sample"):
# This should return True OR raise an exception at this point.
return self._get_pipestat_configuration(pipeline_type)

def _get_pipestat_configuration(self, pipeline_type="sample"):
def _get_pipestat_configuration(self, pipeline_type=PipelineLevel.SAMPLE.value):

# First check if it already exists

if pipeline_type == "sample":
if pipeline_type == PipelineLevel.SAMPLE.value:
for piface in self.pipeline_interfaces:

pipestat_config_path = self._check_for_existing_pipestat_config(piface)
Expand All @@ -479,7 +482,7 @@ def _get_pipestat_configuration(self, pipeline_type="sample"):
config_file=pipestat_config_path, multi_pipelines=True
)

elif pipeline_type == "project":
elif pipeline_type == PipelineLevel.PROJECT.value:
for prj_piface in self.project_pipeline_interfaces:
pipestat_config_path = self._check_for_existing_pipestat_config(
prj_piface
Expand Down Expand Up @@ -691,7 +694,7 @@ def _piface_by_samples(self):
pifaces_by_sample = {}
for source, sample_names in self._samples_by_interface.items():
try:
pi = PipelineInterface(source, pipeline_type="sample")
pi = PipelineInterface(source, pipeline_type=PipelineLevel.SAMPLE.value)
except PipelineInterfaceConfigError as e:
_LOGGER.debug(f"Skipping pipeline interface creation: {e}")
else:
Expand Down Expand Up @@ -742,7 +745,9 @@ def _samples_by_piface(self, piface_key):
for source in piface_srcs:
source = self._resolve_path_with_cfg(source)
try:
PipelineInterface(source, pipeline_type="sample")
PipelineInterface(
source, pipeline_type=PipelineLevel.SAMPLE.value
)
except (
ValidationError,
IOError,
Expand Down

0 comments on commit affd8d4

Please sign in to comment.