Skip to content

Commit 48830dc

Browse files
authored
Add Durable Entities support (#63)
* Signalling working, some other logic * Entities kind of working * Entity lock incremental change * More entity implementing * Finish locking, add operationactions * Add samples, test, documentation * Linting * Linting * Linting * Remove circular import * Remove circular import * Remove resolved todos * Add SDK docs for expoded methods * Various - Return pending actions when orchestrations complete - Ensure locked entities are unlocked when orchestration ends (success/fail/continue_as_new) - Provide default "delete" operation and document deleting entities * Linting * Linting * Linting * Linting * Fix tests * 3.9 compat, fix tests for action output * Action output test fix
1 parent 48e6ac1 commit 48830dc

20 files changed

+2061
-45
lines changed

docs/features.md

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,92 @@ Orchestrations can schedule durable timers using the `create_timer` API. These t
4848

4949
Orchestrations can start child orchestrations using the `call_sub_orchestrator` API. Child orchestrations are useful for encapsulating complex logic and for breaking up large orchestrations into smaller, more manageable pieces. Sub-orchestrations can also be versioned in a similar manner to their parent orchestrations, however, they do not inherit the parent orchestrator's version. Instead, they will use the default_version defined in the current worker's VersioningOptions unless otherwise specified during `call_sub_orchestrator`.
5050

51+
### Entities
52+
53+
#### Concepts
54+
55+
Durable Entities provide a way to model small, stateful objects within your orchestration workflows. Each entity has a unique identity and maintains its own state, which is persisted durably. Entities can be interacted with by sending them operations (messages) that mutate or query their state. These operations are processed sequentially, ensuring consistency. Examples of uses for durable entities include counters, accumulators, or any other operation which requires state to persist across orchestrations.
56+
57+
Entities can be invoked from durable clients directly, or from durable orchestrators. They support features like automatic state persistence, concurrency control, and can be locked for exclusive access during critical operations.
58+
59+
Entities are accessed by a unique ID, implemented here as EntityInstanceId. This ID is comprised of two parts, an entity name referring to the function or class that defines the behavior of the entity, and a key which is any string defined in your code. Each entity instance, represented by a distinct EntityInstanceId, has its own state.
60+
61+
#### Syntax
62+
63+
##### Defining Entities
64+
65+
Entities can be defined using either function-based or class-based syntax.
66+
67+
```python
68+
# Funtion-based entity
69+
def counter(ctx: entities.EntityContext, input: int):
70+
state = ctx.get_state(int, 0)
71+
if ctx.operation == "add":
72+
state += input
73+
ctx.set_state(state)
74+
elif operation == "get":
75+
return state
76+
77+
# Class-based entity
78+
class Counter(entities.DurableEntity):
79+
def __init__(self):
80+
self.set_state(0)
81+
82+
def add(self, amount: int):
83+
self.set_state(self.get_state(int, 0) + amount)
84+
85+
def get(self):
86+
return self.get_state(int, 0)
87+
```
88+
89+
> Note that the object properties of class-based entities may not be preserved across invocations. Use the derived get_state and set_state methods to access the persisted entity data.
90+
91+
##### Invoking entities
92+
93+
Entities are invoked using the `signal_entity` or `call_entity` APIs. The Durable Client only allows `signal_entity`:
94+
95+
```python
96+
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
97+
taskhub=taskhub_name, token_credential=None)
98+
entity_id = entities.EntityInstanceId("my_entity_function", "myEntityId")
99+
c.signal_entity(entity_id, "do_nothing")
100+
```
101+
102+
Whereas orchestrators can choose to use `signal_entity` or `call_entity`:
103+
104+
```python
105+
# Signal an entity (fire-and-forget)
106+
entity_id = entities.EntityInstanceId("my_entity_function", "myEntityId")
107+
ctx.signal_entity(entity_id, operation_name="add", input=5)
108+
109+
# Call an entity (wait for result)
110+
entity_id = entities.EntityInstanceId("my_entity_function", "myEntityId")
111+
result = yield ctx.call_entity(entity_id, operation_name="get")
112+
```
113+
114+
##### Entity actions
115+
116+
Entities can perform actions such signaling other entities or starting new orchestrations
117+
118+
- `ctx.signal_entity(entity_id, operation, input)`
119+
- `ctx.schedule_new_orchestration(orchestrator_name, input)`
120+
121+
##### Locking and concurrency
122+
123+
Because entites can be accessed from multiple running orchestrations at the same time, entities may also be locked by a single orchestrator ensuring exclusive access during the duration of the lock (also known as a critical section). Think semaphores:
124+
125+
```python
126+
with (yield ctx.lock_entities([entity_id_1, entity_id_2]):
127+
# Perform entity call operations that require exclusive access
128+
...
129+
```
130+
131+
Note that locked entities may not be signalled, and every call to a locked entity must return a result before another call to the same entity may be made from within the critical section. For more details and advanced usage, see the examples and API documentation.
132+
133+
##### Deleting entities
134+
135+
Entites are represented as orchestration instances in your Task Hub, and their state is persisted in the Task Hub as well. When using the Durable Task Scheduler as your durability provider, the backend will automatically clean up entities when their state is empty, this is effectively the "delete" operation to save space in the Task Hub. In the DTS Dashboard, "delete entity" simply signals the entity with the "delete" operation. In this SDK, we provide a default implementation for the "delete" operation to clear the state when using class-based entities, which end users are free to override as needed. Users must implement "delete" manually for function-based entities.
136+
51137
### External events
52138

53139
Orchestrations can wait for external events using the `wait_for_external_event` API. External events are useful for implementing human interaction patterns, such as waiting for a user to approve an order before continuing.

durabletask/client.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@
44
import logging
55
import uuid
66
from dataclasses import dataclass
7-
from datetime import datetime
7+
from datetime import datetime, timezone
88
from enum import Enum
99
from typing import Any, Optional, Sequence, TypeVar, Union
1010

1111
import grpc
1212
from google.protobuf import wrappers_pb2
1313

14+
from durabletask.entities import EntityInstanceId
1415
import durabletask.internal.helpers as helpers
1516
import durabletask.internal.orchestrator_service_pb2 as pb
1617
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
@@ -227,3 +228,16 @@ def purge_orchestration(self, instance_id: str, recursive: bool = True):
227228
req = pb.PurgeInstancesRequest(instanceId=instance_id, recursive=recursive)
228229
self._logger.info(f"Purging instance '{instance_id}'.")
229230
self._stub.PurgeInstances(req)
231+
232+
def signal_entity(self, entity_instance_id: EntityInstanceId, operation_name: str, input: Optional[Any] = None):
233+
req = pb.SignalEntityRequest(
234+
instanceId=str(entity_instance_id),
235+
name=operation_name,
236+
input=wrappers_pb2.StringValue(value=shared.to_json(input)) if input else None,
237+
requestId=str(uuid.uuid4()),
238+
scheduledTime=None,
239+
parentTraceContext=None,
240+
requestTime=helpers.new_timestamp(datetime.now(timezone.utc))
241+
)
242+
self._logger.info(f"Signaling entity '{entity_instance_id}' operation '{operation_name}'.")
243+
self._stub.SignalEntity(req, None) # TODO: Cancellation timeout?

durabletask/entities/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
"""Durable Task SDK for Python entities component"""
5+
6+
from durabletask.entities.entity_instance_id import EntityInstanceId
7+
from durabletask.entities.durable_entity import DurableEntity
8+
from durabletask.entities.entity_lock import EntityLock
9+
from durabletask.entities.entity_context import EntityContext
10+
11+
__all__ = ["EntityInstanceId", "DurableEntity", "EntityLock", "EntityContext"]
12+
13+
PACKAGE_NAME = "durabletask.entities"
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
from typing import Any, Optional, Type, TypeVar, Union, overload
2+
3+
from durabletask.entities.entity_context import EntityContext
4+
from durabletask.entities.entity_instance_id import EntityInstanceId
5+
6+
TState = TypeVar("TState")
7+
8+
9+
class DurableEntity:
10+
def _initialize_entity_context(self, context: EntityContext):
11+
self.entity_context = context
12+
13+
@overload
14+
def get_state(self, intended_type: Type[TState], default: TState) -> TState:
15+
...
16+
17+
@overload
18+
def get_state(self, intended_type: Type[TState]) -> Optional[TState]:
19+
...
20+
21+
@overload
22+
def get_state(self, intended_type: None = None, default: Any = None) -> Any:
23+
...
24+
25+
def get_state(self, intended_type: Optional[Type[TState]] = None, default: Optional[TState] = None) -> Union[None, TState, Any]:
26+
"""Get the current state of the entity, optionally converting it to a specified type.
27+
28+
Parameters
29+
----------
30+
intended_type : Type[TState] | None, optional
31+
The type to which the state should be converted. If None, the state is returned as-is.
32+
default : TState, optional
33+
The default value to return if the state is not found or cannot be converted.
34+
35+
Returns
36+
-------
37+
TState | Any
38+
The current state of the entity, optionally converted to the specified type.
39+
"""
40+
return self.entity_context.get_state(intended_type, default)
41+
42+
def set_state(self, state: Any):
43+
"""Set the state of the entity to a new value.
44+
45+
Parameters
46+
----------
47+
new_state : Any
48+
The new state to set for the entity.
49+
"""
50+
self.entity_context.set_state(state)
51+
52+
def signal_entity(self, entity_instance_id: EntityInstanceId, operation: str, input: Optional[Any] = None) -> None:
53+
"""Signal another entity to perform an operation.
54+
55+
Parameters
56+
----------
57+
entity_instance_id : EntityInstanceId
58+
The ID of the entity instance to signal.
59+
operation : str
60+
The operation to perform on the entity.
61+
input : Any, optional
62+
The input to provide to the entity for the operation.
63+
"""
64+
self.entity_context.signal_entity(entity_instance_id, operation, input)
65+
66+
def schedule_new_orchestration(self, orchestration_name: str, input: Optional[Any] = None, instance_id: Optional[str] = None) -> str:
67+
"""Schedule a new orchestration instance.
68+
69+
Parameters
70+
----------
71+
orchestration_name : str
72+
The name of the orchestration to schedule.
73+
input : Any, optional
74+
The input to provide to the new orchestration.
75+
instance_id : str, optional
76+
The instance ID to assign to the new orchestration. If None, a new ID will be generated.
77+
78+
Returns
79+
-------
80+
str
81+
The instance ID of the scheduled orchestration.
82+
"""
83+
return self.entity_context.schedule_new_orchestration(orchestration_name, input, instance_id=instance_id)
84+
85+
def delete(self, input: Any = None) -> None:
86+
"""Delete the entity instance.
87+
88+
Parameters
89+
----------
90+
input : Any, optional
91+
Unused: The input for the entity "delete" operation.
92+
"""
93+
self.set_state(None)
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
2+
from typing import Any, Optional, Type, TypeVar, Union, overload
3+
import uuid
4+
from durabletask.entities.entity_instance_id import EntityInstanceId
5+
from durabletask.internal import helpers, shared
6+
from durabletask.internal.entity_state_shim import StateShim
7+
import durabletask.internal.orchestrator_service_pb2 as pb
8+
9+
TState = TypeVar("TState")
10+
11+
12+
class EntityContext:
13+
def __init__(self, orchestration_id: str, operation: str, state: StateShim, entity_id: EntityInstanceId):
14+
self._orchestration_id = orchestration_id
15+
self._operation = operation
16+
self._state = state
17+
self._entity_id = entity_id
18+
19+
@property
20+
def orchestration_id(self) -> str:
21+
"""Get the ID of the orchestration instance that scheduled this entity.
22+
23+
Returns
24+
-------
25+
str
26+
The ID of the current orchestration instance.
27+
"""
28+
return self._orchestration_id
29+
30+
@property
31+
def operation(self) -> str:
32+
"""Get the operation associated with this entity invocation.
33+
34+
The operation is a string that identifies the specific action being
35+
performed on the entity. It can be used to distinguish between
36+
multiple operations that are part of the same entity invocation.
37+
38+
Returns
39+
-------
40+
str
41+
The operation associated with this entity invocation.
42+
"""
43+
return self._operation
44+
45+
@overload
46+
def get_state(self, intended_type: Type[TState], default: TState) -> TState:
47+
...
48+
49+
@overload
50+
def get_state(self, intended_type: Type[TState]) -> Optional[TState]:
51+
...
52+
53+
@overload
54+
def get_state(self, intended_type: None = None, default: Any = None) -> Any:
55+
...
56+
57+
def get_state(self, intended_type: Optional[Type[TState]] = None, default: Optional[TState] = None) -> Union[None, TState, Any]:
58+
"""Get the current state of the entity, optionally converting it to a specified type.
59+
60+
Parameters
61+
----------
62+
intended_type : Type[TState] | None, optional
63+
The type to which the state should be converted. If None, the state is returned as-is.
64+
default : TState, optional
65+
The default value to return if the state is not found or cannot be converted.
66+
67+
Returns
68+
-------
69+
TState | Any
70+
The current state of the entity, optionally converted to the specified type.
71+
"""
72+
return self._state.get_state(intended_type, default)
73+
74+
def set_state(self, new_state: Any):
75+
"""Set the state of the entity to a new value.
76+
77+
Parameters
78+
----------
79+
new_state : Any
80+
The new state to set for the entity.
81+
"""
82+
self._state.set_state(new_state)
83+
84+
def signal_entity(self, entity_instance_id: EntityInstanceId, operation: str, input: Optional[Any] = None) -> None:
85+
"""Signal another entity to perform an operation.
86+
87+
Parameters
88+
----------
89+
entity_instance_id : EntityInstanceId
90+
The ID of the entity instance to signal.
91+
operation : str
92+
The operation to perform on the entity.
93+
input : Any, optional
94+
The input to provide to the entity for the operation.
95+
"""
96+
encoded_input = shared.to_json(input) if input is not None else None
97+
self._state.add_operation_action(
98+
pb.OperationAction(
99+
sendSignal=pb.SendSignalAction(
100+
instanceId=str(entity_instance_id),
101+
name=operation,
102+
input=helpers.get_string_value(encoded_input),
103+
scheduledTime=None,
104+
requestTime=None,
105+
parentTraceContext=None,
106+
)
107+
)
108+
)
109+
110+
def schedule_new_orchestration(self, orchestration_name: str, input: Optional[Any] = None, instance_id: Optional[str] = None) -> str:
111+
"""Schedule a new orchestration instance.
112+
113+
Parameters
114+
----------
115+
orchestration_name : str
116+
The name of the orchestration to schedule.
117+
input : Any, optional
118+
The input to provide to the new orchestration.
119+
instance_id : str, optional
120+
The instance ID to assign to the new orchestration. If None, a new ID will be generated.
121+
122+
Returns
123+
-------
124+
str
125+
The instance ID of the scheduled orchestration.
126+
"""
127+
encoded_input = shared.to_json(input) if input is not None else None
128+
if not instance_id:
129+
instance_id = uuid.uuid4().hex
130+
self._state.add_operation_action(
131+
pb.OperationAction(
132+
startNewOrchestration=pb.StartNewOrchestrationAction(
133+
instanceId=instance_id,
134+
name=orchestration_name,
135+
input=helpers.get_string_value(encoded_input),
136+
version=None,
137+
scheduledTime=None,
138+
requestTime=None,
139+
parentTraceContext=None
140+
)
141+
)
142+
)
143+
return instance_id
144+
145+
@property
146+
def entity_id(self) -> EntityInstanceId:
147+
"""Get the ID of the entity instance.
148+
149+
Returns
150+
-------
151+
str
152+
The ID of the current entity instance.
153+
"""
154+
return self._entity_id

0 commit comments

Comments
 (0)