-
Notifications
You must be signed in to change notification settings - Fork 528
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Saga pattern quickstart in Java (#957)
* copy code example of java workflow as code base for saga Signed-off-by: Sky Ao <[email protected]> * rename for saga Signed-off-by: Sky Ao <[email protected]> * update workflow and activities for saga pattern support Signed-off-by: Sky Ao <[email protected]> * add compentation log Signed-off-by: Sky Ao <[email protected]> * rename DistributionActivity to DelieveryActivity Signed-off-by: Sky Ao <[email protected]> * change to saga-in-workflow solution Signed-off-by: Sky Ao <[email protected]> * improve code according to proposal Signed-off-by: Sky Ao <[email protected]> * update saga quickstart because CompensatableWorkflowActivity is removed Signed-off-by: Sky Ao <[email protected]> * update after saga context added Signed-off-by: Sky Ao <[email protected]> * trigger CI Signed-off-by: Sky Ao <[email protected]> * trigger CI Signed-off-by: Sky Ao <[email protected]> * update expected stdout lines for compensated inventory and payment Signed-off-by: Sky Ao <[email protected]> --------- Signed-off-by: Sky Ao <[email protected]>
- Loading branch information
Showing
26 changed files
with
1,039 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
apiVersion: dapr.io/v1alpha1 | ||
kind: Component | ||
metadata: | ||
name: statestore | ||
spec: | ||
type: state.redis | ||
version: v1 | ||
initTimeout: 1m | ||
metadata: | ||
- name: redisHost | ||
value: localhost:6379 | ||
- name: redisPassword | ||
value: "" | ||
- name: actorStateStore | ||
value: "true" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
# Dapr workflows | ||
|
||
In this quickstart, you'll create a simple console application to demonstrate Dapr's workflow programming model and saga pattern support. The console app starts and manages the lifecycle of a workflow that stores and retrieves data in a state store. | ||
|
||
This quickstart includes one project: | ||
|
||
- Java console app `order-processor-saga` | ||
|
||
The quickstart contains 1 workflow to simulate purchasing items from a store, and 6 unique activities within the workflow. These 6 activities are as follows: | ||
|
||
- NotifyActivity: This activity utilizes a logger to print out messages throughout the workflow. These messages notify the user when there is insufficient inventory, their payment couldn't be processed, and more. | ||
- ReserveInventoryActivity: This activity checks the state store to ensure that there is enough inventory present for purchase. | ||
- RequestApprovalActivity: This activity requests approval for orders over a certain threshold | ||
- ProcessPaymentActivity: This activity is responsible for processing and authorizing the payment. | ||
- UpdateInventoryActivity: This activity updates the state store with the new remaining inventory value. | ||
- DistributionActivity: This activity starts the distribution. In this quickstart, it will allways be failed to trigger saga compensation. | ||
|
||
### Run the order processor workflow with multi-app-run | ||
|
||
1. Open a new terminal window and navigate to `order-processor` directory: | ||
|
||
<!-- STEP | ||
name: Install Java dependencies | ||
--> | ||
|
||
```bash | ||
cd ./order-processor-saga | ||
mvn clean install | ||
cd .. | ||
``` | ||
|
||
<!-- END_STEP --> | ||
2. Run the console app with Dapr: | ||
|
||
<!-- STEP | ||
name: Run order-processor service | ||
expected_stdout_lines: | ||
- 'Compensated inventory for order' | ||
- '== APP - SagaConsoleApp == there are now 100 cars left in stock' | ||
- 'Compensated payment for request ID' | ||
- '== APP - SagaConsoleApp == workflow instance completed' | ||
expected_stderr_lines: | ||
output_match_mode: substring | ||
background: true | ||
sleep: 15 | ||
timeout_seconds: 120 | ||
--> | ||
|
||
```bash | ||
dapr run -f . | ||
``` | ||
|
||
<!-- END_STEP --> | ||
|
||
3. Expected output | ||
|
||
|
||
``` | ||
== APP - SagaConsoleApp == *** Welcome to the Dapr saga console app sample! | ||
== APP - SagaConsoleApp == *** Using this app, you can place orders that start workflows. | ||
== APP - SagaConsoleApp == Start workflow runtime | ||
== APP - SagaConsoleApp == Oct 24, 2023 7:00:56 AM com.microsoft.durabletask.DurableTaskGrpcWorker startAndBlock | ||
== APP - SagaConsoleApp == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:33907. | ||
== APP - SagaConsoleApp == ==========Begin the purchase of item:========== | ||
== APP - SagaConsoleApp == Starting order workflow, purchasing 10 of cars | ||
== APP - SagaConsoleApp == scheduled new workflow instance of OrderProcessingWorkflow with instance ID: 48a551c1-a8ac-4622-ab28-ae89647066f3 | ||
== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.quickstarts.saga.OrderProcessingWorkflow | ||
== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Instance ID(order ID): 48a551c1-a8ac-4622-ab28-ae89647066f3 | ||
== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Current Orchestration Time: 2023-10-24T07:01:00.264Z | ||
== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Received Order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] | ||
== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.NotifyActivity - Received Order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] | ||
== APP - SagaConsoleApp == workflow instance 48a551c1-a8ac-4622-ab28-ae89647066f3 started | ||
== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.ReserveInventoryActivity - Reserving inventory for order '48a551c1-a8ac-4622-ab28-ae89647066f3' of 10 cars | ||
== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.ReserveInventoryActivity - There are 100 cars available for purchase | ||
== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.ReserveInventoryActivity - Reserved inventory for order '48a551c1-a8ac-4622-ab28-ae89647066f3' of 10 cars | ||
== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.RequestApprovalActivity - Requesting approval for order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] | ||
== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.RequestApprovalActivity - Approved requesting approval for order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] | ||
== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.ProcessPaymentActivity - Processing payment: 48a551c1-a8ac-4622-ab28-ae89647066f3 for 10 cars at $150000 | ||
== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.ProcessPaymentActivity - Payment for request ID '48a551c1-a8ac-4622-ab28-ae89647066f3' processed successfully | ||
== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.UpdateInventoryActivity - Updating inventory for order '48a551c1-a8ac-4622-ab28-ae89647066f3' of 10 cars | ||
== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.UpdateInventoryActivity - Updated inventory for order '48a551c1-a8ac-4622-ab28-ae89647066f3': there are now 90 cars left in stock | ||
== APP - SagaConsoleApp == there are now 90 cars left in stock | ||
== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.UpdateInventoryActivity - Compensating inventory for order '48a551c1-a8ac-4622-ab28-ae89647066f3' of 10 cars | ||
== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.UpdateInventoryActivity - Compensated inventory for order '48a551c1-a8ac-4622-ab28-ae89647066f3': there are now 100 cars left in stock | ||
== APP - SagaConsoleApp == there are now 100 cars left in stock | ||
== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.ProcessPaymentActivity - Compensating payment for request ID '48a551c1-a8ac-4622-ab28-ae89647066f3' at $150000 | ||
== APP - SagaConsoleApp == workflow instance completed, out is: {"processed":false,"compensated":true} | ||
``` | ||
|
||
### View workflow output with Zipkin | ||
|
||
For a more detailed view of the workflow activities (duration, progress etc.), try using Zipkin. | ||
|
||
1. Launch Zipkin container - The [openzipkin/zipkin](https://hub.docker.com/r/openzipkin/zipkin/) docker container is launched on running `dapr init`. Check to make sure the container is running. If it's not, launch the Zipkin docker container with the following command. | ||
|
||
```bash | ||
docker run -d -p 9411:9411 openzipkin/zipkin | ||
``` | ||
|
||
2. View Traces in Zipkin UI - In your browser go to http://localhost:9411 to view the workflow trace spans in the Zipkin web UI. The order-processor workflow should be viewable with the following output in the Zipkin web UI. | ||
|
||
<img src="img/workflow-trace-spans-zipkin.png"> | ||
|
||
### What happened? | ||
|
||
When you ran `dapr run --app-id WorkflowConsoleApp --resources-path ../../../components/ --dapr-grpc-port 50001 -- java -jar target/OrderProcessingService-0.0.1-SNAPSHOT.jar io.dapr.quickstarts.workflows.WorkflowConsoleApp` | ||
|
||
1. A unique order ID for the workflow is generated (in the above example, `95d33f7c-3af8-4960-ba11-4ecea83b0509`) and the workflow is scheduled. | ||
2. The `NotifyActivity` workflow activity sends a notification saying an order for 10 cars has been received. | ||
3. The `ReserveInventoryActivity` workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock. | ||
4. Your workflow starts and notifies you of its status. | ||
5. The `RequestApprovalActivity` workflow activity requests approval for order `95d33f7c-3af8-4960-ba11-4ecea83b0509` | ||
6. The `ProcessPaymentActivity` workflow activity begins processing payment for order `95d33f7c-3af8-4960-ba11-4ecea83b0509` and confirms if successful. | ||
7. The `UpdateInventoryActivity` workflow activity updates the inventory with the current available cars after the order has been processed. | ||
8. The `DistributionActivity` workflow activity failed (to trigger saga compensation). | ||
9. Saga compensation is triggered and compensated for `ProcessPaymentActivity` and `UpdateInventoryActivity` | ||
10. The workflow terminates as completed with processed=false and compensated=true. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
version: 1 | ||
common: | ||
resourcesPath: ../../components | ||
apps: | ||
- appID: SagaConsoleApp | ||
appDirPath: ./order-processor-saga/target | ||
command: ["java", "-jar", "SagaService-0.0.1-SNAPSHOT.jar", "io.dapr.quickstarts.saga.SagaConsoleApp"] |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
include ../../../docker.mk | ||
include ../../../validate.mk |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<groupId>com.service</groupId> | ||
<artifactId>SagaService</artifactId> | ||
<version>0.0.1-SNAPSHOT</version> | ||
<name>SagaService</name> | ||
<description>Demo for Dapr workflow Saga</description> | ||
<properties> | ||
<maven.compiler.source>11</maven.compiler.source> | ||
<maven.compiler.target>11</maven.compiler.target> | ||
<slf4jVersion>1.6.1</slf4jVersion> | ||
</properties> | ||
<dependencies> | ||
<dependency> | ||
<groupId>io.dapr</groupId> | ||
<artifactId>dapr-sdk-workflows</artifactId> | ||
<version>0.11.0-SNAPSHOT</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>slf4j-simple</artifactId> | ||
<version>1.7.36</version> | ||
</dependency> | ||
</dependencies> | ||
|
||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.springframework.boot</groupId> | ||
<artifactId>spring-boot-maven-plugin</artifactId> | ||
<version>2.6.3</version> | ||
<executions> | ||
<execution> | ||
<goals> | ||
<goal>repackage</goal> | ||
</goals> | ||
<configuration> | ||
<mainClass> | ||
io.dapr.quickstarts.saga.SagaConsoleApp | ||
</mainClass> | ||
</configuration> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
|
||
<repositories> | ||
<repository> | ||
<id>maven-SNAPSHOT</id> | ||
<url>https://oss.sonatype.org/content/repositories/snapshots/</url> | ||
<snapshots> | ||
<enabled>true</enabled> | ||
</snapshots> | ||
</repository> | ||
</repositories> | ||
</project> |
128 changes: 128 additions & 0 deletions
128
.../order-processor-saga/src/main/java/io/dapr/quickstarts/saga/OrderProcessingWorkflow.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
package io.dapr.quickstarts.saga; | ||
|
||
import org.slf4j.Logger; | ||
|
||
import io.dapr.quickstarts.saga.activities.DeliveryActivity; | ||
import io.dapr.quickstarts.saga.activities.NotifyActivity; | ||
import io.dapr.quickstarts.saga.activities.ProcessPaymentActivity; | ||
import io.dapr.quickstarts.saga.activities.ProcessPaymentCompensationActivity; | ||
import io.dapr.quickstarts.saga.activities.RequestApprovalActivity; | ||
import io.dapr.quickstarts.saga.activities.ReserveInventoryActivity; | ||
import io.dapr.quickstarts.saga.activities.UpdateInventoryActivity; | ||
import io.dapr.quickstarts.saga.activities.UpdateInventoryCompensationActivity; | ||
import io.dapr.quickstarts.saga.models.ApprovalResult; | ||
import io.dapr.quickstarts.saga.models.InventoryRequest; | ||
import io.dapr.quickstarts.saga.models.InventoryResult; | ||
import io.dapr.quickstarts.saga.models.Notification; | ||
import io.dapr.quickstarts.saga.models.OrderPayload; | ||
import io.dapr.quickstarts.saga.models.OrderResult; | ||
import io.dapr.quickstarts.saga.models.PaymentRequest; | ||
import io.dapr.workflows.Workflow; | ||
import io.dapr.workflows.WorkflowStub; | ||
import io.dapr.workflows.saga.SagaOption; | ||
|
||
public class OrderProcessingWorkflow extends Workflow { | ||
|
||
@Override | ||
public WorkflowStub create() { | ||
return ctx -> { | ||
Logger logger = ctx.getLogger(); | ||
String orderId = ctx.getInstanceId(); | ||
logger.info("Starting Workflow: " + ctx.getName()); | ||
logger.info("Instance ID(order ID): " + orderId); | ||
logger.info("Current Orchestration Time: " + ctx.getCurrentInstant()); | ||
|
||
OrderPayload order = ctx.getInput(OrderPayload.class); | ||
logger.info("Received Order: " + order.toString()); | ||
OrderResult orderResult = new OrderResult(); | ||
|
||
// step1: notify the user that an order has come through | ||
Notification notification = new Notification(); | ||
notification.setMessage("Received Order: " + order.toString()); | ||
ctx.callActivity(NotifyActivity.class.getName(), notification).await(); | ||
|
||
// step2: determine if there is enough of the item available for purchase by | ||
// checking the inventory | ||
InventoryRequest inventoryRequest = new InventoryRequest(); | ||
inventoryRequest.setRequestId(orderId); | ||
inventoryRequest.setItemName(order.getItemName()); | ||
inventoryRequest.setQuantity(order.getQuantity()); | ||
InventoryResult inventoryResult = ctx.callActivity(ReserveInventoryActivity.class.getName(), | ||
inventoryRequest, InventoryResult.class).await(); | ||
|
||
// If there is insufficient inventory, fail and let the user know | ||
if (!inventoryResult.isSuccess()) { | ||
notification.setMessage("Insufficient inventory for order : " + order.getItemName()); | ||
ctx.callActivity(NotifyActivity.class.getName(), notification).await(); | ||
ctx.complete(orderResult); | ||
return; | ||
} | ||
|
||
// step3: require orders over a certain threshold to be approved | ||
if (order.getTotalCost() > 5000) { | ||
ApprovalResult approvalResult = ctx.callActivity(RequestApprovalActivity.class.getName(), | ||
order, ApprovalResult.class).await(); | ||
if (approvalResult != ApprovalResult.Approved) { | ||
notification.setMessage("Order " + order.getItemName() + " was not approved."); | ||
ctx.callActivity(NotifyActivity.class.getName(), notification).await(); | ||
ctx.complete(orderResult); | ||
return; | ||
} | ||
} | ||
|
||
// There is enough inventory available so the user can purchase the item(s). | ||
// step4: Process their payment (need compensation) | ||
PaymentRequest paymentRequest = new PaymentRequest(); | ||
paymentRequest.setRequestId(orderId); | ||
paymentRequest.setItemBeingPurchased(order.getItemName()); | ||
paymentRequest.setQuantity(order.getQuantity()); | ||
paymentRequest.setAmount(order.getTotalCost()); | ||
boolean isOK = ctx.callActivity(ProcessPaymentActivity.class.getName(), | ||
paymentRequest, boolean.class).await(); | ||
if (!isOK) { | ||
notification.setMessage("Payment failed for order : " + orderId); | ||
ctx.callActivity(NotifyActivity.class.getName(), notification).await(); | ||
ctx.complete(orderResult); | ||
return; | ||
} | ||
ctx.getSagaContext().registerCompensation(ProcessPaymentCompensationActivity.class.getName(), paymentRequest); | ||
|
||
// step5: Update the inventory (need compensation) | ||
inventoryResult = ctx.callActivity(UpdateInventoryActivity.class.getName(), | ||
inventoryRequest, InventoryResult.class).await(); | ||
if (!inventoryResult.isSuccess()) { | ||
// Let users know their payment processing failed | ||
notification.setMessage("Order failed to update inventory! : " + orderId); | ||
ctx.callActivity(NotifyActivity.class.getName(), notification).await(); | ||
|
||
// trigger saga compensation gracefully | ||
ctx.getSagaContext().compensate(); | ||
orderResult.setCompensated(true); | ||
ctx.complete(orderResult); | ||
return; | ||
} | ||
ctx.getSagaContext().registerCompensation(UpdateInventoryCompensationActivity.class.getName(), | ||
inventoryRequest); | ||
|
||
// step6: delevery (allways be failed to trigger compensation) | ||
ctx.callActivity(DeliveryActivity.class.getName()).await(); | ||
|
||
// step7: Let user know their order was processed(won't be executed if step6 | ||
// failed) | ||
notification.setMessage("Order completed! : " + orderId); | ||
ctx.callActivity(NotifyActivity.class.getName(), notification).await(); | ||
|
||
// Complete the workflow with order result is processed(won't be executed if | ||
// step6 failed) | ||
orderResult.setProcessed(true); | ||
ctx.complete(orderResult); | ||
}; | ||
} | ||
|
||
@Override | ||
public SagaOption getSagaOption() { | ||
return SagaOption.newBuilder().setParallelCompensation(false) | ||
.setContinueWithError(true).build(); | ||
} | ||
|
||
} |
Oops, something went wrong.