From 725287316d98cb5b0a23a0486b1982d3d7d3e706 Mon Sep 17 00:00:00 2001 From: Paul Gessinger Date: Fri, 15 Dec 2023 11:44:50 +0100 Subject: [PATCH] direct write --- src/herald/artifact_cache.py | 106 +++++++++++++++++++++++++++++++++++ src/herald/github.py | 19 +------ 2 files changed, 109 insertions(+), 16 deletions(-) create mode 100644 src/herald/artifact_cache.py diff --git a/src/herald/artifact_cache.py b/src/herald/artifact_cache.py new file mode 100644 index 0000000..c2256e6 --- /dev/null +++ b/src/herald/artifact_cache.py @@ -0,0 +1,106 @@ +from pathlib import Path +import hashlib +import shutil +from typing import IO +import filelock +import contextlib +import io + +from .logger import logger +from .metric import cache_cull_total + + +class ArtifactCache: + def __init__(self, path: Path, cache_size: int): + self.path = path + self.cache_limit = cache_size + + self.path.mkdir(parents=True, exist_ok=True) + + @staticmethod + def safe_key(key: str) -> str: + return hashlib.sha256(key.encode("utf-8")).hexdigest() + + @contextlib.contextmanager + def key_lock(self, key: str): + with filelock.FileLock(self.path / (key + ".lock"), timeout=10): + yield + + def __contains__(self, key: str) -> bool: + safe_key = self.safe_key(key) + test_path = self.path / safe_key + # with self.key_lock(safe_key): + return test_path.exists() + + def __getitem__(self, key: str) -> bytes: + safe_key = self.safe_key(key) + path = self.path / safe_key + with self.key_lock(safe_key): + if not path.exists(): + raise KeyError() + path.touch() + return path.read_bytes() + + @contextlib.contextmanager + def open(self, key: str, mode: str = "rb"): + safe_key = self.safe_key(key) + with self.key_lock(safe_key): + path = self.path / safe_key + if "w" not in mode and not path.exists(): + raise KeyError() + with path.open(mode) as fh: + yield fh + + def put(self, key: str, buf: IO[bytes]) -> None: + safe_key = self.safe_key(key) + path = self.path / safe_key + with self.key_lock(safe_key): + with path.open("wb") as fh: + shutil.copyfileobj(buf, fh) + + def total_size(self) -> int: + result = 0 + for file in self.path.iterdir(): + if file.suffix == ".lock": + continue + result += file.stat().st_size + return result + + def __len__(self) -> int: + result = 0 + for file in self.path.iterdir(): + if file.suffix == ".lock": + continue + result += 1 + return result + + def cull(self) -> None: + with filelock.FileLock(self.path / "cull.lock", timeout=30): + size = self.total_size() + logger.info( + "Culling artifact cache: size=%d, max size=%d", size, self.cache_limit + ) + deleted_bytes = 0 + num_deleted = 0 + if size > self.cache_limit: + items = list(self.path.iterdir()) + for item in sorted(items, key=lambda i: i.stat().st_mtime): + if item.name == "cull.lock": + # don't delete our current lock + continue + if item.suffix == ".lock": + actual_item = item.parent / item.stem + if not actual_item.exists() and item.exists(): + item.unlink() # delete lock if source file is gone + continue + num_deleted += 1 + deleted_bytes += item.stat().st_size + size -= item.stat().st_size + item.unlink() + item_lock = item.parent / (item.name + ".lock") + if item_lock.exists(): + item_lock.unlink() + if size <= self.cache_limit: + break + cache_cull_total.inc(num_deleted) + logger.info("Culled %d items, %d bytes", num_deleted, deleted_bytes) diff --git a/src/herald/github.py b/src/herald/github.py index 759b504..3ccfc7f 100644 --- a/src/herald/github.py +++ b/src/herald/github.py @@ -183,18 +183,7 @@ def get_artifact(self, token: str, repo: str, artifact_id: int): z = zipfile.ZipFile(io.BytesIO(buffer)) z.extractall(tmpd) - import shutil - - d = Path.cwd() / "tmp" - dt = Path.cwd() / "tmp.tar" - dt.unlink(missing_ok=True) - - shutil.rmtree(d, ignore_errors=True) - shutil.copytree(tmpd, d) - - with tempfile.NamedTemporaryFile( - "wb+" - ) as tar_fh, tempfile.TemporaryFile("wb") as zstd_fh: + with tempfile.NamedTemporaryFile("wb+") as tar_fh: t = tarfile.TarFile(fileobj=tar_fh, mode="w") t.add(tmpd, arcname=".", recursive=True) t.close() @@ -202,11 +191,9 @@ def get_artifact(self, token: str, repo: str, artifact_id: int): tar_fh.flush() tar_fh.seek(0) - shutil.copyfile(tar_fh.name, dt) compressor = zstandard.ZstdCompressor() - buf = io.BytesIO() - compressor.copy_stream(tar_fh, buf) - self._artifact_cache.set(key, buf.getvalue()) + with self._artifact_cache.open(key, "wb") as fh: + compressor.copy_stream(tar_fh, fh) logger.info( "Cache reports key %s created for artifact %d",