From 53241d074f72ded86718b198530ba6c5fed6fa42 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Wed, 3 Aug 2022 15:14:49 +0900 Subject: [PATCH] contrib: add monkeypatched filesystem for kombu<5.3 --- pyproject.toml | 6 + setup.cfg | 3 +- src/dvc_task/__init__.py | 7 + src/dvc_task/app/filesystem.py | 14 +- src/dvc_task/contrib/__init__.py | 0 src/dvc_task/contrib/kombu_filesystem.py | 261 +++++++++++++++++++++++ 6 files changed, 277 insertions(+), 14 deletions(-) create mode 100644 src/dvc_task/contrib/__init__.py create mode 100644 src/dvc_task/contrib/kombu_filesystem.py diff --git a/pyproject.toml b/pyproject.toml index b04a634..8ae5929 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,7 @@ addopts = "-ra" [tool.coverage.run] branch = true source = ["dvc_task", "tests"] +omit = ["dvc_task/contrib/*"] [tool.coverage.paths] source = ["src", "*/site-packages"] @@ -62,16 +63,21 @@ warn_no_return = true warn_redundant_casts = true warn_unreachable = true files = ["src", "tests"] +exclude = ["^src/dvc_task/contrib"] [[tool.mypy.overrides]] module = [ "celery.*", "funcy.*", "shortuuid", + "kombu.message.*", "kombu.utils.*", ] ignore_missing_imports = true +[tool.pylint.master] +ignore-paths = ["src/dvc_task/contrib"] + [tool.pylint.message_control] enable = ["c-extension-no-member", "no-else-return"] diff --git a/setup.cfg b/setup.cfg index 59aefdb..67df69c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -24,7 +24,7 @@ package_dir= packages = find: install_requires= celery>=5.2.0,<6 - kombu + kombu>=5.2.0,<6 funcy>=1.17 shortuuid>=1.0.8 pywin32>=225; sys_platform == 'win32' @@ -38,7 +38,6 @@ docs = mkdocstrings-python==0.7.1 tests = flaky==3.7.0 - kombu>=5.3.0a1 pytest==7.1.2 pytest-sugar==0.9.4 pytest-cov==3.0.0 diff --git a/src/dvc_task/__init__.py b/src/dvc_task/__init__.py index 266da8c..87b76bd 100644 --- a/src/dvc_task/__init__.py +++ b/src/dvc_task/__init__.py @@ -1 +1,8 @@ """DVC Task.""" + +__all__ = [ + "app", + "proc", + "utils", + "worker", +] diff --git a/src/dvc_task/app/filesystem.py b/src/dvc_task/app/filesystem.py index 1b48429..315e1fa 100644 --- a/src/dvc_task/app/filesystem.py +++ b/src/dvc_task/app/filesystem.py @@ -8,22 +8,13 @@ from kombu.utils.encoding import bytes_to_str from kombu.utils.json import loads -from ..exceptions import DvcTaskError +from ..contrib.kombu_filesystem import backport_filesystem_transport from ..utils import makedirs, remove, unc_path logger = logging.getLogger(__name__) -def _check_kombu_version(): - # pylint: disable=import-outside-toplevel - from kombu import VERSION, __version__ - - # FSApp requires kombu >= 5.3.0 - if VERSION.major < 5 or (VERSION.major == 5 and VERSION.minor < 3): - raise DvcTaskError( - f"Unsupported Kombu version '{__version__}' found. " - "dvc-task FSApp requires Kombu >=5.3.0." - ) +backport_filesystem_transport() def _get_fs_config( @@ -91,7 +82,6 @@ def __init__( Additional arguments will be passed into the Celery constructor. """ - _check_kombu_version() super().__init__(*args, **kwargs) self.wdir = wdir or os.getcwd() self.conf.update( diff --git a/src/dvc_task/contrib/__init__.py b/src/dvc_task/contrib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/dvc_task/contrib/kombu_filesystem.py b/src/dvc_task/contrib/kombu_filesystem.py new file mode 100644 index 0000000..209f5e6 --- /dev/null +++ b/src/dvc_task/contrib/kombu_filesystem.py @@ -0,0 +1,261 @@ +"""Kombu filesystem transport module. + +Contains classes which need to be backported in kombu <5.3.0 via monkeypatch. +""" +import os +import shutil +import tempfile +import uuid +from collections import namedtuple +from contextlib import contextmanager +from pathlib import Path +from queue import Empty +from time import monotonic + +from kombu.exceptions import ChannelError +from kombu.transport import virtual +from kombu.utils.encoding import bytes_to_str, str_to_bytes +from kombu.utils.json import dumps, loads +from kombu.utils.objects import cached_property + +from ..exceptions import DvcTaskError + +# needs win32all to work on Windows +if os.name == "nt": + + import pywintypes + import win32con + import win32file + + LOCK_EX = win32con.LOCKFILE_EXCLUSIVE_LOCK + # 0 is the default + LOCK_SH = 0 + LOCK_NB = win32con.LOCKFILE_FAIL_IMMEDIATELY + __overlapped = pywintypes.OVERLAPPED() + + def lock(file, flags): + """Create file lock.""" + hfile = win32file._get_osfhandle(file.fileno()) + win32file.LockFileEx(hfile, flags, 0, 0xFFFF0000, __overlapped) + + def unlock(file): + """Remove file lock.""" + hfile = win32file._get_osfhandle(file.fileno()) + win32file.UnlockFileEx(hfile, 0, 0xFFFF0000, __overlapped) + +elif os.name == "posix": + + import fcntl + from fcntl import LOCK_EX, LOCK_NB, LOCK_SH # noqa + + def lock(file, flags): + """Create file lock.""" + fcntl.flock(file.fileno(), flags) + + def unlock(file): + """Remove file lock.""" + fcntl.flock(file.fileno(), fcntl.LOCK_UN) + + +exchange_queue_t = namedtuple( + "exchange_queue_t", ["routing_key", "pattern", "queue"] +) + + +class FilesystemChannel(virtual.Channel): + """Filesystem Channel.""" + + supports_fanout = True + + @contextmanager + def _get_exchange_file_obj(self, exchange, mode="rb"): + file = self.control_folder / f"{exchange}.exchange" + if "w" in mode: + self.control_folder.mkdir(exist_ok=True) + f_obj = file.open(mode) + + try: + if "w" in mode: + lock(f_obj, LOCK_EX) + yield f_obj + except OSError: + raise ChannelError(f"Cannot open {file}") + finally: + if "w" in mode: + unlock(f_obj) + f_obj.close() + + def get_table(self, exchange): + try: + with self._get_exchange_file_obj(exchange) as f_obj: + exchange_table = loads(bytes_to_str(f_obj.read())) + return [exchange_queue_t(*q) for q in exchange_table] + except FileNotFoundError: + return [] + + def _queue_bind(self, exchange, routing_key, pattern, queue): + queues = self.get_table(exchange) + queue_val = exchange_queue_t( + routing_key or "", pattern or "", queue or "" + ) + if queue_val not in queues: + queues.insert(0, queue_val) + with self._get_exchange_file_obj(exchange, "wb") as f_obj: + f_obj.write(str_to_bytes(dumps(queues))) + + def _put_fanout(self, exchange, payload, routing_key, **kwargs): + for q in self.get_table(exchange): + self._put(q.queue, payload, **kwargs) + + def _put(self, queue, payload, **kwargs): + """Put `message` onto `queue`.""" + filename = "{}_{}.{}.msg".format( + int(round(monotonic() * 1000)), uuid.uuid4(), queue + ) + filename = os.path.join(self.data_folder_out, filename) + + try: + f = open(filename, "wb") + lock(f, LOCK_EX) + f.write(str_to_bytes(dumps(payload))) + except OSError: + raise ChannelError(f"Cannot add file {filename!r} to directory") + finally: + unlock(f) + f.close() + + def _get(self, queue): + """Get next message from `queue`.""" + queue_find = "." + queue + ".msg" + folder = os.listdir(self.data_folder_in) + folder = sorted(folder) + while len(folder) > 0: + filename = folder.pop(0) + + # only handle message for the requested queue + if filename.find(queue_find) < 0: + continue + + if self.store_processed: + processed_folder = self.processed_folder + else: + processed_folder = tempfile.gettempdir() + + try: + # move the file to the tmp/processed folder + shutil.move( + os.path.join(self.data_folder_in, filename), + processed_folder, + ) + except OSError: + pass # file could be locked, or removed in meantime so ignore + + filename = os.path.join(processed_folder, filename) + try: + f = open(filename, "rb") + payload = f.read() + f.close() + if not self.store_processed: + os.remove(filename) + except OSError: + raise ChannelError( + f"Cannot read file {filename!r} from queue." + ) + + return loads(bytes_to_str(payload)) + + raise Empty() + + def _purge(self, queue): + """Remove all messages from `queue`.""" + count = 0 + queue_find = "." + queue + ".msg" + + folder = os.listdir(self.data_folder_in) + while len(folder) > 0: + filename = folder.pop() + try: + # only purge messages for the requested queue + if filename.find(queue_find) < 0: + continue + + filename = os.path.join(self.data_folder_in, filename) + os.remove(filename) + + count += 1 + + except OSError: + # we simply ignore its existence, as it was probably + # processed by another worker + pass + + return count + + def _size(self, queue): + """Return the number of messages in `queue` as an :class:`int`.""" + count = 0 + + queue_find = f".{queue}.msg" + folder = os.listdir(self.data_folder_in) + while len(folder) > 0: + filename = folder.pop() + + # only handle message for the requested queue + if filename.find(queue_find) < 0: + continue + + count += 1 + + return count + + @property + def transport_options(self): + return self.connection.client.transport_options + + @cached_property + def data_folder_in(self): + return self.transport_options.get("data_folder_in", "data_in") + + @cached_property + def data_folder_out(self): + return self.transport_options.get("data_folder_out", "data_out") + + @cached_property + def store_processed(self): + return self.transport_options.get("store_processed", False) + + @cached_property + def processed_folder(self): + return self.transport_options.get("processed_folder", "processed") + + @property + def control_folder(self): + return Path(self.transport_options.get("control_folder", "control")) + + +def _need_backport(): + # pylint: disable=import-outside-toplevel + from kombu import VERSION, __version__ + + # FSApp requires kombu >= 5.3.0 + if VERSION.major < 5 or (VERSION.major == 5 and VERSION.minor < 2): + raise DvcTaskError( + f"Unsupported Kombu version '{__version__}' found. " + "dvc-task requires Kombu >=5.2.0." + ) + if VERSION.major == 5 and VERSION.minor < 3: + return True + return False + + +def backport_filesystem_transport(): + if _need_backport(): + import kombu.transport.filesystem + + kombu.transport.filesystem.Transport.implements = ( + virtual.Transport.implements.extend( + asynchronous=False, + exchange_type=frozenset(["direct", "topic", "fanout"]), + ) + ) + kombu.transport.filesystem.Transport.Channel = FilesystemChannel