-
Notifications
You must be signed in to change notification settings - Fork 129
/
state_store.py
113 lines (93 loc) · 3.82 KB
/
state_store.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
"""
dapr run python3 state_store.py
"""
import grpc
from dapr.clients import DaprClient
from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType
from dapr.clients.grpc._state import StateItem
with DaprClient() as d:
storeName = 'statestore'
key = 'key_1'
value = 'value_1'
updated_value = 'value_1_updated'
another_key = 'key_2'
another_value = 'value_2'
yet_another_key = 'key_3'
yet_another_value = 'value_3'
# Save single state.
d.save_state(store_name=storeName, key=key, value=value)
print(f'State store has successfully saved {value} with {key} as key')
# Save with an etag that is different from the one stored in the database.
try:
d.save_state(store_name=storeName, key=key, value=another_value, etag='9999')
except grpc.RpcError as err:
# StatusCode should be StatusCode.ABORTED.
print(f'Cannot save due to bad etag. ErrorCode={err.code()}')
# For detailed error messages from the dapr runtime:
# print(f"Details={err.details()})
# Save multiple states.
d.save_bulk_state(
store_name=storeName,
states=[
StateItem(key=another_key, value=another_value),
StateItem(key=yet_another_key, value=yet_another_value),
],
)
print(f'State store has successfully saved {another_value} with {another_key} as key')
print(f'State store has successfully saved {yet_another_value} with {yet_another_key} as key')
# Save bulk with etag that is different from the one stored in the database.
try:
d.save_bulk_state(
store_name=storeName,
states=[
StateItem(key=another_key, value=another_value, etag='999'),
StateItem(key=yet_another_key, value=yet_another_value, etag='999'),
],
)
except grpc.RpcError as err:
# StatusCode should be StatusCode.ABORTED.
print(f'Cannot save bulk due to bad etags. ErrorCode={err.code()}')
# For detailed error messages from the dapr runtime: # print(f"Details={err.details()})
# Get one state by key.
state = d.get_state(store_name=storeName, key=key, state_metadata={'metakey': 'metavalue'})
print(f'Got value={state.data} eTag={state.etag}')
# Transaction upsert
d.execute_state_transaction(
store_name=storeName,
operations=[
TransactionalStateOperation(
operation_type=TransactionOperationType.upsert,
key=key,
data=updated_value,
etag=state.etag,
),
TransactionalStateOperation(key=another_key, data=another_value),
TransactionalStateOperation(key=yet_another_key, data=yet_another_value),
],
)
# Batch get
items = d.get_bulk_state(
store_name=storeName, keys=[key, another_key], states_metadata={'metakey': 'metavalue'}
).items
print(f'Got items with etags: {[(i.data, i.etag) for i in items]}')
# Transaction delete
d.execute_state_transaction(
store_name=storeName,
operations=[
TransactionalStateOperation(operation_type=TransactionOperationType.delete, key=key),
TransactionalStateOperation(
operation_type=TransactionOperationType.delete, key=another_key
),
],
)
# Batch get
items = d.get_bulk_state(
store_name=storeName, keys=[key, another_key], states_metadata={'metakey': 'metavalue'}
).items
print(f'Got values after transaction delete: {[data.data for data in items]}')
# Delete one state by key.
d.delete_state(
store_name=storeName, key=yet_another_key, state_metadata={'metakey': 'metavalue'}
)
data = d.get_state(store_name=storeName, key=yet_another_key).data
print(f'Got value after delete: {data}')