Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change message acknowledgement #317

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion taskiq/message.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional
from typing import Any, Awaitable, Callable, Dict, List, Optional, Union

from pydantic import BaseModel

Expand All @@ -20,6 +20,7 @@ class TaskiqMessage(BaseModel):
labels_types: Optional[Dict[str, int]] = None
args: List[Any]
kwargs: Dict[str, Any]
ack: Optional[Callable[[], Union[None, Awaitable[None]]]] = None

def parse_labels(self) -> None:
"""
Expand Down
14 changes: 10 additions & 4 deletions taskiq/receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,15 @@ async def callback( # noqa: C901, PLR0912
:param raise_err: raise an error if cannot save result in
result_backend.
"""
message_data = message.data if isinstance(message, AckableMessage) else message
message_data, message_ack = (
(message.data, message.ack)
if isinstance(message, AckableMessage)
else (message, None)
)
try:
taskiq_msg = self.broker.formatter.loads(message=message_data)
if message_ack:
taskiq_msg.ack = message_ack
taskiq_msg.parse_labels()
except Exception as exc:
logger.warning(
Expand Down Expand Up @@ -143,7 +149,7 @@ async def callback( # noqa: C901, PLR0912
message,
AckableMessage,
):
await maybe_awaitable(message.ack())
await maybe_awaitable(taskiq_msg.ack()) # type: ignore

result = await self.run_task(
target=task.original_func,
Expand All @@ -154,7 +160,7 @@ async def callback( # noqa: C901, PLR0912
message,
AckableMessage,
):
await maybe_awaitable(message.ack())
await maybe_awaitable(taskiq_msg.ack()) # type: ignore

for middleware in self.broker.middlewares:
if middleware.__class__.post_execute != TaskiqMiddleware.post_execute:
Expand All @@ -181,7 +187,7 @@ async def callback( # noqa: C901, PLR0912
message,
AckableMessage,
):
await maybe_awaitable(message.ack())
await maybe_awaitable(taskiq_msg.ack()) # type: ignore

async def run_task( # noqa: C901, PLR0912, PLR0915
self,
Expand Down
3 changes: 2 additions & 1 deletion tests/formatters/test_json_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ async def test_json_dumps() -> None:
b'{"task_id":"task-id","task_name":"task.name",'
b'"labels":{"label1":1,"label2":"text"},'
b'"labels_types":null,'
b'"args":[1,"a"],"kwargs":{"p1":"v1"}}'
b'"args":[1,"a"],"kwargs":{"p1":"v1"},'
b'"ack":null}'
),
labels={"label1": 1, "label2": "text"},
)
Expand Down
3 changes: 2 additions & 1 deletion tests/formatters/test_proxy_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ async def test_proxy_dumps() -> None:
b'{"task_id": "task-id", "task_name": "task.name", '
b'"labels": {"label1": 1, "label2": "text"}, '
b'"labels_types": null, '
b'"args": [1, "a"], "kwargs": {"p1": "v1"}}'
b'"args": [1, "a"], "kwargs": {"p1": "v1"}, '
b'"ack": null}'
),
labels={"label1": 1, "label2": "text"},
)
Expand Down