Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support activity retry delay #571

Merged
merged 9 commits into from
Aug 19, 2024
6 changes: 6 additions & 0 deletions temporalio/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
overload,
)

import google.protobuf.duration_pb2
import google.protobuf.json_format
import google.protobuf.message
import google.protobuf.symbol_database
Expand Down Expand Up @@ -843,6 +844,10 @@ def _error_to_failure(
failure.application_failure_info.details.CopyFrom(
payload_converter.to_payloads_wrapper(error.details)
)
if error.next_retry_delay:
failure.application_failure_info.next_retry_delay.FromTimedelta(
error.next_retry_delay
)
elif isinstance(error, temporalio.exceptions.TimeoutError):
failure.timeout_failure_info.SetInParent()
failure.timeout_failure_info.timeout_type = (
Expand Down Expand Up @@ -928,6 +933,7 @@ def from_failure(
*payload_converter.from_payloads_wrapper(app_info.details),
type=app_info.type or None,
non_retryable=app_info.non_retryable,
next_retry_delay=app_info.next_retry_delay.ToTimedelta(),
)
elif failure.HasField("timeout_failure_info"):
timeout_info = failure.timeout_failure_info
Expand Down
12 changes: 12 additions & 0 deletions temporalio/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Common Temporal exceptions."""

import asyncio
from datetime import timedelta
from enum import IntEnum
from typing import Any, Optional, Sequence, Tuple

Expand Down Expand Up @@ -78,6 +79,7 @@ def __init__(
*details: Any,
type: Optional[str] = None,
non_retryable: bool = False,
next_retry_delay: Optional[timedelta] = None,
) -> None:
"""Initialize an application error."""
super().__init__(
Expand All @@ -88,6 +90,7 @@ def __init__(
self._details = details
self._type = type
self._non_retryable = non_retryable
self._next_retry_delay = next_retry_delay

@property
def details(self) -> Sequence[Any]:
Expand All @@ -109,6 +112,15 @@ def non_retryable(self) -> bool:
"""
return self._non_retryable

@property
def next_retry_delay(self) -> Optional[timedelta]:
"""Delay before the next activity retry attempt.

User activity code may set this when raising ApplicationError to specify
a delay before the next activity retry.
"""
return self._next_retry_delay


class CancelledError(FailureError):
"""Error raised on workflow/activity cancellation."""
Expand Down
9 changes: 8 additions & 1 deletion tests/worker/test_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,14 @@ async def test_sync_activity_process_non_picklable_heartbeat_details(
picklable_activity_non_pickable_heartbeat_details,
worker_config={"activity_executor": executor},
)
assert "Can't pickle" in str(assert_activity_application_error(err.value))
msg = str(assert_activity_application_error(err.value))
# TODO: different messages can apparently be produced across runs/platforms
# See e.g. https://github.com/temporalio/sdk-python/actions/runs/10455232879/job/28949714969?pr=571
assert (
"Can't pickle" in msg
or "Can't get local object 'picklable_activity_non_pickable_heartbeat_details.<locals>.<lambda>'"
in msg
)


async def test_activity_error_non_retryable(client: Client, worker: ExternalWorker):
Expand Down
44 changes: 44 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5861,3 +5861,47 @@ async def test_timer_started_after_workflow_completion(client: Client):
)
await handle.signal(TimerStartedAfterWorkflowCompletionWorkflow.my_signal)
assert await handle.result() == "workflow-result"


@activity.defn
async def activity_with_retry_delay():
raise ApplicationError(
ActivitiesWithRetryDelayWorkflow.error_message,
next_retry_delay=ActivitiesWithRetryDelayWorkflow.next_retry_delay,
)


@workflow.defn
class ActivitiesWithRetryDelayWorkflow:
error_message = "Deliberately failing with next_retry_delay set"
next_retry_delay = timedelta(milliseconds=5)

@workflow.run
async def run(self) -> None:
await workflow.execute_activity(
activity_with_retry_delay,
retry_policy=RetryPolicy(maximum_attempts=2),
schedule_to_close_timeout=timedelta(minutes=5),
)


async def test_activity_retry_delay(client: Client):
async with new_worker(
client, ActivitiesWithRetryDelayWorkflow, activities=[activity_with_retry_delay]
) as worker:
try:
await client.execute_workflow(
ActivitiesWithRetryDelayWorkflow.run,
id=str(uuid.uuid4()),
task_queue=worker.task_queue,
)
except WorkflowFailureError as err:
assert isinstance(err.cause, ActivityError)
assert isinstance(err.cause.cause, ApplicationError)
assert (
str(err.cause.cause) == ActivitiesWithRetryDelayWorkflow.error_message
)
assert (
err.cause.cause.next_retry_delay
== ActivitiesWithRetryDelayWorkflow.next_retry_delay
)
Loading