Skip to content

Commit

Permalink
Merge pull request #32 from ENCODE-DCC/dev14
Browse files Browse the repository at this point in the history
Dev14
  • Loading branch information
leepc12 authored Nov 2, 2019
2 parents 486fc33 + 85a42cb commit 641db11
Show file tree
Hide file tree
Showing 3 changed files with 286 additions and 23 deletions.
68 changes: 46 additions & 22 deletions caper/caper.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from pyhocon import ConfigFactory, HOCONConverter
import os
import sys
import pwd
import json
import re
Expand All @@ -27,6 +28,7 @@
from subprocess import Popen, check_call, PIPE, CalledProcessError
from datetime import datetime

from .dict_tool import merge_dict
from .caper_args import parse_caper_arguments
from .caper_check import check_caper_conf
from .cromwell_rest_api import CromwellRestAPI
Expand All @@ -38,32 +40,12 @@
CaperBackendSGE, CaperBackendPBS


def merge_dict(a, b, path=None):
"""Merge b into a recursively. This mutates a and overwrites
items in b on a for conflicts.
Ref: https://stackoverflow.com/questions/7204805/dictionaries
-of-dictionaries-merge/7205107#7205107
"""
if path is None:
path = []
for key in b:
if key in a:
if isinstance(a[key], dict) and isinstance(b[key], dict):
merge_dict(a[key], b[key], path + [str(key)])
elif a[key] == b[key]:
pass
else:
a[key] = b[key]
else:
a[key] = b[key]


class Caper(object):
"""Cromwell/WDL wrapper
"""

CROMWELL_JAR_DIR = '~/.caper/cromwell_jar'
WOMTOOL_JAR_DIR = '~/.caper/womtool_jar'
BACKEND_CONF_HEADER = 'include required(classpath("application"))\n'
DEFAULT_BACKEND = BACKEND_LOCAL
RE_PATTERN_BACKEND_CONF_HEADER = r'^\s*include\s'
Expand Down Expand Up @@ -152,6 +134,8 @@ def __init__(self, args):
self._imports = args.get('imports')
self._metadata_output = args.get('metadata_output')
self._singularity_cachedir = args.get('singularity_cachedir')
self._ignore_womtool = args.get('ignore_womtool')
self._womtool = args.get('womtool')

# file DB
self._file_db = args.get('file_db')
Expand Down Expand Up @@ -228,8 +212,22 @@ def run(self):
'-m', metadata_file]
if imports_file is not None:
cmd += ['-p', imports_file]
print('[Caper] cmd: ', cmd)

if not self._ignore_womtool:
# run womtool first to validate WDL and input JSON
cmd_womtool = ['java', '-Xmx512M', '-jar',
self.__download_womtool_jar(),
'validate', CaperURI(self._wdl).get_local_file(),
'-i', input_file]
try:
print("[Caper] Validating WDL/input JSON with womtool...")
check_call(cmd_womtool)
except CalledProcessError as e:
print("[Caper] Error (womtool): WDL or input JSON is invalid.")
rc = e.returncode
sys.exit(rc)

print('[Caper] cmd: ', cmd)
if self._dry_run:
return -1
try:
Expand Down Expand Up @@ -383,6 +381,20 @@ def submit(self):
labels_file = self.__create_labels_json_file(tmp_dir)
on_hold = self._hold if self._hold is not None else False

# run womtool first to validate WDL and input JSON
if not self._ignore_womtool:
cmd_womtool = ['java', '-Xmx512M', '-jar',
self.__download_womtool_jar(),
'validate', CaperURI(self._wdl).get_local_file(),
'-i', input_file]
try:
print("[Caper] Validating WDL/input JSON with womtool...")
check_call(cmd_womtool)
except CalledProcessError as e:
print("[Caper] Error (womtool): WDL or input JSON is invalid.")
rc = e.returncode
sys.exit(rc)

if self._dry_run:
return -1
r = self._cromwell_rest_api.submit(
Expand Down Expand Up @@ -570,6 +582,18 @@ def __download_cromwell_jar(self):
os.path.basename(self._cromwell))
return cu.copy(target_uri=path)

def __download_womtool_jar(self):
"""Download womtool-X.jar
"""
cu = CaperURI(self._womtool)
if cu.uri_type == URI_LOCAL:
return cu.get_uri()

path = os.path.join(
os.path.expanduser(Caper.WOMTOOL_JAR_DIR),
os.path.basename(self._womtool))
return cu.copy(target_uri=path)

def __write_metadata_jsons(self, workflow_ids):
try:
for wf_id in workflow_ids:
Expand Down
10 changes: 9 additions & 1 deletion caper/caper_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
from .caper_backend import BACKEND_ALIAS_SHERLOCK, BACKEND_ALIAS_SCG


__version__ = '0.5.3'
__version__ = '0.5.4'

DEFAULT_JAVA_HEAP_SERVER = '10G'
DEFAULT_JAVA_HEAP_RUN = '2G'
DEFAULT_CAPER_CONF = '~/.caper/default.conf'
DEFAULT_SINGULARITY_CACHEDIR = '~/.caper/singularity_cachedir'
DEFAULT_CROMWELL_JAR = 'https://github.com/broadinstitute/cromwell/releases/download/42/cromwell-42.jar'
DEFAULT_WOMTOOL_JAR = 'https://github.com/broadinstitute/cromwell/releases/download/42/womtool-42.jar'
DEFAULT_MYSQL_DB_IP = 'localhost'
DEFAULT_MYSQL_DB_PORT = 3306
DEFAULT_DB_TIMEOUT_MS = 30000
Expand Down Expand Up @@ -384,6 +385,12 @@ def parse_caper_arguments():
parent_submit.add_argument(
'--deepcopy-ext', default=DEFAULT_DEEPCOPY_EXT,
help='Comma-separated list of file extensions to be deepcopied')
parent_submit.add_argument(
'--ignore-womtool', action='store_true',
help='Ignore warnings from womtool.jar.')
parent_submit.add_argument(
'--womtool', default=DEFAULT_WOMTOOL_JAR,
help='Path or URL for Cromwell\'s womtool JAR file')

group_dep = parent_submit.add_argument_group(
title='dependency resolver for all backends',
Expand Down Expand Up @@ -550,6 +557,7 @@ def parse_caper_arguments():
'use_gsutil_over_aws_s3',
'hold',
'no_deepcopy',
'ignore_womtool',
'no_build_singularity',
'no_file_db',
'use_netrc',
Expand Down
231 changes: 231 additions & 0 deletions caper/dict_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
#!/usr/bin/env python3
"""dictTool: merge/split/flatten/unflatten dict
Author:
Jin Lee ([email protected]) at ENCODE-DCC
"""

import re
from collections import defaultdict
try:
from collections.abc import MutableMapping
except AttributeError:
from collections import MutableMapping


def merge_dict(a, b):
"""Merges b into a recursively. This mutates a and overwrites
items in b on a for conflicts.
Ref: https://stackoverflow.com/questions/7204805/dictionaries
-of-dictionaries-merge/7205107#7205107
"""
for key in b:
if key in a:
if isinstance(a[key], dict) and isinstance(b[key], dict):
merge_dict(a[key], b[key])
elif a[key] == b[key]:
pass
else:
a[key] = b[key]
else:
a[key] = b[key]


def flatten_dict(d, parent_key=()):
"""Flattens dict into single-level-tuple-keyed dict with
{(tuple of keys of parents and self): value}
Returns:
dict of {
(key_lvl1, key_lvl2, key_lvl3, ...): value
}
"""
items = []
for k, v in d.items():
new_key = parent_key + (k if isinstance(k, tuple) else (k,))
if isinstance(v, MutableMapping):
items.extend(flatten_dict(v, parent_key=new_key).items())
else:
items.append((new_key, v))
return type(d)(items)


def unflatten_dict(d_flat):
"""Unflattens single-level-tuple-keyed dict into dict
"""
result = type(d_flat)()
for k_tuple, v in d_flat.items():
d_curr = result
for i, k in enumerate(k_tuple):
if i == len(k_tuple) - 1:
d_curr[k] = v
elif k not in d_curr:
d_curr[k] = type(d_flat)()
d_curr = d_curr[k]
return result


def split_dict(d, rules=None):
"""Splits dict according to "rule"
Returns:
List of split dict
Args:
rule:
A list of tuple (RULE_NAME: REGEX)
If a key name in an JSON object matches with this REGEX
then ALL objects with the same key will be separated from
the original root JSON object while keeping their hierachy.
RULE_NAME will be added to root of each new JSON object.
For example, we have a JSON object like the following
[
{
"flagstat_qc": {
"rep1": {
"read1": 100,
"read2": 200
},
"rep2": {
"read1": 300,
"read2": 400
}
},
"etc": {
"samstat_qc": {
"rep1": {
"unmapped": 500,
"mapped": 600
},
"rep2": {
"unmapped": 700,
"mapped": 800
}
}
},
"idr_qc": {
"qc_test1" : 900
}
}
]
with "new_row_rule" = "replicate:^rep\d+$", this JSON object
will be splitted into three (original, rep1, rep2) JSON object.
[
# original
{
"idr_qc": {
"qc_test1" : 900
}
},
# rep1
{
"replicate": "rep1",
"flagstat_qc": {
"read1": 100,
"read2": 200
},
"etc": {
"samstat_qc": {
"unmapped": 500,
"mapped": 600
}
}
},
# rep2
{
"replicate": "rep2",
"flagstat_qc": {
"read1": 300,
"read2": 400
},
"etc": {
"samstat_qc": {
"unmapped": 700,
"mapped": 800
}
}
},
]
"""
if rules is None:
return [d]
if isinstance(rules, tuple):
rules = [rules]

d_flat = flatten_dict(d)
result = []
keys_matched_regex = set()
d_each_rule = defaultdict(type(d))
for rule_name, rule_regex in rules:
for k_tuple, v in d_flat.items():
new_k_tuple = ()
pattern_matched_k = None
for k in k_tuple:
if re.findall(rule_regex, k):
pattern_matched_k = (rule_name, k)
else:
new_k_tuple += (k,)
if pattern_matched_k is not None:
d_each_rule[pattern_matched_k][new_k_tuple] = v
keys_matched_regex.add(k_tuple)

for (rule_name, k), d_each_matched in d_each_rule.items():
d_ = unflatten_dict(d_each_matched)
d_[rule_name] = k
result.append(d_)

d_others = type(d)()
for k_tuple, v in d_flat.items():
if k_tuple not in keys_matched_regex:
d_others[k_tuple] = v
if d_others:
d_ = unflatten_dict(d_others)
result = [d_] + result
return result


def test():
import json
from collections import OrderedDict
d = OrderedDict({
"flagstat_qc": {
"rep1": {
"read1": 100,
"read2": 200
},
"rep2": {
"read1": 300,
"read2": 400
}
},
"etc": {
"samstat_qc": {
"rep1": {
"unmapped": 500,
"mapped": 600
},
"rep2": {
"unmapped": 700,
"mapped": 800
}
}
},
"idr_qc": {
"qc_test1" : 900
}
})
j = json.dumps(d, indent=4)
print(j)
# j_flat = flatten_dict(d)
# print(j_flat)
jsons_split = split_dict(d, {'replicate': r'^rep\d+$'})
print(json.dumps(jsons_split, indent=4))
# print(split_dict(d, {'replicate': r'^rep\d+$'}))
return 0


if __name__ == '__main__':
test()

0 comments on commit 641db11

Please sign in to comment.