Skip to content

Commit

Permalink
monkey patch for the concurrent problem.
Browse files Browse the repository at this point in the history
  • Loading branch information
karajan1001 committed Sep 27, 2022
1 parent c82ffa2 commit af70c33
Showing 1 changed file with 39 additions and 28 deletions.
67 changes: 39 additions & 28 deletions src/dvc_task/contrib/kombu_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
Contains classes which need to be backported in kombu <5.3.0 via monkeypatch.
"""

import os
import shutil
import tempfile
import uuid
from collections import namedtuple
from contextlib import contextmanager
from pathlib import Path
from queue import Empty
from time import monotonic
Expand Down Expand Up @@ -46,7 +46,7 @@ def unlock(file):
elif os.name == "posix":

import fcntl
from fcntl import LOCK_EX, LOCK_NB, LOCK_SH # noqa
from fcntl import LOCK_EX, LOCK_SH

def lock(file, flags):
"""Create file lock."""
Expand All @@ -56,6 +56,11 @@ def unlock(file):
"""Remove file lock."""
fcntl.flock(file.fileno(), fcntl.LOCK_UN)

else:
raise RuntimeError(
"Filesystem plugin only defined for NT and POSIX platforms"
)


exchange_queue_t = namedtuple(
"exchange_queue_t", ["routing_key", "pattern", "queue"]
Expand All @@ -67,41 +72,46 @@ class FilesystemChannel(virtual.Channel):

supports_fanout = True

@contextmanager
def _get_exchange_file_obj(self, exchange, mode="rb"):
file = self.control_folder / f"{exchange}.exchange"
if "w" in mode:
self.control_folder.mkdir(exist_ok=True)
f_obj = file.open(mode)

try:
if "w" in mode:
lock(f_obj, LOCK_EX)
yield f_obj
except OSError:
raise ChannelError(f"Cannot open {file}")
finally:
if "w" in mode:
unlock(f_obj)
f_obj.close()

def get_table(self, exchange):
file = self.control_folder / f"{exchange}.exchange"
try:
with self._get_exchange_file_obj(exchange) as f_obj:
f_obj = file.open("r")
try:
lock(f_obj, LOCK_SH)
exchange_table = loads(bytes_to_str(f_obj.read()))
return [exchange_queue_t(*q) for q in exchange_table]
finally:
unlock(f_obj)
f_obj.close()
except FileNotFoundError:
return []
except OSError:
raise ChannelError(f"Cannot open {file}")

def _queue_bind(self, exchange, routing_key, pattern, queue):
queues = self.get_table(exchange)
file = self.control_folder / f"{exchange}.exchange"
self.control_folder.mkdir(exist_ok=True)
queue_val = exchange_queue_t(
routing_key or "", pattern or "", queue or ""
)
if queue_val not in queues:
queues.insert(0, queue_val)
with self._get_exchange_file_obj(exchange, "wb") as f_obj:
f_obj.write(str_to_bytes(dumps(queues)))
try:
if file.exists():
f_obj = file.open("rb+", buffering=0)
lock(f_obj, LOCK_EX)
exchange_table = loads(bytes_to_str(f_obj.read()))
queues = [exchange_queue_t(*q) for q in exchange_table]
if queue_val not in queues:
queues.insert(0, queue_val)
f_obj.seek(0)
f_obj.write(str_to_bytes(dumps(queues)))
else:
f_obj = file.open("wb", buffering=0)
lock(f_obj, LOCK_EX)
queues = [queue_val]
f_obj.write(str_to_bytes(dumps(queues)))
finally:
unlock(f_obj)
f_obj.close()

def _put_fanout(self, exchange, payload, routing_key, **kwargs):
for q in self.get_table(exchange):
Expand All @@ -115,7 +125,7 @@ def _put(self, queue, payload, **kwargs):
filename = os.path.join(self.data_folder_out, filename)

try:
f = open(filename, "wb")
f = open(filename, "wb", buffering=0)
lock(f, LOCK_EX)
f.write(str_to_bytes(dumps(payload)))
except OSError:
Expand Down Expand Up @@ -148,7 +158,8 @@ def _get(self, queue):
processed_folder,
)
except OSError:
pass # file could be locked, or removed in meantime so ignore
# file could be locked, or removed in meantime so ignore
continue

filename = os.path.join(processed_folder, filename)
try:
Expand Down

0 comments on commit af70c33

Please sign in to comment.