diff --git a/DETAILS.md b/DETAILS.md index 0a724e5c..54d62113 100644 --- a/DETAILS.md +++ b/DETAILS.md @@ -41,6 +41,19 @@ list | WF_ID or STR_LABEL |List submitted workflows on a Cromwell server metadata | WF_ID or STR_LABEL |Retrieve metadata JSONs for workflows debug, troubleshoot | WF_ID, STR_LABEL or
METADATA_JSON_FILE |Analyze reason for errors +* `init`: To initialize Caper on a given platform. This command also downloads Cromwell/Womtool JARs so that Caper can work completely offline with local data files. + + **Platform**|**Description** + :--------|:----- + sherlock | Stanford Sherlock cluster (SLURM) + scg | Stanford SCG cluster (SLURM) + gcp | Google Cloud Platform + aws | Amazon Web Service + local | General local computer + sge | HPC with Sun GridEngine cluster engine + pbs | HPC with PBS cluster engine + slurm | HPC with SLURM cluster engine + * `run`: To run a single workflow. A string label `-s` is optional and useful for other subcommands to indentify a workflow. ```bash @@ -217,11 +230,16 @@ We highly recommend to use a default configuration file described in the section :-----|:-----|:-----|:----- ip|--ip|localhost|Cromwell server IP address or hostname port|--port|8000|Cromwell server port + no-server-heartbeat|--no-server-heartbeat||Flag to disable server heartbeat file. + server-heartbeat-file|--server-heartbeat-file|`~/.caper/default_server_heartbeat`|Heartbeat file for Caper clients to get IP and port of a server. + server-heartbeat-timeout|--server-heartbeat-timeout|120000|Timeout for a heartbeat file in Milliseconds. + cromwell|--cromwell|[cromwell-40.jar](https://github.com/broadinstitute/cromwell/releases/download/40/cromwell-40.jar)|Path or URL for Cromwell JAR file max-concurrent-tasks|--max-concurrent-tasks|1000|Maximum number of concurrent tasks max-concurrent-workflows|--max-concurrent-workflows|40|Maximum number of concurrent workflows max-retries|--max-retries|1|Maximum number of retries for failing tasks disable-call-caching|--disable-call-caching| |Disable Cromwell's call-caching (re-using outputs) + soft-glob-output|--soft-glob-output||Use soft-linking for globbing outputs for a filesystem that does not allow hard-linking: e.g. beeGFS. backend-file|--backend-file| |Custom Cromwell backend conf file. This will override Caper's built-in backends * Troubleshoot parameters for `caper troubleshoot` subcommand. diff --git a/README.md b/README.md index 44b5b880..c391f3a5 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ +**IMPORATNT**: A new flag `--soft-glob-output` is added to use soft-linking for globbing outputs. Use it for `caper server/run` (not for `caper submit`) on a filesystem that does not allow hard-linking: e.g. beeGFS. + **IMPORATNT**: Caper defaults back to **NOT** use a file-based metadata DB, which means no call-caching (re-using outputs from previous workflows) by default. **IMPORATNT**: Even if you still want to use a file-based DB (`--db file` and `--file-db [DB_PATH]`), metadata DB generated from Caper<0.6 (with Cromwell-42) is not compatible with metadata DB generated from Caper>=0.6 (with Cromwell-47). Refer to [this doc](https://github.com/broadinstitute/cromwell/releases/tag/43) for such migration. @@ -45,7 +47,8 @@ Caper is based on Unix and cloud platform CLIs (`curl`, `gsutil` and `aws`) and export PATH=$PATH:~/.local/bin ``` -5) Choose a platform from the following table and initialize Caper. This will create a default Caper configuration file `~/.caper/default.conf`, which have only required parameters for each platform. There are special platforms for Stanford Sherlock/SCG users. +5) Choose a platform from the following table and initialize Caper. This will create a default Caper configuration file `~/.caper/default.conf`, which have only required parameters for each platform. There are special platforms for Stanford Sherlock/SCG users. This will also install Cromwell/Womtool JARs on `~/.caper`. Downloading those files can take up to 10 minutes. Once they are installed, Caper can completely work offline with local data files. + ```bash $ caper init [PLATFORM] ``` @@ -99,11 +102,12 @@ $ sbatch ... --wrap "caper run ..." ## Running pipelines on Stanford Sherlock -> **WARINING**: DO NOT INSTALL CAPER, CONDA AND PIPELINE'S WDL ON `$SCRATCH` OR `$OAK` STORAGES. You will see `Segmentation Fault` errors. Install these executables (Caper, Conda, WDL, ...) on `$HOME` OR `$PI_HOME`. You can still use `$SCRATCH` or `$OAK` for data and Caper's outputs. +> **IMPORTANT**: DO NOT INSTALL CAPER, CONDA AND PIPELINE'S WDL ON `$SCRATCH` OR `$OAK` STORAGES. You will see `Segmentation Fault` errors. Install these executables (Caper, Conda, WDL, ...) on `$HOME` OR `$PI_HOME`. You can still use `$OAK` for input data (e.g. FASTQs defined in your input JSON file) but not for outputs, which means that you should not run Caper on `$OAK`. `$SCRATCH` and `$PI_SCRATCH` are okay for both input and output data so run Caper on them. Running Croo to organize outputs into `$OAK` is okay. Submit a Caper leader job (`caper run`) to SLURM. For a partition `-p [SLURM_PARTITON]`, make sure that you use the same SLURM partition (`slurm-partition` in `~/.caper/default.conf`) as defined in Caper's configuration file. `-J [JOB_NAME]` is to identify Caper's leader job for each workflow. Make a separate directory for each workflow output will be written to each directory. ```bash +$ # DO NOT RUN THIS ON OAK STORAGE! $ # conda activate here if required $ cd [OUTPUT_DIR] # make a separate directory for each workflow. $ sbatch -p [SLURM_PARTITON] -J [JOB_NAME] --export=ALL --mem 3G -t 4-0 --wrap "caper run [WDL] -i [INPUT_JSON]" @@ -200,6 +204,144 @@ $ cd [OUTPUT_DIR] # make a separate directory for each workflow $ caper run [WDL] -i [INPUT_JSON] ``` +## Running pipelines on a custom backend + +If Caper's built-in backends don't work as expected on your clusters (e.g. due to different resource settings), then you can override built-in backends with your own configuration file (e.g. `your.backend.conf`). Caper generates a `backend.conf` for built-in backends on a temporary directory. + +Find this `backend.conf` first by dry-running `caper run [WDL] --dry-run ...`. For example of a `slurm` backend: +``` +$ caper run toy.wdl --dry-run --backend slurm +[Caper] Validating WDL/input JSON with womtool... +Picked up _JAVA_OPTIONS: -Xms256M -Xmx4024M -XX:ParallelGCThreads=1 +Success! +[Caper] cmd: ['java', '-Xmx3G', '-XX:ParallelGCThreads=1', '-DLOG_LEVEL=INFO', '-DLOG_MODE=standard', '-jar', '-Dconfig.file=/mnt/data/scratch/leepc12/caper_out/.caper_tmp/toy/20200309_151256_331283/backend.conf', '/users/leepc12/.caper/cromwell_jar/cromwell-47.jar', 'run', '/mnt/data2/scratch/leepc12/test_caper_refac/toy.wdl', '-i', '/mnt/data/scratch/leepc12/caper_out/.caper_tmp/toy/20200309_151256_331283/inputs.json', '-o', '/mnt/data/scratch/leepc12/caper_out/.caper_tmp/toy/20200309_151256_331283/workflow_opts.json', '-l', '/mnt/data/scratch/leepc12/caper_out/.caper_tmp/toy/20200309_151256_331283/labels.json', '-m', '/mnt/data/scratch/leepc12/caper_out/.caper_tmp/toy/20200309_151256_331283/metadata.json'] +``` + +Look for a file defined with a Java parameter `-Dconfig.file` and find a backend of interest (`slurm` in this example) in the file. +``` +include required(classpath("application")) +backend { + default = "slurm" + providers { + + ... + + slurm { + actor-factory = "cromwell.backend.impl.sfs.config.ConfigBackendLifecycleActorFactory" + config { + default-runtime-attributes { + time = 24 + } + concurrent-job-limit = 1000 + script-epilogue = "sleep 10 && sync" + root = "/mnt/data/scratch/leepc12/caper_out" + runtime-attributes = """ + String? docker + String? docker_user + Int cpu = 1 + Int? gpu + Int? time + Int? memory_mb + String? slurm_partition + String? slurm_account + String? slurm_extra_param + String? singularity + String? singularity_bindpath + String? singularity_cachedir + """ + submit = """ITER=0; until [ $ITER -ge 3 ]; do + sbatch --export=ALL -J ${job_name} -D ${cwd} -o ${out} -e ${err} ${"-t " + time*60} -n 1 --ntasks-per-node=1 ${true="--cpus-per-task=" false="" defined(cpu)}${cpu} ${true="--mem=" false="" defined(memory_mb)}${memory_mb} ${"-p " + slurm_partition} ${"--account " + slurm_account} ${true="--gres gpu:" false="" defined(gpu)}${gpu} ${slurm_extra_param} --wrap "${if defined(singularity) then '' else '/bin/bash ${script} #'} if [ -z \"$SINGULARITY_BINDPATH\" ]; then export SINGULARITY_BINDPATH=${singularity_bindpath}; fi; if [ -z \"$SINGULARITY_CACHEDIR\" ]; then export SINGULARITY_CACHEDIR=${singularity_cachedir}; fi; singularity exec --cleanenv --home ${cwd} ${if defined(gpu) then '--nv' else ''} ${singularity} /bin/bash ${script}" && break + ITER=$[$ITER+1]; sleep 30; done + """ + kill = "scancel ${job_id}" + exit-code-timeout-seconds = 360 + check-alive = "for ITER in 1 2 3; do CHK_ALIVE=$(squeue --noheader -j ${job_id} --format=%i | grep ${job_id}); if [ -z \"$CHK_ALIVE\" ]; then if [ \"$ITER\" == 3 ]; then /bin/bash -c 'exit 1'; else sleep 30; fi; else echo $CHK_ALIVE; break; fi; done" + job-id-regex = "Submitted batch job (\\d+).*" + } + } + + ... + +} + +... + +```` + +Some part of the script (wrapped in `${}`) is written in WDL. For example, `${true="--mem=" false="" defined(memory_mb)}`, if `memory_mb` is defined it will print `--mem`). For such WDL expressions, you can use any variables defined in `runtime-attributes`. + +For example, if your cluster does not allow importing all environment variables (`sbatch --export=ALL` ...) then you can remove `--export=ALL` from the above script. + +There is a retrial logic implemented in this SLURM backend. It retries submitting up to 3 times for some SLURM clusters. +``` +ITER=0; until [ $ITER -ge 3 ]; do +... +ITER=$[$ITER+1]; sleep 30; done +``` + +Also, there is another logic to use Singularity. If `singularity` is not given, then Cromwell will run `/bin/bash ${script}` otherwise this backend will collect some Singularity specific environment variables and finally run `singularity exec --cleanenv --home ${cwd} ${singularity} /bin/bash ${script}`. `${singularity}` is a variable that has singularity image location defined in `runtime-attributes` mentioned above. +``` +sbatch ... --wrap "${if defined(singularity) then '' else '/bin/bash ${script} #`} ..." +``` + +There are some built-in variables (`out`, `err`, `cwd`, `script`, `cpu`, `memory_mb` and `time`) in Cromwell, which are important to keep Cromwell's task running. For example, if you remove `-o ${out}` from the script and Cromwell will fail to find `stdout` on output directory, which will lead to a pipeline failure. + +See more [details](https://cromwell.readthedocs.io/en/stable/Configuring/) about a backend configuration file. + +Your custom `your.backend.conf` file will override on Caper's existing built-in backend, so keep modified parts (`submit` command line in this example) only in your `your.backend.conf` file. +``` +backend { + default = "slurm" + providers { + slurm { + submit = """sbatch --export=ALL -J ${job_name} -D ${cwd} -o ${out} -e ${err} ${"-t " + time*60} -n 1 --ntasks-per-node=1 ${true="--cpus-per-task=" false="" defined(cpu)}${cpu} ${true="--mem=" false="" defined(memory_mb)}${memory_mb} ${"-p " + slurm_partition} ${"--account " + slurm_account} ${true="--gres gpu:" false="" defined(gpu)}${gpu} ${slurm_extra_param} --wrap "${if defined(singularity) then '' else '/bin/bash ${script} #'} if [ -z \"$SINGULARITY_BINDPATH\" ]; then export SINGULARITY_BINDPATH=${singularity_bindpath}; fi; if [ -z \"$SINGULARITY_CACHEDIR\" ]; then export SINGULARITY_CACHEDIR=${singularity_cachedir}; fi; singularity exec --cleanenv --home ${cwd} ${if defined(gpu) then '--nv' else ''} ${singularity} /bin/bash ${script}" && break + ITER=$[$ITER+1]; sleep 30; done + """ + } + } +} +``` + +And then run `caper run` with your `your.backend.conf`. +``` +$ caper run ... --backend-file your.backend.conf +``` + + +## Caper server heartbeat (running multiple servers) + +Caper server writes a heartbeat file (specified by `--server-heartbeat-file`) on every 120 seconds (controlled by `--server-heartbeat-timeout`). This file will contain an IP(hostname)/PORT pair of the running `caper server`. + +Example heartbeat file: +```bash +$ cat ~/.caper/default_server_heartbeat +kadru.stanford.edu:8000 +``` + +This heartbeat file is useful when users don't want to find IP(hostname)/PORT of a running `caper server` especially when they `qsub`bed or `sbatch`ed `caper server` on their clusters. For such cases, IP (hostname of node/instance) of the server is later determined after the cluster engine starts the submitted `caper server` job and it's inconvenient for the users to find the IP (hostname) of the running server manually with `qstat` or `squeue` and add it back to Caper's configuration file `~/.caper/default.conf`. + +Therefore, Caper defaults to use this heartbeat file (can be disabled by a flag `--no-server-heartbeat`). So if client-side caper functions like `caper list` and `caper metadata` finds this heartbeat file and automatically parse it to get an IP/PORT pair. + +However, there can be a conflict if users want to run multiple `caper server`s on the same machine (or multiple machines sharing the same caper configuration directory `~/.caper/` and hence the same default heartbeat file). For such cases, users can disable this heartbeat feature by adding the following line to their configuration file: e.g. `~/.caper/default.conf`. +```bash +no-server-heartbeat=True +``` + +Then start multiple servers with different port and DB (for example of MySQL). Users should make sure that each server uses a different DB (file or MySQL server port, whatever...) since there is no point of using multiple Caper servers with the same DB. For example of MySQL, users should not forget to spin up multiple MySQL servers with different ports. + +```bash +$ caper server --port 8000 --mysql-db-port 3306 ... & +$ caper server --port 8001 --mysql-db-port 3307 ... & +$ caper server --port 8002 --mysql-db-port 3308 ... & +``` + +Send queries to a specific server. +```bash +$ caper list --port 8000 +$ caper list --port 8001 +$ caper list --port 8002 +``` + ## Metadata database If you are not interested in resuming failed workflows skip this section. diff --git a/caper/caper.py b/caper/caper.py index 1e5d8ce3..20ae84b6 100644 --- a/caper/caper.py +++ b/caper/caper.py @@ -30,6 +30,8 @@ from .dict_tool import merge_dict from .caper_args import parse_caper_arguments +from .caper_init import init_caper_conf, install_cromwell_jar, install_womtool_jar + from .caper_check import check_caper_conf from .cromwell_rest_api import CromwellRestAPI from .caper_uri import URI_S3, URI_GCS, URI_LOCAL, \ @@ -38,14 +40,13 @@ CaperBackendCommon, CaperBackendDatabase, CaperBackendGCP, \ CaperBackendAWS, CaperBackendLocal, CaperBackendSLURM, \ CaperBackendSGE, CaperBackendPBS +from .caper_backend import CaperBackendBase, CaperBackendBaseLocal class Caper(object): """Cromwell/WDL wrapper """ - CROMWELL_JAR_DIR = '~/.caper/cromwell_jar' - WOMTOOL_JAR_DIR = '~/.caper/womtool_jar' BACKEND_CONF_HEADER = 'include required(classpath("application"))\n' DEFAULT_BACKEND = BACKEND_LOCAL RE_PATTERN_BACKEND_CONF_HEADER = r'^\s*include\s' @@ -64,8 +65,11 @@ class Caper(object): RE_PATTERN_DELIMITER_GCP_ZONES = r',| ' USER_INTERRUPT_WARNING = '\n********** DO NOT CTRL+C MULTIPLE TIMES **********\n' + MAX_RETRY_UPDATING_METADATA = 3 SEC_INTERVAL_UPDATE_METADATA = 1200.0 SEC_INTERVAL_UPDATE_SERVER_HEARTBEAT = 60.0 + SEC_INTERVAL_RETRY_UPDATING_METADATA = 10.0 + # added to cromwell labels file KEY_CAPER_STR_LABEL = 'caper-str-label' KEY_CAPER_USER = 'caper-user' @@ -87,6 +91,7 @@ def __init__(self, args): action=args.get('action'), ip=args.get('ip'), port=args.get('port'), + no_server_hearbeat=args.get('no_server_heartbeat'), server_hearbeat_file=args.get('server_heartbeat_file'), server_hearbeat_timeout=args.get('server_heartbeat_timeout')) @@ -111,6 +116,7 @@ def __init__(self, args): self._tmp_dir = os.path.abspath(self._tmp_dir) self._gcp_prj = args.get('gcp_prj') self._gcp_zones = args.get('gcp_zones') + self._gcp_call_caching_dup_strat = args.get('gcp_call_caching_dup_strat') self._out_gcs_bucket = args.get('out_gcs_bucket') self._out_s3_bucket = args.get('out_s3_bucket') self._aws_batch_arn = args.get('aws_batch_arn') @@ -125,6 +131,7 @@ def __init__(self, args): self._pbs_extra_param = args.get('pbs_extra_param') self._backend_file = args.get('backend_file') + self._soft_glob_output = args.get('soft_glob_output') self._wdl = args.get('wdl') self._inputs = args.get('inputs') self._cromwell = args.get('cromwell') @@ -210,7 +217,7 @@ def run(self): cmd = ['java', java_heap, '-XX:ParallelGCThreads=1', '-DLOG_LEVEL=INFO', '-DLOG_MODE=standard', '-jar', '-Dconfig.file={}'.format(backend_file), - self.__download_cromwell_jar(), 'run', + install_cromwell_jar(self._cromwell), 'run', CaperURI(self._wdl).get_local_file(), '-i', input_file, '-o', workflow_opts_file, @@ -222,7 +229,7 @@ def run(self): if not self._ignore_womtool: # run womtool first to validate WDL and input JSON cmd_womtool = ['java', '-Xmx512M', '-jar', - self.__download_womtool_jar(), + install_womtool_jar(self._womtool), 'validate', CaperURI(self._wdl).get_local_file(), '-i', input_file] try: @@ -305,7 +312,7 @@ def server(self): cmd = ['java', java_heap, '-XX:ParallelGCThreads=1', '-DLOG_LEVEL=INFO', '-DLOG_MODE=standard', '-jar', '-Dconfig.file={}'.format(backend_file), - self.__download_cromwell_jar(), 'server'] + install_cromwell_jar(self._cromwell), 'server'] print('[Caper] cmd: ', cmd) # pending/running workflows @@ -393,7 +400,7 @@ def submit(self): # run womtool first to validate WDL and input JSON if not self._ignore_womtool: cmd_womtool = ['java', '-Xmx512M', '-jar', - self.__download_womtool_jar(), + install_womtool_jar(self._womtool), 'validate', CaperURI(self._wdl).get_local_file(), '-i', input_file] try: @@ -489,14 +496,16 @@ def list(self): if f == 'workflow_id': row.append(str(workflow_id)) elif f == 'str_label': - lbl = self._cromwell_rest_api.get_label( - workflow_id, - Caper.KEY_CAPER_STR_LABEL) + if 'labels' in w and Caper.KEY_CAPER_STR_LABEL in w['labels']: + lbl = w['labels'][Caper.KEY_CAPER_STR_LABEL] + else: + lbl = None row.append(str(lbl)) elif f == 'user': - lbl = self._cromwell_rest_api.get_label( - workflow_id, - Caper.KEY_CAPER_USER) + if 'labels' in w and Caper.KEY_CAPER_USER in w['labels']: + lbl = w['labels'][Caper.KEY_CAPER_USER] + else: + lbl = None row.append(str(lbl)) else: row.append(str(w[f] if f in w else None)) @@ -528,8 +537,10 @@ def troubleshoot(self): Caper.__troubleshoot(metadata, self._show_completed_task) def __init_cromwell_rest_api(self, action, ip, port, + no_server_hearbeat, server_hearbeat_file, server_hearbeat_timeout): + self._no_server_hearbeat = no_server_hearbeat self._server_hearbeat_file = server_hearbeat_file self._ip, self._port = \ self.__read_heartbeat_file(action, ip, port, server_hearbeat_timeout) @@ -538,7 +549,7 @@ def __init_cromwell_rest_api(self, action, ip, port, ip=self._ip, port=self._port, verbose=False) def __read_heartbeat_file(self, action, ip, port, server_hearbeat_timeout): - if self._server_hearbeat_file is not None: + if not self._no_server_hearbeat and self._server_hearbeat_file is not None: self._server_hearbeat_file = os.path.expanduser( self._server_hearbeat_file) if action != 'server': @@ -556,7 +567,7 @@ def __read_heartbeat_file(self, action, ip, port, server_hearbeat_timeout): return ip, port def __write_heartbeat_file(self): - if self._server_hearbeat_file is not None: + if not self._no_server_hearbeat and self._server_hearbeat_file is not None: while True: try: print('[Caper] Writing heartbeat', @@ -578,55 +589,34 @@ def __write_heartbeat_file(self): if self._stop_heartbeat_thread: break - def __download_cromwell_jar(self): - """Download cromwell-X.jar - """ - cu = CaperURI(self._cromwell) - if cu.uri_type == URI_LOCAL: - return cu.get_uri() - - path = os.path.join( - os.path.expanduser(Caper.CROMWELL_JAR_DIR), - os.path.basename(self._cromwell)) - return cu.copy(target_uri=path) - - def __download_womtool_jar(self): - """Download womtool-X.jar - """ - cu = CaperURI(self._womtool) - if cu.uri_type == URI_LOCAL: - return cu.get_uri() - - path = os.path.join( - os.path.expanduser(Caper.WOMTOOL_JAR_DIR), - os.path.basename(self._womtool)) - return cu.copy(target_uri=path) - def __write_metadata_jsons(self, workflow_ids): - try: - for wf_id in workflow_ids: - # get metadata for wf_id - m = self._cromwell_rest_api.get_metadata([wf_id]) - assert(len(m) == 1) - metadata = m[0] - if 'labels' in metadata and \ - 'caper-backend' in metadata['labels']: - backend = \ - metadata['labels']['caper-backend'] - else: - backend = None - - if backend is not None: - self.__write_metadata_json( - wf_id, metadata, - backend=backend, - wdl=metadata['workflowName']) - return True - except Exception as e: - print('[Caper] Exception caught while retrieving ' - 'metadata from Cromwell server. Keeping running... ', - str(e), workflow_ids) - return False + for wf_id in workflow_ids.copy(): + for trial in range(Caper.MAX_RETRY_UPDATING_METADATA + 1): + try: + time.sleep(Caper.SEC_INTERVAL_RETRY_UPDATING_METADATA) + # get metadata for wf_id + m = self._cromwell_rest_api.get_metadata([wf_id]) + assert(len(m) == 1) + metadata = m[0] + if 'labels' in metadata and \ + 'caper-backend' in metadata['labels']: + backend = \ + metadata['labels']['caper-backend'] + else: + backend = None + + if backend is not None: + self.__write_metadata_json( + wf_id, metadata, + backend=backend, + wdl=metadata['workflowName']) + except Exception as e: + print('[Caper] Exception caught while retrieving ' + 'metadata from Cromwell server. ' + 'trial: {t}, wf_id: {wf_id}'.format( + t=trial, wf_id=wf_id), + str(e)) + break def __write_metadata_json(self, workflow_id, metadata_json, backend=None, wdl=None): @@ -910,12 +900,21 @@ def __get_backend_conf_str(self): disable_call_caching=self._disable_call_caching, max_concurrent_workflows=self._max_concurrent_workflows)) + # common settings for all backends + if self._max_concurrent_tasks is not None: + CaperBackendBase.CONCURRENT_JOB_LIMIT = self._max_concurrent_tasks + + # common settings for local-based backends + if self._soft_glob_output is not None: + CaperBackendBaseLocal.USE_SOFT_GLOB_OUTPUT = self._soft_glob_output + if self._out_dir is not None: + CaperBackendBaseLocal.OUT_DIR = self._out_dir + # local backend merge_dict( backend_dict, - CaperBackendLocal( - out_dir=self._out_dir, - concurrent_job_limit=self._max_concurrent_tasks)) + CaperBackendLocal()) + # GC if self._gcp_prj is not None and self._out_gcs_bucket is not None: merge_dict( @@ -923,7 +922,8 @@ def __get_backend_conf_str(self): CaperBackendGCP( gcp_prj=self._gcp_prj, out_gcs_bucket=self._out_gcs_bucket, - concurrent_job_limit=self._max_concurrent_tasks)) + call_caching_dup_strat=self._gcp_call_caching_dup_strat)) + # AWS if self._aws_batch_arn is not None and self._aws_region is not None \ and self._out_s3_bucket is not None: @@ -932,35 +932,30 @@ def __get_backend_conf_str(self): CaperBackendAWS( aws_batch_arn=self._aws_batch_arn, aws_region=self._aws_region, - out_s3_bucket=self._out_s3_bucket, - concurrent_job_limit=self._max_concurrent_tasks)) + out_s3_bucket=self._out_s3_bucket)) + # SLURM merge_dict( backend_dict, CaperBackendSLURM( - out_dir=self._out_dir, partition=self._slurm_partition, account=self._slurm_account, - extra_param=self._slurm_extra_param, - concurrent_job_limit=self._max_concurrent_tasks)) + extra_param=self._slurm_extra_param)) + # SGE merge_dict( backend_dict, CaperBackendSGE( - out_dir=self._out_dir, pe=self._sge_pe, queue=self._sge_queue, - extra_param=self._sge_extra_param, - concurrent_job_limit=self._max_concurrent_tasks)) + extra_param=self._sge_extra_param)) # PBS merge_dict( backend_dict, CaperBackendPBS( - out_dir=self._out_dir, queue=self._pbs_queue, - extra_param=self._pbs_extra_param, - concurrent_job_limit=self._max_concurrent_tasks)) + extra_param=self._pbs_extra_param)) # Database merge_dict( @@ -1245,6 +1240,10 @@ def main(): # parse arguments # note that args is a dict args = parse_caper_arguments() + action = args['action'] + if action == 'init': + init_caper_conf(args) + sys.exit(0) args = check_caper_conf(args) # init caper uri to transfer files across various storages @@ -1259,10 +1258,9 @@ def main(): use_gsutil_over_aws_s3=args.get('use_gsutil_over_aws_s3'), verbose=True) - # init caper: taking all args at init step + # initialize caper: taking all args at init step c = Caper(args) - action = args['action'] if action == 'run': c.run() elif action == 'server': diff --git a/caper/caper_args.py b/caper/caper_args.py index 40ebb4bf..b156e2f8 100644 --- a/caper/caper_args.py +++ b/caper/caper_args.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 """CaperArgs: Command line arguments parser for Caper Author: @@ -11,16 +10,14 @@ import os from distutils.util import strtobool from collections import OrderedDict -from .caper_backend import BACKENDS, BACKENDS_WITH_ALIASES -from .caper_backend import BACKEND_GCP, BACKEND_AWS, BACKEND_LOCAL -from .caper_backend import BACKEND_SLURM, BACKEND_SGE, BACKEND_PBS +from .caper_backend import CaperBackendDatabase +from .caper_backend import CaperBackendGCP +from .caper_backend import BACKENDS, BACKEND_LOCAL from .caper_backend import BACKEND_ALIAS_LOCAL -from .caper_backend import BACKEND_ALIAS_GOOGLE, BACKEND_ALIAS_AMAZON from .caper_backend import BACKEND_ALIAS_SHERLOCK, BACKEND_ALIAS_SCG -from .caper_backend import CaperBackendDatabase -__version__ = '0.6.5' +__version__ = '0.7.0' DEFAULT_JAVA_HEAP_SERVER = '10G' DEFAULT_JAVA_HEAP_RUN = '3G' @@ -50,69 +47,10 @@ DEFAULT_SERVER_HEARTBEAT_FILE = '~/.caper/default_server_heartbeat' DEFAULT_SERVER_HEARTBEAT_TIMEOUT_MS = 120000 DEFAULT_CONF_CONTENTS = '\n\n' +DEFAULT_GCP_CALL_CACHING_DUP_STRAT = CaperBackendGCP.CALL_CACHING_DUP_STRAT_REFERENCE + DYN_FLAGS = ['--singularity', '--docker'] INVALID_EXT_FOR_DYN_FLAG = '.wdl' -DEFAULT_CONF_CONTENTS_LOCAL = """backend=local - -# DO NOT use /tmp here -# Caper stores all important temp files and cached big data files here -tmp-dir= -""" -DEFAULT_CONF_CONTENTS_SHERLOCK = """backend=slurm -slurm-partition= - -# DO NOT use /tmp here -# Caper stores all important temp files and cached big data files here -tmp-dir= -""" -DEFAULT_CONF_CONTENTS_SCG = """backend=slurm -slurm-account= - -# DO NOT use /tmp here -# Caper stores all important temp files and cached big data files here -tmp-dir= -""" -DEFAULT_CONF_CONTENTS_SLURM = """backend=slurm - -# define one of the followings (or both) according to your -# cluster's SLURM configuration. -slurm-partition= -slurm-account= - -# DO NOT use /tmp here -# Caper stores all important temp files and cached big data files here -tmp-dir= -""" -DEFAULT_CONF_CONTENTS_SGE = """backend=sge -sge-pe= - -# DO NOT use /tmp here -# Caper stores all important temp files and cached big data files here -tmp-dir= -""" -DEFAULT_CONF_CONTENTS_PBS = """backend=pbs - -# DO NOT use /tmp here -# Caper stores all important temp files and cached big data files here -tmp-dir= -""" -DEFAULT_CONF_CONTENTS_AWS = """backend=aws -aws-batch-arn= -aws-region= -out-s3-bucket= - -# DO NOT use /tmp here -# Caper stores all important temp files and cached big data files here -tmp-dir= -""" -DEFAULT_CONF_CONTENTS_GCP = """backend=gcp -gcp-prj= -out-gcs-bucket= - -# DO NOT use /tmp here -# Caper stores all important temp files and cached big data files here -tmp-dir= -""" def process_dyn_flags(remaining_args, dyn_flags, @@ -140,33 +78,6 @@ def process_dyn_flags(remaining_args, dyn_flags, return remaining_args -def init_caper_conf(args): - backend = args.get('platform') - assert(backend in BACKENDS_WITH_ALIASES) - if backend in (BACKEND_LOCAL, BACKEND_ALIAS_LOCAL): - contents = DEFAULT_CONF_CONTENTS_LOCAL - elif backend == BACKEND_ALIAS_SHERLOCK: - contents = DEFAULT_CONF_CONTENTS_SHERLOCK - elif backend == BACKEND_ALIAS_SCG: - contents = DEFAULT_CONF_CONTENTS_SCG - elif backend == BACKEND_SLURM: - contents = DEFAULT_CONF_CONTENTS_SLURM - elif backend == BACKEND_SGE: - contents = DEFAULT_CONF_CONTENTS_SGE - elif backend == BACKEND_PBS: - contents = DEFAULT_CONF_CONTENTS_PBS - elif backend in (BACKEND_GCP, BACKEND_ALIAS_GOOGLE): - contents = DEFAULT_CONF_CONTENTS_GCP - elif backend in (BACKEND_AWS, BACKEND_ALIAS_AMAZON): - contents = DEFAULT_CONF_CONTENTS_AWS - else: - raise Exception('Unsupported platform/backend/alias.') - - conf_file = os.path.expanduser(args.get('conf')) - with open(conf_file, 'w') as fp: - fp.write(contents + '\n') - - def parse_caper_arguments(): """Argument parser for Caper """ @@ -321,6 +232,14 @@ def parse_caper_arguments(): group_cromwell.add_argument( '--backend-file', help='Custom Cromwell backend configuration file to override all') + group_cromwell.add_argument( + '--soft-glob-output', action='store_true', + help='Use soft-linking when globbing outputs for a filesystem that ' + 'does not allow hard-linking. e.g. beeGFS. ' + 'This flag does not work with backends based on a Docker container. ' + 'i.e. gcp and aws. Also, ' + 'it does not work with local backends (local/slurm/sge/pbs) ' + 'with --docker. However, it works fine with --singularity.') group_local = parent_host.add_argument_group( title='local backend arguments') @@ -330,14 +249,22 @@ def parse_caper_arguments(): '--tmp-dir', help='Temporary directory for local backend') group_gc = parent_host.add_argument_group( - title='GC backend arguments') + title='GCP backend arguments') group_gc.add_argument('--gcp-prj', help='GC project') group_gc.add_argument('--gcp-zones', help='GCP zones (e.g. us-west1-b,' 'us-central1-b)') group_gc.add_argument( - '--out-gcs-bucket', help='Output GCS bucket for GC backend') + '--gcp-call-caching-dup-strat', default=DEFAULT_GCP_CALL_CACHING_DUP_STRAT, + choices=[ + CaperBackendGCP.CALL_CACHING_DUP_STRAT_REFERENCE, + CaperBackendGCP.CALL_CACHING_DUP_STRAT_COPY + ], + help='Duplication strategy for call-cached outputs for GCP backend: ' + 'copy: make a copy, reference: refer to old output in metadata.json.') + group_gc.add_argument( + '--out-gcs-bucket', help='Output GCS bucket for GCP backend') group_gc.add_argument( - '--tmp-gcs-bucket', help='Temporary GCS bucket for GC backend') + '--tmp-gcs-bucket', help='Temporary GCS bucket for GCP backend') group_aws = parent_host.add_argument_group( title='AWS backend arguments') @@ -513,6 +440,9 @@ def parse_caper_arguments(): parent_server_client.add_argument( '--ip', default=DEFAULT_IP, help='IP address for Caper server') + parent_server_client.add_argument( + '--no-server-heartbeat', action='store_true', + help='Disable server heartbeat file.') parent_server_client.add_argument( '--server-heartbeat-file', default=DEFAULT_SERVER_HEARTBEAT_FILE, @@ -606,7 +536,9 @@ def parse_caper_arguments(): # string to boolean for k in [ 'dry_run', + 'no_server_heartbeat', 'disable_call_caching', + 'soft_glob_output', 'use_gsutil_over_aws_s3', 'hold', 'no_deepcopy', @@ -632,10 +564,4 @@ def parse_caper_arguments(): if v is not None and isinstance(v, str): args_d[k] = int(v) - # if action is 'init' then initialize Conf and exit - action = args_d['action'] - if action == 'init': - init_caper_conf(args_d) - sys.exit(0) - return args_d diff --git a/caper/caper_backend.py b/caper/caper_backend.py index 9ea5f274..d6c49e0b 100644 --- a/caper/caper_backend.py +++ b/caper/caper_backend.py @@ -1,6 +1,8 @@ -#!/usr/bin/env python3 """Caper backend """ +from collections import UserDict +from copy import deepcopy +from .dict_tool import merge_dict BACKEND_GCP = 'gcp' BACKEND_AWS = 'aws' @@ -34,11 +36,11 @@ def get_backend(backend): elif backend == BACKEND_ALIAS_LOCAL: backend = BACKEND_LOCAL if backend not in BACKENDS: - raise Exception('Unsupported backend: {}'.format(backend)) + raise ValueError('Unsupported backend: {}'.format(backend)) return backend -class CaperBackendCommon(dict): +class CaperBackendCommon(UserDict): """Common stanzas for all Caper backends """ TEMPLATE = { @@ -75,8 +77,7 @@ class CaperBackendCommon(dict): def __init__(self, port=None, disable_call_caching=None, max_concurrent_workflows=None): - super(CaperBackendCommon, self).__init__( - CaperBackendCommon.TEMPLATE) + super().__init__(CaperBackendCommon.TEMPLATE) if port is not None: self['webservice']['port'] = port if disable_call_caching is not None: @@ -86,7 +87,7 @@ def __init__(self, port=None, disable_call_caching=None, max_concurrent_workflows -class CaperBackendDatabase(dict): +class CaperBackendDatabase(UserDict): """Common stanzas for database """ DB_TYPE_IN_MEMORY = 'in-memory' @@ -141,8 +142,7 @@ def __init__(self, db_type=None, db_timeout=None, postgresql_ip=None, postgresql_port=None, postgresql_user=None, postgresql_password=None, postgresql_name=None): - super(CaperBackendDatabase, self).__init__( - CaperBackendDatabase.TEMPLATE) + super().__init__(CaperBackendDatabase.TEMPLATE) if db_type == CaperBackendDatabase.DB_TYPE_IN_MEMORY: self['database'] = CaperBackendDatabase.TEMPLATE_DB_IN_MEMORY @@ -171,27 +171,118 @@ def __init__(self, db_type=None, db_timeout=None, db['password'] = postgresql_password else: - raise Exception('Unsupported DB type {}'.format(db_type)) + raise ValueError('Unsupported DB type {}'.format(db_type)) if db_timeout is not None and 'db' in self['database']: self['database']['db']['connectionTimeout'] = db_timeout -class CaperBackendGCP(dict): +class CaperBackendBase(UserDict): + """Base skeleton backend for all backends + """ + CONCURRENT_JOB_LIMIT = None + TEMPLATE = { + "backend": { + "providers": { + } + } + } + TEMPLATE_BACKEND = { + "actor-factory": None, + "config": { + "default-runtime-attributes": { + }, + "concurrent-job-limit": None + } + } + + def __init__(self, dict_to_override_self=None, backend_name=None): + """ + Args: + dict_to_override_self: dict to override self + backend_name: backend name + """ + if backend_name is None: + raise ValueError('backend_name must be provided.') + self._backend_name = backend_name + + super().__init__(deepcopy(CaperBackendBase.TEMPLATE)) + + config = self.get_backend_config() + + if CaperBackendBase.CONCURRENT_JOB_LIMIT is None: + raise ValueError('You must define CaperBackendBase.CONCURRENT_JOB_LIMIT.') + config['concurrent-job-limit'] = CaperBackendBase.CONCURRENT_JOB_LIMIT + + if dict_to_override_self is not None: + merge_dict(self, deepcopy(dict_to_override_self)) + + @property + def backend_name(self): + return self._backend_name + + def get_backend(self): + if self._backend_name not in self['backend']['providers']: + self['backend']['providers'][self._backend_name] = \ + deepcopy(CaperBackendBase.TEMPLATE_BACKEND) + return self['backend']['providers'][self._backend_name] + + def get_backend_config(self): + return self.get_backend()['config'] + + +class CaperBackendBaseLocal(CaperBackendBase): + """Base backend for all local backends (including HPCs with cluster engine) + """ + USE_SOFT_GLOB_OUTPUT = None + OUT_DIR = None + + SOFT_GLOB_OUTPUT_CMD = 'ln -sL GLOB_PATTERN GLOB_DIRECTORY 2> /dev/null' + + TEMPLATE_BACKEND = { + "actor-factory": "cromwell.backend.impl.sfs.config.ConfigBackendLifecycleActorFactory", + "config": { + "script-epilogue": "sleep 10 && sync", + "root": None + } + } + def __init__(self, dict_to_override_self=None, backend_name=None): + super().__init__(backend_name=backend_name) + + merge_dict( + self.get_backend(), + deepcopy(CaperBackendBaseLocal.TEMPLATE_BACKEND)) + config = self.get_backend_config() + + if CaperBackendBaseLocal.USE_SOFT_GLOB_OUTPUT is None: + raise ValueError('You must define CaperBackendBase.USE_SOFT_GLOB_OUTPUT.') + if CaperBackendBaseLocal.USE_SOFT_GLOB_OUTPUT: + config['glob-link-command'] = CaperBackendBaseLocal.SOFT_GLOB_OUTPUT_CMD + + if CaperBackendBaseLocal.OUT_DIR is None: + raise ValueError('You must define CaperBackendBase.OUT_DIR.') + config['root'] = CaperBackendBaseLocal.OUT_DIR + + if dict_to_override_self is not None: + merge_dict(self, deepcopy(dict_to_override_self)) + + +class CaperBackendGCP(CaperBackendBase): """Google Cloud backend """ + CALL_CACHING_DUP_STRAT_REFERENCE = 'reference' + CALL_CACHING_DUP_STRAT_COPY = 'copy' + TEMPLATE = { "backend": { "providers": { BACKEND_GCP: { - "actor-factory": "cromwell.backend.google.pipelines." - "v2alpha1.PipelinesApiLifecycleActorFactory", + "actor-factory": "cromwell.backend.google.pipelines.v2alpha1.PipelinesApiLifecycleActorFactory", "config": { "default-runtime-attributes": { }, - "project": "YOUR_GCP_PROJECT", - "root": "gs://YOUR_GCS_BUCKET", - "concurrent-job-limit": 1000, + "project": None, + "root": None, "genomics-api-queries-per-100-seconds": 1000, "maximum-polling-interval": 600, "genomics": { @@ -202,7 +293,10 @@ class CaperBackendGCP(dict): }, "filesystems": { "gcs": { - "auth": "application-default" + "auth": "application-default", + "caching": { + "duplication-strategy": None + } } } } @@ -220,35 +314,47 @@ class CaperBackendGCP(dict): } } - def __init__(self, gcp_prj, out_gcs_bucket, concurrent_job_limit=None): - super(CaperBackendGCP, self).__init__( - CaperBackendGCP.TEMPLATE) - config = self['backend']['providers'][BACKEND_GCP]['config'] + def __init__(self, gcp_prj, out_gcs_bucket, + call_caching_dup_strat=None): + super().__init__( + CaperBackendGCP.TEMPLATE, + backend_name=BACKEND_GCP) + config = self.get_backend_config() + config['project'] = gcp_prj config['root'] = out_gcs_bucket - assert(out_gcs_bucket.startswith('gs://')) - if concurrent_job_limit is not None: - config['concurrent-job-limit'] = concurrent_job_limit + if not out_gcs_bucket.startswith('gs://'): + raise ValueError('Wrong GCS bucket URI for out_gcs_bucket: {v}'.format( + v=out_gcs_bucket)) + + if call_caching_dup_strat is None: + dup_strat = CaperBackendGCP.CALL_CACHING_DUP_STRAT_REFERENCE + else: + dup_strat = call_caching_dup_strat + if dup_strat not in ( + CaperBackendGCP.CALL_CACHING_DUP_STRAT_REFERENCE, + CaperBackendGCP.CALL_CACHING_DUP_STRAT_COPY): + raise ValueError('Wrong call_caching_dup_strat: {v}'.format( + v=call_caching_dup_strat)) + config['filesystems']['gcs']['caching']['duplication-strategy'] = dup_strat -class CaperBackendAWS(dict): +class CaperBackendAWS(CaperBackendBase): """AWS backend """ TEMPLATE = { "backend": { "providers": { BACKEND_AWS: { - "actor-factory": "cromwell.backend.impl.aws." - "AwsBatchBackendLifecycleActorFactory", + "actor-factory": "cromwell.backend.impl.aws.AwsBatchBackendLifecycleActorFactory", "config": { "default-runtime-attributes": { - "queueArn": "YOUR_AWS_BATCH_ARN" + "queueArn": None }, "numSubmitAttempts": 6, "numCreateDefinitionAttempts": 6, - "root": "s3://YOUR_S3_BUCKET", - "concurrent-job-limit": 1000, + "root": None, "auth": "default", "filesystems": { "s3": { @@ -267,7 +373,7 @@ class CaperBackendAWS(dict): "scheme": "default" } ], - "region": "YOUR_AWS_REGION" + "region": None }, "engine": { "filesystems": { @@ -278,21 +384,20 @@ class CaperBackendAWS(dict): } } - def __init__(self, aws_batch_arn, aws_region, out_s3_bucket, - concurrent_job_limit=None): - super(CaperBackendAWS, self).__init__( - CaperBackendAWS.TEMPLATE) + def __init__(self, aws_batch_arn, aws_region, out_s3_bucket): + super().__init__( + CaperBackendAWS.TEMPLATE, + backend_name=BACKEND_AWS) self[BACKEND_AWS]['region'] = aws_region - config = self['backend']['providers'][BACKEND_AWS]['config'] + config = self.get_backend_config() config['default-runtime-attributes']['queueArn'] = aws_batch_arn config['root'] = out_s3_bucket - assert(out_s3_bucket.startswith('s3://')) + if not out_s3_bucket.startswith('s3://'): + raise ValueError('Wrong S3 bucket URI for out_s3_bucket: {v}'.format( + v=out_s3_bucket)) - if concurrent_job_limit is not None: - config['concurrent-job-limit'] = concurrent_job_limit - -class CaperBackendLocal(dict): +class CaperBackendLocal(CaperBackendBaseLocal): """Local backend """ RUNTIME_ATTRIBUTES = """ @@ -329,14 +434,8 @@ class CaperBackendLocal(dict): "backend": { "providers": { BACKEND_LOCAL: { - "actor-factory": "cromwell.backend.impl.sfs.config." - "ConfigBackendLifecycleActorFactory", "config": { - "default-runtime-attributes": { - }, "run-in-background": True, - "script-epilogue": "sleep 10 && sync", - "concurrent-job-limit": 1000, "runtime-attributes": RUNTIME_ATTRIBUTES, "submit": SUBMIT, "submit-docker" : SUBMIT_DOCKER @@ -346,17 +445,13 @@ class CaperBackendLocal(dict): } } - def __init__(self, out_dir, concurrent_job_limit=None): - super(CaperBackendLocal, self).__init__( - CaperBackendLocal.TEMPLATE) - config = self['backend']['providers'][BACKEND_LOCAL]['config'] - config['root'] = out_dir - - if concurrent_job_limit is not None: - config['concurrent-job-limit'] = concurrent_job_limit + def __init__(self): + super().__init__( + CaperBackendLocal.TEMPLATE, + backend_name=BACKEND_LOCAL) -class CaperBackendSLURM(dict): +class CaperBackendSLURM(CaperBackendBaseLocal): """SLURM backend """ RUNTIME_ATTRIBUTES = """ @@ -412,14 +507,10 @@ class CaperBackendSLURM(dict): "backend": { "providers": { BACKEND_SLURM: { - "actor-factory": "cromwell.backend.impl.sfs.config." - "ConfigBackendLifecycleActorFactory", "config": { "default-runtime-attributes": { "time": 24 }, - "script-epilogue": "sleep 10 && sync", - "concurrent-job-limit": 1000, "runtime-attributes": RUNTIME_ATTRIBUTES, "submit": SUBMIT, "kill": "scancel ${job_id}", @@ -432,25 +523,21 @@ class CaperBackendSLURM(dict): } } - def __init__(self, out_dir, partition=None, account=None, extra_param=None, - concurrent_job_limit=None): - super(CaperBackendSLURM, self).__init__( - CaperBackendSLURM.TEMPLATE) - config = self['backend']['providers'][BACKEND_SLURM]['config'] - key = 'default-runtime-attributes' - config['root'] = out_dir + def __init__(self, partition=None, account=None, extra_param=None): + super().__init__( + CaperBackendSLURM.TEMPLATE, + backend_name=BACKEND_SLURM) + config = self.get_backend_config() if partition is not None and partition != '': - config[key]['slurm_partition'] = partition + config['default-runtime-attributes']['slurm_partition'] = partition if account is not None and account != '': - config[key]['slurm_account'] = account + config['default-runtime-attributes']['slurm_account'] = account if extra_param is not None and extra_param != '': - config[key]['slurm_extra_param'] = extra_param - if concurrent_job_limit is not None: - config['concurrent-job-limit'] = concurrent_job_limit + config['default-runtime-attributes']['slurm_extra_param'] = extra_param -class CaperBackendSGE(dict): +class CaperBackendSGE(CaperBackendBaseLocal): """SGE backend """ RUNTIME_ATTRIBUTES = """ @@ -506,14 +593,10 @@ class CaperBackendSGE(dict): "backend": { "providers": { BACKEND_SGE: { - "actor-factory": "cromwell.backend.impl.sfs.config." - "ConfigBackendLifecycleActorFactory", "config": { "default-runtime-attributes": { "time": 24 }, - "script-epilogue": "sleep 10 && sync", - "concurrent-job-limit": 1000, "runtime-attributes": RUNTIME_ATTRIBUTES, "submit": SUBMIT, "exit-code-timeout-seconds": 180, @@ -526,25 +609,21 @@ class CaperBackendSGE(dict): } } - def __init__(self, out_dir, pe=None, queue=None, extra_param=None, - concurrent_job_limit=None): - super(CaperBackendSGE, self).__init__( - CaperBackendSGE.TEMPLATE) - config = self['backend']['providers'][BACKEND_SGE]['config'] - key = 'default-runtime-attributes' - config['root'] = out_dir + def __init__(self, pe=None, queue=None, extra_param=None): + super().__init__( + CaperBackendSGE.TEMPLATE, + backend_name=BACKEND_SGE) + config = self.get_backend_config() if pe is not None and pe != '': - config[key]['sge_pe'] = pe + config['default-runtime-attributes']['sge_pe'] = pe if queue is not None and queue != '': - config[key]['sge_queue'] = queue + config['default-runtime-attributes']['sge_queue'] = queue if extra_param is not None and extra_param != '': - config[key]['sge_extra_param'] = extra_param - if concurrent_job_limit is not None: - config['concurrent-job-limit'] = concurrent_job_limit + config['default-runtime-attributes']['sge_extra_param'] = extra_param -class CaperBackendPBS(dict): +class CaperBackendPBS(CaperBackendBaseLocal): """PBS backend """ RUNTIME_ATTRIBUTES = """ @@ -586,14 +665,11 @@ class CaperBackendPBS(dict): "backend": { "providers": { BACKEND_PBS: { - "actor-factory": "cromwell.backend.impl.sfs.config." - "ConfigBackendLifecycleActorFactory", "config": { "default-runtime-attributes": { "time": 24 }, "script-epilogue": "sleep 30 && sync", - "concurrent-job-limit": 1000, "runtime-attributes": RUNTIME_ATTRIBUTES, "submit": SUBMIT, "exit-code-timeout-seconds": 180, @@ -606,20 +682,16 @@ class CaperBackendPBS(dict): } } - def __init__(self, out_dir, queue=None, extra_param=None, - concurrent_job_limit=None): - super(CaperBackendPBS, self).__init__( - CaperBackendPBS.TEMPLATE) - config = self['backend']['providers'][BACKEND_PBS]['config'] - key = 'default-runtime-attributes' - config['root'] = out_dir + def __init__(self, queue=None, extra_param=None): + super().__init__( + CaperBackendPBS.TEMPLATE, + backend_name=BACKEND_PBS) + config = self.get_backend_config() if queue is not None and queue != '': - config[key]['pbs_queue'] = queue + config['default-runtime-attributes']['pbs_queue'] = queue if extra_param is not None and extra_param != '': - config[key]['pbs_extra_param'] = extra_param - if concurrent_job_limit is not None: - config['concurrent-job-limit'] = concurrent_job_limit + config['default-runtime-attributes']['pbs_extra_param'] = extra_param def main(): diff --git a/caper/caper_check.py b/caper/caper_check.py index 578366ae..f231267e 100644 --- a/caper/caper_check.py +++ b/caper/caper_check.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 """CaperCheck: Caper arguments/configuration checker Author: @@ -9,6 +8,7 @@ from .caper_backend import BACKENDS, BACKEND_SLURM, get_backend DEFAULT_FILE_DB_PREFIX = 'caper_file_db' +DEFAULT_CAPER_TMP_DIR_SUFFIX = '.caper_tmp' def check_caper_conf(args_d): @@ -57,16 +57,16 @@ def check_caper_conf(args_d): args_d['out_dir'] = os.getcwd() if args_d.get('tmp_dir') is None: - args_d['tmp_dir'] = os.path.join(args_d['out_dir'], '.caper_tmp') + args_d['tmp_dir'] = os.path.join(args_d['out_dir'], DEFAULT_CAPER_TMP_DIR_SUFFIX) if args_d.get('tmp_s3_bucket') is None: if args_d.get('out_s3_bucket'): args_d['tmp_s3_bucket'] = os.path.join(args_d['out_s3_bucket'], - '.caper_tmp') + DEFAULT_CAPER_TMP_DIR_SUFFIX) if args_d.get('tmp_gcs_bucket') is None: if args_d.get('out_gcs_bucket'): args_d['tmp_gcs_bucket'] = os.path.join(args_d['out_gcs_bucket'], - '.caper_tmp') + DEFAULT_CAPER_TMP_DIR_SUFFIX) file_db = args_d.get('file_db') if file_db is not None: file_db = os.path.abspath(os.path.expanduser(file_db)) @@ -121,4 +121,10 @@ def check_caper_conf(args_d): '--out-s3-bucket (s3:// output bucket path) ' 'is required for backend aws.') + if args_d.get('soft_glob_output') and args_d.get('use_docker'): + raise ValueError( + '--soft-glob-output and --docker are mutually exclusive. ' + 'Delocalization from docker container will fail ' + 'for soft-linked globbed outputs.') + return args_d diff --git a/caper/caper_init.py b/caper/caper_init.py new file mode 100644 index 00000000..b833e106 --- /dev/null +++ b/caper/caper_init.py @@ -0,0 +1,182 @@ +"""Functions for caper init subcommand + +Author: + Jin Lee (leepc12@gmail.com) at ENCODE-DCC +""" + +import os +import sys +from .caper_backend import BACKENDS, BACKENDS_WITH_ALIASES +from .caper_backend import BACKEND_GCP, BACKEND_AWS, BACKEND_LOCAL +from .caper_backend import BACKEND_SLURM, BACKEND_SGE, BACKEND_PBS +from .caper_backend import BACKEND_ALIAS_LOCAL +from .caper_backend import BACKEND_ALIAS_GOOGLE, BACKEND_ALIAS_AMAZON +from .caper_backend import BACKEND_ALIAS_SHERLOCK, BACKEND_ALIAS_SCG +from .caper_uri import CaperURI, URI_LOCAL +from .caper_args import DEFAULT_CROMWELL_JAR, DEFAULT_WOMTOOL_JAR + + +DEFAULT_CROMWELL_JAR_INSTALL_DIR = '~/.caper/cromwell_jar' +DEFAULT_WOMTOOL_JAR_INSTALL_DIR = '~/.caper/womtool_jar' +DEFAULT_CONF_CONTENTS_LOCAL = """backend=local + +# DO NOT use /tmp here +# Caper stores all important temp files and cached big data files here +# If not defined, Caper will make .caper_tmp/ on your local output directory +# which is defined by out-dir, --out-dir or $CWD +# Use a local absolute path here +tmp-dir= +""" +DEFAULT_CONF_CONTENTS_SHERLOCK = """backend=slurm +slurm-partition= + +# DO NOT use /tmp here +# You can use $OAK or $SCRATCH storages here. +# Caper stores all important temp files and cached big data files here +# If not defined, Caper will make .caper_tmp/ on your local output directory +# which is defined by out-dir, --out-dir or $CWD +# Use a local absolute path here +tmp-dir= + +# IMPORTANT warning for Stanford Sherlock cluster +# ==================================================================== +# DO NOT install any codes/executables +# (java, conda, python, caper, pipeline's WDL, pipeline's Conda env, ...) on $SCRATCH or $OAK. +# You will see Segmentation Fault errors. +# Install all executables on $HOME or $PI_HOME instead. +# It's STILL OKAY to read input data from and write outputs to $SCRATCH or $OAK. +# ==================================================================== +""" +DEFAULT_CONF_CONTENTS_SCG = """backend=slurm +slurm-account= + +# DO NOT use /tmp here +# Caper stores all important temp files and cached big data files here +# If not defined, Caper will make .caper_tmp/ on your local output directory +# which is defined by out-dir, --out-dir or $CWD +# Use a local absolute path here +tmp-dir= +""" +DEFAULT_CONF_CONTENTS_SLURM = """backend=slurm + +# define one of the followings (or both) according to your +# cluster's SLURM configuration. +slurm-partition= +slurm-account= + +# DO NOT use /tmp here +# Caper stores all important temp files and cached big data files here +# If not defined, Caper will make .caper_tmp/ on your local output directory +# which is defined by out-dir, --out-dir or $CWD +# Use a local absolute path here +tmp-dir= +""" +DEFAULT_CONF_CONTENTS_SGE = """backend=sge +sge-pe= + +# DO NOT use /tmp here +# Caper stores all important temp files and cached big data files here +# If not defined, Caper will make .caper_tmp/ on your local output directory +# which is defined by out-dir, --out-dir or $CWD +# Use a local absolute path here +tmp-dir= +""" +DEFAULT_CONF_CONTENTS_PBS = """backend=pbs + +# DO NOT use /tmp here +# Caper stores all important temp files and cached big data files here +# If not defined, Caper will make .caper_tmp/ on your local output directory +# which is defined by out-dir, --out-dir or $CWD +# Use a local absolute path here +tmp-dir= +""" +DEFAULT_CONF_CONTENTS_AWS = """backend=aws +aws-batch-arn= +aws-region= +out-s3-bucket= + +# DO NOT use /tmp here +# Caper stores all important temp files and cached big data files here +# If not defined, Caper will make .caper_tmp/ on your local output directory +# which is defined by out-dir, --out-dir or $CWD +# Use a local absolute path here +tmp-dir= +""" +DEFAULT_CONF_CONTENTS_GCP = """backend=gcp +gcp-prj= +out-gcs-bucket= + +# call-cached outputs will be duplicated by making a copy or reference +# reference: refer to old output file in metadata.json file. +# copy: make a copy +gcp-call-caching-dup-strat= + +# DO NOT use /tmp here +# Caper stores all important temp files and cached big data files here +# If not defined, Caper will make .caper_tmp/ on your local output directory +# which is defined by out-dir, --out-dir or $CWD +# Use a local absolute path here +tmp-dir= +""" + + +def install_cromwell_jar(uri): + """Download cromwell-X.jar + """ + cu = CaperURI(uri) + if cu.uri_type == URI_LOCAL: + return cu.get_uri() + print('Downloading Cromwell JAR... {f}'.format(f=uri), file=sys.stderr) + path = os.path.join( + os.path.expanduser(DEFAULT_CROMWELL_JAR_INSTALL_DIR), + os.path.basename(uri)) + return cu.copy(target_uri=path) + + +def install_womtool_jar(uri): + """Download womtool-X.jar + """ + cu = CaperURI(uri) + if cu.uri_type == URI_LOCAL: + return cu.get_uri() + print('Downloading Womtool JAR... {f}'.format(f=uri), file=sys.stderr) + path = os.path.join( + os.path.expanduser(DEFAULT_WOMTOOL_JAR_INSTALL_DIR), + os.path.basename(uri)) + return cu.copy(target_uri=path) + + +def init_caper_conf(args): + """Initialize conf file for a given platform. + Also, download/install Cromwell/Womtool JARs. + """ + backend = args.get('platform') + assert(backend in BACKENDS_WITH_ALIASES) + if backend in (BACKEND_LOCAL, BACKEND_ALIAS_LOCAL): + contents = DEFAULT_CONF_CONTENTS_LOCAL + elif backend == BACKEND_ALIAS_SHERLOCK: + contents = DEFAULT_CONF_CONTENTS_SHERLOCK + elif backend == BACKEND_ALIAS_SCG: + contents = DEFAULT_CONF_CONTENTS_SCG + elif backend == BACKEND_SLURM: + contents = DEFAULT_CONF_CONTENTS_SLURM + elif backend == BACKEND_SGE: + contents = DEFAULT_CONF_CONTENTS_SGE + elif backend == BACKEND_PBS: + contents = DEFAULT_CONF_CONTENTS_PBS + elif backend in (BACKEND_GCP, BACKEND_ALIAS_GOOGLE): + contents = DEFAULT_CONF_CONTENTS_GCP + elif backend in (BACKEND_AWS, BACKEND_ALIAS_AMAZON): + contents = DEFAULT_CONF_CONTENTS_AWS + else: + raise Exception('Unsupported platform/backend/alias.') + + conf_file = os.path.expanduser(args.get('conf')) + with open(conf_file, 'w') as fp: + fp.write(contents + '\n') + fp.write('{key}={val}\n'.format( + key='cromwell', + val=install_cromwell_jar(DEFAULT_CROMWELL_JAR))) + fp.write('{key}={val}\n'.format( + key='womtool', + val=install_womtool_jar(DEFAULT_WOMTOOL_JAR))) diff --git a/caper/caper_uri.py b/caper/caper_uri.py index 193cc8e4..64f7963d 100644 --- a/caper/caper_uri.py +++ b/caper/caper_uri.py @@ -203,24 +203,20 @@ class CaperURI(object): LOCK_MAX_ITER = 100 def __init__(self, uri_or_path): - if CaperURI.TMP_DIR is None: + if CaperURI.TMP_DIR is not None and \ + not CaperURI.TMP_DIR.endswith('/'): raise Exception( - 'Call init_caper_uri() first ' - 'to initialize CaperURI. TMP_DIR must be ' - 'specified.') - elif not CaperURI.TMP_DIR.endswith('/'): - raise Exception( - 'CaperURI.TMP_DIR must ends ' + 'CaperURI.TMP_DIR must end ' 'with a slash (/).') if CaperURI.TMP_S3_BUCKET is not None and \ not CaperURI.TMP_S3_BUCKET.endswith('/'): raise Exception( - 'CaperURI.TMP_S3_BUCKET must ends ' + 'CaperURI.TMP_S3_BUCKET must end ' 'with a slash (/).') if CaperURI.TMP_GCS_BUCKET is not None and \ not CaperURI.TMP_GCS_BUCKET.endswith('/'): raise Exception( - 'CaperURI.TMP_GCS_BUCKET must ends ' + 'CaperURI.TMP_GCS_BUCKET must end ' 'with a slash (/).') self._uri = uri_or_path diff --git a/caper/cromwell_rest_api.py b/caper/cromwell_rest_api.py index 0f4b7f76..b95f3c79 100644 --- a/caper/cromwell_rest_api.py +++ b/caper/cromwell_rest_api.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 """CromwellRestAPI """ @@ -20,6 +19,9 @@ class CromwellRestAPI(object): ENDPOINT_ABORT = '/api/workflows/v1/{wf_id}/abort' ENDPOINT_RELEASE_HOLD = '/api/workflows/v1/{wf_id}/releaseHold' KEY_LABEL = 'cromwell_rest_api_label' + PARAMS_WORKFLOWS = { + 'additionalQueryResultFields': 'labels' + } def __init__(self, ip='localhost', port=8000, user=None, password=None, verbose=False): @@ -198,7 +200,8 @@ def find(self, workflow_ids=None, labels=None): List of matched workflow JSONs """ r = self.__request_get( - CromwellRestAPI.ENDPOINT_WORKFLOWS) + CromwellRestAPI.ENDPOINT_WORKFLOWS, + params=CromwellRestAPI.PARAMS_WORKFLOWS) if r is None: return None workflows = r['results'] @@ -215,8 +218,8 @@ def find(self, workflow_ids=None, labels=None): break if w['id'] in matched: continue - if labels is not None: - labels_ = self.get_labels(w['id']) + if labels is not None and 'labels' in w: + labels_ = w['labels'] for k, v in labels: if k in labels_: v_ = labels_[k] @@ -246,7 +249,7 @@ def __init_auth(self): else: self._auth = None - def __request_get(self, endpoint): + def __request_get(self, endpoint, params=None): """GET request Returns: @@ -257,7 +260,7 @@ def __request_get(self, endpoint): port=self._port) + endpoint try: resp = requests.get( - url, auth=self._auth, + url, auth=self._auth, params=params, headers={'accept': 'application/json'}) except Exception as e: # traceback.print_exc() diff --git a/caper/dict_tool.py b/caper/dict_tool.py index 46f989ad..0fd95c4c 100644 --- a/caper/dict_tool.py +++ b/caper/dict_tool.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 """dictTool: merge/split/flatten/unflatten dict Author: