From 90b9074896fbb4c6b4d357e65601c550258227e5 Mon Sep 17 00:00:00 2001 From: Andrzej Ludwikowski Date: Fri, 4 Aug 2023 15:34:13 +0300 Subject: [PATCH] docs: documenting workflow pause and timers (#1756) * docs: documenting workflow pause and timers * Update docs/src/modules/java/pages/workflows.adoc Co-authored-by: Eduardo Pinto --------- Co-authored-by: Eduardo Pinto --- .../java-protobuf/partials/timers-intro.adoc | 6 +- docs/src/modules/java/pages/workflows.adoc | 25 +++ .../TransferWorkflowIntegrationTest.java | 148 +++++++++++++----- .../com/example/transfer/TransferState.java | 14 +- .../example/transfer/TransferWorkflow.java | 82 +++++++++- .../java/com/example/wallet/WalletEntity.java | 10 ++ 6 files changed, 226 insertions(+), 59 deletions(-) diff --git a/docs/src/modules/java-protobuf/partials/timers-intro.adoc b/docs/src/modules/java-protobuf/partials/timers-intro.adoc index cbbc18588f..1052765f0f 100644 --- a/docs/src/modules/java-protobuf/partials/timers-intro.adoc +++ b/docs/src/modules/java-protobuf/partials/timers-intro.adoc @@ -5,7 +5,7 @@ Timers are persisted in the https://docs.kalix.io/reference/glossary.html#proxy[ When a timer is triggered, the scheduled call is executed. If successfully executed, the timer completes and is automatically removed. In case of a failure, the timer is rescheduled with a delay of 3 seconds. This process repeats until the call succeeds. -You can schedule a timer for any service method in Kalix, but you can only create a timer from within an Action and by passing a `DeferredCall`. +You can schedule a timer for any service method in Kalix, but you can only create a timer from within an Action or Workflow by passing a `DeferredCall`. **Timer features**: @@ -18,5 +18,5 @@ You can schedule a timer for any service method in Kalix, but you can only creat **Timer limitations**: -* Timers can only be scheduled from within an Action. -* Timers can only call other components (e.g., Actions and Entities), therefore external service calls must be wrapped by an Action in the deployed service. \ No newline at end of file +* Timers can only be scheduled from within an Action or Workflow. +* Timers can only call other components (e.g., Actions, Entities, Workflows), therefore external service calls must be wrapped by an Action in the deployed service. \ No newline at end of file diff --git a/docs/src/modules/java/pages/workflows.adoc b/docs/src/modules/java/pages/workflows.adoc index 7e85443ec9..0c87cf9623 100644 --- a/docs/src/modules/java/pages/workflows.adoc +++ b/docs/src/modules/java/pages/workflows.adoc @@ -135,6 +135,31 @@ IMPORTANT: For simplicity purposes, we are returning the internal state directly A full transfer workflow source code is available {sample-url}[here, {tab-icon}, window="new"]. Follow the `README` file to run and test it. +== Pausing workflow + +A long-running workflow can be paused while waiting for some additional information to continue processing. A special `pause` transition can be used to inform Kalix that the execution of the Workflow should be postponed. By launching a request to a Workflow endpoint, the user can then resume the processing. Additionally, a Kalix Timer can be scheduled to automatically inform the Workflow that the expected time for the additional data has passed. + + +[source,java,indent=0] +.src/main/java/com/example/transfer/TransferWorkflow.java +---- +include::example$java-spring-transfer-workflow-compensation/src/main/java/com/example/transfer/TransferWorkflow.java[tag=pausing] +---- +<1> Schedules a timer as a Workflow step action. Make sure that the timer name is unique for every Workflow instance. +<2> Pauses the Workflow execution. + +NOTE: Remember to cancel the timer once the Workflow is resumed. Also, adjust the Workflow timeout to match the timer schedule. + +Exposing additional mutational endpoints from the Workflow implementation should be done with special caution. Accepting a request from such endpoints should only be possible when the Workflow is in the expected state. + +[source,java,indent=0] +.src/main/java/com/example/transfer/TransferWorkflow.java +---- +include::example$java-spring-transfer-workflow-compensation/src/main/java/com/example/transfer/TransferWorkflow.java[tag=resuming] +---- +<1> Accepts the request only when status is `WAITING_FOR_ACCEPTATION`. +<2> Otherwise, rejects the requests. + == Error handling Design for failure is one of the key attributes of all Kalix components. Workflow has the richest set of configurations from all of them. It's essential to build robust and reliable solutions. diff --git a/samples/java-spring-transfer-workflow-compensation/src/it/java/com/example/transfer/TransferWorkflowIntegrationTest.java b/samples/java-spring-transfer-workflow-compensation/src/it/java/com/example/transfer/TransferWorkflowIntegrationTest.java index 5f8cbf9db1..b56badbc8f 100644 --- a/samples/java-spring-transfer-workflow-compensation/src/it/java/com/example/transfer/TransferWorkflowIntegrationTest.java +++ b/samples/java-spring-transfer-workflow-compensation/src/it/java/com/example/transfer/TransferWorkflowIntegrationTest.java @@ -2,6 +2,9 @@ import com.example.Main; import com.example.transfer.TransferState.Transfer; +import com.example.wallet.WalletEntity; +import com.google.protobuf.any.Any; +import kalix.javasdk.DeferredCall; import kalix.spring.testkit.KalixIntegrationTestKitSupport; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -12,10 +15,13 @@ import java.time.Duration; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static com.example.transfer.TransferState.TransferStatus.COMPENSATION_COMPLETED; import static com.example.transfer.TransferState.TransferStatus.REQUIRES_MANUAL_INTERVENTION; +import static com.example.transfer.TransferState.TransferStatus.TRANSFER_ACCEPTATION_TIMED_OUT; import static java.time.temporal.ChronoUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -24,9 +30,6 @@ @SpringBootTest(classes = Main.class) public class TransferWorkflowIntegrationTest extends KalixIntegrationTestKitSupport { - @Autowired - private WebClient webClient; - private Duration timeout = Duration.of(10, SECONDS); @Test @@ -36,15 +39,13 @@ public void shouldTransferMoney() { createWallet(walletId1, 100); createWallet(walletId2, 100); var transferId = randomId(); - var transferUrl = "/transfer/" + transferId; var transfer = new Transfer(walletId1, walletId2, 10); - String response = webClient.put().uri(transferUrl) - .bodyValue(transfer) - .retrieve() - .bodyToMono(Message.class) - .map(Message::value) - .block(timeout); + String response = execute(componentClient + .forWorkflow(transferId) + .call(TransferWorkflow::startTransfer) + .params(transfer)) + .value(); assertThat(response).isEqualTo("transfer started"); @@ -59,21 +60,84 @@ public void shouldTransferMoney() { }); } + @Test + public void shouldTransferMoneyWithAcceptation() { + var walletId1 = randomId(); + var walletId2 = randomId(); + createWallet(walletId1, 2000); + createWallet(walletId2, 100); + var transferId = randomId(); + var transfer = new Transfer(walletId1, walletId2, 1001); + + String response = execute(componentClient + .forWorkflow(transferId) + .call(TransferWorkflow::startTransfer) + .params(transfer)) + .value(); + + assertThat(response).isEqualTo("transfer started, waiting for acceptation"); + + String acceptationResponse = execute(componentClient + .forWorkflow(transferId) + .call(TransferWorkflow::accept)) + .value(); + + assertThat(acceptationResponse).isEqualTo("transfer accepted"); + + await() + .atMost(10, TimeUnit.of(SECONDS)) + .untilAsserted(() -> { + var balance1 = getWalletBalance(walletId1); + var balance2 = getWalletBalance(walletId2); + + assertThat(balance1).isEqualTo(999); + assertThat(balance2).isEqualTo(1101); + }); + } + + @Test + public void shouldTimeoutTransferAcceptation() { + var walletId1 = randomId(); + var walletId2 = randomId(); + createWallet(walletId1, 2000); + createWallet(walletId2, 100); + var transferId = randomId(); + var transfer = new Transfer(walletId1, walletId2, 1001); + + String response = execute(componentClient + .forWorkflow(transferId) + .call(TransferWorkflow::startTransfer) + .params(transfer)) + .value(); + assertThat(response).isEqualTo("transfer started, waiting for acceptation"); + + String acceptationResponse = execute(componentClient + .forWorkflow(transferId) + .call(TransferWorkflow::acceptationTimeout)); + assertThat(acceptationResponse).contains("timed out"); + + var balance1 = getWalletBalance(walletId1); + var balance2 = getWalletBalance(walletId2); + assertThat(balance1).isEqualTo(2000); + assertThat(balance2).isEqualTo(100); + + TransferState transferState = getTransferState(transferId); + assertThat(transferState.status()).isEqualTo(TRANSFER_ACCEPTATION_TIMED_OUT); + } + @Test public void shouldCompensateFailedMoneyTransfer() { var walletId1 = randomId(); var walletId2 = randomId(); createWallet(walletId1, 100); var transferId = randomId(); - var transferUrl = "/transfer/" + transferId; var transfer = new Transfer(walletId1, walletId2, 10); //walletId2 not exists - String response = webClient.put().uri(transferUrl) - .bodyValue(transfer) - .retrieve() - .bodyToMono(Message.class) - .map(Message::value) - .block(timeout); + String response = execute(componentClient + .forWorkflow(transferId) + .call(TransferWorkflow::startTransfer) + .params(transfer)) + .value(); assertThat(response).isEqualTo("transfer started"); @@ -81,7 +145,7 @@ public void shouldCompensateFailedMoneyTransfer() { .atMost(10, TimeUnit.of(SECONDS)) .ignoreExceptions() .untilAsserted(() -> { - TransferState transferState = getTransferState(transferUrl); + TransferState transferState = getTransferState(transferId); assertThat(transferState.status()).isEqualTo(COMPENSATION_COMPLETED); var balance1 = getWalletBalance(walletId1); @@ -95,15 +159,13 @@ public void shouldTimedOutTransferWorkflow() { var walletId1 = randomId(); var walletId2 = randomId(); var transferId = randomId(); - var transferUrl = "/transfer/" + transferId; var transfer = new Transfer(walletId1, walletId2, 10); //both not exists - String response = webClient.put().uri(transferUrl) - .bodyValue(transfer) - .retrieve() - .bodyToMono(Message.class) - .map(Message::value) - .block(timeout); + String response = execute(componentClient + .forWorkflow(transferId) + .call(TransferWorkflow::startTransfer) + .params(transfer)) + .value(); assertThat(response).isEqualTo("transfer started"); @@ -111,7 +173,7 @@ public void shouldTimedOutTransferWorkflow() { .atMost(10, TimeUnit.of(SECONDS)) .ignoreExceptions() .untilAsserted(() -> { - TransferState transferState = getTransferState(transferUrl); + TransferState transferState = getTransferState(transferId); assertThat(transferState.status()).isEqualTo(REQUIRES_MANUAL_INTERVENTION); }); } @@ -122,27 +184,31 @@ private String randomId() { } private void createWallet(String walletId, int amount) { - ResponseEntity response = webClient.post().uri("/wallet/" + walletId + "/create/" + amount) - .retrieve() - .toBodilessEntity() - .block(timeout); + String response = execute(componentClient + .forValueEntity(walletId) + .call(WalletEntity::create) + .params(walletId, amount)); - assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(response).contains("Ok"); } private int getWalletBalance(String walletId) { - Integer response = webClient.get().uri("/wallet/" + walletId) - .retrieve() - .bodyToMono(Integer.class) - .block(timeout); + return execute(componentClient + .forValueEntity(walletId) + .call(WalletEntity::get)); + } - return response; + private TransferState getTransferState(String transferId) { + return execute(componentClient + .forWorkflow(transferId) + .call(TransferWorkflow::getTransferState)); } - private TransferState getTransferState(String url) { - return webClient.get().uri(url) - .retrieve() - .bodyToMono(TransferState.class) - .block(timeout); + private T execute(DeferredCall deferredCall) { + try { + return deferredCall.execute().toCompletableFuture().get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } } } \ No newline at end of file diff --git a/samples/java-spring-transfer-workflow-compensation/src/main/java/com/example/transfer/TransferState.java b/samples/java-spring-transfer-workflow-compensation/src/main/java/com/example/transfer/TransferState.java index 37fbd7537f..af652bf7fd 100644 --- a/samples/java-spring-transfer-workflow-compensation/src/main/java/com/example/transfer/TransferState.java +++ b/samples/java-spring-transfer-workflow-compensation/src/main/java/com/example/transfer/TransferState.java @@ -2,20 +2,20 @@ import static com.example.transfer.TransferState.TransferStatus.*; -public record TransferState(Transfer transfer, TransferStatus status) { +public record TransferState(String transferId, Transfer transfer, TransferStatus status) { - public record Transfer(String from, String to, int amount) { // <1> + public record Transfer(String from, String to, int amount) { } - public enum TransferStatus { // <2> - STARTED, WITHDRAW_FAILED, WITHDRAW_SUCCEED, DEPOSIT_FAILED, COMPLETED, COMPENSATION_COMPLETED, REQUIRES_MANUAL_INTERVENTION + public enum TransferStatus { + STARTED, WITHDRAW_FAILED, WITHDRAW_SUCCEED, DEPOSIT_FAILED, COMPLETED, COMPENSATION_COMPLETED, WAITING_FOR_ACCEPTATION, TRANSFER_ACCEPTATION_TIMED_OUT, REQUIRES_MANUAL_INTERVENTION } - public TransferState(Transfer transfer) { - this(transfer, STARTED); + public TransferState(String transferId, Transfer transfer) { + this(transferId, transfer, STARTED); } public TransferState withStatus(TransferStatus newStatus) { - return new TransferState(transfer, newStatus); + return new TransferState(transferId, transfer, newStatus); } } diff --git a/samples/java-spring-transfer-workflow-compensation/src/main/java/com/example/transfer/TransferWorkflow.java b/samples/java-spring-transfer-workflow-compensation/src/main/java/com/example/transfer/TransferWorkflow.java index 61644158ab..7f72ad2120 100644 --- a/samples/java-spring-transfer-workflow-compensation/src/main/java/com/example/transfer/TransferWorkflow.java +++ b/samples/java-spring-transfer-workflow-compensation/src/main/java/com/example/transfer/TransferWorkflow.java @@ -1,5 +1,6 @@ package com.example.transfer; +import akka.Done; import com.example.transfer.TransferState.Transfer; import com.example.wallet.WalletEntity; import com.example.wallet.WalletEntity.DepositResult; @@ -15,6 +16,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PatchMapping; import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; @@ -25,8 +27,11 @@ import static com.example.transfer.TransferState.TransferStatus.COMPLETED; import static com.example.transfer.TransferState.TransferStatus.DEPOSIT_FAILED; import static com.example.transfer.TransferState.TransferStatus.REQUIRES_MANUAL_INTERVENTION; +import static com.example.transfer.TransferState.TransferStatus.TRANSFER_ACCEPTATION_TIMED_OUT; +import static com.example.transfer.TransferState.TransferStatus.WAITING_FOR_ACCEPTATION; import static com.example.transfer.TransferState.TransferStatus.WITHDRAW_FAILED; import static com.example.transfer.TransferState.TransferStatus.WITHDRAW_SUCCEED; +import static java.time.Duration.ofHours; import static java.time.Duration.ofSeconds; import static kalix.javasdk.workflow.Workflow.RecoverStrategy.maxRetries; @@ -54,11 +59,13 @@ public TransferWorkflow(ComponentClient componentClient) { public WorkflowDef definition() { Step withdraw = step("withdraw") - .call(Withdraw.class, cmd -> { + .asyncCall(Withdraw.class, cmd -> { logger.info("Running: " + cmd); - return componentClient.forValueEntity(cmd.from) - .call(WalletEntity::withdraw) - .params(cmd.amount); + // cancelling the timer in case it was scheduled + return timers().cancel("acceptationTimout-" + currentState().transferId()).thenCompose(__ -> + componentClient.forValueEntity(cmd.from) + .call(WalletEntity::withdraw) + .params(cmd.amount).execute()); }) .andThen(WithdrawResult.class, withdrawResult -> { if (withdrawResult instanceof WithdrawSucceed) { @@ -141,6 +148,21 @@ public WorkflowDef definition() { .timeout(ofSeconds(1)); // <1> // end::step-timeout[] + // tag::pausing[] + Step waitForAcceptation = + step("wait-for-acceptation") + .asyncCall(() -> { + String transferId = currentState().transferId(); + return timers().startSingleTimer( + "acceptationTimout-" + transferId, + ofHours(8), + componentClient.forWorkflow(transferId) + .call(TransferWorkflow::acceptationTimeout)); // <1> + }) + .andThen(Done.class, __ -> + effects().pause()); // <2> + // end::pausing[] + // tag::timeouts[] // tag::recover-strategy[] return workflow() @@ -157,18 +179,27 @@ public WorkflowDef definition() { .addStep(deposit, maxRetries(2).failoverTo("compensate-withdraw")) // <3> // end::recover-strategy[] .addStep(compensateWithdraw) + .addStep(waitForAcceptation) .addStep(failoverHandler); } @PutMapping public Effect startTransfer(@RequestBody Transfer transfer) { - if (transfer.amount() <= 0) { - return effects().error("transfer amount should be greater than zero"); - } else if (currentState() != null) { + if (currentState() != null) { return effects().error("transfer already started"); + } else if (transfer.amount() <= 0) { + return effects().error("transfer amount should be greater than zero"); + } else if (transfer.amount() > 1000) { + logger.info("Waiting for acceptation: " + transfer); + TransferState waitingForAcceptationState = new TransferState(commandContext().workflowId(), transfer) + .withStatus(WAITING_FOR_ACCEPTATION); + return effects() + .updateState(waitingForAcceptationState) + .transitionTo("wait-for-acceptation") + .thenReply(new Message("transfer started, waiting for acceptation")); } else { logger.info("Running: " + transfer); - TransferState initialState = new TransferState(transfer); + TransferState initialState = new TransferState(commandContext().workflowId(), transfer); Withdraw withdrawInput = new Withdraw(transfer.from(), transfer.amount()); return effects() .updateState(initialState) @@ -177,6 +208,41 @@ public Effect startTransfer(@RequestBody Transfer transfer) { } } + @PatchMapping("/acceptation-timeout") + public Effect acceptationTimeout() { + if (currentState() == null) { + return effects().error("transfer not started"); + } else if (currentState().status() == WAITING_FOR_ACCEPTATION) { + return effects() + .updateState(currentState().withStatus(TRANSFER_ACCEPTATION_TIMED_OUT)) + .end() + .thenReply("timed out"); + } else { + logger.info("Ignoring acceptation timeout for status: " + currentState().status()); + return effects().reply("Ok"); + } + } + + // tag::resuming[] + @PatchMapping("/accept") + public Effect accept() { + if (currentState() == null) { + return effects().error("transfer not started"); + } else if (currentState().status() == WAITING_FOR_ACCEPTATION) { // <1> + Transfer transfer = currentState().transfer(); + // end::resuming[] + logger.info("Accepting transfer: " + transfer); + // tag::resuming[] + Withdraw withdrawInput = new Withdraw(transfer.from(), transfer.amount()); + return effects() + .transitionTo("withdraw", withdrawInput) + .thenReply(new Message("transfer accepted")); + } else { // <2> + return effects().error("Cannot accept transfer with status: " + currentState().status()); + } + } + // end::resuming[] + @GetMapping public Effect getTransferState() { if (currentState() == null) { diff --git a/samples/java-spring-transfer-workflow-compensation/src/main/java/com/example/wallet/WalletEntity.java b/samples/java-spring-transfer-workflow-compensation/src/main/java/com/example/wallet/WalletEntity.java index ad7fb0d5a4..6f25cbfa28 100644 --- a/samples/java-spring-transfer-workflow-compensation/src/main/java/com/example/wallet/WalletEntity.java +++ b/samples/java-spring-transfer-workflow-compensation/src/main/java/com/example/wallet/WalletEntity.java @@ -4,6 +4,8 @@ import com.example.wallet.WalletEntity.DepositResult.DepositSucceed; import com.example.wallet.WalletEntity.WithdrawResult.WithdrawFailed; import com.example.wallet.WalletEntity.WithdrawResult.WithdrawSucceed; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import kalix.javasdk.valueentity.ValueEntity; import kalix.javasdk.annotations.Id; import kalix.javasdk.annotations.TypeId; @@ -27,6 +29,10 @@ public Wallet deposit(int amount) { } } + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME) + @JsonSubTypes({ + @JsonSubTypes.Type(value = WithdrawResult.WithdrawSucceed.class), + @JsonSubTypes.Type(value = WithdrawResult.WithdrawSucceed.class)}) public sealed interface WithdrawResult { record WithdrawFailed(String errorMsg) implements WithdrawResult { } @@ -35,6 +41,10 @@ record WithdrawSucceed() implements WithdrawResult { } } + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME) + @JsonSubTypes({ + @JsonSubTypes.Type(value = DepositResult.DepositSucceed.class), + @JsonSubTypes.Type(value = DepositResult.DepositFailed.class)}) public sealed interface DepositResult { record DepositFailed(String errorMsg) implements DepositResult { }