diff --git a/README.md b/README.md index 2a399e4..4adbdf8 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ Deploy workflows on laptops, servers, or HPC/HTC clusters. - [Input file](#input-file) - [Input files](#input-files) - [Input directory](#input-directory) + - [Input patterns](#input-patterns) - [Overwriting existing input files](#overwriting-existing-input-files) - [Outputs](#outputs) - [Bind mounts](#bind-mounts) @@ -108,6 +109,34 @@ input: from: /iplant/home/username/directory ``` +#### Input patterns + +To match and pull only certain input files `from` an input directory, use attribute `patterns`: + +```yaml +input: + kind: directory + from: /iplant/home/username/directory + patterns: + - jpg + - png +``` + + + #### Overwriting existing input files Note that by default, the CLI will check whether files already exist on the local filesystem, and will not re-download them if they do. To force a download and overwrite, add the flag `overwrite: True` to the `input` section, for instance: @@ -116,7 +145,7 @@ Note that by default, the CLI will check whether files already exist on the loca input: kind: directory from: /iplant/home/username/directory - overwrite: true + overwrite: True ``` ### Outputs diff --git a/plantit_cli/plan.py b/plantit_cli/plan.py index ef414a6..ae89a3a 100644 --- a/plantit_cli/plan.py +++ b/plantit_cli/plan.py @@ -12,7 +12,8 @@ def __init__(self, params: list = None, input: dict = None, output: dict = None, - logging: dict = None): + logging: dict = None, + checksums: list = None): self.identifier = identifier self.workdir = workdir self.image = image @@ -26,3 +27,4 @@ def __init__(self, self.input = input self.output = output self.logging = logging + self.checksums = checksums diff --git a/plantit_cli/runner/runner.py b/plantit_cli/runner/runner.py index 4040da0..63ba060 100644 --- a/plantit_cli/runner/runner.py +++ b/plantit_cli/runner/runner.py @@ -39,16 +39,14 @@ def __pull_input(self, plan: Plan) -> str: input_kind = plan.input['kind'].lower() os.makedirs(input_dir, exist_ok=True) if input_kind == 'directory' or input_kind == 'files': - self.__store.download_directory(plan.input['from'], input_dir, - plan.input['pattern'] if 'pattern' in plan.input else None) + self.__store.download_directory(plan.input['from'], input_dir, plan.input['patterns'] if 'patterns' in plan.input else None) elif input_kind == 'file': self.__store.download_file(plan.input['from'], input_dir) else: raise ValueError(f"'input.kind' must be either 'file' or 'directory'") input_files = os.listdir(input_dir) if len(input_files) == 0: - raise PlantitException(f"No inputs found at path '{plan.input['from']}'" + ( - f" matching pattern '{plan.input['pattern']}'" if 'pattern' in plan.input else '')) + raise PlantitException(f"No inputs found at path '{plan.input['from']}'" + (f" matching patterns '{plan.input['patterns']}'" if 'patterns' in plan.input else '')) update_status(plan, 3, f"Pulled input(s): {', '.join(input_files)}") return input_dir @@ -56,10 +54,10 @@ def __push_output(self, plan: Plan): self.__store.upload_directory( join(plan.workdir, plan.output['from']) if 'from' in plan.output else plan.workdir, plan.output['to'], - (plan.output['include']['patterns'] if type(plan.output['include']['patterns']) is list else None) if 'include' in plan.output else None, - (plan.output['include']['names'] if type(plan.output['include']['names']) is list else None) if 'include' in plan.output else None, - (plan.output['exclude']['patterns'] if type(plan.output['exclude']['patterns']) is list else None) if 'exclude' in plan.output else None, - (plan.output['exclude']['names'] if type(plan.output['exclude']['names']) is list else None) if 'exclude' in plan.output else None) + (plan.output['include']['patterns'] if type(plan.output['include']['patterns']) is list else None) if 'include' in plan.output and 'patterns' in plan.output['include'] else None, + (plan.output['include']['names'] if type(plan.output['include']['names']) is list else None) if 'include' in plan.output and 'names' in plan.output['include'] else None, + (plan.output['exclude']['patterns'] if type(plan.output['exclude']['patterns']) is list else None) if 'exclude' in plan.output and 'patterns' in plan.output['exclude'] else None, + (plan.output['exclude']['names'] if type(plan.output['exclude']['names']) is list else None) if 'exclude' in plan.output and 'names' in plan.output['exclude'] else None) update_status(plan, 3, f"Pushed output(s)") diff --git a/plantit_cli/store/local_store.py b/plantit_cli/store/local_store.py index 8469a73..ddf4096 100644 --- a/plantit_cli/store/local_store.py +++ b/plantit_cli/store/local_store.py @@ -25,9 +25,9 @@ def download_file(self, from_path, to_path): update_status(self.plan, 3, f"Copying {from_path_file} to {to_path_file}") copyfileobj(from_file, to_file) - def download_directory(self, from_path, to_path, include_pattern): + def download_directory(self, from_path, to_path, patterns): from_paths = [p for p in self.list_directory(from_path) if - include_pattern.lower() in p.lower()] if include_pattern is not None else self.list_directory(from_path) + patterns.lower() in p.lower()] if patterns is not None else self.list_directory(from_path) for path in from_paths: self.download_file(path, to_path) diff --git a/plantit_cli/store/store.py b/plantit_cli/store/store.py index 7d3b77d..b24b1f6 100644 --- a/plantit_cli/store/store.py +++ b/plantit_cli/store/store.py @@ -41,14 +41,14 @@ def download_file(self, from_path, to_path): pass @abstractmethod - def download_directory(self, from_path, to_path, include_pattern): + def download_directory(self, from_path, to_path, patterns): """ Downloads files from the remote directory to the local directory. Args: from_path: The remote directory path. to_path: The local directory path. - include_pattern: File pattern(s) to include. + patterns: File patterns to include. """ pass diff --git a/plantit_cli/store/terrain_store.py b/plantit_cli/store/terrain_store.py index f611a83..950924b 100644 --- a/plantit_cli/store/terrain_store.py +++ b/plantit_cli/store/terrain_store.py @@ -1,4 +1,8 @@ +import multiprocessing +from contextlib import closing +from multiprocessing import Pool from os import listdir +from pprint import pprint from typing import List from os.path import isdir, isfile, basename, join @@ -22,11 +26,14 @@ def list_directory(self, path) -> List[str]: raise PlantitException(f"Path {path} does not exist") response.raise_for_status() content = response.json() + for file in content['files']: + pprint(file) return [file['path'] for file in content['files']] def download_file(self, from_path, to_path): to_path_full = f"{to_path}/{from_path.split('/')[-1]}" - if isfile(to_path_full) and ('overwrite' not in self.plan.input or ('overwrite' in self.plan.input and not self.plan.input['overwrite'])): + if isfile(to_path_full) and ('overwrite' not in self.plan.input or ( + 'overwrite' in self.plan.input and not self.plan.input['overwrite'])): update_status(self.plan, 3, f"File {to_path_full} already exists, skipping download") return else: @@ -40,15 +47,36 @@ def download_file(self, from_path, to_path): for chunk in response.iter_content(chunk_size=8192): file.write(chunk) + def __verify_inputs(self, from_path, paths): + with requests.post('https://de.cyverse.org/terrain/secured/filesystem/stat', + headers={'Authorization': f"Bearer {self.plan.cyverse_token}"}, + data={'paths': paths}) as response: + response.raise_for_status() + pairs = [(path[join(from_path, path)]['label'], path[join(from_path, path)]['md5']) for path in + response.json()['paths']] + if len(pairs) != len(self.plan.checksums): + raise PlantitException( + f"{len(self.plan.checksums)} checksum pairs provided but {len(pairs)} files exist") + for actual in pairs: + expected = [pair for pair in self.plan.checksums if pair['name'] == actual[0]][0] + assert expected['name'] == actual[0] + assert expected['md5'] == actual[1] + def download_directory(self, from_path, to_path, - include_patterns=None): - from_paths = [p for p in self.list_directory(from_path) if - any(ip in p for ip in include_patterns)] if include_patterns is not None else self.list_directory(from_path) + patterns=None): + from_paths = [path for path in self.list_directory(from_path) if any(pattern in path for pattern in patterns)] if patterns is not None else self.list_directory(from_path) + + if self.plan.checksums is not None and len(self.plan.checksums) > 0: + self.__verify_inputs(from_path, from_paths) + update_status(self.plan, 3, f"Downloading directory '{from_path}' with {len(from_paths)} file(s)") - for path in from_paths: - self.download_file(path, to_path) + with closing(Pool(processes=multiprocessing.cpu_count())) as pool: + pool.starmap(self.download_file, [(path, to_path) for path in from_paths]) + + if self.plan.checksums is not None and len(self.plan.checksums) > 0: + self.__verify_inputs(from_path, from_paths) def upload_file(self, from_path, to_path): update_status(self.plan, 3, f"Uploading '{from_path}' to '{to_path}'") @@ -72,8 +100,8 @@ def upload_directory(self, elif is_dir: from_paths = list_files(from_path, include_patterns, include_names, exclude_patterns, exclude_names) update_status(self.plan, 3, f"Uploading directory '{from_path}' with {len(from_paths)} files") - for path in [str(p) for p in from_paths]: - self.upload_file(path, to_path) + with closing(Pool(processes=multiprocessing.cpu_count())) as pool: + pool.starmap(self.upload_file, [(path, to_path) for path in [str(p) for p in from_paths]]) elif is_file: self.upload_file(from_path, to_path) else: diff --git a/plantit_cli/tests/integration/test_plan_validation.py b/plantit_cli/tests/integration/test_plan_validation.py index a642306..2248221 100644 --- a/plantit_cli/tests/integration/test_plan_validation.py +++ b/plantit_cli/tests/integration/test_plan_validation.py @@ -297,6 +297,7 @@ def test_validate_plan_with_params_and_no_input_and_directory_output(remote_path clear_dir(testdir) delete_collection(remote_path, token) + def test_validate_plan_with_no_params_and_file_input_and_directory_output(remote_path, file_name_1): local_path = join(testdir, file_name_1) plan = Plan( @@ -466,6 +467,57 @@ def test_validate_plan_with_params_and_files_input_and_directory_output(remote_p delete_collection(remote_path, token) +def test_validate_plan_with_params_and_files_input_with_pattern_and_directory_output(remote_path, + file_name_1, + file_name_2): + local_input_file_path_1 = join(testdir, file_name_1) + local_input_file_path_2 = join(testdir, file_name_2) + plan = Plan( + identifier='test_validate_plan_with_params_and_files_input_with_pattern_and_directory_output', + workdir=testdir, + image="docker://alpine", + command='cat $INPUT | tee $INPUT.$TAG.output', + input={ + 'kind': 'files', + 'from': remote_path, + 'patterns': [ + file_name_1 + ] + }, + output={ + 'to': remote_path, + 'from': 'input', # write output files to input dir + 'include': { + 'patterns': ['output'], + 'names': [] + } + }, + params=[ + { + 'key': 'TAG', + 'value': message + }, + ], + cyverse_token=token) + + try: + # prep CyVerse collection + create_collection(remote_path, token) + + # prep file + with open(local_input_file_path_1, "w") as file1, open(local_input_file_path_2, "w") as file2: + file1.write('Hello, 1!') + file2.write('Hello, 2!') + upload_file(local_input_file_path_1, remote_path, token) + upload_file(local_input_file_path_2, remote_path, token) + + result = validate_plan(plan) + assert type(result) is bool and result + finally: + clear_dir(testdir) + delete_collection(remote_path, token) + + def test_validate_plan_with_no_params_and_directory_input_and_directory_output(remote_path, file_name_1, file_name_2): @@ -647,7 +699,8 @@ def test_validate_plan_with_params_and_directory_input_and_directory_output_when delete_collection(remote_path, token) -def test_validate_plan_with_no_params_and_no_input_and_directory_output_with_include_patterns_and_exclude_names(remote_path): +def test_validate_plan_with_no_params_and_no_input_and_directory_output_with_include_patterns_and_exclude_names( + remote_path): plan = Plan( identifier='test_run_succeeds_with_no_params_and_no_input_and_directory_output_with_excludes', workdir=testdir, @@ -715,7 +768,8 @@ def test_validate_plan_with_params_and_no_input_and_directory_output_with_exclud delete_collection(remote_path, token) -def test_validate_plan_with_no_params_and_no_input_and_directory_output_with_non_matching_case_pattern_and_excludes(remote_path): +def test_validate_plan_with_no_params_and_no_input_and_directory_output_with_non_matching_case_pattern_and_excludes( + remote_path): plan = Plan( identifier='test_run_succeeds_with_no_params_and_no_input_and_directory_output_with_non_matching_case_pattern_and_excludes', workdir=testdir, @@ -746,7 +800,8 @@ def test_validate_plan_with_no_params_and_no_input_and_directory_output_with_non delete_collection(remote_path, token) -def test_validate_plan_with_params_and_no_input_and_directory_output_with_non_matching_case_pattern_and_excludes(remote_path): +def test_validate_plan_with_params_and_no_input_and_directory_output_with_non_matching_case_pattern_and_excludes( + remote_path): plan = Plan( identifier='test_run_succeeds_with_params_and_no_input_and_directory_output_with_non_matching_case_pattern_and_excludes', workdir=testdir, diff --git a/plantit_cli/tests/integration/test_runner_with_terrain.py b/plantit_cli/tests/integration/test_runner_with_terrain.py index fa174eb..9870a3f 100644 --- a/plantit_cli/tests/integration/test_runner_with_terrain.py +++ b/plantit_cli/tests/integration/test_runner_with_terrain.py @@ -205,6 +205,57 @@ def test_run_succeeds_with_no_params_and_files_input_and_no_output( delete_collection(remote_path, token) +def test_run_succeeds_with_no_params_and_files_input_and_patterns_and_no_output( + remote_base_path, + file_name_1, + file_name_2): + local_path_1 = join(testdir, file_name_1) + local_path_2 = join(testdir, file_name_2) + remote_path = join(remote_base_path, "testCollection") + plan = Plan( + identifier='test_run_succeeds_with_no_params_and_files_input_and_no_output', + workdir=testdir, + image="docker://alpine:latest", + command='cat $INPUT | tee $INPUT.output', + input={ + 'kind': 'files', + 'from': join(remote_base_path, "testCollection"), + 'patterns': [ + file_name_1 + ] + }, + cyverse_token=token) + + try: + # prep CyVerse collection + create_collection(remote_path, token) + + # prep files + with open(local_path_1, "w") as file1, open(local_path_2, "w") as file2: + file1.write('Hello, 1!') + file2.write('Hello, 2!') + upload_file(local_path_1, remote_path, token) + upload_file(local_path_2, remote_path, token) + + # expect 2 containers + Runner(TerrainStore(plan)).run(plan) + + # check files were pulled + downloaded_path_1 = join(testdir, 'input', file_name_1) + downloaded_path_2 = join(testdir, 'input', file_name_2) + assert not isfile(downloaded_path_2) + check_hello(downloaded_path_1, 1) + remove(downloaded_path_1) + + # check local output files were written + output_1 = f"{downloaded_path_1}.output" + check_hello(output_1, 1) + remove(output_1) + finally: + clear_dir(testdir) + delete_collection(remote_path, token) + + def test_run_succeeds_with_params_and_files_input_and_no_output( remote_base_path, file_name_1, @@ -443,7 +494,7 @@ def test_run_succeeds_with_no_params_and_file_input_and_directory_output( output={ 'to': join(remote_base_path, "testCollection"), 'from': 'input', # write output files to input dir - 'include': {'pattern': ['output'], 'names': []} + 'include': {'patterns': ['output'], 'names': []} }, cyverse_token=token) @@ -603,7 +654,7 @@ def test_run_succeeds_with_params_and_directory_input_and_directory_output( output={ 'to': remote_path, 'from': '', - 'include': {'pattern': ['output'], 'names': []} + 'include': {'patterns': ['output'], 'names': []} }, params=[ { diff --git a/plantit_cli/tests/unit/test_plan_validation.py b/plantit_cli/tests/unit/test_plan_validation.py index ce903da..ca27119 100644 --- a/plantit_cli/tests/unit/test_plan_validation.py +++ b/plantit_cli/tests/unit/test_plan_validation.py @@ -1,11 +1,7 @@ from os import environ from os.path import join -import pytest - from plantit_cli.plan import Plan -from plantit_cli.tests.integration.terrain_test_utils import create_collection, delete_collection -from plantit_cli.tests.test_utils import get_token from plantit_cli.utils import validate_plan message = "Message" diff --git a/plantit_cli/utils.py b/plantit_cli/utils.py index 8b16935..d133e8d 100644 --- a/plantit_cli/utils.py +++ b/plantit_cli/utils.py @@ -130,26 +130,34 @@ def validate_plan(plan: Plan): # token if plan.cyverse_token is None or plan.cyverse_token == '': errors.append(f"CyVerse token must be provided") + # kind if 'kind' not in plan.input: errors.append('Attribute \'input\' must include attribute \'kind\'') elif type(plan.input['kind']) is not str or not ( plan.input['kind'] == 'file' or plan.input['kind'] == 'files' or plan.input['kind'] == 'directory'): errors.append('Attribute \'input.kind\' must be either \'file\', \'files\', or \'directory\'') + # from if 'from' not in plan.input: errors.append('Attribute \'input\' must include attribute \'from\'') elif type(plan.input['from']) is not str or not cyverse_path_exists(plan.input['from'], plan.cyverse_token): errors.append(f"Attribute 'input.from' must be a valid path in the CyVerse Data Store") + # overwrite if 'overwrite' in plan.input and type(plan.input['overwrite']) is not bool: errors.append('Attribute \'input.overwrite\' must be a bool') - # pattern - if 'pattern' in plan.input and (type(plan.input['pattern']) is not str or plan.input['pattern'] == ''): - errors.append('Attribute \'input.pattern\' must be a non-empty str') + + # patterns + if 'patterns' in plan.input and not all(type(pattern) is str and pattern != '' for pattern in plan.input['patterns']): + errors.append('Attribute \'input.patterns\' must be a non-empty str') # output if plan.output is not None: + # token + if plan.cyverse_token is None or plan.cyverse_token == '': + errors.append(f"CyVerse token must be provided") + # from if 'from' not in plan.output: errors.append('Attribute \'output\' must include attribute \'from\'')