diff --git a/pyproject.toml b/pyproject.toml index 6764b15..e4f0e04 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,9 @@ classifiers = [ requires-python = ">=3.8" dependencies = [ "bidsschematools >=0.11", + "typing_extensions", + "attrs", + "httpx", ] [project.optional-dependencies] diff --git a/src/bids_validator/bidsignore.py b/src/bids_validator/bidsignore.py new file mode 100644 index 0000000..9078902 --- /dev/null +++ b/src/bids_validator/bidsignore.py @@ -0,0 +1,129 @@ +"""Utilities for working with .bidsignore files.""" + +import os +import re +from functools import lru_cache +from typing import List, Protocol, Union + +import attrs + +from .types.files import FileTree + + +@lru_cache +def compile_pat(pattern: str) -> Union[re.Pattern, None]: + """Compile .gitignore-style ignore lines to regular expressions.""" + orig = pattern + # A line starting with # serves as a comment. + if pattern.startswith('#'): + return None + + # An optional prefix "!" which negates the pattern; + invert = pattern.startswith('!') + + # Put a backslash ("\") in front of the first hash for patterns that begin with a hash. + # Put a backslash ("\") in front of the first "!" for patterns that begin with a literal "!" + if pattern.startswith((r'\#', r'\!')): + pattern = pattern[1:] # Unescape + + # Trailing spaces are ignored unless they are quoted with backslash ("\"). + pattern = re.sub(r'(? bool: ... # noqa: D102 + + +@attrs.define +class Ignore: + """Collection of .gitignore-style patterns. + + Tracks successfully matched files for reporting. + """ + + patterns: List[str] = attrs.field(factory=list) + history: List[str] = attrs.field(factory=list, init=False) + + @classmethod + def from_file(cls, pathlike: os.PathLike): + """Load Ignore contents from file.""" + with open(pathlike) as fobj: + return cls([line.rstrip('\n') for line in fobj]) + + def match(self, relpath: str) -> bool: + """Match a relative path against a collection of ignore patterns.""" + if any(compile_pat(pattern).match(relpath) for pattern in self.patterns): + self.history.append(relpath) + return True + return False + + +@attrs.define +class IgnoreMany: + """Match against several ignore filters.""" + + ignores: List[Ignore] = attrs.field() + + def match(self, relpath: str) -> bool: + """Return true if any filters match the given file. + + Will short-circuit, so ordering is significant for side-effects, + such as recording files ignored by a particular filter. + """ + return any(ignore.match(relpath) for ignore in self.ignores) + + +def filter_file_tree(filetree: FileTree) -> FileTree: + """Read .bidsignore and filter file tree.""" + bidsignore = filetree.children.get('.bidsignore') + if not bidsignore: + return filetree + ignore = IgnoreMany([Ignore.from_file(bidsignore), Ignore(['/.bidsignore'])]) + return _filter(filetree, ignore) + + +def _filter(filetree: FileTree, ignore: HasMatch) -> FileTree: + items = filetree.children.items() + children = { + name: _filter(child, ignore) + for name, child in items + if not ignore.match(child.relative_path) + } + + # XXX This check may not be worth the time. Profile this. + if any(children.get(name) is not child for name, child in items): + filetree = attrs.evolve(filetree, children=children) + + return filetree diff --git a/src/bids_validator/context.py b/src/bids_validator/context.py new file mode 100644 index 0000000..0db70d3 --- /dev/null +++ b/src/bids_validator/context.py @@ -0,0 +1,38 @@ +"""Validation context for schema-based BIDS validation.""" + +from .context_generator import get_schema, load_schema_into_namespace + +schema = get_schema() +load_schema_into_namespace(schema['meta']['context'], globals(), 'Context') + + +__all__ = [ # noqa: F822 + 'Context', + 'Schema', + 'Dataset', + 'DatasetDescription', + 'Tree', + 'Subjects', + 'Subject', + 'Sessions', + 'Entities', + 'Sidecar', + 'Associations', + 'Events', + 'Aslcontext', + 'M0scan', + 'Magnitude', + 'Magnitude1', + 'Bval', + 'Bvec', + 'Channels', + 'Coordsystem', + 'Columns', + 'Json', + 'Gzip', + 'NiftiHeader', + 'DimInfo', + 'XyztUnits', + 'Ome', + 'Tiff', +] diff --git a/src/bids_validator/context_generator.py b/src/bids_validator/context_generator.py new file mode 100644 index 0000000..cbf6d61 --- /dev/null +++ b/src/bids_validator/context_generator.py @@ -0,0 +1,214 @@ +"""Utilities for generating validation context classes from a BIDS schema. + +For default contexts based on the installed BIDS schema, use the `context` module. +These functions allow generating classes from alternative schemas. + +Basic usage: + +.. python:: + + from bids_validator.context_generator import get_schema, load_schema_into_namespace + + schema = get_schema('https://bids-specification.readthedocs.io/en/stable/schema.json') + load_schema_into_namespace(schema['meta']['context']['context'], globals(), 'Context') +""" + +import json +from typing import Any, Dict, List, Union + +import attrs +import bidsschematools as bst +import bidsschematools.schema +import httpx + +LATEST_SCHEMA_URL = 'https://bids-specification.readthedocs.io/en/latest/schema.json' +STABLE_SCHEMA_URL = 'https://bids-specification.readthedocs.io/en/stable/schema.json' + + +def get_schema(url: Union[str, None] = None) -> Dict[str, Any]: + """Load a BIDS schema from a URL or return the bundled schema if no URL is provided. + + Parameters + ---------- + url : str | None + The URL to load the schema from. If None, the bundled schema is returned. + The strings 'latest' and 'stable' are also accepted as shortcuts. + + Returns + ------- + Dict[str, Any] + The loaded schema as a dictionary. + + """ + if url is None: + return bst.schema.load_schema() + + if url == 'latest': + url = LATEST_SCHEMA_URL + elif url == 'stable': + url = STABLE_SCHEMA_URL + + with httpx.Client() as client: + return client.get(url).json() + + +def snake_to_pascal(val: str): + """Convert snake_case string to PascalCase.""" + return ''.join(sub.capitalize() for sub in val.split('_')) + + +def typespec_to_type(name: str, typespec: Dict[str, Any]): + """Convert JSON-schema style specification to type and metadata dictionary.""" + tp = typespec.get('type') + if not tp: + raise ValueError(f'Invalid typespec: {json.dumps(typespec)}') + metadata = {key: typespec[key] for key in ('name', 'description') if key in typespec} + if tp == 'object': + properties = typespec.get('properties') + if properties: + type_ = create_attrs_class(name, properties=properties, metadata=metadata) + else: + type_ = Dict[str, Any] + elif tp == 'array': + if 'items' in typespec: + subtype, md = typespec_to_type(name, typespec['items']) + else: + subtype = Any + type_ = List[subtype] + else: + type_ = { + 'number': float, + 'float': float, # Fix in schema + 'string': str, + 'integer': int, + 'int': int, # Fix in schema + }[tp] + return type_, metadata + + +def _type_name(tp: type) -> str: + try: + return tp.__name__ + except AttributeError: + return str(tp) + + +def create_attrs_class( + class_name: str, + properties: Dict[str, Any], + metadata: Dict[str, Any], +) -> type: + """Dynamically create an attrs class with the given properties. + + Parameters + ---------- + class_name + The name of the class to create. + properties + A dictionary of property names and their corresponding schema information. + If a nested object is encountered, a nested class is created. + metadata + A short description of the class, included in the docstring. + + Returns + ------- + cls : type + The dynamically created attrs class. + + """ + attributes = {} + for prop_name, prop_info in properties.items(): + type_, md = typespec_to_type(prop_name, prop_info) + attributes[prop_name] = attrs.field( + type=type_, repr=prop_name != 'schema', default=None, metadata=md + ) + + return attrs.make_class( + snake_to_pascal(class_name), + attributes, + class_body={ + '__doc__': f"""\ +{metadata.get('description', '')} + +attrs data class auto-generated from BIDS schema + +Attributes +---------- +""" + + '\n'.join( + [ + f'{k}: {_type_name(v.type)}\n\t{v.metadata["description"]}' + for k, v in attributes.items() + ] + ), + }, + ) + + +def generate_attrs_classes_from_schema( + schema: Dict[str, Any], + root_class_name: str, +) -> type: + """Generate attrs classes from a JSON schema. + + Parameters + ---------- + schema : Dict[str, Any] + The JSON schema to generate classes from. Must contain a 'properties' field. + root_class_name : str + The name of the root class to create. + + Returns + ------- + cls : type + The root class created from the schema. + + """ + if 'properties' not in schema: + raise ValueError("Invalid schema: 'properties' field is required") + + type_, _ = typespec_to_type(root_class_name, schema) + return type_ + + +def populate_namespace(attrs_class: type, namespace: Dict[str, Any]) -> None: + """Populate a namespace with nested attrs classes. + + Parameters + ---------- + attrs_class : type + The root attrs class to add to the namespace. + namespace : Dict[str, Any] + The namespace to populate with nested classes. + + """ + for attr in attrs_class.__attrs_attrs__: + attr_type = attr.type + + if isinstance(attr_type, type) and hasattr(attr_type, '__attrs_attrs__'): + namespace[attr_type.__name__] = attr_type + populate_namespace(attr_type, namespace) + + +def load_schema_into_namespace( + schema: Dict[str, Any], + namespace: Dict[str, Any], + root_class_name: str, +) -> None: + """Load a JSON schema into a namespace as attrs classes. + + Intended to be used with globals() or locals() to create classes in the current module. + + Parameters + ---------- + schema : Dict[str, Any] + The JSON schema to load into the namespace. + namespace : Dict[str, Any] + The namespace to load the schema into. + root_class_name : str + The name of the root class to create. + + """ + attrs_class = generate_attrs_classes_from_schema(schema, root_class_name) + namespace[root_class_name] = attrs_class + populate_namespace(attrs_class, namespace) diff --git a/src/bids_validator/types/__init__.py b/src/bids_validator/types/__init__.py new file mode 100644 index 0000000..d5e90e9 --- /dev/null +++ b/src/bids_validator/types/__init__.py @@ -0,0 +1 @@ +"""Modules for providing types.""" diff --git a/src/bids_validator/types/files.py b/src/bids_validator/types/files.py new file mode 100644 index 0000000..ff13e04 --- /dev/null +++ b/src/bids_validator/types/files.py @@ -0,0 +1,127 @@ +"""Types for working with file trees.""" + +import os +import posixpath +import stat +from functools import cached_property +from pathlib import Path +from typing import Dict, Union + +import attrs +from typing_extensions import Self # PY310 + +__all__ = ('FileTree',) + + +@attrs.define +class UserDirEntry: + """Partial reimplementation of :class:`os.DirEntry`. + + :class:`os.DirEntry` can't be instantiated from Python, but this can. + """ + + path: str = attrs.field(repr=False, converter=os.fspath) + name: str = attrs.field(init=False) + _stat: os.stat_result = attrs.field(init=False, repr=False, default=None) + _lstat: os.stat_result = attrs.field(init=False, repr=False, default=None) + + def __attrs_post_init__(self) -> None: + self.name = os.path.basename(self.path) + + def __fspath__(self) -> str: + return self.path + + def stat(self, *, follow_symlinks: bool = True) -> os.stat_result: + """Return stat_result object for the entry; cached per entry.""" + if follow_symlinks: + if self._stat is None: + self._stat = os.stat(self.path, follow_symlinks=True) + return self._stat + else: + if self._lstat is None: + self._lstat = os.stat(self.path, follow_symlinks=False) + return self._lstat + + def is_dir(self, *, follow_symlinks: bool = True) -> bool: + """Return True if the entry is a directory; cached per entry.""" + _stat = self.stat(follow_symlinks=follow_symlinks) + return stat.S_ISDIR(_stat.st_mode) + + def is_file(self, *, follow_symlinks: bool = True) -> bool: + """Return True if the entry is a file; cached per entry.""" + _stat = self.stat(follow_symlinks=follow_symlinks) + return stat.S_ISREG(_stat.st_mode) + + def is_symlink(self) -> bool: + """Return True if the entry is a symlink; cached per entry.""" + _stat = self.stat(follow_symlinks=False) + return stat.S_ISLNK(_stat.st_mode) + + +def as_direntry(obj: os.PathLike) -> Union[os.DirEntry, UserDirEntry]: + """Convert PathLike into DirEntry-like object.""" + if isinstance(obj, os.DirEntry): + return obj + return UserDirEntry(obj) + + +@attrs.define +class FileTree: + """Represent a FileTree with cached metadata.""" + + direntry: Union[os.DirEntry, UserDirEntry] = attrs.field(repr=False, converter=as_direntry) + parent: Union['FileTree', None] = attrs.field(repr=False, default=None) + is_dir: bool = attrs.field(default=False) + children: Dict[str, 'FileTree'] = attrs.field(repr=False, factory=dict) + name: str = attrs.field(init=False) + + def __attrs_post_init__(self): + self.name = self.direntry.name + self.children = { + name: attrs.evolve(child, parent=self) for name, child in self.children.items() + } + + @classmethod + def read_from_filesystem( + cls, + direntry: os.PathLike, + parent: Union['FileTree', None] = None, + ) -> Self: + """Read a FileTree from the filesystem. + + Uses :func:`os.scandir` to walk the directory tree. + """ + self = cls(direntry, parent=parent) + if self.direntry.is_dir(): + self.is_dir = True + self.children = { + entry.name: FileTree.read_from_filesystem(entry, parent=self) + for entry in os.scandir(self.direntry) + } + return self + + def __contains__(self, relpath: os.PathLike) -> bool: + parts = Path(relpath).parts + if len(parts) == 0: + return False + child = self.children.get(parts[0], False) + return child and (len(parts) == 1 or posixpath.join(*parts[1:]) in child) + + def __fspath__(self): + return self.direntry.path + + @cached_property + def relative_path(self) -> str: + """The path of the current FileTree, relative to the root. + + Follows parents up to the root and joins with POSIX separators (/). + + Directories include trailing slashes for simpler matching. + """ + if self.parent is None: + return '' + + return posixpath.join( + self.parent.relative_path, + f'{self.name}/' if self.is_dir else self.name, + ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..78fa1d5 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +"""bids_validator tests.""" diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..0cecb70 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,18 @@ +"""Pytest configuration.""" + +import importlib.resources +import os +from pathlib import Path + +import pytest + + +@pytest.fixture(scope='session') +def examples() -> Path: + """Get bids-examples from submodule, allow environment variable override.""" + ret = os.getenv('BIDS_EXAMPLES') + if not ret: + ret = importlib.resources.files(__package__) / 'data' / 'bids-examples' + if not ret.exists(): + pytest.skip('Missing examples') + return Path(ret) diff --git a/tests/test_bidsignore.py b/tests/test_bidsignore.py new file mode 100644 index 0000000..0ef68ba --- /dev/null +++ b/tests/test_bidsignore.py @@ -0,0 +1,65 @@ +"""Test bids_validator.bidsignore.""" + +import pytest +from bids_validator.bidsignore import Ignore, compile_pat +from bids_validator.types.files import FileTree + + +@pytest.mark.parametrize( + ('pattern', 'hits', 'misses'), + [ + ('/', ['/'], ['dir/', 'file']), + # Match file or directory named foo + ('foo', ['foo', 'foo/', 'bar/foo', 'bar/foo/'], ['bar', 'foobar', 'barfoo', 'barfoo/']), + # Directories named foo only + ('foo/', ['foo/', 'bar/foo/'], ['foo', 'bar/foo', 'bar', 'foobar', 'barfoo', 'barfoo/']), + # Files or directories at the root + ('/foo', ['foo', 'foo/'], ['bar/foo', 'bar/foo/', 'bar', 'foobar', 'barfoo', 'barfoo/']), + # doc/frotz/ examples from GITIGNORE(5) + ('doc/frotz/', ['doc/frotz/'], ['a/doc/frotz/']), + ('frotz/', ['frotz/', 'doc/frotz/', 'a/doc/frotz/'], []), + # * matches everything because everything has a basename + ('*', ['foo', 'foo/', 'foo/bar', 'foo/bar/'], []), + # *o matches things with basename ending in o + ('*o', ['foo', 'foo/', 'bar/foo', 'bar/foo/'], ['foo/bar', 'foo/bar/']), + # Leading **/ matches in all directories + ('**/foo', ['foo', 'foo/', 'bar/foo', 'bar/foo/'], ['foo/bar', 'foo/bar/', 'baz/foobar']), + ('**/foo/bar', ['foo/bar', 'foo/bar/', 'a/foo/bar'], ['foo/', 'bar/foo', 'bar']), + # Trailing /** matches everything inside a root-relative directory + ('foo/**', ['foo/', 'foo/x', 'foo/x/y/z'], ['foo', 'bar/foo/x/y/z']), + # /**/ matches zero or more directories + ('a/**/b', ['a/b', 'a/x/b', 'a/x/y/b'], ['x/a/b', 'x/a/y/b']), + # ** surrounded by something other than slashes acts like a regular * + ('a/x**/b', ['a/x/b', 'a/xy/b'], ['x/a/b', 'x/a/y/b', 'a/x/y/b']), + ], +) +def test_patterns(pattern, hits, misses): + """Test expected hits and misses of ignore patterns.""" + regex = compile_pat(pattern) + for fname in hits: + assert regex.match(fname) + for fname in misses: + assert not regex.match(fname) + + +def test_skipped_patterns(): + """Test ignore patterns that should match nothing.""" + assert compile_pat('') is None + assert compile_pat('# commented line') is None + assert compile_pat(' ') is None + + +def test_Ignore_ds000117(examples): + """Test that we can load a .bidsignore file and match a file.""" + ds000117 = FileTree.read_from_filesystem(examples / 'ds000117') + ignore = Ignore.from_file(ds000117.children['.bidsignore']) + assert 'run-*_echo-*_FLASH.json' in ignore.patterns + assert 'sub-01/ses-mri/anat/sub-01_ses-mri_run-1_echo-1_FLASH.nii.gz' in ds000117 + assert ignore.match('sub-01/ses-mri/anat/sub-01_ses-mri_run-1_echo-1_FLASH.nii.gz') + flash_file = ( + ds000117.children['sub-01'] + .children['ses-mri'] + .children['anat'] + .children['sub-01_ses-mri_run-1_echo-1_FLASH.nii.gz'] + ) + assert ignore.match(flash_file.relative_path) diff --git a/tests/types/__init__.py b/tests/types/__init__.py new file mode 100644 index 0000000..93961ce --- /dev/null +++ b/tests/types/__init__.py @@ -0,0 +1 @@ +"""Tests for bids_validator.types.""" diff --git a/tests/types/test_files.py b/tests/types/test_files.py new file mode 100644 index 0000000..df1d485 --- /dev/null +++ b/tests/types/test_files.py @@ -0,0 +1,18 @@ +"""Tests for bids_validator.types.files.""" + +import attrs +from bids_validator.types.files import FileTree + + +def test_FileTree(examples): + """Test the FileTree class.""" + ds000117 = FileTree.read_from_filesystem(examples / 'ds000117') + assert 'sub-01/ses-mri/anat/sub-01_ses-mri_acq-mprage_T1w.nii.gz' in ds000117 + assert ds000117.children['sub-01'].parent is ds000117 + + # Verify that evolving FileTrees creates consistent structures + evolved = attrs.evolve(ds000117) + assert evolved.children['sub-01'].parent is not ds000117 + assert evolved.children['sub-01'].parent is evolved + assert evolved.children['sub-01'].children['ses-mri'].parent is not ds000117.children['sub-01'] + assert evolved.children['sub-01'].children['ses-mri'].parent is evolved.children['sub-01']