diff --git a/.ci/workchains.py b/.ci/workchains.py index f5ab3872d7..4ae521540f 100644 --- a/.ci/workchains.py +++ b/.ci/workchains.py @@ -7,6 +7,7 @@ # For further information on the license, see the LICENSE.txt file # # For further information please visit http://www.aiida.net # ########################################################################### +# pylint: disable=invalid-name from aiida.common import AttributeDict from aiida.engine import calcfunction, workfunction, WorkChain, ToContext, append_, while_, ExitCode from aiida.engine import BaseRestartWorkChain, process_handler, ProcessHandlerReport diff --git a/.pylintrc b/.pylintrc index 9e3adfb075..8fc7a6a3c3 100644 --- a/.pylintrc +++ b/.pylintrc @@ -50,7 +50,7 @@ confidence= # --enable=similarities". If you want to run only the classes checker, but have # no Warning level messages displayed, use"--disable=all --enable=classes # --disable=W" -disable=bad-continuation,locally-disabled,useless-suppression,django-not-available,bad-option-value,logging-format-interpolation,no-else-raise,import-outside-toplevel +disable=bad-continuation,locally-disabled,useless-suppression,django-not-available,bad-option-value,logging-format-interpolation,no-else-raise,import-outside-toplevel,cyclic-import # Enable the message, report, category or checker with the given id(s). You can # either give multiple identifier separated by comma (,) or put this option diff --git a/aiida/cmdline/commands/cmd_group.py b/aiida/cmdline/commands/cmd_group.py index d74e416bd5..16978379ae 100644 --- a/aiida/cmdline/commands/cmd_group.py +++ b/aiida/cmdline/commands/cmd_group.py @@ -361,3 +361,72 @@ def group_copy(source_group, destination_group): # Copy nodes dest_group.add_nodes(list(source_group.nodes)) echo.echo_success('Nodes copied from group<{}> to group<{}>'.format(source_group.label, dest_group.label)) + + +@verdi_group.group('path') +def verdi_group_path(): + """Inspect groups of nodes, with delimited label paths.""" + + +@verdi_group_path.command('ls') +@click.argument('path', type=click.STRING, required=False) +@click.option('-R', '--recursive', is_flag=True, default=False, help='Recursively list sub-paths encountered') +@click.option('-l', '--long', 'as_table', is_flag=True, default=False, help='List as a table, with sub-group count') +@click.option( + '-d', '--with-description', 'with_description', is_flag=True, default=False, help='Show also the group description' +) +@click.option( + '--no-virtual', + 'no_virtual', + is_flag=True, + default=False, + help='Only show paths that fully correspond to an existing group' +) +@click.option( + '-t', + '--type', + 'group_type', + type=types.LazyChoice(valid_group_type_strings), + default=user_defined_group, + help='Show groups of a specific type, instead of user-defined groups. Start with semicolumn if you want to ' + 'specify aiida-internal type' +) +@click.option('--no-warn', is_flag=True, default=False, help='Do not issue a warning if any paths are invalid.') +@with_dbenv() +def group_path_ls(path, recursive, as_table, no_virtual, group_type, with_description, no_warn): + # pylint: disable=too-many-arguments + """Show a list of existing group paths.""" + from aiida.tools.groups.paths import GroupPath, InvalidPath + + try: + path = GroupPath(path or '', type_string=group_type, warn_invalid_child=not no_warn) + except InvalidPath as err: + echo.echo_critical(str(err)) + + if recursive: + children = path.walk() + else: + children = path.children + + if as_table or with_description: + from tabulate import tabulate + headers = ['Path', 'Sub-Groups'] + if with_description: + headers.append('Description') + rows = [] + for child in sorted(children): + if no_virtual and child.is_virtual: + continue + row = [ + child.path if child.is_virtual else click.style(child.path, bold=True), + len([c for c in child.walk() if not c.is_virtual]) + ] + if with_description: + row.append('-' if child.is_virtual else child.get_group().description) + rows.append(row) + echo.echo(tabulate(rows, headers=headers)) + else: + for child in sorted(children): + if no_virtual and child.is_virtual: + continue + echo.echo(child.path, bold=not child.is_virtual) diff --git a/aiida/tools/groups/__init__.py b/aiida/tools/groups/__init__.py new file mode 100644 index 0000000000..7d429eeab7 --- /dev/null +++ b/aiida/tools/groups/__init__.py @@ -0,0 +1,11 @@ +# This file is part of the AiiDA code. # +# # +# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core # +# For further information on the license, see the LICENSE.txt file # +# For further information please visit http://www.aiida.net # +########################################################################### +# pylint: disable=wildcard-import,undefined-variable +"""Provides tools for interacting with AiiDA Groups.""" +from .paths import * + +__all__ = paths.__all__ diff --git a/aiida/tools/groups/paths.py b/aiida/tools/groups/paths.py new file mode 100644 index 0000000000..9d20ea9c55 --- /dev/null +++ b/aiida/tools/groups/paths.py @@ -0,0 +1,352 @@ +# -*- coding: utf-8 -*- +########################################################################### +# Copyright (c), The AiiDA team. All rights reserved. # +# This file is part of the AiiDA code. # +# # +# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core # +# For further information on the license, see the LICENSE.txt file # +# For further information please visit http://www.aiida.net # +########################################################################### +"""Provides functionality for managing large numbers of AiiDA Groups, via label delimitation.""" +from collections import namedtuple +from functools import total_ordering +import re +from typing import Any, Iterable, List, Optional # pylint: disable=unused-import +import warnings + +from aiida import orm +from aiida.common.exceptions import NotExistent + +__all__ = ('GroupPath', 'InvalidPath') + +REGEX_ATTR = re.compile('^[a-zA-Z][\\_a-zA-Z0-9]*$') + + +class InvalidPath(Exception): + """An exception to indicate that a path is not valid.""" + + +class GroupNotFoundError(Exception): + """An exception raised when a path does not have an associated group.""" + + def __init__(self, grouppath): + msg = 'No such group: {}'.format(grouppath.path) + super().__init__(msg) + + +class GroupNotUniqueError(Exception): + """An exception raised when a path has multiple associated groups.""" + + def __init__(self, grouppath): + msg = 'The path is not unique: {}'.format(grouppath.path) + super().__init__(msg) + + +class NoGroupsInPathError(Exception): + """An exception raised when a path has multiple associated groups.""" + + def __init__(self, grouppath): + msg = 'The path does not contain any descendant groups: {}'.format(grouppath.path) + super().__init__(msg) + + +WalkNodeResult = namedtuple('WalkNodeResult', ['group_path', 'node']) + + +@total_ordering +class GroupPath: + """A class to provide label delimited access to groups. + + See tests for usage examples. + """ + + def __init__(self, path='', type_string=orm.GroupTypeString.USER.value, warn_invalid_child=True): + # type: (str, Optional[str], Optional[GroupPath]) + """Instantiate the class. + + :param path: The initial path of the group. + :param type_string: Used to query for and instantiate a ``Group`` with. + :param warn_invalid_child: Issue a warning, when iterating children, if a child path is invalid. + + """ + self._delimiter = '/' + if not isinstance(type_string, str): + raise TypeError('type_string must a str: {}'.format(type_string)) + self._type_string = type_string + self._path_string = self._validate_path(path) + self._path_list = self._path_string.split(self._delimiter) if path else [] + self._warn_invalid_child = warn_invalid_child + + def _validate_path(self, path): + """Validate the supplied path.""" + if path == self._delimiter: + return '' + if self._delimiter * 2 in path: + raise InvalidPath("The path may not contain a duplicate delimiter '{}': {}".format(self._delimiter, path)) + if (path.startswith(self._delimiter) or path.endswith(self._delimiter)): + raise InvalidPath("The path may not start/end with the delimiter '{}': {}".format(self._delimiter, path)) + return path + + def __repr__(self): + # type: () -> str + """Represent the instantiated class.""" + return "{}('{}', type='{}')".format(self.__class__.__name__, self.path, self.type_string) + + def __eq__(self, other): + # type: (Any) -> bool + """Compare equality of path and type string to another ``GroupPath`` object.""" + if not isinstance(other, GroupPath): + return NotImplemented + return (self.path, self.type_string) == (other.path, other.type_string) + + def __lt__(self, other): + # type: (Any) -> bool + """Compare less-than operator of path and type string to another ``GroupPath`` object.""" + if not isinstance(other, GroupPath): + return NotImplemented + return (self.path, self.type_string) < (other.path, other.type_string) + + @property + def path(self): + # type: () -> str + """Return the path string.""" + return self._path_string + + @property + def path_list(self): + # type: () -> List[str] + """Return a list of the path components.""" + return self._path_list[:] + + @property + def key(self): + # type: () -> str + """Return the final component of the the path.""" + if self._path_list: + return self._path_list[-1] + return None + + @property + def delimiter(self): + # type: () -> str + """Return the delimiter used to split path into components.""" + return self._delimiter + + @property + def type_string(self): + # type: () -> str + """Return the type_string used to query for and instantiate a ``Group`` with.""" + return self._type_string + + @property + def parent(self): + # type: () -> Optional[GroupPath] + """Return the parent path.""" + if self.path_list: + return GroupPath( + self.delimiter.join(self.path_list[:-1]), + type_string=self.type_string, + warn_invalid_child=self._warn_invalid_child + ) + return None + + def __truediv__(self, path): + # type: (str) -> GroupPath + """Return a child ``GroupPath``, with a new path formed by appending ``path`` to the current path.""" + if not isinstance(path, str): + raise TypeError('path is not a string: {}'.format(path)) + path = self._validate_path(path) + child = GroupPath( + path=self.path + self.delimiter + path if self.path else path, + type_string=self.type_string, + warn_invalid_child=self._warn_invalid_child + ) + return child + + def __getitem__(self, path): + # type: (str) -> GroupPath + """Return a child ``GroupPath``, with a new path formed by appending ``path`` to the current path.""" + return self.__truediv__(path) + + def get_group(self): + # type: () -> Optional[orm.Group] + """Return the concrete group associated with this path.""" + try: + return orm.Group.objects.get(label=self.path, type_string=self.type_string) + except NotExistent: + return None + + @property + def group_ids(self): + # type: () -> List[int] + """Return all the UUID associated with this GroupPath. + + :returns: and empty list, if no group associated with this label, + or can be multiple if type_string was None + + This is an efficient method for checking existence, + which does not require the (slow) loading of the ORM entity. + """ + query = orm.QueryBuilder() + filters = {'label': self.path} + if self.type_string is not None: + filters['type_string'] = self.type_string + query.append(orm.Group, filters=filters, project='id') + return [r[0] for r in query.all()] + + @property + def is_virtual(self): + # type: () -> bool + """Return whether there is one or more concrete groups associated with this path.""" + return len(self.group_ids) == 0 + + def get_or_create_group(self): + # type: () -> (orm.Group, bool) + """Return the concrete group associated with this path or, create it, if it does not already exist.""" + if self.type_string is not None: + return orm.Group.objects.get_or_create(label=self.path, type_string=self.type_string) + return orm.Group.objects.get_or_create(label=self.path) + + def delete_group(self): + """Delete the concrete group associated with this path. + + :raises: GroupNotFoundError, GroupNotUniqueError + """ + ids = self.group_ids + if not ids: + raise GroupNotFoundError(self) + if len(ids) > 1: + raise GroupNotUniqueError(self) + orm.Group.objects.delete(ids[0]) + + @property + def children(self): + # type: () -> Iterable[GroupPath] + """Iterate through all (direct) children of this path.""" + query = orm.QueryBuilder() + filters = {} + if self.path: + filters['label'] = {'like': self.path + self.delimiter + '%'} + if self.type_string is not None: + filters['type_string'] = self.type_string + query.append(orm.Group, filters=filters, project='label') + if query.count() == 0 and self.is_virtual: + raise NoGroupsInPathError(self) + + yielded = [] + for (label,) in query.iterall(): + path = label.split(self._delimiter) + if len(path) <= len(self._path_list): + continue + path_string = self._delimiter.join(path[:len(self._path_list) + 1]) + if (path_string not in yielded and path[:len(self._path_list)] == self._path_list): + yielded.append(path_string) + try: + yield GroupPath( + path=path_string, type_string=self.type_string, warn_invalid_child=self._warn_invalid_child + ) + except InvalidPath: + if self._warn_invalid_child: + warnings.warn('invalid path encountered: {}'.format(path_string)) # pylint: disable=no-member + + def __iter__(self): + # type: () -> Iterable[GroupPath] + """Iterate through all (direct) children of this path.""" + return self.children + + def __len__(self): + # type: () -> int + """Return the number of children for this path.""" + return sum(1 for _ in self.children) + + def __contains__(self, key): + # type: (str) -> bool + """Return whether a child exists for this key.""" + for child in self.children: + if child.path_list[-1] == key: + return True + return False + + def walk(self, return_virtual=True): + # type: () -> Iterable[GroupPath] + """Recursively iterate through all children of this path.""" + for child in self: + if return_virtual or not child.is_virtual: + yield child + for sub_child in child.walk(return_virtual=return_virtual): + if return_virtual or not sub_child.is_virtual: + yield sub_child + + def walk_nodes(self, filters=None, node_class=None, query_batch=None): + # type: () -> Iterable[WalkNodeResult] + """Recursively iterate through all nodes of this path and its children. + + :param filters: filters to apply to the node query + :param node_class: return only nodes of a certain class (or list of classes) + :param int batch_size: The size of the batches to ask the backend to batch results in subcollections. + You can optimize the speed of the query by tuning this parameter. + Be aware though that is only safe if no commit will take place during this transaction. + """ + query = orm.QueryBuilder() + group_filters = {} + if self.path: + group_filters['label'] = {'or': [{'==': self.path}, {'like': self.path + self.delimiter + '%'}]} + if self.type_string is not None: + group_filters['type_string'] = self.type_string + query.append(orm.Group, filters=group_filters, project='label', tag='group') + query.append( + orm.Node if node_class is None else node_class, + with_group='group', + filters=filters, + project=['*'], + ) + for (label, node) in query.iterall(query_batch) if query_batch else query.all(): + yield WalkNodeResult(GroupPath(label, type_string=self.type_string), node) + + @property + def browse(self): + """Return a ``GroupAttr`` instance, for attribute access to children.""" + return GroupAttr(self) + + +class GroupAttr: + """A class to provide attribute access to a ``GroupPath`` children. + + The only public attributes on this class are dynamically created from the ``GroupPath`` child keys. + NOTE: any child keys that do not conform to an acceptable (public) attribute string will be ignored. + The ``GroupPath`` can be retrieved *via* a function call, e.g.:: + + group_path = GroupPath() + group_attr = GroupAttr(group_path) + group_attr.a.b.c() == GroupPath("a/b/c") + + """ + + def __init__(self, group_path): + # type: (GroupPath) + """Instantiate the ``GroupPath``, and a mapping of its children.""" + self._group_path = group_path + + def __repr__(self): + # type: () -> str + """Represent the instantiated class.""" + return "{}('{}', type='{}')".format( + self.__class__.__name__, self._group_path.path, self._group_path.type_string + ) + + def __call__(self): + # type: () -> GroupPath + """Return the ``GroupPath``.""" + return self._group_path + + def __dir__(self): + """Return a list of available attributes.""" + return [c.path_list[-1] for c in self._group_path.children if REGEX_ATTR.match(c.path_list[-1])] + + def __getattr__(self, attr): + # type: (str) -> GroupAttr + """Return the requested attribute name.""" + for child in self._group_path.children: + if attr == child.path_list[-1]: + return GroupAttr(child) + raise AttributeError(attr) diff --git a/docs/source/verdi/verdi_user_guide.rst b/docs/source/verdi/verdi_user_guide.rst index 8265083e6d..1fc8bfabe8 100644 --- a/docs/source/verdi/verdi_user_guide.rst +++ b/docs/source/verdi/verdi_user_guide.rst @@ -436,6 +436,7 @@ Below is a list with all available subcommands. delete Delete a group. description Change the description of a group. list Show a list of existing groups. + path Inspect groups of nodes, with delimited label paths. relabel Change the label of a group. remove-nodes Remove nodes from a group. show Show information for a given group. diff --git a/tests/cmdline/commands/test_group_ls.py b/tests/cmdline/commands/test_group_ls.py new file mode 100644 index 0000000000..7cc01079a4 --- /dev/null +++ b/tests/cmdline/commands/test_group_ls.py @@ -0,0 +1,127 @@ +# -*- coding: utf-8 -*- +########################################################################### +# Copyright (c), The AiiDA team. All rights reserved. # +# This file is part of the AiiDA code. # +# # +# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core # +# For further information on the license, see the LICENSE.txt file # +# For further information please visit http://www.aiida.net # +########################################################################### +"""Tests for GroupPath command line interface""" +# pylint: disable=redefined-outer-name,unused-argument +from textwrap import dedent + +from click.testing import CliRunner +import pytest + +from aiida import orm +from aiida.cmdline.commands.cmd_group import group_path_ls + + +@pytest.fixture +def setup_groups(clear_database_before_test): + """Setup some groups for testing.""" + for label in ['a', 'a/b', 'a/c/d', 'a/c/e/g', 'a/f']: + group, _ = orm.Group.objects.get_or_create(label, type_string=orm.GroupTypeString.USER.value) + group.description = 'A description of {}'.format(label) + orm.Group.objects.get_or_create('a/x', type_string=orm.GroupTypeString.UPFGROUP_TYPE.value) + yield + + +def test_with_no_opts(setup_groups): + """Test ``verdi group path ls``""" + + cli_runner = CliRunner() + + result = cli_runner.invoke(group_path_ls) + assert result.exit_code == 0, result.exception + assert result.output == 'a\n' + + result = cli_runner.invoke(group_path_ls, ['a']) + assert result.exit_code == 0, result.exception + assert result.output == 'a/b\na/c\na/f\n' + + result = cli_runner.invoke(group_path_ls, ['a/c']) + assert result.exit_code == 0, result.exception + assert result.output == 'a/c/d\na/c/e\n' + + +def test_recursive(setup_groups): + """Test ``verdi group path ls --recursive``""" + + cli_runner = CliRunner() + + for tag in ['-R', '--recursive']: + result = cli_runner.invoke(group_path_ls, [tag]) + assert result.exit_code == 0, result.exception + assert result.output == 'a\na/b\na/c\na/c/d\na/c/e\na/c/e/g\na/f\n' + + result = cli_runner.invoke(group_path_ls, [tag, 'a/c']) + assert result.exit_code == 0, result.exception + assert result.output == 'a/c/d\na/c/e\na/c/e/g\n' + + +@pytest.mark.parametrize('tag', ['-l', '--long']) +def test_long(setup_groups, tag): + """Test ``verdi group path ls --long``""" + + cli_runner = CliRunner() + + result = cli_runner.invoke(group_path_ls, [tag]) + assert result.exit_code == 0, result.exception + assert result.output == dedent( + """\ + Path Sub-Groups + ------ ------------ + a 4 + """ + ) + + result = cli_runner.invoke(group_path_ls, [tag, '-d', 'a']) + assert result.exit_code == 0, result.exception + assert result.output == dedent( + """\ + Path Sub-Groups Description + ------ ------------ -------------------- + a/b 0 A description of a/b + a/c 2 - + a/f 0 A description of a/f + """ + ) + + result = cli_runner.invoke(group_path_ls, [tag, '-R']) + assert result.exit_code == 0, result.exception + assert result.output == dedent( + """\ + Path Sub-Groups + ------- ------------ + a 4 + a/b 0 + a/c 2 + a/c/d 0 + a/c/e 1 + a/c/e/g 0 + a/f 0 + """ + ) + + +@pytest.mark.parametrize('tag', ['--no-virtual']) +def test_groups_only(setup_groups, tag): + """Test ``verdi group path ls --no-virtual``""" + + cli_runner = CliRunner() + + result = cli_runner.invoke(group_path_ls, [tag, '-l', '-R', '--with-description']) + assert result.exit_code == 0, result.exception + assert result.output == dedent( + """\ + Path Sub-Groups Description + ------- ------------ ------------------------ + a 4 A description of a + a/b 0 A description of a/b + a/c/d 0 A description of a/c/d + a/c/e/g 0 A description of a/c/e/g + a/f 0 A description of a/f + """ + ) diff --git a/tests/tools/groups/__init__.py b/tests/tools/groups/__init__.py new file mode 100644 index 0000000000..2776a55f97 --- /dev/null +++ b/tests/tools/groups/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- +########################################################################### +# Copyright (c), The AiiDA team. All rights reserved. # +# This file is part of the AiiDA code. # +# # +# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core # +# For further information on the license, see the LICENSE.txt file # +# For further information please visit http://www.aiida.net # +########################################################################### diff --git a/tests/tools/groups/test_paths.py b/tests/tools/groups/test_paths.py new file mode 100644 index 0000000000..a6f1cdb757 --- /dev/null +++ b/tests/tools/groups/test_paths.py @@ -0,0 +1,161 @@ +# -*- coding: utf-8 -*- +########################################################################### +# Copyright (c), The AiiDA team. All rights reserved. # +# This file is part of the AiiDA code. # +# # +# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core # +# For further information on the license, see the LICENSE.txt file # +# For further information please visit http://www.aiida.net # +########################################################################### +"""Tests for GroupPath""" +# pylint: disable=redefined-outer-name,unused-argument +import pytest + +from aiida import orm +from aiida.tools.groups.paths import (GroupAttr, GroupPath, InvalidPath, GroupNotFoundError, NoGroupsInPathError) + + +@pytest.fixture +def setup_groups(clear_database_before_test): + """Setup some groups for testing.""" + for label in ['a', 'a/b', 'a/c/d', 'a/c/e/g', 'a/f']: + group, _ = orm.Group.objects.get_or_create(label, type_string=orm.GroupTypeString.USER.value) + group.description = 'A description of {}'.format(label) + yield + + +@pytest.mark.parametrize('path', ('/a', 'a/', '/a/', 'a//b')) +def test_invalid_paths(setup_groups, path): + """Invalid paths should raise an ``InvalidPath`` exception.""" + with pytest.raises(InvalidPath): + GroupPath(path=path) + + +def test_root_path(setup_groups): + """Test the root path properties""" + group_path = GroupPath() + assert group_path.path == '' + assert group_path.delimiter == '/' + assert group_path.parent is None + assert group_path.is_virtual + assert group_path.get_group() is None + + +def test_path_concatenation(setup_groups): + """Test methods to build a new path.""" + group_path = GroupPath() + assert (group_path / 'a').path == 'a' + assert (group_path / 'a' / 'b').path == 'a/b' + assert (group_path / 'a/b').path == 'a/b' + assert group_path['a/b'].path == 'a/b' + assert GroupPath('a/b/c') == GroupPath('a/b') / 'c' + + +def test_path_existence(setup_groups): + """Test existence of child "folders".""" + group_path = GroupPath() + assert 'a' in group_path + assert 'x' not in group_path + + +def test_group_retrieval(setup_groups): + """Test retrieval of the actual group from a path. + + The ``group`` attribute will return None + if no group is associated with the path + """ + group_path = GroupPath() + assert group_path['x'].is_virtual + assert not group_path['a'].is_virtual + assert group_path.get_group() is None + assert isinstance(group_path['a'].get_group(), orm.Group) + + +def test_group_creation(setup_groups): + """Test creation of new groups.""" + group_path = GroupPath() + group, created = group_path['a'].get_or_create_group() + assert isinstance(group, orm.Group) + assert created is False + group, created = group_path['x'].get_or_create_group() + assert isinstance(group, orm.Group) + assert created is True + + +def test_group_deletion(setup_groups): + """Test deletion of existing groups.""" + group_path = GroupPath() + assert not group_path['a'].is_virtual + group_path['a'].delete_group() + assert group_path['a'].is_virtual + with pytest.raises(GroupNotFoundError): + group_path['a'].delete_group() + + +def test_path_iteration(setup_groups): + """Test iteration of groups.""" + group_path = GroupPath() + assert len(group_path) == 1 + assert [(c.path, c.is_virtual) for c in group_path.children] == [('a', False)] + child = next(group_path.children) + assert child.parent == group_path + assert len(child) == 3 + assert [(c.path, c.is_virtual) for c in sorted(child)] == [('a/b', False), ('a/c', True), ('a/f', False)] + + +def test_path_with_no_groups(setup_groups): + """Test ``NoGroupsInPathError`` is raised if the path contains descendant groups.""" + group_path = GroupPath() + with pytest.raises(NoGroupsInPathError): + list(group_path['x']) + + +def test_walk(setup_groups): + """Test the ``GroupPath.walk()`` function.""" + group_path = GroupPath() + assert [c.path for c in sorted(group_path.walk())] == ['a', 'a/b', 'a/c', 'a/c/d', 'a/c/e', 'a/c/e/g', 'a/f'] + + +def test_walk_with_invalid_path(clear_database_before_test): + for label in ['a', 'a/b', 'a/c/d', 'a/c/e/g', 'a/f', 'bad//group', 'bad/other']: + orm.Group.objects.get_or_create(label, type_string=orm.GroupTypeString.USER.value) + group_path = GroupPath() + assert [c.path for c in sorted(group_path.walk()) + ] == ['a', 'a/b', 'a/c', 'a/c/d', 'a/c/e', 'a/c/e/g', 'a/f', 'bad', 'bad/other'] + + +def test_walk_nodes(clear_database): + """Test the ``GroupPath.walk_nodes()`` function.""" + group, _ = orm.Group.objects.get_or_create('a', type_string=orm.GroupTypeString.USER.value) + node = orm.Data() + node.set_attribute_many({'i': 1, 'j': 2}) + node.store() + group.add_nodes(node) + group_path = GroupPath() + assert [(r.group_path.path, r.node.attributes) for r in group_path.walk_nodes()] == [('a', {'i': 1, 'j': 2})] + + +def test_type_string(clear_database_before_test): + """Test that only the type_string instantiated in ``GroupPath`` is returned.""" + for label in ['a', 'a/b', 'a/c/d', 'a/c/e/g']: + orm.Group.objects.get_or_create(label, type_string=orm.GroupTypeString.USER.value) + for label in ['a/c/e', 'a/f']: + orm.Group.objects.get_or_create(label, type_string=orm.GroupTypeString.UPFGROUP_TYPE.value) + group_path = GroupPath() + assert sorted([c.path for c in group_path.walk()]) == ['a', 'a/b', 'a/c', 'a/c/d', 'a/c/e', 'a/c/e/g'] + group_path = GroupPath(type_string=orm.GroupTypeString.UPFGROUP_TYPE.value) + assert sorted([c.path for c in group_path.walk()]) == ['a', 'a/c', 'a/c/e', 'a/f'] + assert GroupPath('a/b/c') != GroupPath('a/b/c', type_string=orm.GroupTypeString.UPFGROUP_TYPE.value) + + +def test_attr(clear_database_before_test): + """Test ``GroupAttr``.""" + for label in ['a', 'a/b', 'a/c/d', 'a/c/e/g', 'a/f', 'bad space', 'bad@char', '_badstart']: + orm.Group.objects.get_or_create(label) + group_path = GroupPath() + assert isinstance(group_path.browse.a.c.d, GroupAttr) + assert isinstance(group_path.browse.a.c.d(), GroupPath) + assert group_path.browse.a.c.d().path == 'a/c/d' + assert not set(group_path.browse.__dir__()).intersection(['bad space', 'bad@char', '_badstart']) + with pytest.raises(AttributeError): + group_path.browse.a.c.x # pylint: disable=pointless-statement