Skip to content

Commit

Permalink
fsapp: add reject()
Browse files Browse the repository at this point in the history
  • Loading branch information
pmrowla committed Mar 22, 2022
1 parent 1696be8 commit d0ea7cc
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 6 deletions.
31 changes: 27 additions & 4 deletions src/dvc_task/app/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from kombu.utils.encoding import bytes_to_str
from kombu.utils.json import loads

from ..utils import makedirs
from ..utils import makedirs, remove

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -96,6 +96,7 @@ def __init__(
)
)
logger.debug("Initialized filesystem:// app in '%s'", wdir)
self._msg_path_cache: Dict[str, str] = {}

def iter_queued(
self, queue: Optional[str] = None
Expand All @@ -109,17 +110,39 @@ def iter_queued(
with self.connection_for_read() as conn: # type: ignore[attr-defined]
with conn.channel() as channel:
for filename in sorted(os.listdir(channel.data_folder_in)):
with open(
os.path.join(channel.data_folder_in, filename), "rb"
) as fobj:
path = os.path.join(channel.data_folder_in, filename)
with open(path, "rb") as fobj:
payload = fobj.read()
msg = channel.Message(
loads(bytes_to_str(payload)), channel=channel
)
self._msg_path_cache[msg.delivery_tag] = path
delivery_info = msg.properties.get("delivery_info", {})
if delivery_info.get("routing_key") == queue:
yield msg

def reject(self, delivery_tag: str):
"""Reject the specified message.
Allows the caller to reject FS broker messages without establishing a
full Kombu consumer. Requeue is not supported.
Raises:
ValueError: Invalid delivery_tag
"""
path = self._msg_path_cache.get(delivery_tag)
if path and os.path.exists(path):
remove(path)
del self._msg_path_cache[delivery_tag]
return

for msg in self.iter_queued():
if msg.delivery_tag == delivery_tag:
remove(self._msg_path_cache[delivery_tag])
del self._msg_path_cache[delivery_tag]
return
raise ValueError(f"Message '{delivery_tag}' not found")

def iter_processed(
self, queue: Optional[str] = None
) -> Generator[Message, None, None]:
Expand Down
22 changes: 20 additions & 2 deletions tests/app/test_filesystem.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Filesystem app tests."""
import json
import os
from typing import Optional
from typing import Any, Dict, Optional

import pytest
from funcy import first
Expand All @@ -10,7 +10,7 @@

from dvc_task.app.filesystem import FSApp, _get_fs_config, _unc_path

TEST_MSG = {
TEST_MSG: Dict[str, Any] = {
"body": "",
"content-encoding": "utf-8",
"content-type": "application/json",
Expand Down Expand Up @@ -85,3 +85,21 @@ def test_iter_processed(tmp_dir: TmpDir):
attr = attr.decode("utf-8")
assert attr == value
assert first(app.iter_queued()) is None


def test_reject(tmp_dir: TmpDir):
"""Rejected message should be removed."""
app = FSApp(wdir=str(tmp_dir), mkdir=True)
tmp_dir.gen({"broker": {"in": {"foo.msg": json.dumps(TEST_MSG)}}})

app.reject(TEST_MSG["properties"]["delivery_tag"])
assert not (tmp_dir / "broker" / "in" / "foo.msg").exists()

tmp_dir.gen({"broker": {"in": {"foo.msg": json.dumps(TEST_MSG)}}})
for msg in app.iter_queued():
assert msg.delivery_tag
app.reject(msg.delivery_tag)
assert not (tmp_dir / "broker" / "in" / "foo.msg").exists()

with pytest.raises(ValueError):
app.reject(TEST_MSG["properties"]["delivery_tag"])

0 comments on commit d0ea7cc

Please sign in to comment.