Skip to content

Commit

Permalink
FSApp: add a new method purge
Browse files Browse the repository at this point in the history
fix: #68
We currently lack a way to clean processed messages which makes our
queue status list longer and longer. This PR add a new method `purge` to
solve this problem.

1. Add a new method `purge` to remove processed messages like `queued`
   ones.
2. Unify the `iter_queued` method and `iter_processed` method.
3. Unify the `purge` and `reject` method.
4. Add a new test for `purge`
  • Loading branch information
karajan1001 authored and pmrowla committed Jun 25, 2022
1 parent dffef07 commit dc4c6c9
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 37 deletions.
119 changes: 82 additions & 37 deletions src/dvc_task/app/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""(Local) filesystem based Celery application."""
import logging
import os
from typing import Any, Dict, Generator, Optional
from typing import Any, Dict, Generator, Iterable, Optional

from celery import Celery
from kombu.message import Message
Expand Down Expand Up @@ -89,26 +89,33 @@ def __init__(
)
)
logger.debug("Initialized filesystem:// app in '%s'", wdir)
self._msg_path_cache: Dict[str, str] = {}
self._processed_msg_path_cache: Dict[str, str] = {}
self._queued_msg_path_cache: Dict[str, str] = {}

def __reduce_keys__(self) -> Dict[str, Any]:
keys = super().__reduce_keys__() # type: ignore[misc]
keys.update({"wdir": self.wdir})
return keys

def iter_queued(
self, queue: Optional[str] = None
def _iter_folder(
self,
path_name: str,
path_cache: Dict[str, str],
queue: Optional[str] = None,
) -> Generator[Message, None, None]:
"""Iterate over queued tasks which have not been taken by a worker.
"""Iterate over queued tasks inside a folder
Arguments:
path_name: the folder to iterate
path_cache: cache of message path.
queue: Optional name of queue.
"""
queue = queue or self.conf.task_default_queue
with self.connection_for_read() as conn: # type: ignore[attr-defined]
with conn.channel() as channel:
for filename in sorted(os.listdir(channel.data_folder_in)):
path = os.path.join(channel.data_folder_in, filename)
folder = getattr(channel, path_name)
for filename in sorted(os.listdir(folder)):
path = os.path.join(folder, filename)
try:
with open(path, "rb") as fobj:
payload = fobj.read()
Expand All @@ -120,52 +127,90 @@ def iter_queued(
msg = channel.Message(
loads(bytes_to_str(payload)), channel=channel
)
self._msg_path_cache[msg.delivery_tag] = path
path_cache[msg.delivery_tag] = path
delivery_info = msg.properties.get("delivery_info", {})
if delivery_info.get("routing_key") == queue:
yield msg

def reject(self, delivery_tag: str):
"""Reject the specified message.
def iter_queued(
self, queue: Optional[str] = None
) -> Generator[Message, None, None]:
"""Iterate over queued tasks which have not been taken by a worker.
Allows the caller to reject FS broker messages without establishing a
full Kombu consumer. Requeue is not supported.
Arguments:
queue: Optional name of queue.
"""
yield from self._iter_folder(
"data_folder_in",
self._queued_msg_path_cache,
queue,
)

def iter_processed(
self, queue: Optional[str] = None
) -> Generator[Message, None, None]:
"""Iterate over tasks which have been taken by a worker.
Arguments:
queue: Optional name of queue.
"""
yield from self._iter_folder(
"processed_folder",
self._processed_msg_path_cache,
queue,
)

@staticmethod
def _delete_msg(
delivery_tag: str,
msg_collection: Iterable[Message],
path_cache: Dict[str, str],
):
"""delete the specified message.
Arguments:
delivery_tag: delivery tag of the message to be deleted.
msg_collection: where to found this message.
path_cache: cache of message path.
Raises:
ValueError: Invalid delivery_tag
"""
path = self._msg_path_cache.get(delivery_tag)
path = path_cache.get(delivery_tag)
if path and os.path.exists(path):
remove(path)
del self._msg_path_cache[delivery_tag]
del path_cache[delivery_tag]
return

for msg in self.iter_queued():
for msg in msg_collection:
if msg.delivery_tag == delivery_tag:
remove(self._msg_path_cache[delivery_tag])
del self._msg_path_cache[delivery_tag]
remove(path_cache[delivery_tag])
del path_cache[delivery_tag]
return
raise ValueError(f"Message '{delivery_tag}' not found")

def iter_processed(
self, queue: Optional[str] = None
) -> Generator[Message, None, None]:
"""Iterate over tasks which have been taken by a worker.
def reject(self, delivery_tag: str):
"""Reject the specified message.
Arguments:
queue: Optional name of queue.
Allows the caller to reject FS broker messages without establishing a
full Kombu consumer. Requeue is not supported.
Raises:
ValueError: Invalid delivery_tag
"""
queue = queue or self.conf.task_default_queue
with self.connection_for_read() as conn: # type: ignore[attr-defined]
with conn.channel() as channel:
for filename in sorted(os.listdir(channel.processed_folder)):
with open(
os.path.join(channel.processed_folder, filename), "rb"
) as fobj:
payload = fobj.read()
msg = channel.Message(
loads(bytes_to_str(payload)), channel=channel
)
delivery_info = msg.properties.get("delivery_info", {})
if delivery_info.get("routing_key") == queue:
yield msg
self._delete_msg(
delivery_tag, self.iter_queued(), self._queued_msg_path_cache
)

def purge(self, delivery_tag: str):
"""Purge the specified processed message.
Allows the caller to purge completed FS broker messages without
establishing a full Kombu consumer. Requeue is not supported.
Raises:
ValueError: Invalid delivery_tag
"""
self._delete_msg(
delivery_tag, self.iter_processed(), self._processed_msg_path_cache
)
18 changes: 18 additions & 0 deletions tests/app/test_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,21 @@ def test_reject(tmp_dir: TmpDir):

with pytest.raises(ValueError):
app.reject(TEST_MSG["properties"]["delivery_tag"])


def test_purge(tmp_dir: TmpDir):
"""Purge message should be removed."""
app = FSApp(wdir=str(tmp_dir), mkdir=True)
tmp_dir.gen({"broker": {"processed": {"foo.msg": json.dumps(TEST_MSG)}}})

app.purge(TEST_MSG["properties"]["delivery_tag"])
assert not (tmp_dir / "broker" / "processed" / "foo.msg").exists()

tmp_dir.gen({"broker": {"processed": {"foo.msg": json.dumps(TEST_MSG)}}})
for msg in app.iter_processed():
assert msg.delivery_tag
app.purge(msg.delivery_tag)
assert not (tmp_dir / "broker" / "processed" / "foo.msg").exists()

with pytest.raises(ValueError):
app.purge(TEST_MSG["properties"]["delivery_tag"])

0 comments on commit dc4c6c9

Please sign in to comment.