Skip to content

Commit

Permalink
Add workflow extraction
Browse files Browse the repository at this point in the history
  • Loading branch information
bebatut committed Jul 17, 2024
1 parent 2c5cde7 commit 00b5599
Show file tree
Hide file tree
Showing 3 changed files with 342 additions and 11 deletions.
314 changes: 314 additions & 0 deletions bin/extract_galaxy_workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
#!/usr/bin/env python

import argparse
from datetime import (
date,
datetime,
)
from typing import (
Dict,
List,
)
from pathlib import Path

import pandas as pd
import requests
import shared


class Workflow():
"""
Class for workflow
"""

def __init__(self, wf: dict, source: str, tools: dict = {}):
if source == "json":
self.source=source
self.id=wf["id"]
self.server=wf["server"]
self.link=wf["link"]
self.name=wf["name"]
self.creators=wf["creators"]
self.tags=wf["tags"]
self.update_time=wf["update_time"]
self.latest_version=wf["latest_version"]
self.versions=wf["versions"]
self.number_of_steps=wf["number_of_steps"]
self.tools=wf["tools"]
self.edam_operation=wf["edam_operation"]
else:
self.source=source
if self.source == "WorkflowHub":
self.id = wf["data"]["id"]
self.server = "https://workflowhub.eu"
self.link = f"https://workflowhub.eu{wf["data"]["links"]["self"]}"
self.name = wf["data"]["attributes"]["title"]
self.tags = wf["data"]["attributes"]["tags"]
self.update_time = wf["data"]["attributes"]["updated_at"]
self.latest_version = wf["data"]["attributes"]["latest_version"]
self.versions = len(wf["data"]["attributes"]["versions"])
self.number_of_steps = len(wf["data"]["attributes"]["internals"]["steps"])
elif self.source == "server":
self.id = wf["id"]
self.server = wf["server"]
self.link = f"{ wf["server"] }/published/workflow?id={ wf["id"] }"
self.name = wf["name"]
self.add_creators(wf)
self.number_of_steps = wf["number_of_steps"] if "number_of_steps" in wf else len(wf["steps"].keys())
self.tags = wf["tags"]
self.update_time = wf["update_time"]
self.latest_version = wf["version"]
self.versions = wf["version"]
else:
raise ValueError(f"Incorrect source ({ self.source }) for workflow")

self.add_creators(wf)
self.add_tools(wf)
self.edam_operation = shared.get_edam_operation_from_tools(self.tools, tools)

def add_creators(self, wf: dict) -> None:
"""
Get workflow creators
"""
self.creators = []
if self.source == "WorkflowHub":
creator = wf["data"]["attributes"]["creators"]
if len(creator)==0:
other = wf["data"]["attributes"]["other_creators"]
if other and len(other) > 0:
self.creators.extend(wf["data"]["attributes"]["other_creators"].split(","))
else:
self.creators.extend(f"{creator0["given_name"]} {creator0["family_name"]}")
else:
if "creator" in wf and wf["creator"] is not None:
for c in wf["creator"]:
self.creators.append(c["name"])

def add_tools(self, wf: list):
"""
Extract list of tool ids from workflow
"""
self.tools = set()
if self.source == "WorkflowHub":
for tool in wf["data"]["attributes"]["internals"]["steps"]:
self.tools.add(shared.shorten_tool_id(tool["description"]))
else:
for step in wf["steps"].values():
if "tool_id" in step and step["tool_id"] is not None:
self.tools.add(shared.shorten_tool_id(step["tool_id"]))
self.tools = list(self.tools)

def __getitem__(self, item):
return self.__dict__[item]


class Workflows():
"""
Class Workflows
"""

def __init__(self, tool_fp: str = "", test: bool = False, wfs: list = []) -> None:
self.workflows = []
self.tools = []
self.test = test
if tool_fp != "":
self.tools = shared.read_suite_per_tool_id(tool_fp)
self.add_workflows_from_workflowhub()
self.add_workflows_from_public_servers()
elif len(wfs) > 1:
for wf in wfs:
self.workflows = Workflow(wf=wf, source="json")

def add_workflows_from_workflowhub(self) -> None:
"""
Add workflows from WorkflowHub
"""
header = {"Accept": "application/json"}
wfhub_wfs = shared.get_request_json(
"https://workflowhub.eu/workflows?filter[workflow_type]=galaxy",
header,
)
print(f"Workflows from WorkflowHub: {len(wfhub_wfs["data"])}")
data = wfhub_wfs["data"]
if self.test:
data = data[:1]
for wf in data:
wfhub_wf = shared.get_request_json(
f"https://workflowhub.eu{wf["links"]["self"]}",
header,
)
self.workflows.append(Workflow(wf=wfhub_wf, source="WorkflowHub", tools=self.tools))

def add_workflows_from_a_server(self, server: str) -> None:
"""
Extract public workflows from a server
"""
header = {"Accept": "application/json"}
server_wfs = shared.get_request_json(
f"{server}/api/workflows/",
header,
)
count = 0
for wf in server_wfs:
if wf['published'] and wf['importable'] and not wf['deleted'] and not wf['hidden']:
count += 1
server_wf = shared.get_request_json(
f"{server}/api/workflows/{wf["id"]}",
header,
)
server_wf["server"] = server
self.workflows.append(Workflow(wf=server_wf, source="server", tools=self.tools))
print(f"Workflows from {server}: {count}")

def add_workflows_from_public_servers(self) -> None:
"""
Extract workflows from public servers
"""
server_df = pd.read_csv(Path("data", "available_public_servers.csv"), sep="\t", index_col=0)
server_urls = server_df["url"]
if self.test:
server_urls = server_urls[:2]
for url in server_urls:
print(url)
self.add_workflows_from_a_server(url)

def export_workflows_to_dict(self) -> List:
"""
Export workflows as dictionary
"""
return [w.__dict__ for w in self.workflows]

def filter_workflows_by_tags(self, tags) -> None:
"""
Filter workflows by tags
"""
for w in self.workflows:
matches = set(w.tags) & set(tags)
if len(matches) == 0:
self.workflows.remove(w)

def export_workflows_to_tsv(self, output_fp: str) -> None:
"""
Export workflows to a TSV file
"""
df = pd.DataFrame(self.export_workflows_to_dict()).assign(
#Workflows=lambda df: df.workflows.notna(),
#exact_supported_servers=lambda df: df.exact_supported_servers.fillna("").apply(list),
#inexact_supported_servers=lambda df: df.inexact_supported_servers.fillna("").apply(list),
#visit_duration=lambda df: df.visit_duration / 60,
)

for col in ["tools", "edam_operation"]:
df[col] = shared.format_list_column(df[col])

df = (
df.rename(
columns={
"title": "Title",
"hands_on": "Tutorial",
"url": "Link",
"slides": "Slides",
"mod_date": "Last modification",
"pub_date": "Creation",
"version": "Version",
"short_tools": "Tools",
"exact_supported_servers": "Servers with precise tool versions",
"inexact_supported_servers": "Servers with tool but different versions",
"topic_name_human": "Topic",
"video": "Video",
"edam_topic": "EDAM topic",
"edam_operation": "EDAM operation",
"feedback_number": "Feedback number",
"feedback_mean_note": "Feedback mean note",
"visitors": "Visitors",
"pageviews": "Page views",
"visit_duration": "Visit duration",
"video_versions": "Video versions",
"video_view": "Video views",
}
)
.fillna("")
.reindex(
columns=[
"Topic",
"Title",
"Link",
"EDAM topic",
"EDAM operation",
"Creation",
"Last modification",
"Version",
"Tutorial",
"Slides",
"Video",
"Workflows",
"Tools",
"Servers with precise tool versions",
"Servers with tool but different versions",
"Feedback number",
"Feedback mean note",
"Visitors",
"Page views",
"Visit duration",
"Video views",
]
)
)
df.to_csv(output_fp, sep="\t", index=False)

if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Extract Galaxy Workflows from WorkflowHub and public servers"
)
subparser = parser.add_subparsers(dest="command")

# Extract Workflows
extract = subparser.add_parser("extract", help="Extract all workflows")
extract.add_argument(
"--all", "-o", required=True, help="Filepath to JSON with all extracted workflows"
)
extract.add_argument(
"--tools",
"-t",
required=True,
help="Filepath to JSON with all extracted tools, generated by extractools command",
)
extract.add_argument(
"--test",
action="store_true",
default=False,
required=False,
help="Run a small test case only on one topic",
)

# Filter workflows
filterwf = subparser.add_parser("filter", help="Filter workflows based on their tags")
filterwf.add_argument(
"--all",
"-a",
required=True,
help="Filepath to JSON with all extracted workflows, generated by extract command",
)
filterwf.add_argument(
"--filtered",
"-f",
required=True,
help="Filepath to TSV with filtered tutorials",
)
filterwf.add_argument(
"--tags",
"-c",
help="Path to a file with tags to keep in the extraction (one per line)",
)

args = parser.parse_args()

if args.command == "extract":
wfs = Workflows(tool_fp=args.tools, test=args.test)
shared.export_to_json(wfs.export_workflows_to_dict(), args.all)

elif args.command == "filter":
wfs = Workflows(wfs=shared.load_json(args.all))
tags = shared.read_file(args.tags)
wfs.filter_workflows_by_tags(tags)
wfs.export_workflows_to_tsv(args.filtered_tutorials)
13 changes: 2 additions & 11 deletions bin/extract_gtn_tutorials.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ def get_short_tool_ids(tuto: dict) -> None:
tuto["short_tools"] = set()
if "tools" in tuto:
for tool in tuto["tools"]:
if "toolshed" in tool:
tuto["short_tools"].add(tool.split("/")[-2])
else:
tuto["short_tools"].add(tool)
tuto["short_tools"].add(shared.shorten_tool_id(tool))
tuto["short_tools"] = list(tuto["short_tools"])


Expand All @@ -55,13 +52,7 @@ def get_edam_operations(tuto: dict, tools: dict) -> None:
"""
tuto["edam_operation"] = []
if "short_tools" in tuto:
edam_operation = set()
for t in tuto["short_tools"]:
if t in tools:
edam_operation.update(set(tools[t]["EDAM operation"]))
else:
print(f"{t} not found in all tools")
tuto["edam_operation"] = list(edam_operation)
tuto["edam_operation"] = shared.get_edam_operation_from_tools(tuto["short_tools"], tools)


def get_feedback(tuto: dict, feedback: dict) -> None:
Expand Down
26 changes: 26 additions & 0 deletions bin/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,29 @@ def get_request_json(url: str, headers: dict) -> dict:

def format_date(date: str) -> str:
return datetime.fromisoformat(date).strftime("%Y-%m-%d")


def shorten_tool_id(tool: str) -> str:
"""
Shorten tool id
"""
if "toolshed" in tool:
return tool.split("/")[-2]
else:
return tool


def get_edam_operation_from_tools(selected_tools: list, all_tools: dict) -> List:
"""
Get list of EDAM operations of the tools
:param selected_tools: list of tool suite ids
:param all_tools: dictionary with information about all tools
"""
edam_operation = set()
for t in selected_tools:
if t in all_tools:
edam_operation.update(set(all_tools[t]["EDAM operation"]))
else:
print(f"{t} not found in all tools")
return list(edam_operation)

0 comments on commit 00b5599

Please sign in to comment.