Skip to content

Commit

Permalink
cleaner input/output pattern logic, start implementing checksum verif…
Browse files Browse the repository at this point in the history
…ication (#18)
  • Loading branch information
wpbonelli committed Dec 18, 2020
1 parent 0ce3299 commit 42d6b50
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 34 deletions.
31 changes: 30 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Deploy workflows on laptops, servers, or HPC/HTC clusters.
- [Input file](#input-file)
- [Input files](#input-files)
- [Input directory](#input-directory)
- [Input patterns](#input-patterns)
- [Overwriting existing input files](#overwriting-existing-input-files)
- [Outputs](#outputs)
- [Bind mounts](#bind-mounts)
Expand Down Expand Up @@ -108,6 +109,34 @@ input:
from: /iplant/home/username/directory
```

#### Input patterns

To match and pull only certain input files `from` an input directory, use attribute `patterns`:

```yaml
input:
kind: directory
from: /iplant/home/username/directory
patterns:
- jpg
- png
```

<!--#### Verifying input file checksums

To verify checksums associated with input files, add a `checksums` attribute to the `input` section, with a list of `name`/`md5` pairs. For instance:

```yaml
input:
kind: directory
from: /iplant/home/username/directory
checksums:
- name: file1.txt
md5: 94fc3699a0f99317534736f0ec982dea
- name: file2.txt
md5: 8540f05638ac10899e8bc31c13d5074a
```-->

#### Overwriting existing input files

Note that by default, the CLI will check whether files already exist on the local filesystem, and will not re-download them if they do. To force a download and overwrite, add the flag `overwrite: True` to the `input` section, for instance:
Expand All @@ -116,7 +145,7 @@ Note that by default, the CLI will check whether files already exist on the loca
input:
kind: directory
from: /iplant/home/username/directory
overwrite: true
overwrite: True
```

### Outputs
Expand Down
4 changes: 3 additions & 1 deletion plantit_cli/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ def __init__(self,
params: list = None,
input: dict = None,
output: dict = None,
logging: dict = None):
logging: dict = None,
checksums: list = None):
self.identifier = identifier
self.workdir = workdir
self.image = image
Expand All @@ -26,3 +27,4 @@ def __init__(self,
self.input = input
self.output = output
self.logging = logging
self.checksums = checksums
14 changes: 6 additions & 8 deletions plantit_cli/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,25 @@ def __pull_input(self, plan: Plan) -> str:
input_kind = plan.input['kind'].lower()
os.makedirs(input_dir, exist_ok=True)
if input_kind == 'directory' or input_kind == 'files':
self.__store.download_directory(plan.input['from'], input_dir,
plan.input['pattern'] if 'pattern' in plan.input else None)
self.__store.download_directory(plan.input['from'], input_dir, plan.input['patterns'] if 'patterns' in plan.input else None)
elif input_kind == 'file':
self.__store.download_file(plan.input['from'], input_dir)
else:
raise ValueError(f"'input.kind' must be either 'file' or 'directory'")
input_files = os.listdir(input_dir)
if len(input_files) == 0:
raise PlantitException(f"No inputs found at path '{plan.input['from']}'" + (
f" matching pattern '{plan.input['pattern']}'" if 'pattern' in plan.input else ''))
raise PlantitException(f"No inputs found at path '{plan.input['from']}'" + (f" matching patterns '{plan.input['patterns']}'" if 'patterns' in plan.input else ''))
update_status(plan, 3, f"Pulled input(s): {', '.join(input_files)}")
return input_dir

def __push_output(self, plan: Plan):
self.__store.upload_directory(
join(plan.workdir, plan.output['from']) if 'from' in plan.output else plan.workdir,
plan.output['to'],
(plan.output['include']['patterns'] if type(plan.output['include']['patterns']) is list else None) if 'include' in plan.output else None,
(plan.output['include']['names'] if type(plan.output['include']['names']) is list else None) if 'include' in plan.output else None,
(plan.output['exclude']['patterns'] if type(plan.output['exclude']['patterns']) is list else None) if 'exclude' in plan.output else None,
(plan.output['exclude']['names'] if type(plan.output['exclude']['names']) is list else None) if 'exclude' in plan.output else None)
(plan.output['include']['patterns'] if type(plan.output['include']['patterns']) is list else None) if 'include' in plan.output and 'patterns' in plan.output['include'] else None,
(plan.output['include']['names'] if type(plan.output['include']['names']) is list else None) if 'include' in plan.output and 'names' in plan.output['include'] else None,
(plan.output['exclude']['patterns'] if type(plan.output['exclude']['patterns']) is list else None) if 'exclude' in plan.output and 'patterns' in plan.output['exclude'] else None,
(plan.output['exclude']['names'] if type(plan.output['exclude']['names']) is list else None) if 'exclude' in plan.output and 'names' in plan.output['exclude'] else None)

update_status(plan, 3, f"Pushed output(s)")

Expand Down
4 changes: 2 additions & 2 deletions plantit_cli/store/local_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ def download_file(self, from_path, to_path):
update_status(self.plan, 3, f"Copying {from_path_file} to {to_path_file}")
copyfileobj(from_file, to_file)

def download_directory(self, from_path, to_path, include_pattern):
def download_directory(self, from_path, to_path, patterns):
from_paths = [p for p in self.list_directory(from_path) if
include_pattern.lower() in p.lower()] if include_pattern is not None else self.list_directory(from_path)
patterns.lower() in p.lower()] if patterns is not None else self.list_directory(from_path)
for path in from_paths:
self.download_file(path, to_path)

Expand Down
4 changes: 2 additions & 2 deletions plantit_cli/store/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ def download_file(self, from_path, to_path):
pass

@abstractmethod
def download_directory(self, from_path, to_path, include_pattern):
def download_directory(self, from_path, to_path, patterns):
"""
Downloads files from the remote directory to the local directory.
Args:
from_path: The remote directory path.
to_path: The local directory path.
include_pattern: File pattern(s) to include.
patterns: File patterns to include.
"""
pass

Expand Down
44 changes: 36 additions & 8 deletions plantit_cli/store/terrain_store.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import multiprocessing
from contextlib import closing
from multiprocessing import Pool
from os import listdir
from pprint import pprint
from typing import List
from os.path import isdir, isfile, basename, join

Expand All @@ -22,11 +26,14 @@ def list_directory(self, path) -> List[str]:
raise PlantitException(f"Path {path} does not exist")
response.raise_for_status()
content = response.json()
for file in content['files']:
pprint(file)
return [file['path'] for file in content['files']]

def download_file(self, from_path, to_path):
to_path_full = f"{to_path}/{from_path.split('/')[-1]}"
if isfile(to_path_full) and ('overwrite' not in self.plan.input or ('overwrite' in self.plan.input and not self.plan.input['overwrite'])):
if isfile(to_path_full) and ('overwrite' not in self.plan.input or (
'overwrite' in self.plan.input and not self.plan.input['overwrite'])):
update_status(self.plan, 3, f"File {to_path_full} already exists, skipping download")
return
else:
Expand All @@ -40,15 +47,36 @@ def download_file(self, from_path, to_path):
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)

def __verify_inputs(self, from_path, paths):
with requests.post('https://de.cyverse.org/terrain/secured/filesystem/stat',
headers={'Authorization': f"Bearer {self.plan.cyverse_token}"},
data={'paths': paths}) as response:
response.raise_for_status()
pairs = [(path[join(from_path, path)]['label'], path[join(from_path, path)]['md5']) for path in
response.json()['paths']]
if len(pairs) != len(self.plan.checksums):
raise PlantitException(
f"{len(self.plan.checksums)} checksum pairs provided but {len(pairs)} files exist")
for actual in pairs:
expected = [pair for pair in self.plan.checksums if pair['name'] == actual[0]][0]
assert expected['name'] == actual[0]
assert expected['md5'] == actual[1]

def download_directory(self,
from_path,
to_path,
include_patterns=None):
from_paths = [p for p in self.list_directory(from_path) if
any(ip in p for ip in include_patterns)] if include_patterns is not None else self.list_directory(from_path)
patterns=None):
from_paths = [path for path in self.list_directory(from_path) if any(pattern in path for pattern in patterns)] if patterns is not None else self.list_directory(from_path)

if self.plan.checksums is not None and len(self.plan.checksums) > 0:
self.__verify_inputs(from_path, from_paths)

update_status(self.plan, 3, f"Downloading directory '{from_path}' with {len(from_paths)} file(s)")
for path in from_paths:
self.download_file(path, to_path)
with closing(Pool(processes=multiprocessing.cpu_count())) as pool:
pool.starmap(self.download_file, [(path, to_path) for path in from_paths])

if self.plan.checksums is not None and len(self.plan.checksums) > 0:
self.__verify_inputs(from_path, from_paths)

def upload_file(self, from_path, to_path):
update_status(self.plan, 3, f"Uploading '{from_path}' to '{to_path}'")
Expand All @@ -72,8 +100,8 @@ def upload_directory(self,
elif is_dir:
from_paths = list_files(from_path, include_patterns, include_names, exclude_patterns, exclude_names)
update_status(self.plan, 3, f"Uploading directory '{from_path}' with {len(from_paths)} files")
for path in [str(p) for p in from_paths]:
self.upload_file(path, to_path)
with closing(Pool(processes=multiprocessing.cpu_count())) as pool:
pool.starmap(self.upload_file, [(path, to_path) for path in [str(p) for p in from_paths]])
elif is_file:
self.upload_file(from_path, to_path)
else:
Expand Down
61 changes: 58 additions & 3 deletions plantit_cli/tests/integration/test_plan_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ def test_validate_plan_with_params_and_no_input_and_directory_output(remote_path
clear_dir(testdir)
delete_collection(remote_path, token)


def test_validate_plan_with_no_params_and_file_input_and_directory_output(remote_path, file_name_1):
local_path = join(testdir, file_name_1)
plan = Plan(
Expand Down Expand Up @@ -466,6 +467,57 @@ def test_validate_plan_with_params_and_files_input_and_directory_output(remote_p
delete_collection(remote_path, token)


def test_validate_plan_with_params_and_files_input_with_pattern_and_directory_output(remote_path,
file_name_1,
file_name_2):
local_input_file_path_1 = join(testdir, file_name_1)
local_input_file_path_2 = join(testdir, file_name_2)
plan = Plan(
identifier='test_validate_plan_with_params_and_files_input_with_pattern_and_directory_output',
workdir=testdir,
image="docker://alpine",
command='cat $INPUT | tee $INPUT.$TAG.output',
input={
'kind': 'files',
'from': remote_path,
'patterns': [
file_name_1
]
},
output={
'to': remote_path,
'from': 'input', # write output files to input dir
'include': {
'patterns': ['output'],
'names': []
}
},
params=[
{
'key': 'TAG',
'value': message
},
],
cyverse_token=token)

try:
# prep CyVerse collection
create_collection(remote_path, token)

# prep file
with open(local_input_file_path_1, "w") as file1, open(local_input_file_path_2, "w") as file2:
file1.write('Hello, 1!')
file2.write('Hello, 2!')
upload_file(local_input_file_path_1, remote_path, token)
upload_file(local_input_file_path_2, remote_path, token)

result = validate_plan(plan)
assert type(result) is bool and result
finally:
clear_dir(testdir)
delete_collection(remote_path, token)


def test_validate_plan_with_no_params_and_directory_input_and_directory_output(remote_path,
file_name_1,
file_name_2):
Expand Down Expand Up @@ -647,7 +699,8 @@ def test_validate_plan_with_params_and_directory_input_and_directory_output_when
delete_collection(remote_path, token)


def test_validate_plan_with_no_params_and_no_input_and_directory_output_with_include_patterns_and_exclude_names(remote_path):
def test_validate_plan_with_no_params_and_no_input_and_directory_output_with_include_patterns_and_exclude_names(
remote_path):
plan = Plan(
identifier='test_run_succeeds_with_no_params_and_no_input_and_directory_output_with_excludes',
workdir=testdir,
Expand Down Expand Up @@ -715,7 +768,8 @@ def test_validate_plan_with_params_and_no_input_and_directory_output_with_exclud
delete_collection(remote_path, token)


def test_validate_plan_with_no_params_and_no_input_and_directory_output_with_non_matching_case_pattern_and_excludes(remote_path):
def test_validate_plan_with_no_params_and_no_input_and_directory_output_with_non_matching_case_pattern_and_excludes(
remote_path):
plan = Plan(
identifier='test_run_succeeds_with_no_params_and_no_input_and_directory_output_with_non_matching_case_pattern_and_excludes',
workdir=testdir,
Expand Down Expand Up @@ -746,7 +800,8 @@ def test_validate_plan_with_no_params_and_no_input_and_directory_output_with_non
delete_collection(remote_path, token)


def test_validate_plan_with_params_and_no_input_and_directory_output_with_non_matching_case_pattern_and_excludes(remote_path):
def test_validate_plan_with_params_and_no_input_and_directory_output_with_non_matching_case_pattern_and_excludes(
remote_path):
plan = Plan(
identifier='test_run_succeeds_with_params_and_no_input_and_directory_output_with_non_matching_case_pattern_and_excludes',
workdir=testdir,
Expand Down
55 changes: 53 additions & 2 deletions plantit_cli/tests/integration/test_runner_with_terrain.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,57 @@ def test_run_succeeds_with_no_params_and_files_input_and_no_output(
delete_collection(remote_path, token)


def test_run_succeeds_with_no_params_and_files_input_and_patterns_and_no_output(
remote_base_path,
file_name_1,
file_name_2):
local_path_1 = join(testdir, file_name_1)
local_path_2 = join(testdir, file_name_2)
remote_path = join(remote_base_path, "testCollection")
plan = Plan(
identifier='test_run_succeeds_with_no_params_and_files_input_and_no_output',
workdir=testdir,
image="docker://alpine:latest",
command='cat $INPUT | tee $INPUT.output',
input={
'kind': 'files',
'from': join(remote_base_path, "testCollection"),
'patterns': [
file_name_1
]
},
cyverse_token=token)

try:
# prep CyVerse collection
create_collection(remote_path, token)

# prep files
with open(local_path_1, "w") as file1, open(local_path_2, "w") as file2:
file1.write('Hello, 1!')
file2.write('Hello, 2!')
upload_file(local_path_1, remote_path, token)
upload_file(local_path_2, remote_path, token)

# expect 2 containers
Runner(TerrainStore(plan)).run(plan)

# check files were pulled
downloaded_path_1 = join(testdir, 'input', file_name_1)
downloaded_path_2 = join(testdir, 'input', file_name_2)
assert not isfile(downloaded_path_2)
check_hello(downloaded_path_1, 1)
remove(downloaded_path_1)

# check local output files were written
output_1 = f"{downloaded_path_1}.output"
check_hello(output_1, 1)
remove(output_1)
finally:
clear_dir(testdir)
delete_collection(remote_path, token)


def test_run_succeeds_with_params_and_files_input_and_no_output(
remote_base_path,
file_name_1,
Expand Down Expand Up @@ -443,7 +494,7 @@ def test_run_succeeds_with_no_params_and_file_input_and_directory_output(
output={
'to': join(remote_base_path, "testCollection"),
'from': 'input', # write output files to input dir
'include': {'pattern': ['output'], 'names': []}
'include': {'patterns': ['output'], 'names': []}
},
cyverse_token=token)

Expand Down Expand Up @@ -603,7 +654,7 @@ def test_run_succeeds_with_params_and_directory_input_and_directory_output(
output={
'to': remote_path,
'from': '',
'include': {'pattern': ['output'], 'names': []}
'include': {'patterns': ['output'], 'names': []}
},
params=[
{
Expand Down
4 changes: 0 additions & 4 deletions plantit_cli/tests/unit/test_plan_validation.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
from os import environ
from os.path import join

import pytest

from plantit_cli.plan import Plan
from plantit_cli.tests.integration.terrain_test_utils import create_collection, delete_collection
from plantit_cli.tests.test_utils import get_token
from plantit_cli.utils import validate_plan

message = "Message"
Expand Down
Loading

0 comments on commit 42d6b50

Please sign in to comment.