diff --git a/DETAILS.md b/DETAILS.md index 9bc93c3..a1c66c7 100644 --- a/DETAILS.md +++ b/DETAILS.md @@ -155,7 +155,7 @@ We highly recommend to use a default configuration file described in the section --file-db, -d|File-based metadata DB for Cromwell's built-in HyperSQL database (UNSTABLE) --db-timeout|Milliseconds to wait for DB connection (default: 30000) --java-heap-server|Java heap memory for caper server (default: 10G) - --disable-auto-update-metadata| Disable auto update/retrieval/writing of `metadata.json` on workflow's output directory. + --disable-auto-write-metadata| Disable auto update/retrieval/writing of `metadata.json` on workflow's output directory. --java-heap-run|Java heap memory for caper run (default: 3G) --show-subworkflow|Include subworkflow in `caper list` search query. **WARNING**: If there are too many subworkflows, then you will see HTTP 503 error (service unavaiable) or Caper/Cromwell server can crash. diff --git a/README.md b/README.md index d2d8077..56bcb84 100644 --- a/README.md +++ b/README.md @@ -1,71 +1,5 @@ [![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) [![CircleCI](https://circleci.com/gh/ENCODE-DCC/caper.svg?style=svg)](https://circleci.com/gh/ENCODE-DCC/caper) -# Major changes for Caper 1.0. - -If you are upgrading Caper from previous versions: - - Edit your `~/.caper/default.conf` to remove `cromwell=` and `womtool=` from it then Caper will automatically download Cromwell/Womtool version 51, which support new Google Cloud Life Sciences API (v2beta). You can also use `caper init [YOUR_BACKEND]` to locally install Cromwell/Womtool JARs. - -> **CRITICAL**: Due to change in Caper 1.0 (Cromwell `47` to `51`), metadata database (`--db`) generated before 1.0 will not work with >= 1.0. See details below. - -Upgraded Cromwell from 47 to 51. - - Metadata DB generated with Caper<1.0 will not work with Caper>=1.0. - - See [this note](https://github.com/broadinstitute/cromwell/releases/tag/49) to find DB migration instruction. - - We recommend to use Cromwell-51 with Caper>=1.0 since it's fully test with Cromwell-51. - -Changed hashing strategy for all local backends (`local`, `slurm`, `sge`, `pbs`). - - Default hashing strategy: `file` (based on md5sum, which is expensive) to `path+modtime`. - - Changing hashing strategy and using the same metadata DB will result in cache-miss. - -Changed duplication strategy for all local backends (`local`, `slurm`, `sge`, `pbs`). - - Default file duplication strategy: `hard-link` to `soft-link`. - - For filesystems (e.g. beeGFS) that do not allow hard-linking. - - Caper<1.0 hard-linked input files even with `--soft-glob-output`. - - For Caper>=1.0, you still need to use `--soft-glob-output` for such filesystems. - -Google Cloud Platform backend (`gcp`): - - Cau use a service account instead of an application default (end user's auth.). - - Added `--gcp-service-account-key-json`. - - Make sure that such service account has enough permission (roles) to resources on Google Cloud Platform project (`--gcp-prj`). See [details](docs/conf_gcp.md#how-to-run-caper-with-a-service-account). - - Can use Google Cloud Life Sciences API (v2beta) instead of deprecating Google Cloud Genomics API (v2alpha1). - - Added `--use-google-cloud-life-sciences`. - - For `caper server/run`, you need to specify a region `--gcp-region` to use Life Sciences API. Check [supported regions](https://cloud.google.com/life-sciences/docs/concepts/locations). `--gcp-zones` will be ignored. - - Make sure to enable `Google Cloud Life Sciences API` on Google Cloud Platform console (APIs & Services -> `+` button on top). - - Also if you use a service account then add a role `Life Sciences Admin` to your service account. - - We will deprecate old `Genomics API` support. `Life Sciences API` will become a new default after next 2-3 releases. - - Added [`memory-retry`](https://cromwell.readthedocs.io/en/stable/backends/Google/) to Caper. This is for `gcp` backend only. - - Retries (controlled by `--max-retries`) on an instance with increased memory if workflow fails due to OOM (out-of-memory) error. - - Comma-separated keys to catch OOM: `--gcp-prj-memory-retry-error-keys`. - - Multiplier for every retrial due to OOM: `--gcp-prj-memory-retry-multiplier`. - -Change of parameter names. Backward compatible. - - `--out-dir` -> `--local-out-dir` - - `--out-gcs-bucket` -> `--gcp-out-dir` - - `--out-s3-bucket` -> `--aws-out-dir` - - `--tmp-dir` -> `--local-loc-dir` - - `--tmp-gcs-bucket` -> `--gcp-loc-dir` - - `--tmp-s3-bucket` -> `--aws-loc-dir` - -Added parameters - - `--use-google-cloud-life-sciences` and `--gcp-region`: Use Life Sciences API (Cromwell's v2beta scheme). - - `--gcp-service-account-key-json`: Use a service account for auth on GCP (instead of application default). - - `--gcp-prj-memory-retry-error-keys`: Comma-separated keys to catch OOM error on GCP. - - `--gcp-prj-memory-retry-multiplier`: Multiplier for every retrial due to OOM error on GCP. - - `--cromwell-stdout`: Redirect Cromwell STDOUT to file. - -Improved Python interface. - - Old Caper<1.0 was originally designed for CLI. - - New Caper>=1.0 is designed for Python interface first and then CLI is based on such Python interface. - - Can retrieve `metadata.json` embedded with subworkflows' metadata JSON. - -Better logging and troubleshooting. - - Defaults to write Cromwell STDOUT to `cromwell.out` (controlled by `--cromwell-stdout`). - - -> **IMPORTANT**: `--use-gsutil-for-s3` requires `gsutil` installed on your system. This flag allows a direct transfer between `gs://` and `s3://`. This requires `gsutil` >= 4.47. See this [issue](https://github.com/GoogleCloudPlatform/gsutil/issues/935) for details. `gsutil` is based on Python 2. -```bash -$ pip install gsutil --upgrade -``` - # Caper Caper (Cromwell Assisted Pipeline ExecutoR) is a wrapper Python package for [Cromwell](https://github.com/broadinstitute/cromwell/). diff --git a/caper/__init__.py b/caper/__init__.py index 4777355..57c8f91 100644 --- a/caper/__init__.py +++ b/caper/__init__.py @@ -2,4 +2,4 @@ from .caper_runner import CaperRunner __all__ = ['CaperClient', 'CaperClientSubmit', 'CaperRunner'] -__version__ = '1.4.1' +__version__ = '1.4.2' diff --git a/caper/backward_compatibility.py b/caper/backward_compatibility.py index 5c857fa..753c40f 100644 --- a/caper/backward_compatibility.py +++ b/caper/backward_compatibility.py @@ -10,3 +10,10 @@ 'tmp_s3_bucket': 'aws_loc_dir', 'ip': 'hostname', } + +CAPER_1_4_2_PARAM_KEY_NAME_CHANGE = {'auto_update_metadata': 'auto_write_metadata'} + +PARAM_KEY_NAME_CHANGE = { + **CAPER_1_0_0_PARAM_KEY_NAME_CHANGE, + **CAPER_1_4_2_PARAM_KEY_NAME_CHANGE, +} diff --git a/caper/caper_args.py b/caper/caper_args.py index f7a8f36..cf2ee89 100644 --- a/caper/caper_args.py +++ b/caper/caper_args.py @@ -5,7 +5,7 @@ from autouri import URIBase from .arg_tool import update_parsers_defaults_with_conf -from .backward_compatibility import CAPER_1_0_0_PARAM_KEY_NAME_CHANGE +from .backward_compatibility import PARAM_KEY_NAME_CHANGE from .caper_workflow_opts import CaperWorkflowOpts from .cromwell import Cromwell from .cromwell_backend import ( @@ -533,7 +533,7 @@ def get_parser_and_defaults(conf_file=None): help='Cromwell Java heap size for "server" mode (java -Xmx)', ) parent_server.add_argument( - '--disable-auto-update-metadata', + '--disable-auto-write-metadata', action='store_true', help='Disable automatic retrieval/update/writing of metadata.json upon workflow/task status change.', ) @@ -859,9 +859,7 @@ def get_parser_and_defaults(conf_file=None): ] if os.path.exists(conf_file): conf_dict = update_parsers_defaults_with_conf( - parsers=subparsers, - conf_file=conf_file, - conf_key_map=CAPER_1_0_0_PARAM_KEY_NAME_CHANGE, + parsers=subparsers, conf_file=conf_file, conf_key_map=PARAM_KEY_NAME_CHANGE ) else: conf_dict = None diff --git a/caper/caper_runner.py b/caper/caper_runner.py index ae6b5d6..2397e2e 100644 --- a/caper/caper_runner.py +++ b/caper/caper_runner.py @@ -451,7 +451,7 @@ def server( fileobj_stdout=None, embed_subworkflow=False, java_heap_server=Cromwell.DEFAULT_JAVA_HEAP_CROMWELL_SERVER, - auto_update_metadata=True, + auto_write_metadata=True, work_dir=None, dry_run=False, ): @@ -486,7 +486,7 @@ def server( This is to mimic behavior of Cromwell run mode's -m parameter. java_heap_server: Java heap (java -Xmx) for Cromwell server mode. - auto_update_metadata: + auto_write_metadata: Automatic retrieval/writing of metadata.json upon workflow/task's status change. work_dir: Local temporary directory to store all temporary files. @@ -518,7 +518,7 @@ def server( fileobj_stdout=fileobj_stdout, embed_subworkflow=embed_subworkflow, java_heap_cromwell_server=java_heap_server, - auto_update_metadata=auto_update_metadata, + auto_write_metadata=auto_write_metadata, dry_run=dry_run, ) return th diff --git a/caper/cli.py b/caper/cli.py index 4d28db7..8ee0927 100644 --- a/caper/cli.py +++ b/caper/cli.py @@ -317,7 +317,7 @@ def subcmd_server(caper_runner, args, nonblocking=False): 'server_heartbeat': sh, 'custom_backend_conf': get_abspath(args.backend_file), 'embed_subworkflow': True, - 'auto_update_metadata': not args.disable_auto_update_metadata, + 'auto_write_metadata': not args.disable_auto_write_metadata, 'java_heap_server': args.java_heap_server, 'dry_run': args.dry_run, } diff --git a/caper/cromwell.py b/caper/cromwell.py index d8a5194..4699ce7 100644 --- a/caper/cromwell.py +++ b/caper/cromwell.py @@ -326,7 +326,7 @@ def server( fileobj_stdout=None, embed_subworkflow=False, java_heap_cromwell_server=DEFAULT_JAVA_HEAP_CROMWELL_SERVER, - auto_update_metadata=True, + auto_write_metadata=True, on_server_start=None, on_status_change=None, cwd=None, @@ -365,7 +365,7 @@ def server( This is to mimic behavior of Cromwell run mode's -m parameter. java_heap_cromwell_server: Java heap (java -Xmx) for Cromwell server mode. - auto_update_metadata: + auto_write_metadata: Automatic retrieval/writing of metadata.json upon workflow/task's status change. on_server_start: On server start. @@ -429,7 +429,7 @@ def server( server_port=server_port, is_server=True, embed_subworkflow=embed_subworkflow, - auto_update_metadata=auto_update_metadata, + auto_write_metadata=auto_write_metadata, on_server_start=on_server_start, on_status_change=on_status_change, ) diff --git a/caper/cromwell_workflow_monitor.py b/caper/cromwell_workflow_monitor.py index de5fd15..1c1e079 100644 --- a/caper/cromwell_workflow_monitor.py +++ b/caper/cromwell_workflow_monitor.py @@ -9,7 +9,7 @@ class WorkflowStatusTransition: - def __init__(self, regex, status_transitions): + def __init__(self, regex, status_transitions, auto_write_metadata=False): """ Args: regex: @@ -27,6 +27,7 @@ def __init__(self, regex, status_transitions): """ self._regex = regex self._status_transitions = status_transitions + self._auto_write_metadata = auto_write_metadata def parse(self, line, workflow_status_map): """ @@ -42,9 +43,12 @@ def parse(self, line, workflow_status_map): Workflow's string ID. status: New status after transition. + auto_write_metadata: + For this status transition metadataJSON file should be written + on workflow's root output directory. """ r = re.findall(self._regex, line) - if len(r) > 0: + if r: wf_id = r[0].strip() if wf_id in workflow_status_map: prev_status = workflow_status_map[wf_id] @@ -58,9 +62,9 @@ def parse(self, line, workflow_status_map): id=wf_id, status=st2 ) ) - return wf_id, st2 + return wf_id, st2, self._auto_write_metadata break - return None, None + return None, None, False class CromwellWorkflowMonitor: @@ -92,6 +96,7 @@ class CromwellWorkflowMonitor: ('Aborting', 'Aborted'), (None, 'Succeeded'), ), + auto_write_metadata=True, ), ) @@ -103,8 +108,8 @@ class CromwellWorkflowMonitor: RE_TASK_CALL_CACHED = r'\[UUID\((\b[0-9a-f]{8})\)\]: Job results retrieved \(CallCached\): \'(.+)\' \(scatter index: (.+), attempt (\d+)\)' RE_SUBWORKFLOW_FOUND = r'(\b[0-9a-f]{8}\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-\b[0-9a-f]{12}\b)-SubWorkflowActor-SubWorkflow' - MAX_RETRY_UPDATE_METADATA = 3 - INTERVAL_RETRY_UPDATE_METADATA = 10.0 + MAX_RETRY_WRITE_METADATA = 3 + INTERVAL_RETRY_WRITE_METADATA = 10.0 DEFAULT_SERVER_HOSTNAME = 'localhost' DEFAULT_SERVER_PORT = 8000 @@ -114,7 +119,7 @@ def __init__( server_hostname=DEFAULT_SERVER_HOSTNAME, server_port=DEFAULT_SERVER_PORT, embed_subworkflow=False, - auto_update_metadata=False, + auto_write_metadata=False, on_status_change=None, on_server_start=None, ): @@ -141,7 +146,7 @@ def __init__( It tries to write/update metadata JSON file on workflow's root. For this metadata JSON file, embed subworkflow's metadata JSON in it. If this is turned off, then metadata JSON will just have subworkflow's ID. - auto_update_metadata: + auto_write_metadata: This is server-only feature. For any change of workflow's status, automatically updates metadata JSON file on workflow's root directory. metadata JSON is retrieved by communicating with Cromwell server via @@ -169,7 +174,7 @@ def __init__( self._cromwell_rest_api = None self._embed_subworkflow = embed_subworkflow - self._auto_update_metadata = auto_update_metadata + self._auto_write_metadata = auto_write_metadata self._on_status_change = on_status_change self._on_server_start = on_server_start @@ -191,19 +196,18 @@ def update(self, stderr): if self._is_server: self._update_server_start(stderr) - updated_workflows = set() - updated_workflows |= self._update_workflows(stderr) + updated_workflows, workflows_to_write_metadata = self._update_workflows(stderr) self._update_subworkflows(stderr) - updated_workflows |= self._update_tasks(stderr) + self._update_tasks(stderr) - for w in updated_workflows: - self._update_metadata(w) + for w in workflows_to_write_metadata: + self._write_metadata(w) def _update_server_start(self, stderr): if not self._is_server_started: for line in stderr.split('\n'): r1 = re.findall(CromwellWorkflowMonitor.RE_CROMWELL_SERVER_START, line) - if len(r1) > 0: + if r1: self._is_server_started = True if self._on_server_start: self._on_server_start() @@ -214,21 +218,24 @@ def _update_workflows(self, stderr): """Updates workflow status by parsing Cromwell's stderr lines. """ updated_workflows = set() + workflows_to_write_metadata = set() for line in stderr.split('\n'): for st_transitions in CromwellWorkflowMonitor.ALL_STATUS_TRANSITIONS: - workflow_id, status = st_transitions.parse( + workflow_id, status, auto_write_metadata = st_transitions.parse( line, self._workflow_status_map ) if workflow_id: self._workflow_status_map[workflow_id] = status updated_workflows.add(workflow_id) + if auto_write_metadata: + workflows_to_write_metadata.add(workflow_id) - return updated_workflows + return updated_workflows, workflows_to_write_metadata def _update_subworkflows(self, stderr): for line in stderr.split('\n'): r_sub = re.findall(CromwellWorkflowMonitor.RE_SUBWORKFLOW_FOUND, line) - if len(r_sub) > 0: + if r_sub: subworkflow_id = r_sub[0] if subworkflow_id not in self._subworkflows: logger.info('Subworkflow found: {id}'.format(id=subworkflow_id)) @@ -237,17 +244,16 @@ def _update_subworkflows(self, stderr): def _update_tasks(self, stderr): """Check if workflow's task status changed by parsing Cromwell's stderr lines. """ - updated_workflows = set() for line in stderr.split('\n'): r_common = None r_start = re.findall(CromwellWorkflowMonitor.RE_TASK_START, line) - if len(r_start) > 0: + if r_start: r_common = r_start[0] status = 'Started' job_id = r_common[4] r_callcached = re.findall(CromwellWorkflowMonitor.RE_TASK_CALL_CACHED, line) - if len(r_callcached) > 0: + if r_callcached: r_common = r_callcached[0] status = 'CallCached' job_id = None @@ -255,12 +261,12 @@ def _update_tasks(self, stderr): r_status_change = re.findall( CromwellWorkflowMonitor.RE_TASK_STATUS_CHANGE, line ) - if len(r_status_change) > 0: + if r_status_change: r_common = r_status_change[0] status = r_common[5] job_id = None - if r_common and len(r_common) > 0: + if r_common: short_id = r_common[0] workflow_id = self._find_workflow_id_from_short_id(short_id) task_name = r_common[1] @@ -282,19 +288,15 @@ def _update_tasks(self, stderr): msg += ', job_id={job_id}'.format(job_id=job_id) logger.info(msg) - updated_workflows.add(workflow_id) - - return updated_workflows - def _find_workflow_id_from_short_id(self, short_id): for w in self._subworkflows.union(set(self._workflow_status_map.keys())): if w.startswith(short_id): return w - def _update_metadata(self, workflow_id): + def _write_metadata(self, workflow_id): """Update metadata on Cromwell'e exec root. """ - if not self._is_server or not self._auto_update_metadata: + if not self._is_server or not self._auto_write_metadata: return if workflow_id in self._subworkflows and self._embed_subworkflow: logger.debug( @@ -303,9 +305,9 @@ def _update_metadata(self, workflow_id): ) ) return - for trial in range(CromwellWorkflowMonitor.MAX_RETRY_UPDATE_METADATA + 1): + for trial in range(CromwellWorkflowMonitor.MAX_RETRY_WRITE_METADATA + 1): try: - time.sleep(CromwellWorkflowMonitor.INTERVAL_RETRY_UPDATE_METADATA) + time.sleep(CromwellWorkflowMonitor.INTERVAL_RETRY_WRITE_METADATA) metadata = self._cromwell_rest_api.get_metadata( workflow_ids=[workflow_id], embed_subworkflow=self._embed_subworkflow, diff --git a/caper/nb_subproc_thread.py b/caper/nb_subproc_thread.py index cb3fc0f..2103378 100644 --- a/caper/nb_subproc_thread.py +++ b/caper/nb_subproc_thread.py @@ -139,11 +139,12 @@ def stop(self, stop_signal=DEFAULT_STOP_SIGNAL, wait=False): self._stop_it = True self._stop_signal = stop_signal if wait: - logger.info( - '{name} stopped but waiting for graceful shutdown...'.format( - name=self._subprocess_name + if self._returncode is None: + logger.info( + '{name} stopped but waiting for graceful shutdown...'.format( + name=self._subprocess_name + ) ) - ) while True: if self._returncode is not None: return diff --git a/setup.py b/setup.py index 84580cd..2cc08bd 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name='caper', - version='1.4.1', + version='1.4.2', python_requires='>=3.6', scripts=[ 'bin/caper', diff --git a/tests/test_cli_server_client_gcp.py b/tests/test_cli_server_client_gcp.py index 17de86f..16f3b74 100644 --- a/tests/test_cli_server_client_gcp.py +++ b/tests/test_cli_server_client_gcp.py @@ -5,6 +5,7 @@ import time import pytest +from autouri import AutoURI from caper.cli import main as cli_main from caper.cromwell_rest_api import CromwellRestAPI @@ -122,9 +123,19 @@ def test_server_client( while True: time.sleep(5) m = cra.get_metadata([workflow_id])[0] - print('polling: ', workflow_id, m['status']) - if m['status'] in ('Done', 'Failed', 'Succeeded'): - break + workflow_root = m.get('workflowRoot') + if workflow_root: + metadata_json_file = os.path.join(workflow_root, 'metadata.json') + else: + metadata_json_file = None + print('polling: ', workflow_id, m['status'], metadata_json_file) + + if m['status'] in ('Failed', 'Succeeded'): + if AutoURI(metadata_json_file).exists: + break + elif metadata_json_file: + assert not AutoURI(metadata_json_file).exists + if time.time() - t_start > TIMEOUT_SERVER_RUN_WORKFLOW: raise TimeoutError('Timed out waiting for workflow being done.')