diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md index efdac157ce6..92f6d18a39f 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md @@ -28,9 +28,180 @@ There are several different kinds of tasks that a workflow can schedule, includi ### Workflow identity -Each workflow you define has a type name, and individual executions of a workflow require a unique _instance ID_. Workflow instance IDs can be generated by your app code, which is useful when workflows correspond to business entities like documents or jobs, or can be auto-generated UUIDs. A workflow's instance ID is useful for debugging and also for managing workflows using the [Workflow APIs]({{< ref workflow_api.md >}}). +Each workflow you define has a type name, and individual executions of a workflow require a unique _instance ID_. Only one workflow instance with a given ID can exist at any given time. Workflow instance IDs can be generated by your app code, which is useful when workflows correspond to business entities like documents or jobs, or can be auto-generated UUIDs. A workflow's instance ID is useful for debugging and also for managing workflows using the [Workflow APIs]({{< ref workflow_api.md >}}). -Only one workflow instance with a given ID can exist at any given time. However, if a workflow instance completes or fails, its ID can be reused by a new workflow instance. Note, however, that the new workflow instance effectively replaces the old one in the configured state store. +#### Reusing workflow identities + +If a workflow instance completes or fails, its ID can be reused by a new workflow instance. The new workflow instance effectively replaces the old one in the configured state store. + +You can use the following policies to reuse workflow IDs: + +| Policies | Description | +| -------- | ----------- | +| `api.REUSE_ID_ACTION_IGNORE` | Ignores the a new workflow being created with the same ID as an existing workflow if the existing workflow is `RUNNING`, `COMPLETED`, or `PENDING`. | +| `api.REUSE_ID_ACTION_TERMINATE` | Terminates a new workflow created with the same ID as an existing workflow if the existing workflow is `RUNNING`, `COMPLETED`, or `PENDING`. | + +**Example 1** + +The following example demonstrates the default behavior, erroring out the workflow when reusing the workflow ID. In the following test: + +1. The workflow calls a single activity with orchestration ID reuse policy. +1. The reuse ID policy specifies the action `IGNORE_IF_RUNNING_OR_COMPLETED` and the target statuses of `RUNNING`, `COMPLETED`, `PENDING`. +1. The second call to create a workflow with the same instance ID throws an error when trying to reuse the workflow ID. + +```go +func main() { + r := task.NewTaskRegistry() + r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { + var input string + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + var output string + err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) + return output, err + }) + r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { + var name string + if err := ctx.GetInput(&name); err != nil { + return nil, err + } + return fmt.Sprintf("Hello, %s!", name), nil + }) + + ctx := context.Background() + client, engine := startEngine(ctx, r) + + instanceID := api.InstanceID("IGNORE_IF_RUNNING_OR_COMPLETED") + reuseIDPolicy := &api.OrchestrationIdReusePolicy{ + Action: api.REUSE_ID_ACTION_IGNORE, + OperationStatus: []api.OrchestrationStatus{api.RUNTIME_STATUS_RUNNING, api.RUNTIME_STATUS_COMPLETED, api.RUNTIME_STATUS_PENDING}, + } + + // Run the orchestration. + id, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(instanceID)) + if err != nil { + fmt.Println(err) + return + } + // Wait for orchestration to start... + client.WaitForOrchestrationStart(ctx, id) + // Schedule the workflow again using the same id. However it will error out since it already exists. + id, err = client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id), api.WithOrchestrationIdReusePolicy(reuseIDPolicy)) + if err != nil { + fmt.Println(err) + return + } +} +``` + + +**Example 2** + +Let's say you want to start an orchestration when not in `RUNNING` state. In the following test: + +1. The workflow calls a single activity with orchestration ID reuse policy. +1. The reuse ID policy specifies the action `IGNORE_IF_RUNNING_OR_COMPLETED` and the target statuses of `RUNNING`, `COMPLETED`, `PENDING`. +1. The second call to create a workflow with the same instance ID is expected to be ignored if the first workflow instance is one of the target statuses. + +```go +func main() { + r := task.NewTaskRegistry() + r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { + var input string + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + var output string + err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) + return output, err + }) + r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { + var name string + if err := ctx.GetInput(&name); err != nil { + return nil, err + } + return fmt.Sprintf("Hello, %s!", name), nil + }) + + ctx := context.Background() + client, engine := startEngine(ctx, r) + + instanceID := api.InstanceID("IGNORE_IF_RUNNING_OR_COMPLETED") + reuseIDPolicy := &api.OrchestrationIdReusePolicy{ + Action: api.REUSE_ID_ACTION_IGNORE, + OperationStatus: []api.OrchestrationStatus{api.RUNTIME_STATUS_RUNNING, api.RUNTIME_STATUS_COMPLETED, api.RUNTIME_STATUS_PENDING}, + } + + // Run the orchestration. + id, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(instanceID)) + if err != nil { + fmt.Println(err) + return + } + // Wait for orchestration to start... + client.WaitForOrchestrationStart(ctx, id) + // Schedule the workflow again using the same id. However it will ignore creating the new orchestration, since the id is already in use. + id, err = client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id), api.WithOrchestrationIdReusePolicy(reuseIDPolicy)) + if err != nil { + fmt.Println(err) + return + } +} +``` + +**Example 3** + +In the following example: +1. The workflow calls a single activity with the orchestration ID reuse policy. +1. The reuse ID policy contains the action to `TERMINATE` and target statuses `RUNNING`, `COMPLETED`, and `PENDING`. +1. The second call to create a workflow with the same workflow instance ID is expected to terminate the first workflow instance and create a new workflow instance if in one of the target statuses. + +```go +func main() { + r := task.NewTaskRegistry() + r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { + var input string + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + var output string + err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) + return output, err + }) + r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { + var name string + if err := ctx.GetInput(&name); err != nil { + return nil, err + } + return fmt.Sprintf("Hello, %s!", name), nil + }) + + ctx := context.Background() + client, engine := startEngine(ctx, r) + + instanceID := api.InstanceID("IGNORE_IF_RUNNING_OR_COMPLETED") + reuseIDPolicy := &api.OrchestrationIdReusePolicy{ + Action: api.REUSE_ID_ACTION_TERMINATE, + OperationStatus: []api.OrchestrationStatus{api.RUNTIME_STATUS_RUNNING, api.RUNTIME_STATUS_COMPLETED, api.RUNTIME_STATUS_PENDING}, + } + + // Run the orchestration + id, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(instanceID)) + if err != nil { + fmt.Println(err) + return + } + // Wait for orchestration to start + client.WaitForOrchestrationStart(ctx, id) + // Schedule again. This time, the workflow is successfully started (not ignored), since the policy terminates the existing workflow with the id and starts a new one + id, err = client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id), api.WithOrchestrationIdReusePolicy(reuseIDPolicy)) + if err != nil { + fmt.Println(err) + return + } +} +``` ### Workflow replay