Skip to content

[WIP] Added docs for RavenDB as State Store #4600

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 42 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
4f32042
Update conversation-overview.md
hhunter-ms Feb 27, 2025
3688a1d
Merge pull request #4559 from dapr/hhunter-ms-patch-6
hhunter-ms Feb 27, 2025
b74c247
update python examples for workflow; update conversation quickstart t…
hhunter-ms Feb 28, 2025
0992cd4
Updates latest version to 1.15.1 (#4562)
JoshVanL Feb 28, 2025
fb53445
Revert back to built-in template that includes GA
marcduiker Mar 3, 2025
287cfd7
Merge pull request #4564 from marcduiker/v1.15
hhunter-ms Mar 3, 2025
7c40430
spelling-fix: kubernetes-persisting-scheduler.md (#4566)
jake-engelberg Mar 3, 2025
19f812c
remove 3500, only use as an example in API docs
hhunter-ms Mar 3, 2025
178e0c8
Update kubernetes-persisting-scheduler.md (#4568)
jake-engelberg Mar 3, 2025
ac358cb
Merge branch 'v1.15' into issue_4519
hhunter-ms Mar 4, 2025
b7b273d
Merge pull request #4567 from hhunter-ms/issue_4519
hhunter-ms Mar 4, 2025
12b2f17
Update sidecar.md (#4570)
joneldominic Mar 4, 2025
f4848ae
Update v1.15.1 --> v1.15.2 (#4571)
hhunter-ms Mar 5, 2025
0682cd0
update keys to items for consistency (#4565)
hhunter-ms Mar 6, 2025
418ef7f
update the docs
msfussell Mar 13, 2025
fb8764b
Update multi-app template and arguments annotations for new configura…
antontroshin Mar 13, 2025
449fb9a
Merge branch 'v1.15' into pythondocs
hhunter-ms Mar 17, 2025
b24527e
Merge pull request #4576 from msfussell/pythondocs
hhunter-ms Mar 17, 2025
60d5332
update author and manage workflow how-tos
hhunter-ms Mar 17, 2025
8cbd37a
Merge branch 'v1.15' into issue_4410
hhunter-ms Mar 17, 2025
6e990e4
Added .NET streaming subscription example to pubsub page
WhitWaldo Mar 18, 2025
2be7c99
Added double quotes around .NET in the tabs markup
WhitWaldo Mar 18, 2025
dd0fd38
Merge pull request #4583 from WhitWaldo/net-streaming-subs
hhunter-ms Mar 18, 2025
79d0320
Merge branch 'v1.15' into issue_4410
hhunter-ms Mar 18, 2025
f612ab7
Merge pull request #4563 from hhunter-ms/issue_4410
hhunter-ms Mar 19, 2025
8a29b39
Fixed spelling mistake in secret-scope.md (#4593)
jamespegg Mar 24, 2025
aebf393
Update self-hosted-with-docker.md - make `scheduler` running (#4599)
mathieu-benoit Mar 25, 2025
4088858
Added docs for RavenDB as State Store
nmalocicvega Mar 26, 2025
ccef7f3
Merge branch 'v1.15' into 3318-ravendb-docs
mikeee Mar 26, 2025
b421483
Update actors-quickstart.md (#4601)
alicejgibbons Mar 27, 2025
028e554
Fixes to the supported features and to Ravendb page
nmalocic Mar 31, 2025
7f87f2b
Fixes to the supported features and to Ravendb page
nmalocic Mar 31, 2025
dcc6d70
Merge branch '3318-ravendb-docs' of github.com:nmalocic/dapr-docs int…
nmalocic Mar 31, 2025
7b4962c
Fixed spelling mistake in secret-scope.md (#4593)
jamespegg Mar 24, 2025
bd33953
Added docs for RavenDB as State Store
nmalocicvega Mar 26, 2025
44ce484
Update self-hosted-with-docker.md - make `scheduler` running (#4599)
mathieu-benoit Mar 25, 2025
a6c8cc4
Fixes to the supported features and to Ravendb page
nmalocic Mar 31, 2025
46a8c73
Update actors-quickstart.md (#4601)
alicejgibbons Mar 27, 2025
282c3b9
Merge branch '3318-ravendb-docs' of github.com:nmalocic/dapr-docs int…
nmalocic Mar 31, 2025
08fe273
Merge branch 'v1.15' into 3318-ravendb-docs
mikeee Apr 8, 2025
45bb23a
Update daprdocs/data/components/state_stores/generic.yaml
nmalocic Jul 18, 2025
a06284a
Update daprdocs/content/en/reference/components-reference/supported-s…
nmalocic Jul 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion daprdocs/content/en/concepts/dapr-services/sidecar.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ For a detailed list of all available arguments run `daprd --help` or see this [t
1. Specify the port your application is listening to

```bash
daprd --app-id --app-port 5000
daprd --app-id myapp --app-port 5000
```

1. If you are using several custom resources and want to specify the location of the resource definition files, use the `--resources-path` argument:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ using System.Threading.Tasks;
using Dapr.Client;

const string DAPR_CONFIGURATION_STORE = "configstore";
var CONFIGURATION_KEYS = new List<string> { "orderId1", "orderId2" };
var CONFIGURATION_ITEMS = new List<string> { "orderId1", "orderId2" };
var client = new DaprClientBuilder().Build();

// Subscribe for configuration changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Want to put the Dapr conversation API to the test? Walk through the following qu

| Quickstart/tutorial | Description |
| ------------------- | ----------- |
| [Conversation quickstart](todo) | TODO |
| [Conversation quickstart]({{< ref conversation-quickstart.md >}}) | Learn how to interact with Large Language Models (LLMs) using the conversation API. |

### Start using the conversation API directly in your app

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,55 @@ As messages are sent to the given message handler code, there is no concept of r

The example below shows the different ways to stream subscribe to a topic.

{{< tabs Python Go >}}
{{< tabs ".NET" Python Go >}}

{{% codetab %}}

You can use the `SubscribeAsync` method on the `DaprPublishSubscribeClient` to configure the message handler to use to pull messages from the stream.

```c#
using System.Text;
using Dapr.Messaging.PublishSubscribe;
using Dapr.Messaging.PublishSubscribe.Extensions;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddDaprPubSubClient();
var app = builder.Build();

var messagingClient = app.Services.GetRequiredService<DaprPublishSubscribeClient>();

//Create a dynamic streaming subscription and subscribe with a timeout of 30 seconds and 10 seconds for message handling
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var subscription = await messagingClient.SubscribeAsync("pubsub", "myTopic",
new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry)),
HandleMessageAsync, cancellationTokenSource.Token);

await Task.Delay(TimeSpan.FromMinutes(1));

//When you're done with the subscription, simply dispose of it
await subscription.DisposeAsync();
return;

//Process each message returned from the subscription
Task<TopicResponseAction> HandleMessageAsync(TopicMessage message, CancellationToken cancellationToken = default)
{
try
{
//Do something with the message
Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span));
return Task.FromResult(TopicResponseAction.Success);
}
catch
{
return Task.FromResult(TopicResponseAction.Retry);
}
}
```

[Learn more about streaming subscriptions using the .NET SDK client.]({{< ref "dotnet-messaging-pubsub-howto.md" >}})

{{% /codetab %}}


{{% codetab %}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ The Dapr sidecar doesn’t load any workflow definitions. Rather, the sidecar si
Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates a counter (activity) called `hello_act` that notifies users of the current counter value. `hello_act` is a function derived from a class called `WorkflowActivityContext`.

```python
def hello_act(ctx: WorkflowActivityContext, input):
@wfr.activity(name='hello_act')
def hello_act(ctx: WorkflowActivityContext, wf_input):
global counter
counter += input
counter += wf_input
print(f'New counter value is: {counter}!', flush=True)
```

[See the `hello_act` workflow activity in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL40C1-L43C59)
[See the task chaining workflow activity in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/simple.py)


{{% /codetab %}}
Expand Down Expand Up @@ -226,19 +227,32 @@ Next, register and call the activites in a workflow.

<!--python-->

The `hello_world_wf` function is derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities.
The `hello_world_wf` function is a function derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities.

```python
def hello_world_wf(ctx: DaprWorkflowContext, input):
print(f'{input}')
@wfr.workflow(name='hello_world_wf')
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
print(f'{wf_input}')
yield ctx.call_activity(hello_act, input=1)
yield ctx.call_activity(hello_act, input=10)
yield ctx.wait_for_external_event("event1")
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)

# Change in event handling: Use when_any to handle both event and timeout
event = ctx.wait_for_external_event(event_name)
timeout = ctx.create_timer(timedelta(seconds=30))
winner = yield when_any([event, timeout])

if winner == timeout:
print('Workflow timed out waiting for event')
return 'Timeout'

yield ctx.call_activity(hello_act, input=100)
yield ctx.call_activity(hello_act, input=1000)
return 'Completed'
```

[See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL32C1-L38C51)
[See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/simple.py)


{{% /codetab %}}
Expand Down Expand Up @@ -405,89 +419,177 @@ Finally, compose the application using the workflow.

<!--python-->

[In the following example](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py), for a basic Python hello world application using the Python SDK, your project code would include:
[In the following example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/simple.py), for a basic Python hello world application using the Python SDK, your project code would include:

- A Python package called `DaprClient` to receive the Python SDK capabilities.
- A builder with extensions called:
- `WorkflowRuntime`: Allows you to register workflows and workflow activities
- `WorkflowRuntime`: Allows you to register the workflow runtime.
- `DaprWorkflowContext`: Allows you to [create workflows]({{< ref "#write-the-workflow" >}})
- `WorkflowActivityContext`: Allows you to [create workflow activities]({{< ref "#write-the-workflow-activities" >}})
- API calls. In the example below, these calls start, pause, resume, purge, and terminate the workflow.
- API calls. In the example below, these calls start, pause, resume, purge, and completing the workflow.

```python
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
from dapr.clients import DaprClient
from datetime import timedelta
from time import sleep
from dapr.ext.workflow import (
WorkflowRuntime,
DaprWorkflowContext,
WorkflowActivityContext,
RetryPolicy,
DaprWorkflowClient,
when_any,
)
from dapr.conf import Settings
from dapr.clients.exceptions import DaprInternalError

settings = Settings()

counter = 0
retry_count = 0
child_orchestrator_count = 0
child_orchestrator_string = ''
child_act_retry_count = 0
instance_id = 'exampleInstanceID'
child_instance_id = 'childInstanceID'
workflow_name = 'hello_world_wf'
child_workflow_name = 'child_wf'
input_data = 'Hi Counter!'
event_name = 'event1'
event_data = 'eventData'
non_existent_id_error = 'no such instance exists'

retry_policy = RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_number_of_attempts=3,
backoff_coefficient=2,
max_retry_interval=timedelta(seconds=10),
retry_timeout=timedelta(seconds=100),
)

wfr = WorkflowRuntime()


@wfr.workflow(name='hello_world_wf')
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
print(f'{wf_input}')
yield ctx.call_activity(hello_act, input=1)
yield ctx.call_activity(hello_act, input=10)
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)

# Change in event handling: Use when_any to handle both event and timeout
event = ctx.wait_for_external_event(event_name)
timeout = ctx.create_timer(timedelta(seconds=30))
winner = yield when_any([event, timeout])

if winner == timeout:
print('Workflow timed out waiting for event')
return 'Timeout'

yield ctx.call_activity(hello_act, input=100)
yield ctx.call_activity(hello_act, input=1000)
return 'Completed'


@wfr.activity(name='hello_act')
def hello_act(ctx: WorkflowActivityContext, wf_input):
global counter
counter += wf_input
print(f'New counter value is: {counter}!', flush=True)


@wfr.activity(name='hello_retryable_act')
def hello_retryable_act(ctx: WorkflowActivityContext):
global retry_count
if (retry_count % 2) == 0:
print(f'Retry count value is: {retry_count}!', flush=True)
retry_count += 1
raise ValueError('Retryable Error')
print(f'Retry count value is: {retry_count}! This print statement verifies retry', flush=True)
retry_count += 1


@wfr.workflow(name='child_retryable_wf')
def child_retryable_wf(ctx: DaprWorkflowContext):
global child_orchestrator_string, child_orchestrator_count
if not ctx.is_replaying:
child_orchestrator_count += 1
print(f'Appending {child_orchestrator_count} to child_orchestrator_string!', flush=True)
child_orchestrator_string += str(child_orchestrator_count)
yield ctx.call_activity(
act_for_child_wf, input=child_orchestrator_count, retry_policy=retry_policy
)
if child_orchestrator_count < 3:
raise ValueError('Retryable Error')


@wfr.activity(name='act_for_child_wf')
def act_for_child_wf(ctx: WorkflowActivityContext, inp):
global child_orchestrator_string, child_act_retry_count
inp_char = chr(96 + inp)
print(f'Appending {inp_char} to child_orchestrator_string!', flush=True)
child_orchestrator_string += inp_char
if child_act_retry_count % 2 == 0:
child_act_retry_count += 1
raise ValueError('Retryable Error')
child_act_retry_count += 1

# ...

def main():
with DaprClient() as d:
host = settings.DAPR_RUNTIME_HOST
port = settings.DAPR_GRPC_PORT
workflowRuntime = WorkflowRuntime(host, port)
workflowRuntime = WorkflowRuntime()
workflowRuntime.register_workflow(hello_world_wf)
workflowRuntime.register_activity(hello_act)
workflowRuntime.start()

# Start workflow
print("==========Start Counter Increase as per Input:==========")
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")

# ...

# Pause workflow
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")

# Resume workflow
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")

sleep(1)
# Raise workflow
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
event_name=eventName, event_data=eventData)

sleep(5)
# Purge workflow
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("Instance Successfully Purged")

# Kick off another workflow for termination purposes
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")

# Terminate workflow
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
sleep(1)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")

# Purge workflow
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("Instance Successfully Purged")

workflowRuntime.shutdown()
wfr.start()
wf_client = DaprWorkflowClient()

print('==========Start Counter Increase as per Input:==========')
wf_client.schedule_new_workflow(
workflow=hello_world_wf, input=input_data, instance_id=instance_id
)

wf_client.wait_for_workflow_start(instance_id)

# Sleep to let the workflow run initial activities
sleep(12)

assert counter == 11
assert retry_count == 2
assert child_orchestrator_string == '1aa2bb3cc'

# Pause Test
wf_client.pause_workflow(instance_id=instance_id)
metadata = wf_client.get_workflow_state(instance_id=instance_id)
print(f'Get response from {workflow_name} after pause call: {metadata.runtime_status.name}')

# Resume Test
wf_client.resume_workflow(instance_id=instance_id)
metadata = wf_client.get_workflow_state(instance_id=instance_id)
print(f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}')

sleep(2) # Give the workflow time to reach the event wait state
wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data)

print('========= Waiting for Workflow completion', flush=True)
try:
state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
if state.runtime_status.name == 'COMPLETED':
print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"')))
else:
print(f'Workflow failed! Status: {state.runtime_status.name}')
except TimeoutError:
print('*** Workflow timed out!')

wf_client.purge_workflow(instance_id=instance_id)
try:
wf_client.get_workflow_state(instance_id=instance_id)
except DaprInternalError as err:
if non_existent_id_error in err._message:
print('Instance Successfully Purged')

wfr.shutdown()


if __name__ == '__main__':
main()
```


{{% /codetab %}}

{{% codetab %}}
Expand Down
Loading