Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a filter predicate to recursive Collection operations #244

Open
wants to merge 2 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 82 additions & 10 deletions src/partisan/irods.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright © 2020, 2021, 2022, 2023, 2024 Genome Research Ltd. All
# Copyright © 2020, 2021, 2022, 2023, 2024, 2025 Genome Research Ltd. All
# rights reserved.
#
# This program is free software: you can redistribute it and/or modify
Expand Down Expand Up @@ -2827,6 +2827,7 @@ def put(
local_checksum=None,
compare_checksums=False,
fill=False,
filter_fn: callable = lambda _: False,
force=True,
timeout=None,
tries=1,
Expand All @@ -2837,6 +2838,11 @@ def put(
local_path: The local path of a directory to put into iRODS at the path
specified by this collection.
recurse: Recurse through subdirectories.
filter_fn: A predicate accepting a single pathlib.Path argument to which
each local path (directories and files) will be passed before putting
into iRODS. If the predicate returns True, the path will be filtered
i.e. not be put into iRODS. Filtering collections will result in them
being pruned. Filtering data objects will result in them being skipped.
calculate_checksum: Calculate remote checksums for all data object replicas.
See DataObject.put() for more information.
verify_checksum: Verify the local checksum calculated by the iRODS C API
Expand Down Expand Up @@ -2868,15 +2874,28 @@ def put(
self.create(exist_ok=True, timeout=timeout, tries=tries)

if recurse:
for root, dirs, files in os.walk(local_path):
for d in dirs:
p = Path(root, d)
for dirpath, dirnames, filenames in os.walk(local_path, topdown=True):
# As topdown is True, we can prune the walk by removing from dirnames
# in-place. N.B that we iterate over a shallow copy of dirnames.
for d in dirnames[:]:
p = Path(dirpath, d)
r = PurePath(self.path, p.relative_to(local_path))
log.debug("Creating collection", local_path=d, remote_path=r)

if filter_fn(p):
log.debug("Skipping directory put", local_path=p, remote_path=r)
dirnames.remove(d)
continue

log.debug("Creating collection", local_path=p, remote_path=r)
Collection(r).create(exist_ok=True, timeout=timeout)
for f in files:
p = Path(root, f)
for f in filenames:
p = Path(dirpath, f)
r = PurePath(self.path, p.relative_to(local_path))

if filter_fn(p):
log.debug("Skipping file put", local_path=p, remote_path=r)
continue

log.debug("Putting data object", local_path=p, remote_path=r)
DataObject(r).put(
p,
Expand All @@ -2893,10 +2912,20 @@ def put(
for p in Path(local_path).iterdir():
if p.is_dir():
r = PurePath(self.path, p.relative_to(local_path))

if filter_fn(p):
log.debug("Skipping directory put", local_path=p, remote_path=r)
continue

log.debug("Creating collection", local_path=p, remote_path=r)
Collection(r).create(exist_ok=True, timeout=timeout)
else:
r = PurePath(self.path, p.name)

if filter_fn(p):
log.debug("Skipping file put", local_path=p, remote_path=r)
continue

log.debug("Putting data object", local_path=p, remote_path=r)
DataObject(r).put(
p,
Expand All @@ -2911,14 +2940,25 @@ def put(

return self

def add_permissions(self, *acs: AC, recurse=False, timeout=None, tries=1) -> int:
def add_permissions(
self,
*acs: AC,
recurse=False,
filter_fn: callable = lambda _: False,
timeout=None,
tries=1,
) -> int:
"""Add access controls to the collection. Return the number of access
controls added. If some argument access controls are already present,
those arguments will be ignored.

Args:
*acs: Access controls.
recurse: Recursively add access controls.
filter_fn: A predicate accepting a single RodsItem argument to which each
iRODS path will be passed during recursive operations, before adding
permissions. If the predicate returns True, the path will be filtered
i.e. not have permissions added.
timeout: Operation timeout in seconds.
tries: Number of times to try the operation.

Expand All @@ -2927,17 +2967,32 @@ def add_permissions(self, *acs: AC, recurse=False, timeout=None, tries=1) -> int
num_added = super().add_permissions(*acs, timeout=timeout, tries=tries)
if recurse:
for item in self.iter_contents(recurse=recurse):
if filter_fn(item):
log.debug("Skipping permissions add", path=item, acl=acs)
continue

num_added += item.add_permissions(*acs, timeout=timeout, tries=tries)
return num_added

def remove_permissions(self, *acs: AC, recurse=False, timeout=None, tries=1) -> int:
def remove_permissions(
self,
*acs: AC,
recurse=False,
filter_fn: callable = lambda _: False,
timeout=None,
tries=1,
) -> int:
"""Remove access controls from the collection. Return the number of access
controls removed. If some argument access controls are not present, those
arguments will be ignored.

Args:
*acs: Access controls.
recurse: Recursively remove access controls.
filter_fn: A predicate accepting a single RodsItem argument to which each
iRODS path will be passed during recursive operations, before removing
permissions. If the predicate returns True, the path will be filtered
i.e. not have permissions removed.
timeout: Operation timeout in seconds.
tries: Number of times to try the operation.

Expand All @@ -2946,13 +3001,22 @@ def remove_permissions(self, *acs: AC, recurse=False, timeout=None, tries=1) ->
num_removed = super().remove_permissions(*acs, timeout=timeout, tries=tries)
if recurse:
for item in self.iter_contents(recurse=recurse):
if filter_fn(item):
log.debug("Skipping permissions remove", path=item, acl=acs)
continue

num_removed += item.remove_permissions(
*acs, timeout=timeout, tries=tries
)
return num_removed

def supersede_permissions(
self, *acs: AC, recurse=False, timeout=None, tries=1
self,
*acs: AC,
recurse=False,
filter_fn: callable = lambda _: False,
timeout=None,
tries=1,
) -> tuple[int, int]:
"""Remove all access controls from the collection, replacing them with the
specified access controls. Return the numbers of access controls
Expand All @@ -2961,6 +3025,10 @@ def supersede_permissions(
Args:
*acs: Access controls.
recurse: Recursively supersede access controls.
filter_fn: A predicate accepting a single RodsItem argument to which each
iRODS path will be passed during recursive operations, before
superseding permissions. If the predicate returns True, the path will be
filtered i.e. not have permissions superseded.
timeout: Operation timeout in seconds.
tries: Number of times to try the operation.

Expand All @@ -2971,6 +3039,10 @@ def supersede_permissions(
)
if recurse:
for item in self.iter_contents(recurse=recurse):
if filter_fn(item):
log.debug("Skipping permissions supersede", path=item, acl=acs)
continue

nr, na = item.supersede_permissions(*acs, timeout=timeout, tries=tries)
num_removed += nr
num_added += na
Expand Down
156 changes: 151 additions & 5 deletions tests/test_irods.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright © 2020, 2021, 2023, 2024 Genome Research Ltd. All rights reserved.
# Copyright © 2020, 2021, 2023, 2024, 2025 Genome Research Ltd. All
# rights reserved.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -639,6 +640,48 @@ def test_add_ac_collection_recurse(self, full_collection):
public_read,
], "Collection content ACL updated"

@m.it("Can have access controls added, recursively, with a filter")
def test_add_ac_collection_recurse_filter(self, full_collection):
zone = "testZone"
coll = Collection(full_collection)
irods_own = AC("irods", Permission.OWN, zone=zone)
public_read = AC("public", Permission.READ, zone=zone)

assert coll.acl() == [irods_own]
for item in coll.contents(recurse=True):
assert item.acl() == [irods_own]

assert (
coll.add_permissions(
irods_own,
recurse=True,
filter_fn=lambda x: x.rods_type == DataObject and x.name == "leaf1.txt",
)
== 0
), "Nothing is added when new ACL == all old ACL, recursively"
for item in coll.contents(recurse=True):
assert item.acl() == [irods_own]

tree = [
"recurse",
"level1/",
"level1/level2/",
"level1/level2/leaf2.txt",
]
assert coll.add_permissions(
public_read,
recurse=True,
filter_fn=lambda x: x.rods_type == DataObject and x.name == "leaf1.txt",
) == len(tree), "Access control added recursively"

for item in coll.contents(recurse=True):
expected = (
[irods_own]
if item.rods_type == DataObject and item.name == "leaf1.txt"
else [irods_own, public_read]
)
assert item.acl() == expected, "Collection content ACL updated"

@m.it("Can have access controls removed, non-recursively")
def test_rem_ac_collection(self, full_collection):
zone = "testZone"
Expand Down Expand Up @@ -689,6 +732,38 @@ def test_rem_ac_collection_recurse(self, full_collection):
for item in coll.contents(recurse=True):
assert item.acl() == [irods_own], "Collection content ACL updated"

@m.it("Can have access controls removed, recursively, with a filter")
def test_rem_ac_collection_recurse_filter(self, full_collection):
zone = "testZone"
coll = Collection(full_collection)
irods_own = AC("irods", Permission.OWN, zone=zone)
public_read = AC("public", Permission.READ, zone=zone)

coll.add_permissions(public_read, recurse=True)
assert coll.acl() == [irods_own, public_read]
for item in coll.contents(recurse=True):
assert item.acl() == [irods_own, public_read]

tree = [
"recurse",
"level1/",
"level1/level2/",
"level1/level2/leaf2.txt",
]
assert coll.remove_permissions(
public_read,
recurse=True,
filter_fn=lambda x: x.rods_type == DataObject and x.name == "leaf1.txt",
) == len(tree), "Access control removed recursively"

for item in coll.contents(recurse=True):
expected = (
[irods_own, public_read]
if item.rods_type == DataObject and item.name == "leaf1.txt"
else [irods_own]
)
assert item.acl() == expected, "Collection content ACL updated"

@m.it("Can have access controls superseded, non-recursively")
def test_super_ac_collection(self, full_collection):
zone = "testZone"
Expand Down Expand Up @@ -739,8 +814,39 @@ def test_super_ac_collection_recur(self, full_collection):
for item in coll.contents(recurse=True):
assert item.acl() == new_acl, "Collection content ACL updated"

@m.context("When a Collection does not exist")
@m.it("When put non-recursively")
@m.it("Can have access controls superseded, recursively, with a filter")
def test_super_ac_collection_recur_filter(self, full_collection):
zone = "testZone"
coll = Collection(full_collection)
irods_own = AC("irods", Permission.OWN, zone=zone)
public_read = AC("public", Permission.READ, zone=zone)
study_01_read = AC("ss_study_01", Permission.READ, zone=zone)
study_02_read = AC("ss_study_02", Permission.READ, zone=zone)

coll.add_permissions(study_01_read, study_02_read, recurse=True)
assert coll.acl() == [irods_own, study_01_read, study_02_read]
for item in coll.contents(recurse=True):
assert item.acl() == [irods_own, study_01_read, study_02_read]

new_acl = [irods_own, public_read]
num_removed, num_added = coll.supersede_permissions(
*new_acl,
recurse=True,
filter_fn=lambda x: x.rods_type == DataObject and x.name == "leaf1.txt",
)
assert num_removed == 2 * (5 - 1) # study access
assert num_added == 1 * (5 - 1) # public access
assert coll.acl() == new_acl, "Access superseded removed recursively"

for item in coll.contents(recurse=True):
expected = (
[irods_own, study_01_read, study_02_read]
if item.rods_type == DataObject and item.name == "leaf1.txt"
else new_acl
)
assert item.acl() == expected, "Collection content ACL updated"

@m.context("When a Collection does not exist and is put non-recursively")
@m.it("Is created, with its immediate contents")
def test_put_collection(self, simple_collection):
coll = Collection(simple_collection / "sub")
Expand All @@ -751,8 +857,7 @@ def test_put_collection(self, simple_collection):
assert coll.exists()
assert coll.contents() == [Collection(coll.path / "empty")]

@m.context("When a Collection does not exist")
@m.it("When put recursively")
@m.context("When a Collection does not exist and is put recursively")
@m.it("Is created, with descendants and their contents")
def test_put_collection_recur(self, simple_collection):
coll = Collection(simple_collection / "sub")
Expand Down Expand Up @@ -781,6 +886,47 @@ def test_put_collection_recur(self, simple_collection):
assert utf8.size() == 2522
assert utf8.checksum() == "500cec3fbb274064e2a25fa17a69638a"

@m.context("When a Collection does not exist and is put recursively")
@m.it("Collection paths can be pruned by providing a filter predicate")
def test_put_collection_filter(self, simple_collection):
coll = Collection(simple_collection / "sub")
assert not coll.exists()

local_path = Path("./tests/data/simple").absolute()
coll.put(
local_path,
recurse=True,
verify_checksum=True,
filter_fn=lambda path: path.name == "data_object", # Prune this path
)
assert coll.exists()

sub1 = Collection(coll.path / "collection")
assert coll.contents() == [sub1]

empty_coll = Collection(sub1.path / "empty")
assert sub1.contents() == [empty_coll]

@m.context("When a Collection does not exist and is put recursively")
@m.it("Data object paths can be skipped by providing a filter predicate")
def test_put_collection_filter(self, simple_collection):
coll = Collection(simple_collection / "sub")
assert not coll.exists()

local_path = Path("./tests/data/simple").absolute()
coll.put(
local_path,
recurse=True,
verify_checksum=True,
filter_fn=lambda path: path.stat().st_size == 0, # Skip empty files
)
assert coll.exists()

sub2 = Collection(coll.path / "data_object")
lorem = DataObject(sub2.path / "lorem.txt")
utf8 = DataObject(sub2.path / "utf-8.txt")
assert sub2.contents() == [lorem, utf8]


@m.describe("DataObject")
class TestDataObject:
Expand Down