Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline Visualization API #690

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/encoded/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def main(global_config, **local_config):
config.include('.visualization')
config.include('.ingestion_listener')
config.include('.custom_embed')
config.include('.pipeline_view')

if 'elasticsearch.server' in config.registry.settings:
config.include('snovault.elasticsearch')
Expand Down
15 changes: 11 additions & 4 deletions src/encoded/custom_embed.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
from pyramid.view import view_config
from snovault.util import debug_log


ATID_PATTERN = re.compile("/[a-zA-Z-]+/[a-zA-Z0-9-_:]+/")
EMBED_ALL_FIELDS_MARKER = "*"
PROPERTY_SPLITTER = "."
GENELIST_ATID = re.compile("/gene-lists/[a-zA-Z0-9-]+/")
MINIMAL_EMBEDS = ["projects", "institutions", "users"]
MINIMAL_EMBED_ATID = re.compile("/(" + "|".join(MINIMAL_EMBEDS) + ")/[a-zA-Z0-9-_:]+/")
Expand All @@ -26,6 +29,7 @@
]
FORBIDDEN_MSG = {"error": "no view permissions"}
DATABASE_ITEM_KEY = "@type" # Key specific to JSON objects that are CGAP items
REQUESTED_FIELDS = "requested_fields"


def includeme(config):
Expand All @@ -43,7 +47,7 @@ def __init__(self, request, item, embed_props):
self.ignored_embeds = embed_props.get("ignored_embeds", [])
self.desired_embeds = embed_props.get("desired_embeds", [])
self.embed_depth = embed_props.get("embed_depth", 4)
self.requested_fields = embed_props.get("requested_fields", [])
self.requested_fields = embed_props.get(REQUESTED_FIELDS, [])

self.cache = {}
self.invalid_ids = []
Expand All @@ -55,6 +59,9 @@ def __init__(self, request, item, embed_props):
depth = -1
self.result = self.embed(item, depth)

def get_embedded_fields(self) -> dict:
return self.result

def add_actions(self, item):
"""
Add the "actions" field to an item according to the request's
Expand Down Expand Up @@ -241,7 +248,7 @@ def fields_to_nested_dict(self):
"""
field_dict = {}
for field in self.requested_fields:
field_keys = field.split(".")
field_keys = field.split(PROPERTY_SPLITTER)
field_keys = [x for x in field_keys if x]
field_dict = self.build_nested_dict(field_dict, field_keys)
return field_dict
Expand Down Expand Up @@ -306,7 +313,7 @@ def field_embed(self, item, field_dict, initial_item=False):
"The 'actions' field was requested for a JSON object"
" that is not a database item."
)
if "*" not in fields_to_keep:
if EMBED_ALL_FIELDS_MARKER not in fields_to_keep:
culled_item = {}
for field in fields_to_keep:
try:
Expand Down Expand Up @@ -380,7 +387,7 @@ def embed(context, request):
"ignored_embeds": ignored_embeds,
"desired_embeds": desired_embeds,
"embed_depth": embed_depth,
"requested_fields": requested_fields,
REQUESTED_FIELDS: requested_fields,
}
for item_id in ids:
item_embed = CustomEmbed(request, item_id, embed_props)
Expand Down
274 changes: 274 additions & 0 deletions src/encoded/pipeline_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
from dataclasses import dataclass
from requests import Request
from typing import Any, Dict, List, Mapping, Optional, Sequence

from pyramid.httpexceptions import HTTPMethodNotAllowed
from pyramid.view import view_config
from snovault.util import debug_log

from . import custom_embed
from .types.base import Item


PIPELINE_PROPERTIES = "pipeline_properties"


def includeme(config):
config.scan(__name__)


def validate_item_pipelines_get(context: Item, request: Request) -> None:
pipeline_properties = getattr(context, PIPELINE_PROPERTIES, [])
if not pipeline_properties:
raise HTTPMethodNotAllowed(detail="Item cannot display pipelines")


@view_config(
context=Item,
permission="view",
name="pipelines",
request_method="GET",
validators=[validate_item_pipelines_get],
)
@debug_log
def pipelines(context: Item, request: Request) -> Dict:
pipelines_to_display = PipelineRetriever(
context, request
).get_pipelines_to_display()
return PipelineDisplayer(pipelines_to_display).get_display()


@dataclass(frozen=True)
class PipelineToDisplay:
"""TODO: Finalize display properties once front-end settles."""

ATID = "@id"
COMPLETED = "completed"
DISPLAY_TITLE = "display_title"
FINAL_STATUS = "final_status"
FINAL_STATUS_COMPLETED = "completed"
FINAL_STATUS_STOPPED = "stopped"
FINAL_STATUS_QC_ERROR = "quality metric failed"
NAME = "name"
RUN_STATUS = "run_status"
RUNNING = "running"
STOPPED = "stopped"
STOPPED_FINAL_STATUSES = [FINAL_STATUS_STOPPED, FINAL_STATUS_QC_ERROR]
VERSION = "version"

parent_item: Mapping[str, Any]
pipeline: Mapping[str, Any]

def get_parent_item_display(self) -> Dict[str, str]:
return {
self.ATID: self.get_parent_item_atid(),
self.NAME: self.get_parent_item_name(),
}

def get_parent_item_atid(self) -> str:
return self.parent_item.get(self.ATID, "")

def get_parent_item_name(self) -> str:
return self.parent_item.get(self.DISPLAY_TITLE, "")

def get_pipeline_display(self) -> Dict[str, Any]:
return {
self.ATID: self.get_pipeline_atid(),
self.RUN_STATUS: self.get_pipeline_run_status(),
self.NAME: self.get_pipeline_name(),
self.VERSION: self.get_pipeline_version(),
}

def get_pipeline_run_status(self) -> str:
final_status = self.get_pipeline_final_status()
if final_status == self.FINAL_STATUS_COMPLETED:
return self.COMPLETED
elif final_status in self.STOPPED_FINAL_STATUSES:
return self.STOPPED
else:
return self.RUNNING

def get_pipeline_final_status(self) -> str:
return self.pipeline.get(self.FINAL_STATUS, "")

def get_pipeline_name(self) -> str:
return self.pipeline.get(self.NAME, "")

def get_pipeline_version(self) -> str:
return self.pipeline.get(self.VERSION, "")

def get_pipeline_atid(self) -> str:
return self.pipeline.get(self.ATID, "")


@dataclass(frozen=True)
class PipelineRetriever:

UUID = "uuid"

context: Item
request: Request

def get_pipelines_to_display(self) -> List[PipelineToDisplay]:
item_with_embeds = self.get_item_with_embeds()
return self.get_pipelines(item_with_embeds)

def get_item_with_embeds(self) -> List[dict]:
item_identifier = self.get_item_identifier()
custom_embed_parameters = self.get_custom_embed_parameters()
return custom_embed.CustomEmbed(
self.request, item_identifier, custom_embed_parameters
).get_embedded_fields()

def get_item_identifier(self) -> str:
return str(getattr(self.context, self.UUID, ""))

def get_custom_embed_parameters(self) -> dict:
return {custom_embed.REQUESTED_FIELDS: self.get_properties_to_embed()}

def get_properties_to_embed(self) -> List[str]:
result = [custom_embed.EMBED_ALL_FIELDS_MARKER]
for pipeline_property in self.get_pipeline_properties():
result.extend(
self.get_properties_to_embed_from_pipeline_property(pipeline_property)
)
return result

def get_pipeline_properties(self) -> List[str]:
return getattr(self.context, PIPELINE_PROPERTIES, [])

def get_properties_to_embed_from_pipeline_property(
self, pipeline_property: str
) -> List[str]:
split_properties = self.split_pipeline_property(pipeline_property)
properties_to_embed = self.get_all_possible_embeds(split_properties)
return [
self.make_embed_property(property_to_embed)
for property_to_embed in properties_to_embed
]

def get_all_possible_embeds(self, split_properties: List[str]) -> List[str]:
return [
custom_embed.PROPERTY_SPLITTER.join(split_properties[:idx + 1])
for idx in range(len(split_properties))
]

def make_embed_property(self, property_to_embed: str) -> str:
return (
property_to_embed
+ custom_embed.PROPERTY_SPLITTER
+ custom_embed.EMBED_ALL_FIELDS_MARKER
)

def get_pipelines(self, embedded_properties: Mapping) -> List[PipelineToDisplay]:
result = []
for pipeline_property in self.get_pipeline_properties():
result.extend(
self.get_pipelines_for_pipeline_property(
embedded_properties, pipeline_property
)
)
return result

def get_pipelines_for_pipeline_property(
self, embedded_properties: Mapping, pipeline_property: str
) -> List[PipelineToDisplay]:
properties_to_get = self.split_pipeline_property(pipeline_property)
return RecursivePipelineRetriever(
embedded_properties, embedded_properties, properties_to_get
).get_pipelines()

@staticmethod
def split_pipeline_property(pipeline_property: str) -> List[str]:
return [
term
for term in pipeline_property.split(custom_embed.PROPERTY_SPLITTER)
if term
]


@dataclass(frozen=True)
class RecursivePipelineRetriever:

TYPES = "@type"
META_WORKFLOW_RUN_TYPE = "MetaWorkflowRun"

parent_item: Mapping
item_to_get_from: Any
properties_to_get: List[str]

def get_pipelines(self) -> List[PipelineToDisplay]:
result = []
if isinstance(self.item_to_get_from, dict):
result.extend(self.get_pipelines_from_dict())
elif isinstance(self.item_to_get_from, list):
result.extend(self.recursive_get_pipelines_from_list())
return result

def get_pipelines_from_dict(self) -> List[PipelineToDisplay]:
result = []
if self.properties_to_get:
result.extend(self.recursive_get_pipelines_from_dict())
elif self.is_pipeline_item():
result.append(PipelineToDisplay(self.parent_item, self.item_to_get_from))
return result

def is_pipeline_item(self) -> bool:
return self.META_WORKFLOW_RUN_TYPE in self.item_to_get_from.get(self.TYPES, [])

def recursive_get_pipelines_from_dict(self) -> List[PipelineToDisplay]:
result = []
[property_to_get, *remaining_properties_to_get] = self.properties_to_get
new_item_to_get_from = self.item_to_get_from.get(property_to_get, {})
if new_item_to_get_from:
result.extend(
self.recursive_get_pipelines_from_item(
new_item_to_get_from, remaining_properties_to_get
)
)
return result

def recursive_get_pipelines_from_list(self) -> List[PipelineToDisplay]:
result = []
for item in self.item_to_get_from:
result.extend(self.recursive_get_pipelines_from_item(item))
return result

def recursive_get_pipelines_from_item(
self, item: Any, properties_to_get: Optional[List[str]] = None
) -> List[PipelineToDisplay]:
if properties_to_get is None:
properties_to_get = self.properties_to_get
parent_item_to_pass = self.get_parent_item_to_pass(item, properties_to_get)
return RecursivePipelineRetriever(
parent_item_to_pass, item, properties_to_get
).get_pipelines()

def get_parent_item_to_pass(self, item: Any, properties_to_get: List[str]) -> Dict:
if self.is_item(item) and properties_to_get:
return item
return self.parent_item

def is_item(self, item: Any) -> bool:
if isinstance(item, dict) and item.get(self.TYPES):
return True
return False


@dataclass(frozen=True)
class PipelineDisplayer:
"""TODO: Finalize display once front-end settles."""

pipelines_to_display: Sequence[PipelineToDisplay]

def get_display(self) -> Dict[str, List[Dict]]:
result = {}
for pipeline_to_display in self.pipelines_to_display:
parent_atid = pipeline_to_display.get_parent_item_atid()
pipeline_display = pipeline_to_display.get_pipeline_display()
existing_pipeline_displays = result.get(parent_atid)
if existing_pipeline_displays is None:
result[parent_atid] = [pipeline_display]
else:
existing_pipeline_displays.append(pipeline_display)
return result
Loading