Skip to content

Commit

Permalink
Adding a 's' to the functional names of open/list DataPipes (pytorch#479
Browse files Browse the repository at this point in the history
) (pytorch#489)

Summary: Pull Request resolved: pytorch#479

Test Plan: Imported from OSS

Reviewed By: ejguan

Differential Revision: D36785643

Pulled By: NivekT

fbshipit-source-id: 02c3071047ac00dd34cb83a9b392be0cfa3565b0
  • Loading branch information
NivekT authored Jun 1, 2022
1 parent 38b77c1 commit 74c5ba3
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 17 deletions.
4 changes: 2 additions & 2 deletions test/test_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ def test_fsspec_file_loader_iterdatapipe(self):
# Reset Test: Ensure the resulting streams are still readable after the DataPipe is reset/exhausted
self._write_text_files()
lister_dp = FileLister(self.temp_dir.name, "*.text")
fsspec_file_loader_dp = FSSpecFileOpener(lister_dp, mode="rb")
fsspec_file_opener_dp = lister_dp.open_files_by_fsspec(mode="rb")

n_elements_before_reset = 2
res_before_reset, res_after_reset = reset_after_n_next_calls(fsspec_file_loader_dp, n_elements_before_reset)
res_before_reset, res_after_reset = reset_after_n_next_calls(fsspec_file_opener_dp, n_elements_before_reset)
self.assertEqual(2, len(res_before_reset))
self.assertEqual(3, len(res_after_reset))
for _name, stream in res_before_reset:
Expand Down
4 changes: 2 additions & 2 deletions test/test_local_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,10 +684,10 @@ def test_io_path_file_loader_iterdatapipe(self):
# Reset Test: Ensure the resulting streams are still readable after the DataPipe is reset/exhausted
self._write_text_files()
lister_dp = FileLister(self.temp_dir.name, "*.text")
iopath_file_loader_dp = IoPathFileOpener(lister_dp, mode="rb")
iopath_file_opener_dp = lister_dp.open_files_by_iopath(mode="rb")

n_elements_before_reset = 2
res_before_reset, res_after_reset = reset_after_n_next_calls(iopath_file_loader_dp, n_elements_before_reset)
res_before_reset, res_after_reset = reset_after_n_next_calls(iopath_file_opener_dp, n_elements_before_reset)
self.assertEqual(2, len(res_before_reset))
self.assertEqual(3, len(res_after_reset))
for _name, stream in res_before_reset:
Expand Down
6 changes: 3 additions & 3 deletions torchdata/datapipes/iter/load/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ Note: refer to the official documentation for detailed installtion instructions

### S3FileLister

`S3FileLister` accepts a list of S3 prefixes and iterates all matching s3 urls. The functional API is `list_file_by_s3`.
Acceptable prefixes include `s3://bucket-name`, `s3://bucket-name/`, `s3://bucket-name/folder`,
`S3FileLister` accepts a list of S3 prefixes and iterates all matching s3 urls. The functional API is
`list_files_by_s3`. Acceptable prefixes include `s3://bucket-name`, `s3://bucket-name/`, `s3://bucket-name/folder`,
`s3://bucket-name/folder/`, and `s3://bucket-name/prefix`. You may also set `length`, `request_timeout_ms` (default 3000
ms in aws-sdk-cpp), and `region`. Note that:

Expand All @@ -48,7 +48,7 @@ ms in aws-sdk-cpp), and `region`. Note that:
### S3FileLoader

`S3FileLoader` accepts a list of S3 URLs and iterates all files in `BytesIO` format with `(url, BytesIO)` tuples. The
functional API is `load_file_by_s3`. You may also set `request_timeout_ms` (default 3000 ms in aws-sdk-cpp), `region`,
functional API is `load_files_by_s3`. You may also set `request_timeout_ms` (default 3000 ms in aws-sdk-cpp), `region`,
`buffer_size` (default 120Mb), and `multi_part_download` (default to use multi-part downloading). Note that:

1. Input **must** be a list and S3 URLs must be valid.
Expand Down
10 changes: 7 additions & 3 deletions torchdata/datapipes/iter/load/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ def __iter__(self) -> Iterator[str]:
yield abs_path


@functional_datapipe("open_file_by_fsspec")
@functional_datapipe("open_files_by_fsspec")
class FSSpecFileOpenerIterDataPipe(IterDataPipe[Tuple[str, StreamWrapper]]):
r"""
Opens files from input datapipe which contains `fsspec` paths and yields a tuple of
pathname and opened file stream (functional name: ``open_file_by_fsspec``).
pathname and opened file stream (functional name: ``open_files_by_fsspec``).
Args:
source_datapipe: Iterable DataPipe that provides the pathnames or URLs
Expand All @@ -114,7 +114,7 @@ class FSSpecFileOpenerIterDataPipe(IterDataPipe[Tuple[str, StreamWrapper]]):
Example:
>>> from torchdata.datapipes.iter import FSSpecFileLister
>>> datapipe = FSSpecFileLister(root=dir_path)
>>> file_dp = datapipe.open_file_by_fsspec()
>>> file_dp = datapipe.open_files_by_fsspec()
"""

def __init__(self, source_datapipe: IterDataPipe[str], mode: str = "r") -> None:
Expand All @@ -133,6 +133,10 @@ def __len__(self) -> int:
return len(self.source_datapipe)


# Register for functional API for backward compatibility
IterDataPipe.register_datapipe_as_function("open_file_by_fsspec", FSSpecFileOpenerIterDataPipe)


@functional_datapipe("save_by_fsspec")
class FSSpecSaverIterDataPipe(IterDataPipe[str]):
r"""
Expand Down
10 changes: 7 additions & 3 deletions torchdata/datapipes/iter/load/iopath.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ def __iter__(self) -> Iterator[str]:
yield os.path.join(path, file_name)


@functional_datapipe("open_file_by_iopath")
@functional_datapipe("open_files_by_iopath")
class IoPathFileOpenerIterDataPipe(IterDataPipe[Tuple[str, StreamWrapper]]):
r"""
Opens files from input datapipe which contains pathnames or URLs,
and yields a tuple of pathname and opened file stream (functional name: ``open_file_by_iopath``).
and yields a tuple of pathname and opened file stream (functional name: ``open_files_by_iopath``).
Args:
source_datapipe: Iterable DataPipe that provides the pathnames or URLs
Expand All @@ -114,7 +114,7 @@ class IoPathFileOpenerIterDataPipe(IterDataPipe[Tuple[str, StreamWrapper]]):
Example:
>>> from torchdata.datapipes.iter import IoPathFileLister
>>> datapipe = IoPathFileLister(root=S3URL)
>>> file_dp = datapipe.open_file_by_iopath()
>>> file_dp = datapipe.open_files_by_iopath()
"""

def __init__(self, source_datapipe: IterDataPipe[str], mode: str = "r", pathmgr=None) -> None:
Expand All @@ -141,6 +141,10 @@ def __len__(self) -> int:
return len(self.source_datapipe)


# Register for functional API for backward compatibility
IterDataPipe.register_datapipe_as_function("open_file_by_iopath", IoPathFileOpenerIterDataPipe)


@functional_datapipe("save_by_iopath")
class IoPathSaverIterDataPipe(IterDataPipe[str]):

Expand Down
8 changes: 4 additions & 4 deletions torchdata/datapipes/iter/load/s3io.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
from torchdata.datapipes.utils import StreamWrapper


@functional_datapipe("list_file_by_s3")
@functional_datapipe("list_files_by_s3")
class S3FileListerIterDataPipe(IterDataPipe[str]):
r"""
Iterable DataPipe that lists Amazon S3 file URLs with the given prefixes (functional name: ``list_file_by_s3``).
Iterable DataPipe that lists Amazon S3 file URLs with the given prefixes (functional name: ``list_files_by_s3``).
Acceptable prefixes include ``s3://bucket-name``, ``s3://bucket-name/``, ``s3://bucket-name/folder``,
``s3://bucket-name/folder/``, and ``s3://bucket-name/prefix``. You may also set ``length``, ``request_timeout_ms``
(default 3000 ms in aws-sdk-cpp), and ``region``.
Expand Down Expand Up @@ -72,10 +72,10 @@ def __len__(self) -> int:
return self.length


@functional_datapipe("load_file_by_s3")
@functional_datapipe("load_files_by_s3")
class S3FileLoaderIterDataPipe(IterDataPipe[Tuple[str, StreamWrapper]]):
r"""
Iterable DataPipe that loads Amazon S3 files from the given S3 URLs (functional name: ``load_file_by_s3``).
Iterable DataPipe that loads Amazon S3 files from the given S3 URLs (functional name: ``load_files_by_s3``).
``S3FileLoader`` iterates all given S3 URLs in ``BytesIO`` format with ``(url, BytesIO)`` tuples.
You may also set ``request_timeout_ms`` (default 3000 ms in aws-sdk-cpp), ``region``,
``buffer_size`` (default 120Mb), and ``multi_part_download`` (default to use multi-part downloading).
Expand Down

0 comments on commit 74c5ba3

Please sign in to comment.