Skip to content

Commit f87ba19

Browse files
authored
Merge pull request #24 from filintod/filinto/fix-continue-as-new-only
fix continue as new bug where we missed router on the multi-app change
2 parents 996b454 + abeb0c3 commit f87ba19

File tree

5 files changed

+88
-13
lines changed

5 files changed

+88
-13
lines changed

durabletask/client.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,14 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
127127

128128
name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)
129129

130+
input_pb = (
131+
wrappers_pb2.StringValue(value=shared.to_json(input)) if input is not None else None
132+
)
133+
130134
req = pb.CreateInstanceRequest(
131135
name=name,
132136
instanceId=instance_id if instance_id else uuid.uuid4().hex,
133-
input=wrappers_pb2.StringValue(value=shared.to_json(input)) if input is not None else None,
137+
input=input_pb,
134138
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
135139
version=wrappers_pb2.StringValue(value=""),
136140
orchestrationIdReusePolicy=reuse_id_policy,

durabletask/internal/helpers.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -158,18 +158,25 @@ def get_string_value(val: Optional[str]) -> Optional[wrappers_pb2.StringValue]:
158158

159159

160160
def new_complete_orchestration_action(
161-
id: int,
162-
status: pb.OrchestrationStatus,
163-
result: Optional[str] = None,
164-
failure_details: Optional[pb.TaskFailureDetails] = None,
165-
carryover_events: Optional[list[pb.HistoryEvent]] = None) -> pb.OrchestratorAction:
161+
id: int,
162+
status: pb.OrchestrationStatus,
163+
result: Optional[str] = None,
164+
failure_details: Optional[pb.TaskFailureDetails] = None,
165+
carryover_events: Optional[list[pb.HistoryEvent]] = None,
166+
router: Optional[pb.TaskRouter] = None,
167+
) -> pb.OrchestratorAction:
166168
completeOrchestrationAction = pb.CompleteOrchestrationAction(
167169
orchestrationStatus=status,
168170
result=get_string_value(result),
169171
failureDetails=failure_details,
170-
carryoverEvents=carryover_events)
172+
carryoverEvents=carryover_events,
173+
)
171174

172-
return pb.OrchestratorAction(id=id, completeOrchestration=completeOrchestrationAction)
175+
return pb.OrchestratorAction(
176+
id=id,
177+
completeOrchestration=completeOrchestrationAction,
178+
router=router,
179+
)
173180

174181

175182
def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction:

durabletask/internal/shared.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import dataclasses
55
import json
66
import logging
7+
import os
78
from types import SimpleNamespace
89
from typing import Any, Optional, Sequence, Union
910

@@ -13,7 +14,7 @@
1314
grpc.UnaryUnaryClientInterceptor,
1415
grpc.UnaryStreamClientInterceptor,
1516
grpc.StreamUnaryClientInterceptor,
16-
grpc.StreamStreamClientInterceptor
17+
grpc.StreamStreamClientInterceptor,
1718
]
1819

1920
# Field name used to indicate that an object was automatically serialized
@@ -25,6 +26,27 @@
2526

2627

2728
def get_default_host_address() -> str:
29+
"""Resolve the default Durable Task sidecar address.
30+
31+
Honors environment variables if present; otherwise defaults to localhost:4001.
32+
33+
Supported environment variables (checked in order):
34+
- DURABLETASK_GRPC_ENDPOINT (e.g., "localhost:4001", "grpcs://host:443")
35+
- DURABLETASK_GRPC_HOST and DURABLETASK_GRPC_PORT
36+
"""
37+
38+
# Full endpoint overrides
39+
endpoint = os.environ.get("DAPR_GRPC_ENDPOINT")
40+
if endpoint:
41+
return endpoint
42+
43+
# Host/port split overrides
44+
host = os.environ.get("DAPR_GRPC_HOST") or os.environ.get("DAPR_RUNTIME_HOST")
45+
if host:
46+
port = os.environ.get("DAPR_GRPC_PORT", "4001")
47+
return f"{host}:{port}"
48+
49+
# Default to durabletask-go default port
2850
return "localhost:4001"
2951

3052

durabletask/worker.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,10 @@ def set_complete(
643643
if result is not None:
644644
result_json = result if is_result_encoded else shared.to_json(result)
645645
action = ph.new_complete_orchestration_action(
646-
self.next_sequence_number(), status, result_json
646+
self.next_sequence_number(),
647+
status,
648+
result_json,
649+
router=pb.TaskRouter(sourceAppID=self._app_id) if self._app_id else None,
647650
)
648651
self._pending_actions[action.id] = action
649652

@@ -660,6 +663,7 @@ def set_failed(self, ex: Exception):
660663
pb.ORCHESTRATION_STATUS_FAILED,
661664
None,
662665
ph.new_failure_details(ex),
666+
router=pb.TaskRouter(sourceAppID=self._app_id) if self._app_id else None,
663667
)
664668
self._pending_actions[action.id] = action
665669

@@ -692,11 +696,10 @@ def get_actions(self) -> list[pb.OrchestratorAction]:
692696
action = ph.new_complete_orchestration_action(
693697
self.next_sequence_number(),
694698
pb.ORCHESTRATION_STATUS_CONTINUED_AS_NEW,
695-
result=shared.to_json(self._new_input)
696-
if self._new_input is not None
697-
else None,
699+
result=shared.to_json(self._new_input) if self._new_input is not None else None,
698700
failure_details=None,
699701
carryover_events=carryover_events,
702+
router=pb.TaskRouter(sourceAppID=self._app_id) if self._app_id else None,
700703
)
701704
return [action]
702705
else:

tests/durabletask/test_orchestration_e2e.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,45 @@ def orchestrator(ctx: task.OrchestrationContext, input: int):
366366
assert all_results == [1, 2, 3, 4, 5]
367367

368368

369+
def test_continue_as_new_with_activity_e2e():
370+
"""E2E test for continue_as_new with activities (generator-based)."""
371+
activity_results = []
372+
373+
def double_activity(ctx: task.ActivityContext, value: int) -> int:
374+
"""Activity that doubles the value."""
375+
result = value * 2
376+
activity_results.append(result)
377+
return result
378+
379+
def orchestrator(ctx: task.OrchestrationContext, counter: int):
380+
# Call activity to process the counter
381+
processed = yield ctx.call_activity(double_activity, input=counter)
382+
383+
# Continue as new up to 3 times
384+
if counter < 3:
385+
ctx.continue_as_new(counter + 1, save_events=False)
386+
else:
387+
return {"counter": counter, "processed": processed, "all_results": activity_results}
388+
389+
with worker.TaskHubGrpcWorker() as w:
390+
w.add_activity(double_activity)
391+
w.add_orchestrator(orchestrator)
392+
w.start()
393+
394+
task_hub_client = client.TaskHubGrpcClient()
395+
id = task_hub_client.schedule_new_orchestration(orchestrator, input=1)
396+
397+
state = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
398+
assert state is not None
399+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
400+
401+
output = json.loads(state.serialized_output)
402+
# Should have called activity 3 times with input values 1, 2, 3
403+
assert activity_results == [2, 4, 6]
404+
assert output["counter"] == 3
405+
assert output["processed"] == 6
406+
407+
369408
# NOTE: This test fails when running against durabletask-go with sqlite because the sqlite backend does not yet
370409
# support orchestration ID reuse. This gap is being tracked here:
371410
# https://github.com/microsoft/durabletask-go/issues/42

0 commit comments

Comments
 (0)