-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upgrade validation methods - make pynwb.validate behave similarly for io and paths #1911
base: dev
Are you sure you want to change the base?
Changes from all commits
dbadbd5
fe81ad3
18826f8
923c1d3
41d9b9d
8c921a3
95556a8
87b1f1e
6a748a5
250513e
76a4f0f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,8 +29,10 @@ | |
return validator.validate(builder) | ||
|
||
|
||
def _get_cached_namespaces_to_validate( | ||
path: str, driver: Optional[str] = None, aws_region: Optional[str] = None, | ||
def _get_cached_namespaces_to_validate(path: Optional[str] = None, | ||
driver: Optional[str] = None, | ||
aws_region: Optional[str] = None, | ||
io: Optional[HDMFIO] = None | ||
) -> Tuple[List[str], BuildManager, Dict[str, str]]: | ||
""" | ||
Determine the most specific namespace(s) that are cached in the given NWBFile that can be used for validation. | ||
|
@@ -44,9 +46,10 @@ | |
>>> path = "my_nwb_file.nwb" | ||
>>> validate_namespaces, manager, cached_namespaces = _get_cached_namespaces_to_validate(path) | ||
>>> with NWBHDF5IO(path, "r", manager=manager) as reader: | ||
>>> errors = [] | ||
>>> validation_errors = [] | ||
>>> for ns in validate_namespaces: | ||
>>> errors += validate(io=reader, namespace=ns) | ||
errors, status = validate(io=reader, namespace=ns) | ||
>>> validation_errors += errors | ||
:param path: Path for the NWB file | ||
:return: Tuple with: | ||
- List of strings with the most specific namespace(s) to use for validation. | ||
|
@@ -58,12 +61,15 @@ | |
catalog = NamespaceCatalog( | ||
group_spec_cls=NWBGroupSpec, dataset_spec_cls=NWBDatasetSpec, spec_namespace_cls=NWBNamespace | ||
) | ||
namespace_dependencies = NWBHDF5IO.load_namespaces( | ||
namespace_catalog=catalog, | ||
path=path, | ||
driver=driver, | ||
aws_region=aws_region | ||
) | ||
|
||
if io is not None: | ||
namespace_dependencies = io.load_namespaces(namespace_catalog=catalog, | ||
file=io._HDF5IO__file) | ||
else: | ||
namespace_dependencies = NWBHDF5IO.load_namespaces(namespace_catalog=catalog, | ||
path=path, | ||
driver=driver, | ||
aws_region=aws_region) | ||
|
||
# Determine which namespaces are the most specific (i.e. extensions) and validate against those | ||
candidate_namespaces = set(namespace_dependencies.keys()) | ||
|
@@ -82,6 +88,71 @@ | |
|
||
return cached_namespaces, manager, namespace_dependencies | ||
|
||
def _check_namespaces_to_validate(io: Optional[HDMFIO] = None, | ||
path: Optional[str] = None, | ||
use_cached_namespaces: Optional[bool] = False, | ||
namespace: Optional[str] = None, | ||
verbose: Optional[bool] = False, | ||
driver: Optional[str] = None) -> Tuple[int, List[str], Dict[str, str], str]: | ||
status = 0 | ||
namespaces_to_validate = [] | ||
namespace_message = "PyNWB namespace information" | ||
io_kwargs = dict(path=path, mode="r", driver=driver) | ||
|
||
if use_cached_namespaces: | ||
cached_namespaces, manager, namespace_dependencies = _get_cached_namespaces_to_validate(path=path, | ||
driver=driver, | ||
io=io) | ||
io_kwargs.update(manager=manager) | ||
|
||
if any(cached_namespaces): | ||
namespaces_to_validate = cached_namespaces | ||
namespace_message = "cached namespace information" | ||
else: | ||
namespaces_to_validate = [CORE_NAMESPACE] | ||
if verbose: | ||
print( | ||
f"The file {f'{path} ' if path is not None else ''}has no cached namespace information. Falling back to {namespace_message}.", | ||
file=sys.stderr, | ||
) | ||
else: | ||
io_kwargs.update(load_namespaces=False) | ||
namespaces_to_validate = [CORE_NAMESPACE] | ||
|
||
if namespace is not None: | ||
if namespace in namespaces_to_validate: | ||
namespaces_to_validate = [namespace] | ||
elif use_cached_namespaces and namespace in namespace_dependencies: # validating against a dependency | ||
for namespace_dependency in namespace_dependencies: | ||
if namespace in namespace_dependencies[namespace_dependency]: | ||
status = 1 | ||
print( | ||
f"The namespace '{namespace}' is included by the namespace " | ||
f"'{namespace_dependency}'. Please validate against that namespace instead.", | ||
file=sys.stderr, | ||
) | ||
else: | ||
status = 1 | ||
print( | ||
f"The namespace '{namespace}' could not be found in {namespace_message} as only " | ||
f"{namespaces_to_validate} is present.", | ||
file=sys.stderr, | ||
) | ||
|
||
return status, namespaces_to_validate, io_kwargs, namespace_message | ||
|
||
def _validate_against_namespaces(io: Optional[HDMFIO] = None, | ||
path: Optional[str] = None, | ||
namespaces_to_validate: Optional[List[str]] = None, | ||
namespace_message: Optional[str] = None, | ||
verbose: Optional[bool] = False): | ||
validation_errors = [] | ||
for validation_namespace in namespaces_to_validate: | ||
if verbose: | ||
print(f"Validating {f'{path} ' if path is not None else ''}against " | ||
f"{namespace_message} using namespace '{validation_namespace}'.") | ||
validation_errors += _validate_helper(io=io, namespace=validation_namespace) | ||
return validation_errors | ||
|
||
@docval( | ||
{ | ||
|
@@ -126,9 +197,6 @@ | |
) | ||
def validate(**kwargs): | ||
"""Validate NWB file(s) against a namespace or its cached namespaces. | ||
|
||
NOTE: If an io object is provided and no namespace name is specified, then the file will be validated | ||
against the core namespace, even if use_cached_namespaces is True. | ||
""" | ||
from . import NWBHDF5IO # TODO: modularize to avoid circular import | ||
|
||
|
@@ -137,65 +205,34 @@ | |
) | ||
assert io != paths, "Both 'io' and 'paths' were specified! Please choose only one." | ||
|
||
if io is not None: | ||
validation_errors = _validate_helper(io=io, namespace=namespace or CORE_NAMESPACE) | ||
return validation_errors | ||
|
||
status = 0 | ||
validation_errors = list() | ||
for path in paths: | ||
namespaces_to_validate = [] | ||
namespace_message = "PyNWB namespace information" | ||
io_kwargs = dict(path=path, mode="r", driver=driver) | ||
|
||
if use_cached_namespaces: | ||
cached_namespaces, manager, namespace_dependencies = _get_cached_namespaces_to_validate( | ||
path=path, driver=driver | ||
if io is not None: | ||
status, namespaces_to_validate, io_kwargs, namespace_message = _check_namespaces_to_validate( | ||
io=io, use_cached_namespaces=use_cached_namespaces, namespace=namespace, verbose=verbose, | ||
) | ||
if status == 0: | ||
validation_errors += _validate_against_namespaces(io=io, | ||
namespaces_to_validate=namespaces_to_validate, | ||
namespace_message=namespace_message, | ||
verbose=verbose) | ||
else: | ||
for path in paths: | ||
path_status, namespaces_to_validate, io_kwargs, namespace_message = _check_namespaces_to_validate( | ||
path=path, driver=driver, use_cached_namespaces=use_cached_namespaces, namespace=namespace, | ||
verbose=verbose, | ||
) | ||
io_kwargs.update(manager=manager) | ||
|
||
if any(cached_namespaces): | ||
namespaces_to_validate = cached_namespaces | ||
namespace_message = "cached namespace information" | ||
else: | ||
namespaces_to_validate = [CORE_NAMESPACE] | ||
if verbose: | ||
print( | ||
f"The file {path} has no cached namespace information. Falling back to {namespace_message}.", | ||
file=sys.stderr, | ||
) | ||
else: | ||
io_kwargs.update(load_namespaces=False) | ||
namespaces_to_validate = [CORE_NAMESPACE] | ||
|
||
if namespace is not None: | ||
if namespace in namespaces_to_validate: | ||
namespaces_to_validate = [namespace] | ||
elif use_cached_namespaces and namespace in namespace_dependencies: # validating against a dependency | ||
for namespace_dependency in namespace_dependencies: | ||
if namespace in namespace_dependencies[namespace_dependency]: | ||
status = 1 | ||
print( | ||
f"The namespace '{namespace}' is included by the namespace " | ||
f"'{namespace_dependency}'. Please validate against that namespace instead.", | ||
file=sys.stderr, | ||
) | ||
else: | ||
status = 1 | ||
print( | ||
f"The namespace '{namespace}' could not be found in {namespace_message} as only " | ||
f"{namespaces_to_validate} is present.", | ||
file=sys.stderr, | ||
) | ||
status = status or path_status | ||
if path_status == 1: | ||
continue | ||
|
||
if status == 1: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. from what I could tell, this status check was never reset to 0, so if any of the paths return a nonzero status, |
||
continue | ||
with NWBHDF5IO(**io_kwargs) as io: | ||
validation_errors += _validate_against_namespaces(io=io, | ||
path=path, | ||
namespaces_to_validate=namespaces_to_validate, | ||
namespace_message=namespace_message, | ||
verbose=verbose) | ||
|
||
with NWBHDF5IO(**io_kwargs) as io: | ||
for validation_namespace in namespaces_to_validate: | ||
if verbose: | ||
print(f"Validating {path} against {namespace_message} using namespace '{validation_namespace}'.") | ||
validation_errors += _validate_helper(io=io, namespace=validation_namespace) | ||
return validation_errors, status | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -194,42 +194,42 @@ def get_io(self, path): | |
def test_validate_io_no_cache(self): | ||
"""Test that validating a file with no cached spec against the core namespace succeeds.""" | ||
with self.get_io('tests/back_compat/1.0.2_nwbfile.nwb') as io: | ||
errors = validate(io) | ||
errors, _ = validate(io) | ||
self.assertEqual(errors, []) | ||
|
||
def test_validate_io_no_cache_bad_ns(self): | ||
"""Test that validating a file with no cached spec against a specified, unknown namespace fails.""" | ||
with self.get_io('tests/back_compat/1.0.2_nwbfile.nwb') as io: | ||
with self.assertRaisesWith(KeyError, "\"'notfound' not a namespace\""): | ||
validate(io, 'notfound') | ||
with patch("sys.stderr", new=StringIO()) as fake_err: | ||
with patch("sys.stdout", new=StringIO()) as fake_out: | ||
with self.get_io('tests/back_compat/1.0.2_nwbfile.nwb') as io: | ||
results, status = validate(io=io, namespace='notfound') | ||
self.assertEqual(results, []) | ||
self.assertEqual(status, 1) | ||
stderr_regex = ( | ||
r"The namespace 'notfound' could not be found in PyNWB namespace information as only " | ||
r"\['core'\] is present.\n" | ||
) | ||
self.assertRegex(fake_err.getvalue(), stderr_regex) | ||
self.assertEqual(fake_out.getvalue(), "") | ||
|
||
def test_validate_io_cached(self): | ||
"""Test that validating a file with cached spec against its cached namespace succeeds.""" | ||
with self.get_io('tests/back_compat/1.1.2_nwbfile.nwb') as io: | ||
errors = validate(io) | ||
errors, _ = validate(io) | ||
self.assertEqual(errors, []) | ||
|
||
def test_validate_io_cached_extension(self): | ||
"""Test that validating a file with cached spec against its cached namespaces succeeds.""" | ||
with self.get_io('tests/back_compat/2.1.0_nwbfile_with_extension.nwb') as io: | ||
errors = validate(io) | ||
errors, _ = validate(io) | ||
self.assertEqual(errors, []) | ||
|
||
def test_validate_io_cached_extension_pass_ns(self): | ||
"""Test that validating a file with cached extension spec against the extension namespace succeeds.""" | ||
with self.get_io('tests/back_compat/2.1.0_nwbfile_with_extension.nwb') as io: | ||
errors = validate(io, 'ndx-testextension') | ||
errors, _ = validate(io, 'ndx-testextension') | ||
self.assertEqual(errors, []) | ||
|
||
def test_validate_io_cached_core_with_io(self): | ||
""" | ||
For back-compatability, test that validating a file with cached extension spec against the core | ||
namespace succeeds when using the `io` + `namespace` keywords. | ||
""" | ||
with self.get_io(path='tests/back_compat/2.1.0_nwbfile_with_extension.nwb') as io: | ||
results = validate(io=io, namespace="core") | ||
self.assertEqual(results, []) | ||
|
||
def test_validate_file_cached_extension(self): | ||
""" | ||
Test that validating a file with cached extension spec against the core | ||
|
@@ -280,16 +280,55 @@ def test_validate_file_cached_no_cache_bad_ns(self): | |
self.assertRegex(fake_err.getvalue(), stderr_regex) | ||
self.assertEqual(fake_out.getvalue(), "") | ||
|
||
def test_validate_io_cached_bad_ns(self): | ||
"""Test that validating a file with cached spec against a specified, unknown namespace fails.""" | ||
with self.get_io('tests/back_compat/1.1.2_nwbfile.nwb') as io: | ||
with self.assertRaisesWith(KeyError, "\"'notfound' not a namespace\""): | ||
validate(io, 'notfound') | ||
|
||
def test_validate_io_cached_hdmf_common(self): | ||
"""Test that validating a file with cached spec against the hdmf-common namespace fails.""" | ||
with self.get_io('tests/back_compat/1.1.2_nwbfile.nwb') as io: | ||
# TODO this error should not be different from the error when using the validate script above | ||
msg = "builder must have data type defined with attribute 'data_type'" | ||
with self.assertRaisesWith(ValueError, msg): | ||
validate(io, 'hdmf-common') | ||
with patch("sys.stderr", new=StringIO()) as fake_err: | ||
with patch("sys.stdout", new=StringIO()) as fake_out: | ||
with self.get_io(path='tests/back_compat/1.1.2_nwbfile.nwb') as io: | ||
results, status = validate(io=io, namespace="hdmf-common", verbose=True) | ||
self.assertEqual(results, []) | ||
self.assertEqual(status, 1) | ||
self.assertEqual( | ||
fake_err.getvalue(), | ||
( | ||
"The namespace 'hdmf-common' is included by the namespace 'core'. " | ||
"Please validate against that namespace instead.\n" | ||
) | ||
) | ||
self.assertEqual(fake_out.getvalue(), "") | ||
|
||
def test_validate_io_and_path_same(self): | ||
"""Test that validating a file with an io object and a path return the same results.""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if all of these are necessary to include, but wanted to double check they were all equivalent |
||
tests = [('tests/back_compat/1.0.2_nwbfile.nwb', None), | ||
('tests/back_compat/1.0.2_nwbfile.nwb', 'notfound'), | ||
('tests/back_compat/1.1.2_nwbfile.nwb', None), | ||
('tests/back_compat/1.1.2_nwbfile.nwb', 'core'), | ||
('tests/back_compat/1.1.2_nwbfile.nwb', 'notfound'), | ||
('tests/back_compat/1.1.2_nwbfile.nwb', 'hdmf-common'), | ||
('tests/back_compat/2.1.0_nwbfile_with_extension.nwb', None), | ||
('tests/back_compat/2.1.0_nwbfile_with_extension.nwb', 'core'), | ||
('tests/back_compat/2.1.0_nwbfile_with_extension.nwb', 'ndx-testextension'),] | ||
|
||
for path, ns in tests: | ||
with patch("sys.stderr", new=StringIO()) as fake_err: | ||
with patch("sys.stdout", new=StringIO()) as fake_out: | ||
with self.get_io(path=path) as io: | ||
results_io, status_io = validate(io=io, namespace=ns, verbose=True) | ||
fake_err_io = fake_err.getvalue() | ||
fake_out_io = fake_out.getvalue() | ||
|
||
with patch("sys.stderr", new=StringIO()) as fake_err: | ||
with patch("sys.stdout", new=StringIO()) as fake_out: | ||
results_path, status_path = validate(paths=[path], namespace=ns, verbose=True) | ||
fake_err_path = fake_err.getvalue() | ||
fake_out_path = fake_out.getvalue() | ||
|
||
# remove path from error messages since it will not be included in io outputs | ||
fake_err_path = fake_err_path.replace(f'{path} ', '') | ||
fake_out_path = fake_out_path.replace(f'{path} ', '') | ||
|
||
self.assertEqual(results_io, results_path) | ||
self.assertEqual(status_io, status_path) | ||
self.assertEqual(fake_err_io, fake_err_path) | ||
self.assertEqual(fake_out_io, fake_out_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't sure if there is a better way to load the namespaces from the io object, I realize this is HDF5 specific, but it looks like the original namespace loading was as well? I was wondering how this works for Zarr validation?