Skip to content

read_storage() - filter by name pattern #1285

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions src/datachain/lib/dc/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from collections.abc import Sequence
from functools import reduce
from typing import (
TYPE_CHECKING,
Optional,
Union,
)
Expand All @@ -19,8 +18,35 @@
)
from datachain.query import Session

if TYPE_CHECKING:
from .datachain import DataChain
from .datachain import C, DataChain


def _apply_pattern_filtering(
chain: DataChain, pattern: Optional[Union[str, list[str]]], column: str
) -> DataChain:
"""Apply pattern filtering to a storage chain.

Args:
chain: The DataChain to filter
pattern: Pattern(s) to filter by
column: The column name to apply filtering to (e.g., "file")

Returns:
Filtered DataChain
"""
if not pattern:
return chain

pattern_list = pattern if isinstance(pattern, list) else [pattern]
filters = []

for pattern_item in pattern_list:
filters.append(C(f"{column}.path").glob(f"*{pattern_item}"))
Comment on lines +43 to +44
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Pattern matching may be too permissive for certain use cases.

Consider whether using *{pattern_item} could unintentionally match more files than intended, especially if user-supplied patterns already include wildcards.

Suggested change
for pattern_item in pattern_list:
filters.append(C(f"{column}.path").glob(f"*{pattern_item}"))
for pattern_item in pattern_list:
# If the pattern already contains glob wildcards, use as-is
if any(char in pattern_item for char in ["*", "?", "["]):
glob_pattern = pattern_item
else:
glob_pattern = f"*{pattern_item}*"
filters.append(C(f"{column}.path").glob(glob_pattern))


Comment on lines +41 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Convert for loop into list comprehension (list-comprehension)

Suggested change
filters = []
for pattern_item in pattern_list:
filters.append(C(f"{column}.path").glob(f"*{pattern_item}"))
filters = [
C(f"{column}.path").glob(f"*{pattern_item}")
for pattern_item in pattern_list
]

combined_filter = filters[0]
for filter_expr in filters[1:]:
combined_filter = combined_filter | filter_expr
return chain.filter(combined_filter)


def read_storage(
Expand All @@ -34,6 +60,7 @@ def read_storage(
column: str = "file",
update: bool = False,
anon: Optional[bool] = None,
pattern: Optional[Union[str, list[str]]] = None,
delta: Optional[bool] = False,
delta_on: Optional[Union[str, Sequence[str]]] = (
"file.path",
Expand All @@ -57,6 +84,7 @@ def read_storage(
column : Created column name.
update : force storage reindexing. Default is False.
anon : If True, we will treat cloud bucket as public one
pattern : Optional pattern(s) to filter by file names like "*.jpg".
client_config : Optional client configuration for the storage client.
delta: If True, only process new or changed files instead of reprocessing
everything. This saves time by skipping files that were already processed in
Expand Down Expand Up @@ -113,6 +141,15 @@ def read_storage(
], session=session, recursive=True)
```

Filter by file patterns:
```python
# Single pattern
chain = dc.read_storage("s3://my-bucket/my-dir", pattern="*.mp3")

# Multiple patterns
chain = dc.read_storage("s3://my-bucket/my-dir", pattern=["*.mp3", "*.wav"])
```

Note:
When using multiple URIs with `update=True`, the function optimizes by
avoiding redundant updates for URIs pointing to the same storage location.
Expand Down Expand Up @@ -220,4 +257,4 @@ def lst_fn(ds_name, lst_uri):
delta_retry=delta_retry,
)

return storage_chain
return _apply_pattern_filtering(storage_chain, pattern, column)
40 changes: 40 additions & 0 deletions tests/unit/lib/test_datachain.py
Original file line number Diff line number Diff line change
Expand Up @@ -4371,3 +4371,43 @@ class Signal2(DataModel):
assert chain.max("signals.signal.i3") == 15
assert chain.max("signals.signal.f3") == 7.5
assert chain.max("signals.signal.s3") == "eee"


def test_apply_pattern_filtering_helper(test_session):
from datachain.lib.dc.storage import _apply_pattern_filtering

chain = dc.read_values(
file_names=[
File(path="cat1"),
File(path="mydir/cat2"),
File(path="dog1"),
File(path="mydir/dog2"),
File(path="description"),
],
session=test_session,
)

filtered_chain = _apply_pattern_filtering(chain, "cat*", "file_names")
assert filtered_chain.count() == 2
assert set(filtered_chain.to_values("file_names.path")) == {"cat1", "mydir/cat2"}

filtered_chain = _apply_pattern_filtering(
chain, ["cat*", "description"], "file_names"
)
assert filtered_chain.count() == 3
assert set(filtered_chain.to_values("file_names.path")) == {
"cat1",
"mydir/cat2",
"description",
}

filtered_chain = _apply_pattern_filtering(chain, None, "file_names")
assert filtered_chain.count() == 5
assert filtered_chain is chain # Should be the same object

filtered_chain = _apply_pattern_filtering(chain, [], "file_names")
assert filtered_chain.count() == 5
assert filtered_chain is chain # Should be the same object

filtered_chain = _apply_pattern_filtering(chain, "*.nonexistent", "file_names")
assert filtered_chain.count() == 0
Loading