Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix for cleaning working dir in case of same uri #49313

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7e25f87
fix for cleaning working dir in case of same uri
ujjawal-khare Dec 17, 2024
ccb394c
master merge
ujjawal-khare Dec 23, 2024
64d936d
removed unused imports
ujjawal-khare Dec 23, 2024
262ebdf
Merge branch 'master' into fix/ray-working-dir
ujjawal-khare Dec 23, 2024
5bc3556
timeout increased
ujjawal-khare Dec 24, 2024
e01f82c
Merge branch 'master' into fix/ray-working-dir
ujjawal-khare Dec 24, 2024
bc4a2d5
timeout reverted
ujjawal-khare Dec 24, 2024
ddd9f5f
overwrite as optional
ujjawal-khare Dec 25, 2024
f86aaa4
Merge branch 'master' into fix/ray-working-dir
ujjawal-khare-27 Dec 25, 2024
0867484
master merge
ujjawal-khare Dec 25, 2024
0212f71
gcs cleanup added
ujjawal-khare Dec 27, 2024
a2cae03
test added
ujjawal-khare Dec 27, 2024
e2951b4
cleanup
ujjawal-khare Dec 27, 2024
528fae3
cleanup
ujjawal-khare Dec 27, 2024
9a463b5
Merge branch 'master' into fix/ray-working-dir
ujjawal-khare-27 Dec 27, 2024
708a688
Merge branch 'master' into fix/ray-working-dir
ujjawal-khare-27 Dec 29, 2024
260f4d1
Merge branch 'master' into fix/ray-working-dir
ujjawal-khare-27 Dec 30, 2024
a141159
Merge branch 'master' into fix/ray-working-dir
ujjawal-khare-27 Dec 31, 2024
877f67f
Merge branch 'master' into fix/ray-working-dir
ujjawal-khare-27 Jan 1, 2025
65af26d
Merge branch 'master' into fix/ray-working-dir
ujjawal-khare Jan 2, 2025
3227384
Merge branch 'master' into fix/ray-working-dir
ujjawal-khare Jan 2, 2025
88b1fbe
Merge branch 'master' into fix/ray-working-dir
ujjawal-khare-27 Jan 3, 2025
e36c629
Merge branch 'master' into fix/ray-working-dir
ujjawal-khare-27 Jan 3, 2025
368eeaf
lint fix
ujjawal-khare Jan 3, 2025
7b28ed4
Merge branch 'master' into fix/ray-working-dir
ujjawal-khare Jan 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions python/ray/_private/runtime_env/packaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ def upload_package_if_needed(
package_file = package_file.with_name(
f"{time.time_ns()}_{os.getpid()}_{package_file.name}"
)

create_package(
module_path,
package_file,
Expand Down Expand Up @@ -656,6 +657,7 @@ async def download_and_unpack_package(
base_directory: str,
gcs_aio_client: Optional["GcsAioClient"] = None, # noqa: F821
logger: Optional[logging.Logger] = default_logger,
overwrite: bool = False,
) -> str:
"""Download the package corresponding to this URI and unpack it if zipped.
Expand All @@ -668,6 +670,7 @@ async def download_and_unpack_package(
directory for the unpacked files.
gcs_aio_client: Client to use for downloading from the GCS.
logger: The logger to use.
overwrite: If True, overwrite the existing package.
Returns:
Path to the local directory containing the unpacked package files.
Expand Down Expand Up @@ -695,10 +698,21 @@ async def download_and_unpack_package(

local_dir = get_local_dir_from_uri(pkg_uri, base_directory)
assert local_dir != pkg_file, "Invalid pkg_file!"
if local_dir.exists():

download_package: bool = True
if local_dir.exists() and not overwrite:
download_package = False
assert local_dir.is_dir(), f"{local_dir} is not a directory"
else:
elif local_dir.exists():
logger.info(f"Removing {local_dir} with pkg_file {pkg_file}")
shutil.rmtree(local_dir)

if download_package:
protocol, _ = parse_uri(pkg_uri)
logger.info(
f"Downloading package from {pkg_uri} to {pkg_file} "
f"with protocol {protocol}"
)
if protocol == Protocol.GCS:
if gcs_aio_client is None:
raise ValueError(
Expand Down
6 changes: 5 additions & 1 deletion python/ray/_private/runtime_env/working_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,11 @@ async def create(
logger: logging.Logger = default_logger,
) -> int:
local_dir = await download_and_unpack_package(
uri, self._resources_dir, self._gcs_aio_client, logger=logger
uri,
self._resources_dir,
self._gcs_aio_client,
logger=logger,
overwrite=True,
)
return get_directory_size_bytes(local_dir)

Expand Down
30 changes: 30 additions & 0 deletions python/ray/tests/test_runtime_env_working_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,36 @@ def insert_test_dir_in_pythonpath():
yield


@pytest.mark.asyncio
async def test_working_dir_cleanup(tmpdir, ray_start_regular):
gcs_aio_client = gcs_utils.GcsAioClient(
address=ray.worker.global_worker.gcs_client.address
)

plugin = WorkingDirPlugin(tmpdir, gcs_aio_client)
await plugin.create(HTTPS_PACKAGE_URI, {}, RuntimeEnvContext())

print(f"tmpdir {tmpdir}")
files = os.listdir(f"{tmpdir}/working_dir_files")

# Iterate over the files and storing creation metadata.
creation_metadata = {}
for file in files:
file_metadata = os.stat(f"{tmpdir}/working_dir_files/{file}")
creation_time = file_metadata.st_ctime
creation_metadata[file] = creation_time

time.sleep(1)

await plugin.create(HTTPS_PACKAGE_URI, {}, RuntimeEnvContext())
files = os.listdir(f"{tmpdir}/working_dir_files")

for file in files:
file_metadata = os.stat(f"{tmpdir}/working_dir_files/{file}")
creation_time_after = file_metadata.st_ctime
assert creation_metadata[file] != creation_time_after


@pytest.mark.asyncio
async def test_create_delete_size_equal(tmpdir, ray_start_regular):
"""Tests that `create` and `delete_uri` return the same size for a URI."""
Expand Down
2 changes: 1 addition & 1 deletion python/ray/train/_internal/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def _list_at_fs_path(

selector = pyarrow.fs.FileSelector(fs_path, allow_not_found=True, recursive=False)
return [
os.path.relpath(file_info.path.lstrip("/"), start=fs_path.lstrip("/"))
os.path.relpath(os.path.abspath(file_info.path), start=os.path.abspath(fs_path))
for file_info in fs.get_file_info(selector)
if file_filter(file_info)
]
Expand Down
Loading