Skip to content

Commit

Permalink
docs: documenting workflow pause and timers (#1756)
Browse files Browse the repository at this point in the history
* docs: documenting workflow pause and timers

* Update docs/src/modules/java/pages/workflows.adoc

Co-authored-by: Eduardo Pinto <[email protected]>

---------

Co-authored-by: Eduardo Pinto <[email protected]>
  • Loading branch information
aludwiko and efgpinto authored Aug 4, 2023
1 parent d05914a commit 90b9074
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 59 deletions.
6 changes: 3 additions & 3 deletions docs/src/modules/java-protobuf/partials/timers-intro.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Timers are persisted in the https://docs.kalix.io/reference/glossary.html#proxy[
When a timer is triggered, the scheduled call is executed. If successfully executed, the timer completes and is automatically removed.
In case of a failure, the timer is rescheduled with a delay of 3 seconds. This process repeats until the call succeeds.

You can schedule a timer for any service method in Kalix, but you can only create a timer from within an Action and by passing a `DeferredCall`.
You can schedule a timer for any service method in Kalix, but you can only create a timer from within an Action or Workflow by passing a `DeferredCall`.

**Timer features**:

Expand All @@ -18,5 +18,5 @@ You can schedule a timer for any service method in Kalix, but you can only creat
**Timer limitations**:

* Timers can only be scheduled from within an Action.
* Timers can only call other components (e.g., Actions and Entities), therefore external service calls must be wrapped by an Action in the deployed service.
* Timers can only be scheduled from within an Action or Workflow.
* Timers can only call other components (e.g., Actions, Entities, Workflows), therefore external service calls must be wrapped by an Action in the deployed service.
25 changes: 25 additions & 0 deletions docs/src/modules/java/pages/workflows.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,31 @@ IMPORTANT: For simplicity purposes, we are returning the internal state directly

A full transfer workflow source code is available {sample-url}[here, {tab-icon}, window="new"]. Follow the `README` file to run and test it.

== Pausing workflow

A long-running workflow can be paused while waiting for some additional information to continue processing. A special `pause` transition can be used to inform Kalix that the execution of the Workflow should be postponed. By launching a request to a Workflow endpoint, the user can then resume the processing. Additionally, a Kalix Timer can be scheduled to automatically inform the Workflow that the expected time for the additional data has passed.


[source,java,indent=0]
.src/main/java/com/example/transfer/TransferWorkflow.java
----
include::example$java-spring-transfer-workflow-compensation/src/main/java/com/example/transfer/TransferWorkflow.java[tag=pausing]
----
<1> Schedules a timer as a Workflow step action. Make sure that the timer name is unique for every Workflow instance.
<2> Pauses the Workflow execution.

NOTE: Remember to cancel the timer once the Workflow is resumed. Also, adjust the Workflow timeout to match the timer schedule.

Exposing additional mutational endpoints from the Workflow implementation should be done with special caution. Accepting a request from such endpoints should only be possible when the Workflow is in the expected state.

[source,java,indent=0]
.src/main/java/com/example/transfer/TransferWorkflow.java
----
include::example$java-spring-transfer-workflow-compensation/src/main/java/com/example/transfer/TransferWorkflow.java[tag=resuming]
----
<1> Accepts the request only when status is `WAITING_FOR_ACCEPTATION`.
<2> Otherwise, rejects the requests.

== Error handling

Design for failure is one of the key attributes of all Kalix components. Workflow has the richest set of configurations from all of them. It's essential to build robust and reliable solutions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.example.Main;
import com.example.transfer.TransferState.Transfer;
import com.example.wallet.WalletEntity;
import com.google.protobuf.any.Any;
import kalix.javasdk.DeferredCall;
import kalix.spring.testkit.KalixIntegrationTestKitSupport;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -12,10 +15,13 @@

import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static com.example.transfer.TransferState.TransferStatus.COMPENSATION_COMPLETED;
import static com.example.transfer.TransferState.TransferStatus.REQUIRES_MANUAL_INTERVENTION;
import static com.example.transfer.TransferState.TransferStatus.TRANSFER_ACCEPTATION_TIMED_OUT;
import static java.time.temporal.ChronoUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
Expand All @@ -24,9 +30,6 @@
@SpringBootTest(classes = Main.class)
public class TransferWorkflowIntegrationTest extends KalixIntegrationTestKitSupport {

@Autowired
private WebClient webClient;

private Duration timeout = Duration.of(10, SECONDS);

@Test
Expand All @@ -36,15 +39,13 @@ public void shouldTransferMoney() {
createWallet(walletId1, 100);
createWallet(walletId2, 100);
var transferId = randomId();
var transferUrl = "/transfer/" + transferId;
var transfer = new Transfer(walletId1, walletId2, 10);

String response = webClient.put().uri(transferUrl)
.bodyValue(transfer)
.retrieve()
.bodyToMono(Message.class)
.map(Message::value)
.block(timeout);
String response = execute(componentClient
.forWorkflow(transferId)
.call(TransferWorkflow::startTransfer)
.params(transfer))
.value();

assertThat(response).isEqualTo("transfer started");

Expand All @@ -59,29 +60,92 @@ public void shouldTransferMoney() {
});
}

@Test
public void shouldTransferMoneyWithAcceptation() {
var walletId1 = randomId();
var walletId2 = randomId();
createWallet(walletId1, 2000);
createWallet(walletId2, 100);
var transferId = randomId();
var transfer = new Transfer(walletId1, walletId2, 1001);

String response = execute(componentClient
.forWorkflow(transferId)
.call(TransferWorkflow::startTransfer)
.params(transfer))
.value();

assertThat(response).isEqualTo("transfer started, waiting for acceptation");

String acceptationResponse = execute(componentClient
.forWorkflow(transferId)
.call(TransferWorkflow::accept))
.value();

assertThat(acceptationResponse).isEqualTo("transfer accepted");

await()
.atMost(10, TimeUnit.of(SECONDS))
.untilAsserted(() -> {
var balance1 = getWalletBalance(walletId1);
var balance2 = getWalletBalance(walletId2);

assertThat(balance1).isEqualTo(999);
assertThat(balance2).isEqualTo(1101);
});
}

@Test
public void shouldTimeoutTransferAcceptation() {
var walletId1 = randomId();
var walletId2 = randomId();
createWallet(walletId1, 2000);
createWallet(walletId2, 100);
var transferId = randomId();
var transfer = new Transfer(walletId1, walletId2, 1001);

String response = execute(componentClient
.forWorkflow(transferId)
.call(TransferWorkflow::startTransfer)
.params(transfer))
.value();
assertThat(response).isEqualTo("transfer started, waiting for acceptation");

String acceptationResponse = execute(componentClient
.forWorkflow(transferId)
.call(TransferWorkflow::acceptationTimeout));
assertThat(acceptationResponse).contains("timed out");

var balance1 = getWalletBalance(walletId1);
var balance2 = getWalletBalance(walletId2);
assertThat(balance1).isEqualTo(2000);
assertThat(balance2).isEqualTo(100);

TransferState transferState = getTransferState(transferId);
assertThat(transferState.status()).isEqualTo(TRANSFER_ACCEPTATION_TIMED_OUT);
}

@Test
public void shouldCompensateFailedMoneyTransfer() {
var walletId1 = randomId();
var walletId2 = randomId();
createWallet(walletId1, 100);
var transferId = randomId();
var transferUrl = "/transfer/" + transferId;
var transfer = new Transfer(walletId1, walletId2, 10); //walletId2 not exists

String response = webClient.put().uri(transferUrl)
.bodyValue(transfer)
.retrieve()
.bodyToMono(Message.class)
.map(Message::value)
.block(timeout);
String response = execute(componentClient
.forWorkflow(transferId)
.call(TransferWorkflow::startTransfer)
.params(transfer))
.value();

assertThat(response).isEqualTo("transfer started");

await()
.atMost(10, TimeUnit.of(SECONDS))
.ignoreExceptions()
.untilAsserted(() -> {
TransferState transferState = getTransferState(transferUrl);
TransferState transferState = getTransferState(transferId);
assertThat(transferState.status()).isEqualTo(COMPENSATION_COMPLETED);

var balance1 = getWalletBalance(walletId1);
Expand All @@ -95,23 +159,21 @@ public void shouldTimedOutTransferWorkflow() {
var walletId1 = randomId();
var walletId2 = randomId();
var transferId = randomId();
var transferUrl = "/transfer/" + transferId;
var transfer = new Transfer(walletId1, walletId2, 10); //both not exists

String response = webClient.put().uri(transferUrl)
.bodyValue(transfer)
.retrieve()
.bodyToMono(Message.class)
.map(Message::value)
.block(timeout);
String response = execute(componentClient
.forWorkflow(transferId)
.call(TransferWorkflow::startTransfer)
.params(transfer))
.value();

assertThat(response).isEqualTo("transfer started");

await()
.atMost(10, TimeUnit.of(SECONDS))
.ignoreExceptions()
.untilAsserted(() -> {
TransferState transferState = getTransferState(transferUrl);
TransferState transferState = getTransferState(transferId);
assertThat(transferState.status()).isEqualTo(REQUIRES_MANUAL_INTERVENTION);
});
}
Expand All @@ -122,27 +184,31 @@ private String randomId() {
}

private void createWallet(String walletId, int amount) {
ResponseEntity<Void> response = webClient.post().uri("/wallet/" + walletId + "/create/" + amount)
.retrieve()
.toBodilessEntity()
.block(timeout);
String response = execute(componentClient
.forValueEntity(walletId)
.call(WalletEntity::create)
.params(walletId, amount));

assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(response).contains("Ok");
}

private int getWalletBalance(String walletId) {
Integer response = webClient.get().uri("/wallet/" + walletId)
.retrieve()
.bodyToMono(Integer.class)
.block(timeout);
return execute(componentClient
.forValueEntity(walletId)
.call(WalletEntity::get));
}

return response;
private TransferState getTransferState(String transferId) {
return execute(componentClient
.forWorkflow(transferId)
.call(TransferWorkflow::getTransferState));
}

private TransferState getTransferState(String url) {
return webClient.get().uri(url)
.retrieve()
.bodyToMono(TransferState.class)
.block(timeout);
private <T> T execute(DeferredCall<Any, T> deferredCall) {
try {
return deferredCall.execute().toCompletableFuture().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@

import static com.example.transfer.TransferState.TransferStatus.*;

public record TransferState(Transfer transfer, TransferStatus status) {
public record TransferState(String transferId, Transfer transfer, TransferStatus status) {

public record Transfer(String from, String to, int amount) { // <1>
public record Transfer(String from, String to, int amount) {
}

public enum TransferStatus { // <2>
STARTED, WITHDRAW_FAILED, WITHDRAW_SUCCEED, DEPOSIT_FAILED, COMPLETED, COMPENSATION_COMPLETED, REQUIRES_MANUAL_INTERVENTION
public enum TransferStatus {
STARTED, WITHDRAW_FAILED, WITHDRAW_SUCCEED, DEPOSIT_FAILED, COMPLETED, COMPENSATION_COMPLETED, WAITING_FOR_ACCEPTATION, TRANSFER_ACCEPTATION_TIMED_OUT, REQUIRES_MANUAL_INTERVENTION
}

public TransferState(Transfer transfer) {
this(transfer, STARTED);
public TransferState(String transferId, Transfer transfer) {
this(transferId, transfer, STARTED);
}

public TransferState withStatus(TransferStatus newStatus) {
return new TransferState(transfer, newStatus);
return new TransferState(transferId, transfer, newStatus);
}
}
Loading

0 comments on commit 90b9074

Please sign in to comment.