Skip to content

Commit

Permalink
contrib: add monkeypatched filesystem for kombu<5.3
Browse files Browse the repository at this point in the history
  • Loading branch information
pmrowla committed Aug 3, 2022
1 parent c0af209 commit 53241d0
Show file tree
Hide file tree
Showing 6 changed files with 277 additions and 14 deletions.
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ addopts = "-ra"
[tool.coverage.run]
branch = true
source = ["dvc_task", "tests"]
omit = ["dvc_task/contrib/*"]

[tool.coverage.paths]
source = ["src", "*/site-packages"]
Expand Down Expand Up @@ -62,16 +63,21 @@ warn_no_return = true
warn_redundant_casts = true
warn_unreachable = true
files = ["src", "tests"]
exclude = ["^src/dvc_task/contrib"]

[[tool.mypy.overrides]]
module = [
"celery.*",
"funcy.*",
"shortuuid",
"kombu.message.*",
"kombu.utils.*",
]
ignore_missing_imports = true

[tool.pylint.master]
ignore-paths = ["src/dvc_task/contrib"]

[tool.pylint.message_control]
enable = ["c-extension-no-member", "no-else-return"]

Expand Down
3 changes: 1 addition & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package_dir=
packages = find:
install_requires=
celery>=5.2.0,<6
kombu
kombu>=5.2.0,<6
funcy>=1.17
shortuuid>=1.0.8
pywin32>=225; sys_platform == 'win32'
Expand All @@ -38,7 +38,6 @@ docs =
mkdocstrings-python==0.7.1
tests =
flaky==3.7.0
kombu>=5.3.0a1
pytest==7.1.2
pytest-sugar==0.9.4
pytest-cov==3.0.0
Expand Down
7 changes: 7 additions & 0 deletions src/dvc_task/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
"""DVC Task."""

__all__ = [
"app",
"proc",
"utils",
"worker",
]
14 changes: 2 additions & 12 deletions src/dvc_task/app/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,13 @@
from kombu.utils.encoding import bytes_to_str
from kombu.utils.json import loads

from ..exceptions import DvcTaskError
from ..contrib.kombu_filesystem import backport_filesystem_transport
from ..utils import makedirs, remove, unc_path

logger = logging.getLogger(__name__)


def _check_kombu_version():
# pylint: disable=import-outside-toplevel
from kombu import VERSION, __version__

# FSApp requires kombu >= 5.3.0
if VERSION.major < 5 or (VERSION.major == 5 and VERSION.minor < 3):
raise DvcTaskError(
f"Unsupported Kombu version '{__version__}' found. "
"dvc-task FSApp requires Kombu >=5.3.0."
)
backport_filesystem_transport()


def _get_fs_config(
Expand Down Expand Up @@ -91,7 +82,6 @@ def __init__(
Additional arguments will be passed into the Celery constructor.
"""
_check_kombu_version()
super().__init__(*args, **kwargs)
self.wdir = wdir or os.getcwd()
self.conf.update(
Expand Down
Empty file.
261 changes: 261 additions & 0 deletions src/dvc_task/contrib/kombu_filesystem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
"""Kombu filesystem transport module.
Contains classes which need to be backported in kombu <5.3.0 via monkeypatch.
"""
import os
import shutil
import tempfile
import uuid
from collections import namedtuple
from contextlib import contextmanager
from pathlib import Path
from queue import Empty
from time import monotonic

from kombu.exceptions import ChannelError
from kombu.transport import virtual
from kombu.utils.encoding import bytes_to_str, str_to_bytes
from kombu.utils.json import dumps, loads
from kombu.utils.objects import cached_property

from ..exceptions import DvcTaskError

# needs win32all to work on Windows
if os.name == "nt":

import pywintypes
import win32con
import win32file

LOCK_EX = win32con.LOCKFILE_EXCLUSIVE_LOCK
# 0 is the default
LOCK_SH = 0
LOCK_NB = win32con.LOCKFILE_FAIL_IMMEDIATELY
__overlapped = pywintypes.OVERLAPPED()

def lock(file, flags):
"""Create file lock."""
hfile = win32file._get_osfhandle(file.fileno())
win32file.LockFileEx(hfile, flags, 0, 0xFFFF0000, __overlapped)

def unlock(file):
"""Remove file lock."""
hfile = win32file._get_osfhandle(file.fileno())
win32file.UnlockFileEx(hfile, 0, 0xFFFF0000, __overlapped)

elif os.name == "posix":

import fcntl
from fcntl import LOCK_EX, LOCK_NB, LOCK_SH # noqa

def lock(file, flags):
"""Create file lock."""
fcntl.flock(file.fileno(), flags)

def unlock(file):
"""Remove file lock."""
fcntl.flock(file.fileno(), fcntl.LOCK_UN)


exchange_queue_t = namedtuple(
"exchange_queue_t", ["routing_key", "pattern", "queue"]
)


class FilesystemChannel(virtual.Channel):
"""Filesystem Channel."""

supports_fanout = True

@contextmanager
def _get_exchange_file_obj(self, exchange, mode="rb"):
file = self.control_folder / f"{exchange}.exchange"
if "w" in mode:
self.control_folder.mkdir(exist_ok=True)
f_obj = file.open(mode)

try:
if "w" in mode:
lock(f_obj, LOCK_EX)
yield f_obj
except OSError:
raise ChannelError(f"Cannot open {file}")
finally:
if "w" in mode:
unlock(f_obj)
f_obj.close()

def get_table(self, exchange):
try:
with self._get_exchange_file_obj(exchange) as f_obj:
exchange_table = loads(bytes_to_str(f_obj.read()))
return [exchange_queue_t(*q) for q in exchange_table]
except FileNotFoundError:
return []

def _queue_bind(self, exchange, routing_key, pattern, queue):
queues = self.get_table(exchange)
queue_val = exchange_queue_t(
routing_key or "", pattern or "", queue or ""
)
if queue_val not in queues:
queues.insert(0, queue_val)
with self._get_exchange_file_obj(exchange, "wb") as f_obj:
f_obj.write(str_to_bytes(dumps(queues)))

def _put_fanout(self, exchange, payload, routing_key, **kwargs):
for q in self.get_table(exchange):
self._put(q.queue, payload, **kwargs)

def _put(self, queue, payload, **kwargs):
"""Put `message` onto `queue`."""
filename = "{}_{}.{}.msg".format(
int(round(monotonic() * 1000)), uuid.uuid4(), queue
)
filename = os.path.join(self.data_folder_out, filename)

try:
f = open(filename, "wb")
lock(f, LOCK_EX)
f.write(str_to_bytes(dumps(payload)))
except OSError:
raise ChannelError(f"Cannot add file {filename!r} to directory")
finally:
unlock(f)
f.close()

def _get(self, queue):
"""Get next message from `queue`."""
queue_find = "." + queue + ".msg"
folder = os.listdir(self.data_folder_in)
folder = sorted(folder)
while len(folder) > 0:
filename = folder.pop(0)

# only handle message for the requested queue
if filename.find(queue_find) < 0:
continue

if self.store_processed:
processed_folder = self.processed_folder
else:
processed_folder = tempfile.gettempdir()

try:
# move the file to the tmp/processed folder
shutil.move(
os.path.join(self.data_folder_in, filename),
processed_folder,
)
except OSError:
pass # file could be locked, or removed in meantime so ignore

filename = os.path.join(processed_folder, filename)
try:
f = open(filename, "rb")
payload = f.read()
f.close()
if not self.store_processed:
os.remove(filename)
except OSError:
raise ChannelError(
f"Cannot read file {filename!r} from queue."
)

return loads(bytes_to_str(payload))

raise Empty()

def _purge(self, queue):
"""Remove all messages from `queue`."""
count = 0
queue_find = "." + queue + ".msg"

folder = os.listdir(self.data_folder_in)
while len(folder) > 0:
filename = folder.pop()
try:
# only purge messages for the requested queue
if filename.find(queue_find) < 0:
continue

filename = os.path.join(self.data_folder_in, filename)
os.remove(filename)

count += 1

except OSError:
# we simply ignore its existence, as it was probably
# processed by another worker
pass

return count

def _size(self, queue):
"""Return the number of messages in `queue` as an :class:`int`."""
count = 0

queue_find = f".{queue}.msg"
folder = os.listdir(self.data_folder_in)
while len(folder) > 0:
filename = folder.pop()

# only handle message for the requested queue
if filename.find(queue_find) < 0:
continue

count += 1

return count

@property
def transport_options(self):
return self.connection.client.transport_options

@cached_property
def data_folder_in(self):
return self.transport_options.get("data_folder_in", "data_in")

@cached_property
def data_folder_out(self):
return self.transport_options.get("data_folder_out", "data_out")

@cached_property
def store_processed(self):
return self.transport_options.get("store_processed", False)

@cached_property
def processed_folder(self):
return self.transport_options.get("processed_folder", "processed")

@property
def control_folder(self):
return Path(self.transport_options.get("control_folder", "control"))


def _need_backport():
# pylint: disable=import-outside-toplevel
from kombu import VERSION, __version__

# FSApp requires kombu >= 5.3.0
if VERSION.major < 5 or (VERSION.major == 5 and VERSION.minor < 2):
raise DvcTaskError(
f"Unsupported Kombu version '{__version__}' found. "
"dvc-task requires Kombu >=5.2.0."
)
if VERSION.major == 5 and VERSION.minor < 3:
return True
return False


def backport_filesystem_transport():
if _need_backport():
import kombu.transport.filesystem

kombu.transport.filesystem.Transport.implements = (
virtual.Transport.implements.extend(
asynchronous=False,
exchange_type=frozenset(["direct", "topic", "fanout"]),
)
)
kombu.transport.filesystem.Transport.Channel = FilesystemChannel

0 comments on commit 53241d0

Please sign in to comment.