Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public DaprContainer daprContainer(Network daprNetwork, RabbitMQContainer rabbit
rabbitMqConfig.put("password", "guest");
rabbitMqConfig.put("requeueInFailure", "true");

return new DaprContainer("daprio/daprd:1.14.4").withAppName(applicationName)
return new DaprContainer("daprio/daprd:1.16.0").withAppName(applicationName)
.withNetwork(daprNetwork)
.withComponent(new Component(pubSub.getName(), "pubsub.rabbitmq", "v1", rabbitMqConfig))
.withDaprLogLevel(DaprLogLevel.INFO)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public DaprContainer daprContainer(Network daprNetwork, RabbitMQContainer rabbit
rabbitMqConfig.put("password", "guest");
rabbitMqConfig.put("requeueInFailure", "true");

return new DaprContainer("daprio/daprd:1.14.4").withAppName(applicationName)
return new DaprContainer("daprio/daprd:1.16.0").withAppName(applicationName)
.withNetwork(daprNetwork)
.withComponent(new Component(pubSub.getName(), "pubsub.rabbitmq", "v1", rabbitMqConfig))
.withDaprLogLevel(DaprLogLevel.INFO)
Expand Down
57 changes: 57 additions & 0 deletions messaging-modules/dapr/dapr-workflows/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.baeldung.dapr</groupId>
<artifactId>dapr</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>

<artifactId>dapr-workflows</artifactId>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.dapr.spring</groupId>
<artifactId>dapr-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>io.dapr.spring</groupId>
<artifactId>dapr-spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>rabbitmq</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>21</source>
<target>21</target>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.baeldung.dapr.pubsub.model;

public class RideRequest {
private String passengerId;
private String location;
private String destination;

public RideRequest() {
}

public RideRequest(String passengerId, String location, String destination) {
this.passengerId = passengerId;
this.location = location;
this.destination = destination;
}

public String getPassengerId() {
return passengerId;
}

public void setPassengerId(String passengerId) {
this.passengerId = passengerId;
}

public String getLocation() {
return location;
}

public void setLocation(String location) {
this.location = location;
}

public String getDestination() {
return destination;
}

public void setDestination(String destination) {
this.destination = destination;
}

@Override
public String toString() {
return "RideRequest [passengerId=" + passengerId + ", location=" + location + ", destination=" + destination
+ "]";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.baeldung.dapr.workflow;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import io.dapr.spring.workflows.config.EnableDaprWorkflows;

@EnableDaprWorkflows
@SpringBootApplication
public class DaprWorkflowApp {

public static void main(String[] args) {
SpringApplication.run(DaprWorkflowApp.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.baeldung.dapr.workflow;

import java.time.Duration;

import org.springframework.stereotype.Component;

import com.baeldung.dapr.workflow.activity.CalculateFareActivity;
import com.baeldung.dapr.workflow.activity.NotifyPassengerActivity;
import com.baeldung.dapr.workflow.activity.ValidateDriverActivity;
import com.baeldung.dapr.workflow.model.NotificationInput;
import com.baeldung.dapr.workflow.model.RideWorkflowRequest;
import com.baeldung.dapr.workflow.model.RideWorkflowStatus;

import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;
import io.dapr.workflows.WorkflowTaskOptions;
import io.dapr.workflows.WorkflowTaskRetryPolicy;

@Component
public class RideProcessingWorkflow implements Workflow {

@Override
public WorkflowStub create() {
return context -> {
String instanceId = context.getInstanceId();
context.getLogger()
.info("Starting ride processing workflow: {}", instanceId);

RideWorkflowRequest request = context.getInput(RideWorkflowRequest.class);

WorkflowTaskOptions options = taskOptions();

context.getLogger()
.info("Step 1: Validating driver {}", request.getDriverId());
boolean isValid = context.callActivity(ValidateDriverActivity.class.getName(), request, options, boolean.class)
.await();

if (!isValid) {
context.complete(new RideWorkflowStatus(request.getRideId(), "FAILED", "Driver validation failed"));
return;
}

context.getLogger()
.info("Step 2: Calculating fare");
double fare = context.callActivity(CalculateFareActivity.class.getName(), request, options, double.class)
.await();

context.getLogger()
.info("Step 3: Notifying passenger");
NotificationInput notificationInput = new NotificationInput(request, fare);
String notification = context.callActivity(NotifyPassengerActivity.class.getName(), notificationInput, options, String.class)
.await();

context.getLogger()
.info("Step 4: Waiting for passenger confirmation");
String confirmation = context.waitForExternalEvent("passenger-confirmation", Duration.ofMinutes(5), String.class)
.await();

if (!"confirmed".equalsIgnoreCase(confirmation)) {
context.complete(new RideWorkflowStatus(request.getRideId(), "CANCELLED", "Passenger did not confirm the ride within the timeout period"));
return;
}

String message = String.format("Ride confirmed and processed successfully. Fare: $%.2f. %s", fare, notification);
RideWorkflowStatus status = new RideWorkflowStatus(request.getRideId(), "COMPLETED", message);

context.getLogger()
.info("Workflow completed: {}", message);
context.complete(status);
};
}

private WorkflowTaskOptions taskOptions() {
int maxRetries = 3;
Duration backoffTimeout = Duration.ofSeconds(1);
double backoffCoefficient = 1.5;
Duration maxRetryInterval = Duration.ofSeconds(5);
Duration maxTimeout = Duration.ofSeconds(10);

WorkflowTaskRetryPolicy retryPolicy = new WorkflowTaskRetryPolicy(maxRetries, backoffTimeout, backoffCoefficient, maxRetryInterval, maxTimeout);
return new WorkflowTaskOptions(retryPolicy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.baeldung.dapr.workflow.activity;

import org.springframework.stereotype.Component;

import com.baeldung.dapr.workflow.model.RideWorkflowRequest;

import io.dapr.workflows.WorkflowActivity;
import io.dapr.workflows.WorkflowActivityContext;

@Component
public class CalculateFareActivity implements WorkflowActivity {

@Override
public Object run(WorkflowActivityContext context) {
RideWorkflowRequest request = context.getInput(RideWorkflowRequest.class);
context.getLogger()
.info("Calculating fare for ride: {}", request.getRideId());

double baseFare = 5.0;
double perMileFare = 2.5;
double estimatedMiles = 10.0;

double totalFare = baseFare + (perMileFare * estimatedMiles);
context.getLogger()
.info("Calculated fare: ${}", totalFare);

return totalFare;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.baeldung.dapr.workflow.activity;

import org.springframework.stereotype.Component;

import com.baeldung.dapr.workflow.model.NotificationInput;

import io.dapr.workflows.WorkflowActivity;
import io.dapr.workflows.WorkflowActivityContext;

@Component
public class NotifyPassengerActivity implements WorkflowActivity {

@Override
public Object run(WorkflowActivityContext context) {
NotificationInput input = context.getInput(NotificationInput.class);
context.getLogger()
.info("Notifying passenger: {}", input.request()
.getRideRequest()
.getPassengerId());

String message = String.format("Driver %s is on the way to %s. Estimated fare: $%.2f", input.request()
.getDriverId(),
input.request()
.getRideRequest()
.getLocation(),
input.fare());

context.getLogger()
.info("Notification sent: {}", message);
return message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.baeldung.dapr.workflow.activity;

import org.springframework.stereotype.Component;

import com.baeldung.dapr.workflow.model.RideWorkflowRequest;

import io.dapr.workflows.WorkflowActivity;
import io.dapr.workflows.WorkflowActivityContext;

@Component
public class ValidateDriverActivity implements WorkflowActivity {

@Override
public Object run(WorkflowActivityContext context) {
RideWorkflowRequest request = context.getInput(RideWorkflowRequest.class);
context.getLogger()
.info("Validating driver: {}", request.getDriverId());

if (request.getDriverId() != null && !request.getDriverId()
.isEmpty()) {
context.getLogger()
.info("Driver {} validated successfully", request.getDriverId());
return true;
}

throw new IllegalArgumentException("Invalid driver ID");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.baeldung.dapr.workflow.controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.baeldung.dapr.workflow.RideProcessingWorkflow;
import com.baeldung.dapr.workflow.model.RideWorkflowRequest;

import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.client.WorkflowInstanceStatus;

@RestController
@RequestMapping("/workflow")
public class RideWorkflowController {

private static final Logger logger = LoggerFactory.getLogger(RideWorkflowController.class);

private final DaprWorkflowClient workflowClient;

public RideWorkflowController(DaprWorkflowClient workflowClient) {
this.workflowClient = workflowClient;
}

@PostMapping("/start-ride")
public RideWorkflowRequest startRideWorkflow(@RequestBody RideWorkflowRequest request) {
logger.info("Starting workflow for ride: {}", request.getRideId());

String instanceId = workflowClient.scheduleNewWorkflow(RideProcessingWorkflow.class, request);

request.setWorkflowInstanceId(instanceId);
logger.info("Workflow started with instance ID: {}", instanceId);

return request;
}

@GetMapping("/status/{instanceId}")
public WorkflowInstanceStatus getWorkflowStatus(@PathVariable String instanceId) {
return workflowClient.getInstanceState(instanceId, true);
}

@PostMapping("/confirm/{instanceId}")
public void confirmRide(@PathVariable("instanceId") String instanceId, @RequestBody String confirmation) {
logger.info("Raising confirmation event for workflow: {}", instanceId);
workflowClient.raiseEvent(instanceId, "passenger-confirmation", confirmation);
logger.info("Confirmation event raised: {}", confirmation);
}
}
Loading