From a129868a2d5f90f8d74c94c155e429d5e1008380 Mon Sep 17 00:00:00 2001 From: Jin Lee Date: Sat, 2 Nov 2019 13:33:10 -0700 Subject: [PATCH 1/3] refactor: add a dict_tool.py (remove merge_dict from caper.py) --- caper/caper.py | 22 +---- caper/caper_args.py | 2 +- caper/dict_tool.py | 231 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 233 insertions(+), 22 deletions(-) create mode 100644 caper/dict_tool.py diff --git a/caper/caper.py b/caper/caper.py index 04dbb926..1f839687 100644 --- a/caper/caper.py +++ b/caper/caper.py @@ -27,6 +27,7 @@ from subprocess import Popen, check_call, PIPE, CalledProcessError from datetime import datetime +from .dict_tool import merge_dict from .caper_args import parse_caper_arguments from .caper_check import check_caper_conf from .cromwell_rest_api import CromwellRestAPI @@ -38,27 +39,6 @@ CaperBackendSGE, CaperBackendPBS -def merge_dict(a, b, path=None): - """Merge b into a recursively. This mutates a and overwrites - items in b on a for conflicts. - - Ref: https://stackoverflow.com/questions/7204805/dictionaries - -of-dictionaries-merge/7205107#7205107 - """ - if path is None: - path = [] - for key in b: - if key in a: - if isinstance(a[key], dict) and isinstance(b[key], dict): - merge_dict(a[key], b[key], path + [str(key)]) - elif a[key] == b[key]: - pass - else: - a[key] = b[key] - else: - a[key] = b[key] - - class Caper(object): """Cromwell/WDL wrapper """ diff --git a/caper/caper_args.py b/caper/caper_args.py index 7911cb02..f8e758c2 100644 --- a/caper/caper_args.py +++ b/caper/caper_args.py @@ -19,7 +19,7 @@ from .caper_backend import BACKEND_ALIAS_SHERLOCK, BACKEND_ALIAS_SCG -__version__ = '0.5.3' +__version__ = '0.5.4' DEFAULT_JAVA_HEAP_SERVER = '10G' DEFAULT_JAVA_HEAP_RUN = '2G' diff --git a/caper/dict_tool.py b/caper/dict_tool.py new file mode 100644 index 00000000..3e366ae8 --- /dev/null +++ b/caper/dict_tool.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python3 +"""dictTool: merge/split/flatten/unflatten dict + +Author: + Jin Lee (leepc12@gmail.com) at ENCODE-DCC +""" + +import re +from collections import defaultdict +try: + from collections.abc import MutableMapping +except AttributeError: + from collections import MutableMapping + + +def merge_dict(a, b): + """Merges b into a recursively. This mutates a and overwrites + items in b on a for conflicts. + + Ref: https://stackoverflow.com/questions/7204805/dictionaries + -of-dictionaries-merge/7205107#7205107 + """ + for key in b: + if key in a: + if isinstance(a[key], dict) and isinstance(b[key], dict): + merge_dict(a[key], b[key]) + elif a[key] == b[key]: + pass + else: + a[key] = b[key] + else: + a[key] = b[key] + + +def flatten_dict(d, parent_key=()): + """Flattens dict into single-level-tuple-keyed dict with + {(tuple of keys of parents and self): value} + + Returns: + dict of { + (key_lvl1, key_lvl2, key_lvl3, ...): value + } + """ + items = [] + for k, v in d.items(): + new_key = parent_key + (k if isinstance(k, tuple) else (k,)) + if isinstance(v, MutableMapping): + items.extend(flatten_dict(v, parent_key=new_key).items()) + else: + items.append((new_key, v)) + return type(d)(items) + + +def unflatten_dict(d_flat): + """Unflattens single-level-tuple-keyed dict into dict + """ + result = type(d_flat)() + for k_tuple, v in d_flat.items(): + d_curr = result + for i, k in enumerate(k_tuple): + if i == len(k_tuple) - 1: + d_curr[k] = v + elif k not in d_curr: + d_curr[k] = type(d_flat)() + d_curr = d_curr[k] + return result + + +def split_dict(d, rules=None): + """Splits dict according to "rule" + + Returns: + List of split dict + + Args: + rule: + A list of tuple (RULE_NAME: REGEX) + + If a key name in an JSON object matches with this REGEX + then ALL objects with the same key will be separated from + the original root JSON object while keeping their hierachy. + RULE_NAME will be added to root of each new JSON object. + + For example, we have a JSON object like the following + [ + { + "flagstat_qc": { + "rep1": { + "read1": 100, + "read2": 200 + }, + "rep2": { + "read1": 300, + "read2": 400 + } + }, + "etc": { + "samstat_qc": { + "rep1": { + "unmapped": 500, + "mapped": 600 + }, + "rep2": { + "unmapped": 700, + "mapped": 800 + } + } + }, + "idr_qc": { + "qc_test1" : 900 + } + } + ] + with "new_row_rule" = "replicate:^rep\d+$", this JSON object + will be splitted into three (original, rep1, rep2) JSON object. + [ + # original + { + "idr_qc": { + "qc_test1" : 900 + } + }, + # rep1 + { + "replicate": "rep1", + "flagstat_qc": { + "read1": 100, + "read2": 200 + }, + "etc": { + "samstat_qc": { + "unmapped": 500, + "mapped": 600 + } + } + }, + # rep2 + { + "replicate": "rep2", + "flagstat_qc": { + "read1": 300, + "read2": 400 + }, + "etc": { + "samstat_qc": { + "unmapped": 700, + "mapped": 800 + } + } + }, + ] + """ + if rules is None: + return [d] + if isinstance(rules, tuple): + rules = [rules] + + d_flat = flatten_dict(d) + result = [] + keys_matched_regex = set() + d_each_rule = defaultdict(type(d)) + for rule_name, rule_regex in rules: + for k_tuple, v in d_flat.items(): + new_k_tuple = () + pattern_matched_k = None + for k in k_tuple: + if re.findall(rule_regex, k): + pattern_matched_k = (rule_name, k) + else: + new_k_tuple += (k,) + if pattern_matched_k is not None: + d_each_rule[pattern_matched_k][new_k_tuple] = v + keys_matched_regex.add(k_tuple) + + for (rule_name, k), d_each_matched in d_each_rule.items(): + d_ = unflatten_dict(d_each_matched) + d_[rule_name] = k + result.append(d_) + + d_others = type(d)() + for k_tuple, v in d_flat.items(): + if k_tuple not in keys_matched_regex: + d_others[k_tuple] = v + if d_others: + d_ = unflatten_dict(d_others) + result = [d_] + result + return result + + +def test(): + import json + from collections import OrderedDict + d = OrderedDict({ + "flagstat_qc": { + "rep1": { + "read1": 100, + "read2": 200 + }, + "rep2": { + "read1": 300, + "read2": 400 + } + }, + "etc": { + "samstat_qc": { + "rep1": { + "unmapped": 500, + "mapped": 600 + }, + "rep2": { + "unmapped": 700, + "mapped": 800 + } + } + }, + "idr_qc": { + "qc_test1" : 900 + } + }) + j = json.dumps(d, indent=4) + print(j) + # j_flat = flatten_dict(d) + # print(j_flat) + jsons_split = split_dict(d, {'replicate': r'^rep\d+$'}) + print(json.dumps(jsons_split, indent=4)) + # print(split_dict(d, {'replicate': r'^rep\d+$'})) + return 0 + + +if __name__ == '__main__': + test() From 94dadd8d27e1b600a5f1e2d382c97ec305471625 Mon Sep 17 00:00:00 2001 From: Jin Lee Date: Sat, 2 Nov 2019 14:29:38 -0700 Subject: [PATCH 2/3] feature: womtool validation before running/submitting workflow --- caper/caper.py | 44 +++++++++++++++++++++++++++++++++++++++++++- caper/caper_args.py | 8 ++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/caper/caper.py b/caper/caper.py index 1f839687..157e0bad 100644 --- a/caper/caper.py +++ b/caper/caper.py @@ -17,6 +17,7 @@ from pyhocon import ConfigFactory, HOCONConverter import os +import sys import pwd import json import re @@ -44,6 +45,7 @@ class Caper(object): """ CROMWELL_JAR_DIR = '~/.caper/cromwell_jar' + WOMTOOL_JAR_DIR = '~/.caper/womtool_jar' BACKEND_CONF_HEADER = 'include required(classpath("application"))\n' DEFAULT_BACKEND = BACKEND_LOCAL RE_PATTERN_BACKEND_CONF_HEADER = r'^\s*include\s' @@ -132,6 +134,8 @@ def __init__(self, args): self._imports = args.get('imports') self._metadata_output = args.get('metadata_output') self._singularity_cachedir = args.get('singularity_cachedir') + self._ignore_womtool = args.get('ignore_womtool') + self._womtool = args.get('womtool') # file DB self._file_db = args.get('file_db') @@ -208,8 +212,21 @@ def run(self): '-m', metadata_file] if imports_file is not None: cmd += ['-p', imports_file] - print('[Caper] cmd: ', cmd) + if not self._ignore_womtool: + # run womtool first to validate WDL and input JSON + cmd_womtool = ['java', '-jar', self.__download_womtool_jar(), + 'validate', CaperURI(self._wdl).get_local_file(), + '-i', input_file] + try: + print("[Caper] Validating WDL/input JSON with womtool...") + check_call(cmd_womtool) + except CalledProcessError as e: + print("[Caper] Error (womtool): WDL or input JSON is invalid.") + rc = e.returncode + sys.exit(rc) + + print('[Caper] cmd: ', cmd) if self._dry_run: return -1 try: @@ -363,6 +380,19 @@ def submit(self): labels_file = self.__create_labels_json_file(tmp_dir) on_hold = self._hold if self._hold is not None else False + # run womtool first to validate WDL and input JSON + if not self._ignore_womtool: + cmd_womtool = ['java', '-jar', self.__download_womtool_jar(), + 'validate', CaperURI(self._wdl).get_local_file(), + '-i', input_file] + try: + print("[Caper] Validating WDL/input JSON with womtool...") + check_call(cmd_womtool) + except CalledProcessError as e: + print("[Caper] Error (womtool): WDL or input JSON is invalid.") + rc = e.returncode + sys.exit(rc) + if self._dry_run: return -1 r = self._cromwell_rest_api.submit( @@ -550,6 +580,18 @@ def __download_cromwell_jar(self): os.path.basename(self._cromwell)) return cu.copy(target_uri=path) + def __download_womtool_jar(self): + """Download womtool-X.jar + """ + cu = CaperURI(self._womtool) + if cu.uri_type == URI_LOCAL: + return cu.get_uri() + + path = os.path.join( + os.path.expanduser(Caper.WOMTOOL_JAR_DIR), + os.path.basename(self._womtool)) + return cu.copy(target_uri=path) + def __write_metadata_jsons(self, workflow_ids): try: for wf_id in workflow_ids: diff --git a/caper/caper_args.py b/caper/caper_args.py index f8e758c2..e6663dc1 100644 --- a/caper/caper_args.py +++ b/caper/caper_args.py @@ -26,6 +26,7 @@ DEFAULT_CAPER_CONF = '~/.caper/default.conf' DEFAULT_SINGULARITY_CACHEDIR = '~/.caper/singularity_cachedir' DEFAULT_CROMWELL_JAR = 'https://github.com/broadinstitute/cromwell/releases/download/42/cromwell-42.jar' +DEFAULT_WOMTOOL_JAR = 'https://github.com/broadinstitute/cromwell/releases/download/42/womtool-42.jar' DEFAULT_MYSQL_DB_IP = 'localhost' DEFAULT_MYSQL_DB_PORT = 3306 DEFAULT_DB_TIMEOUT_MS = 30000 @@ -384,6 +385,12 @@ def parse_caper_arguments(): parent_submit.add_argument( '--deepcopy-ext', default=DEFAULT_DEEPCOPY_EXT, help='Comma-separated list of file extensions to be deepcopied') + parent_submit.add_argument( + '--ignore-womtool', action='store_true', + help='Ignore warnings from womtool.jar.') + parent_submit.add_argument( + '--womtool', default=DEFAULT_WOMTOOL_JAR, + help='Path or URL for Cromwell\'s womtool JAR file') group_dep = parent_submit.add_argument_group( title='dependency resolver for all backends', @@ -550,6 +557,7 @@ def parse_caper_arguments(): 'use_gsutil_over_aws_s3', 'hold', 'no_deepcopy', + 'ignore_womtool', 'no_build_singularity', 'no_file_db', 'use_netrc', From 85a42cb897040fb2a787bceb1ad707d9167dc055 Mon Sep 17 00:00:00 2001 From: Jin Lee Date: Sat, 2 Nov 2019 14:39:53 -0700 Subject: [PATCH 3/3] fix: java -Xmx512M for womtool --- caper/caper.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/caper/caper.py b/caper/caper.py index 157e0bad..eca137ac 100644 --- a/caper/caper.py +++ b/caper/caper.py @@ -215,7 +215,8 @@ def run(self): if not self._ignore_womtool: # run womtool first to validate WDL and input JSON - cmd_womtool = ['java', '-jar', self.__download_womtool_jar(), + cmd_womtool = ['java', '-Xmx512M', '-jar', + self.__download_womtool_jar(), 'validate', CaperURI(self._wdl).get_local_file(), '-i', input_file] try: @@ -382,7 +383,8 @@ def submit(self): # run womtool first to validate WDL and input JSON if not self._ignore_womtool: - cmd_womtool = ['java', '-jar', self.__download_womtool_jar(), + cmd_womtool = ['java', '-Xmx512M', '-jar', + self.__download_womtool_jar(), 'validate', CaperURI(self._wdl).get_local_file(), '-i', input_file] try: