Skip to content

Commit

Permalink
improve structlog test
Browse files Browse the repository at this point in the history
  • Loading branch information
malmans2 committed Mar 25, 2024
1 parent 6e9be00 commit 7367893
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 119 deletions.
64 changes: 30 additions & 34 deletions cacholote/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,25 @@ def __init__(self) -> None:
if basename:
self.file_sizes[posixpath.join(urldir, basename)] += size

self.disk_usage = sum(self.file_sizes.values())
self.log_disk_usage()

@property
def disk_usage(self) -> int:
return sum(self.file_sizes.values())
def pop_file_size(self, file: str) -> int:
size = self.file_sizes.pop(file, 0)
self.disk_usage -= size
return size

def log_disk_usage(self) -> None:
self.logger.info("disk usage check", disk_usage=self.disk_usage)
self.logger.info("check disk usage", disk_usage=self.disk_usage)

def stop_cleaning(self, maxsize: int) -> bool:
return self.disk_usage <= maxsize

def get_unknown_sizes(self, lock_validity_period: float | None) -> dict[str, int]:
def get_unknown_files(self, lock_validity_period: float | None) -> set[str]:
self.logger.info("getting unknown files")

utcnow = utils.utcnow()
files_to_skip = []
locked_files = set()
for urlpath in self.file_sizes:
if urlpath.endswith(".lock"):
modified = self.fs.modified(urlpath)
Expand All @@ -132,29 +134,27 @@ def get_unknown_sizes(self, lock_validity_period: float | None) -> dict[str, int
if lock_validity_period is None or delta < datetime.timedelta(
seconds=lock_validity_period
):
files_to_skip.append(urlpath)
files_to_skip.append(urlpath.rsplit(".lock", 1)[0])
locked_files.add(urlpath)
locked_files.add(urlpath.rsplit(".lock", 1)[0])

unknown_sizes = {
k: v for k, v in self.file_sizes.items() if k not in files_to_skip
}
if unknown_sizes:
if unknown_files := (set(self.file_sizes) - locked_files):
with config.get().instantiated_sessionmaker() as session:
for cache_entry in session.scalars(sa.select(database.CacheEntry)):
for file in _get_files_from_cache_entry(cache_entry):
unknown_sizes.pop(file, 0)
return unknown_sizes
for known_file in _get_files_from_cache_entry(cache_entry):
unknown_files.remove(known_file)
if not unknown_files:
break
return unknown_files

def delete_unknown_files(
self, lock_validity_period: float | None, recursive: bool
) -> None:
unknown_sizes = self.get_unknown_sizes(lock_validity_period)
for urlpath in unknown_sizes:
self.file_sizes.pop(urlpath, 0)
unknown_files = self.get_unknown_files(lock_validity_period)
for urlpath in unknown_files:
self.pop_file_size(urlpath)
self.remove_files(
list(unknown_sizes),
list(unknown_files),
recursive=recursive,
msg="deleting unknown files",
)
self.log_disk_usage()

Expand Down Expand Up @@ -207,17 +207,13 @@ def remove_files(
self,
files: list[str],
max_tries: int = 10,
msg: str = "deleting cache files",
**kwargs: Any,
) -> None:
assert max_tries >= 1
if not files:
return

if files:
self.logger.info(
msg,
number_of_files=len(files),
recursive=kwargs.get("recursive", False),
)
self.logger.info("deleting files", n_files_to_delete=len(files), **kwargs)

n_tries = 0
while files:
Expand All @@ -226,6 +222,7 @@ def remove_files(
self.fs.rm(files, **kwargs)
return
except FileNotFoundError:
# Another concurrent process might have deleted files
if n_tries >= max_tries:
raise
files = [file for file in files if self.fs.exists(file)]
Expand All @@ -246,18 +243,18 @@ def delete_cache_files(
files_to_delete = []
dirs_to_delete = []
self.logger.info("getting cache entries to delete")
number_of_cache_entries = 0
n_entries_to_delete = 0
with config.get().instantiated_sessionmaker() as session:
for cache_entry in session.scalars(
sa.select(database.CacheEntry).filter(*filters).order_by(*sorters)
):
files = _get_files_from_cache_entry(cache_entry)
if files:
number_of_cache_entries += 1
n_entries_to_delete += 1
session.delete(cache_entry)

for file, file_type in files.items():
self.file_sizes.pop(file, 0)
self.pop_file_size(file)
if file_type == "application/vnd+zarr":
dirs_to_delete.append(file)
else:
Expand All @@ -266,10 +263,9 @@ def delete_cache_files(
if self.stop_cleaning(maxsize):
break

if number_of_cache_entries:
if n_entries_to_delete:
self.logger.info(
"deleting cache entries",
number_of_cache_entries=number_of_cache_entries,
"deleting cache entries", n_entries_to_delete=n_entries_to_delete
)
database._commit_or_rollback(session)

Expand All @@ -282,7 +278,7 @@ def delete_cache_files(
(
f"Unable to clean {self.dirname!r}."
f" Final disk usage: {self.disk_usage!r}."
f" Expected disk usage: {maxsize!r}"
f" Target disk usage: {maxsize!r}"
)
)

Expand Down
2 changes: 2 additions & 0 deletions ci/environment-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ dependencies:
- types-requests
- xarray>=2022.6.0
- zarr
- pip:
- pytest-structlog
64 changes: 38 additions & 26 deletions tests/test_40_xarray_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import fsspec
import pytest
import pytest_structlog
import structlog

from cacholote import cache, config, decode, encode, extra_encoders, utils
Expand Down Expand Up @@ -163,37 +164,48 @@ def test_xr_corrupted_files(
assert fs.exists(cached_path)


def test_xr_logging(capsys: pytest.CaptureFixture[str]) -> None:
def test_xr_logging(log: pytest_structlog.StructuredLogCapture) -> None:
config.set(logger=structlog.get_logger(), raise_all_encoding_errors=True)

# Cache dataset
cfunc = cache.cacheable(get_grib_ds)
cached_ds = cfunc()
captured = iter(capsys.readouterr().out.splitlines())

line = next(captured)
assert "start write tmp file" in line
assert "urlpath=" in line

line = next(captured)
assert "end write tmp file" in line
assert "urlpath=" in line
assert "write_tmp_file_time=" in line

line = next(captured)
assert "start upload" in line
assert f"urlpath=file://{cached_ds.encoding['source']}" in line
assert "size=22597" in line

line = next(captured)
assert "end upload" in line
assert f"urlpath=file://{cached_ds.encoding['source']}" in line
assert "upload_time=" in line
assert "size=22597" in line

line = next(captured)
assert "retrieve cache file" in line
assert f"urlpath=file://{cached_ds.encoding['source']}" in line
urlpath = f"file://{cached_ds.encoding['source']}"
tmpfile = log.events[0]["urlpath"]
assert urlpath.rsplit("/", 1)[1] == tmpfile.rsplit("/", 1)[1]

expected = [
{
"urlpath": tmpfile,
"event": "start write tmp file",
"level": "info",
},
{
"urlpath": tmpfile,
"write_tmp_file_time": log.events[1]["write_tmp_file_time"],
"event": "end write tmp file",
"level": "info",
},
{
"urlpath": urlpath,
"size": 22597,
"event": "start upload",
"level": "info",
},
{
"urlpath": urlpath,
"size": 22597,
"upload_time": log.events[3]["upload_time"],
"event": "end upload",
"level": "info",
},
{
"urlpath": urlpath,
"event": "retrieve cache file",
"level": "info",
},
]
assert log.events == expected


@pytest.mark.parametrize(
Expand Down
69 changes: 41 additions & 28 deletions tests/test_50_io_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import fsspec
import pytest
import pytest_httpserver
import pytest_structlog
import structlog

from cacholote import cache, config, decode, encode, extra_encoders, utils
Expand Down Expand Up @@ -187,37 +188,49 @@ def test_content_type(tmp_path: pathlib.Path, set_cache: str) -> None:


@pytest.mark.parametrize("set_cache", ["cads"], indirect=True)
def test_io_logging(capsys: pytest.CaptureFixture[str], tmp_path: pathlib.Path) -> None:
def test_io_logging(
log: pytest_structlog.StructuredLogCapture, tmp_path: pathlib.Path
) -> None:
config.set(logger=structlog.get_logger(), io_delete_original=True)

# Cache file
tmpfile = tmp_path / "test.txt"
fsspec.filesystem("file").touch(tmpfile)
cached_file = cached_open(tmpfile)
captured = iter(capsys.readouterr().out.splitlines())

line = next(captured)
assert "start upload" in line
assert f"urlpath=s3://{cached_file.path}" in line
assert "size=0" in line

line = next(captured)
assert "end upload" in line
assert f"urlpath=s3://{cached_file.path}" in line
assert "upload_time=" in line
assert "size=0" in line

line = next(captured)
assert "start remove" in line
assert f"urlpath=file://{tmpfile}" in line
assert "size=0" in line

line = next(captured)
assert "end remove" in line
assert f"urlpath=file://{tmpfile}" in line
assert "remove_time=" in line
assert "size=0" in line

line = next(captured)
assert "retrieve cache file" in line
assert f"urlpath=s3://{cached_file.path}" in line

tmp_urlpath = f"file://{tmpfile!s}"
cached_urlpath = f"s3://{cached_file.path}"
expected = [
{
"urlpath": cached_urlpath,
"size": 0,
"event": "start upload",
"level": "info",
},
{
"urlpath": cached_urlpath,
"size": 0,
"upload_time": log.events[1]["upload_time"],
"event": "end upload",
"level": "info",
},
{
"urlpath": tmp_urlpath,
"size": 0,
"event": "start remove",
"level": "info",
},
{
"urlpath": tmp_urlpath,
"size": 0,
"remove_time": log.events[3]["remove_time"],
"event": "end remove",
"level": "info",
},
{
"urlpath": cached_urlpath,
"event": "retrieve cache file",
"level": "info",
},
]
assert log.events == expected
54 changes: 23 additions & 31 deletions tests/test_60_clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import fsspec
import pydantic
import pytest
import pytest_structlog
import structlog

from cacholote import cache, clean, config, utils
Expand Down Expand Up @@ -253,7 +254,7 @@ def test_clean_invalid_cache_entries(


def test_cleaner_logging(
capsys: pytest.CaptureFixture[str], tmp_path: pathlib.Path
log: pytest_structlog.StructuredLogCapture, tmp_path: pathlib.Path
) -> None:
# Cache file and create unknown
tmpfile = tmp_path / "test.txt"
Expand All @@ -265,37 +266,28 @@ def test_cleaner_logging(
# Clean
config.set(logger=structlog.get_logger())
clean.clean_cache_files(0, delete_unknown_files=True)
captured = iter(capsys.readouterr().out.splitlines())

sep = " " * 15
line = next(captured)
assert "getting disk usage" in line

line = next(captured)
assert line.endswith(f"disk usage check{sep}disk_usage=2")

line = next(captured)
assert "getting unknown files" in line

line = next(captured)
line.endswith(f"deleting unknown files{sep}number_of_files=1{sep}recursive=False")

line = next(captured)
line.endswith(f"disk usage check{sep}disk_usage=1")

line = next(captured)
assert "getting cache entries to delete" in line

line = next(captured)
line.endswith(f"deleting cache entries{sep}number_of_cache_entries=1")

line = next(captured)
line.endswith(f"deleting cache files{sep}number_of_files=1{sep}recursive=False")

line = next(captured)
line.endswith(f"disk usage check{sep}disk_usage=0")

assert next(captured, None) is None
assert log.events == [
{"event": "getting disk usage", "level": "info"},
{"disk_usage": 2, "event": "check disk usage", "level": "info"},
{"event": "getting unknown files", "level": "info"},
{
"n_files_to_delete": 1,
"recursive": False,
"event": "deleting files",
"level": "info",
},
{"disk_usage": 1, "event": "check disk usage", "level": "info"},
{"event": "getting cache entries to delete", "level": "info"},
{"n_entries_to_delete": 1, "event": "deleting cache entries", "level": "info"},
{
"n_files_to_delete": 1,
"recursive": False,
"event": "deleting files",
"level": "info",
},
{"disk_usage": 0, "event": "check disk usage", "level": "info"},
]


def test_clean_multiple_files(tmp_path: pathlib.Path) -> None:
Expand Down

0 comments on commit 7367893

Please sign in to comment.