Skip to content

Commit

Permalink
Add test for dirac fs delete()
Browse files Browse the repository at this point in the history
  • Loading branch information
sverhoeven committed Mar 11, 2024
1 parent 0d22412 commit 596817f
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 16 deletions.
31 changes: 15 additions & 16 deletions src/bartender/filesystems/dirac.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from pathlib import Path
from shutil import make_archive, unpack_archive
from typing import Any

from aiofiles.tempfile import TemporaryDirectory
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
Expand Down Expand Up @@ -63,9 +64,6 @@ async def upload(self, src: JobDescription, target: JobDescription) -> None:
Args:
src: Local directory to copy from.
target: Remote directory to copy to.
Raises:
RuntimeError: When upload failed.
"""
put = async_wrap(self.dm.putAndRegister)
async with TemporaryDirectory(prefix="bartender-upload") as tmpdirname:
Expand All @@ -80,9 +78,7 @@ async def upload(self, src: JobDescription, target: JobDescription) -> None:
fileName=str(archive_fn),
diracSE=self.storage_element,
)
if not result["OK"]:
logger.warning(result)
raise RuntimeError(result["Message"])
_check_for_failure(result)

async def download(self, src: JobDescription, target: JobDescription) -> None:
"""Download job directory of source description to job directory of target.
Expand All @@ -93,9 +89,6 @@ async def download(self, src: JobDescription, target: JobDescription) -> None:
Args:
src: Remote directory to copy from.
target: Local directory to copy to.
Raises:
RuntimeError: When download failed.
"""
archive_base_fn = "output.tar"
archive_fn_on_grid = Path(src.job_dir) / archive_base_fn
Expand All @@ -105,8 +98,7 @@ async def download(self, src: JobDescription, target: JobDescription) -> None:
str(archive_fn_on_grid),
tmpdirname,
)
if not result["OK"]:
raise RuntimeError(result["Message"])
_check_for_failure(result)
archive_fn_in_tmpdir = Path(tmpdirname) / archive_base_fn
if not archive_fn_in_tmpdir.exists():
# Failed job does not have a output.tar
Expand All @@ -124,15 +116,11 @@ async def delete(self, description: JobDescription) -> None:
Args:
description: The job description.
Raises:
RuntimeError: When deletion failed.
"""
result = await async_wrap(self.dm.cleanLogicalDirectory)(
str(description.job_dir),
)
if not result["OK"]:
raise RuntimeError(result["Message"])
_check_for_failure(result)

async def _pack(self, root_dir: Path, container_dir: Path) -> Path:
archive_base_fn = container_dir / "input"
Expand All @@ -143,3 +131,14 @@ async def _pack(self, root_dir: Path, container_dir: Path) -> Path:
root_dir,
)
return Path(archive_fn)


def _check_for_failure(result: Any) -> None:
if not result["OK"]:
raise RuntimeError(result["Message"])
if result["Value"]["Failed"]:
# All dm method are for single lfn,
# but failed is a dict with lfn as key
# so pick the last value as the error message
msg = list(result["Value"]["Failed"].values()).pop()
raise RuntimeError(msg)
52 changes: 52 additions & 0 deletions tests_dirac/test_it.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,55 @@ async def test_failing_job( # noqa: WPS217 single piece of code for readablilty
await fs.delete(gdescription)
await fs.close()
await scheduler.close()


@pytest.mark.anyio
async def test_filesystem_delete(
tmp_path: Path,
) -> None:
fs_config = DiracFileSystemConfig(
lfn_root="/tutoVO/user/c/ciuser/bartenderjobs",
storage_element="StorageElementOne",
)
fs = DiracFileSystem(fs_config)

job_dir = tmp_path / "job1"
job_dir.mkdir()
(job_dir / "input.tar").write_text("the input files")
(job_dir / "output.tar").write_text("the ouput files")
description = JobDescription(
job_dir=job_dir,
command="uptime",
)
gdescription = fs.localize_description(description, tmp_path)

try:
# A completed job will have a input.tar and output.tar on grid storage.
# need to use dirac data manager
# as DiracFileSystem does not allow uploading random files
input_put_result = fs.dm.putAndRegister(
lfn=str(gdescription.job_dir / "input.tar"),
fileName=str(description.job_dir / "input.tar"),
diracSE=fs.storage_element,
)
assert input_put_result["OK"] and not input_put_result["Value"]["Failed"]
output_put_result = fs.dm.putAndRegister(
lfn=str(gdescription.job_dir / "output.tar"),
fileName=str(description.job_dir / "output.tar"),
diracSE=fs.storage_element,
)
assert output_put_result["OK"] and not input_put_result["Value"]["Failed"]

await fs.delete(gdescription)

# Unable to get files after deletion of job dir on grid storage.
input_get_result = fs.dm.getFile(
str(gdescription.job_dir / "input.tar"),
tmp_path,
)
input_get_error = list(input_get_result["Value"]["Failed"].values()).pop()
assert input_get_error == "No such file or directory"
with pytest.raises(RuntimeError, match="No such file or directory"):
await fs.download(gdescription, description)
finally:
await fs.close()

0 comments on commit 596817f

Please sign in to comment.