diff --git a/cubids/cli.py b/cubids/cli.py index 59682a2b..d7f15d63 100644 --- a/cubids/cli.py +++ b/cubids/cli.py @@ -8,12 +8,14 @@ import tempfile import tqdm import shutil +import drmaa import pandas as pd from cubids import CuBIDS +from concurrent.futures import ThreadPoolExecutor from pathlib import Path from .validator import (build_validator_call, run_validator, parse_validator_output, - build_subject_paths) + build_subject_paths, build_drmaa_batch) from .metadata_merge import merge_json_into_json logging.basicConfig(level=logging.INFO) @@ -68,6 +70,17 @@ def cubids_validate(): 'sub-01 sub-02 sub-03', nargs='+', required=False) + parser.add_argument('--drmaa', + action='store_true', + default=False, + help='When running the validator sequentially, submit jobs to scheduler instead of running subprocess', + required=False) + parser.add_argument('--n_cpus', + action='store', + type=int, + default=1, + help='Number of cores to utilize', + required=False) opts = parser.parse_args() # Run directly from python using subprocess @@ -111,6 +124,8 @@ def cubids_validate(): # iterate over the dictionary parsed = [] + # it's easier to parallelize a queue + queue = [] if opts.drmaa is True or opts.n_cpus > 1 else False if opts.sequential_subjects: subjects_dict = {k: v for k, v in subjects_dict.items() @@ -121,37 +136,40 @@ def cubids_validate(): logger.info(" ".join(["Processing subject:", subject])) # create a temporary directory and symlink the data - with tempfile.TemporaryDirectory() as tmpdirname: - for fi in files_list: - - # cut the path down to the subject label - bids_start = fi.find(subject) - - # maybe it's a single file - if bids_start < 1: - bids_folder = tmpdirname - fi_tmpdir = tmpdirname - - else: - bids_folder = Path(fi[bids_start:]).parent - fi_tmpdir = tmpdirname + '/' + str(bids_folder) - - if not os.path.exists(fi_tmpdir): - os.makedirs(fi_tmpdir) - output = fi_tmpdir + '/' + str(Path(fi).name) - shutil.copy2(fi, output) - - # run the validator - nifti_head = opts.ignore_nifti_headers - subj_consist = opts.ignore_subject_consistency - call = build_validator_call(tmpdirname, - nifti_head, - subj_consist) + # TMPDIR isn't networked (available on login + exec nodes), so use bids_dir + tmpdir = tempfile.TemporaryDirectory(dir=opts.bids_dir, prefix=".") + for fi in files_list: + + # cut the path down to the subject label + bids_start = fi.find(subject) + + # maybe it's a single file + if bids_start < 1: + bids_folder = tmpdir.name + fi_tmpdir = tmpdir.name + + else: + bids_folder = Path(fi[bids_start:]).parent + fi_tmpdir = tmpdir.name + '/' + str(bids_folder) + + if not os.path.exists(fi_tmpdir): + os.makedirs(fi_tmpdir) + output = fi_tmpdir + '/' + str(Path(fi).name) + shutil.copy2(fi, output) + + # run the validator + nifti_head = opts.ignore_nifti_headers + subj_consist = opts.ignore_subject_consistency + call = build_validator_call(tmpdir.name, + nifti_head, + subj_consist) + + if not queue: ret = run_validator(call) - # parse output + # execute and parse output immediately if ret.returncode != 0: logger.error("Errors returned " - "from validator run, parsing now") + "from validator run, parsing now") # parse the output and add to list if it returns a df decoded = ret.stdout.decode('UTF-8') @@ -159,6 +177,74 @@ def cubids_validate(): if tmp_parse.shape[1] > 1: tmp_parse['subject'] = subject parsed.append(tmp_parse) + else: + queue.append({ + 'call': call, + 'tmpdir': tmpdir + }) + + + if opts.drmaa: + try: + drmaa_ses = drmaa.Session() + drmaa_ses.initialize() + tmpfiles = [] + jids = [] + + for batch in build_drmaa_batch(queue): + tmp = tempfile.NamedTemporaryFile(delete=False, dir=opts.bids_dir, prefix=".", suffix=".sh") + tmp.write(batch['script'].encode()) + tmp.close() # this is very important + os.chmod(tmp.name, 0o755) # make executable + tmpfiles.append(tmp.name) + jt = drmaa_ses.createJobTemplate() + jt.remoteCommand = tmp.name + # jt.args = call[1:] + jt.blockEmail = False + trash = ':' + os.devnull + jt.outputPath = trash + jt.errorPath = trash + jids.append(drmaa_ses.runJob(jt)) + + # wait for all jobs to finish to parse results + logger.info("Waiting for jobs to complete") + drmaa_ses.synchronize(jids, drmaa.Session.TIMEOUT_WAIT_FOREVER, True) + for q in queue: + # parse output + tmpdir = q['tmpdir'] + with open(os.path.join(tmpdir.name, ".cubids"), 'r') as file: + decoded = file.read() + tmp_parse = parse_validator_output(decoded) + if tmp_parse.shape[1] > 1: + tmp_parse['subject'] = subject + parsed.append(tmp_parse) + + tmpdir.cleanup() + drmaa_ses.exit() + except Exception as e: + exc_type, exc_obj, exc_tb = sys.exc_info() + fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] + print(repr(e), fname, exc_tb.tb_lineno) + finally: + for tmp in tmpfiles: + os.remove(tmp) + elif opts.n_cpus > 1: + # run in parallel on multiple cores + logger.info("Running validator in parallel") + with ThreadPoolExecutor(max_workers = opts.n_cpus) as p: + ret = p.map(run_validator, [q['call'] for q in queue]) + + for q in queue: + q['tmpdir'].cleanup() + + # parse output + for r in ret: + decoded = r.stdout.decode('UTF-8') + tmp_parse = parse_validator_output(decoded) + if tmp_parse.shape[1] > 1: + tmp_parse['subject'] = subject + parsed.append(tmp_parse) + # concatenate the parsed data and exit, we're goin home fellas if len(parsed) < 1: diff --git a/cubids/validator.py b/cubids/validator.py index cf1fee7c..56287e79 100644 --- a/cubids/validator.py +++ b/cubids/validator.py @@ -68,6 +68,22 @@ def run_validator(call, verbose=True): stderr=subprocess.PIPE) return(ret) +def build_drmaa_batch(queue): + batch_size = 10 if len(queue) < 1000 else len(queue) // 10 + batch = [] + for batch_start in range(0, len(queue), batch_size): + script = [] + tmpdirs = [] + for idx in range(batch_start, min(batch_start + batch_size, len(queue))): + # bids validator ignore files starting with . https://github.com/bids-standard/bids-validator/issues/348 + line = " ".join(queue[idx]['call']) + ' > ' + os.path.join(queue[idx]['tmpdir'].name, ".cubids") + script.append(line) + tmpdirs.append(queue[idx]['tmpdir']) + batch.append({ + 'script': "\n".join(script), + 'tmpdirs': tmpdirs + }) + return batch def parse_validator_output(output): """Parse the JSON output of the BIDS validator into a pandas dataframe diff --git a/requirements.txt b/requirements.txt index e69de29b..91ffcad8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -0,0 +1 @@ +drmaa