diff --git a/akka-docs/src/main/paradox/typed/durable-state/persistence.md b/akka-docs/src/main/paradox/typed/durable-state/persistence.md index 16a538057c4..7cd45593c42 100644 --- a/akka-docs/src/main/paradox/typed/durable-state/persistence.md +++ b/akka-docs/src/main/paradox/typed/durable-state/persistence.md @@ -104,6 +104,21 @@ The state is typically defined as an immutable class and a new instance of the s You may choose to use a mutable class for the state, and then the command handler may update the state instance, but it must still pass the updated state to the `persist` effect. +@@@ div { .group-java } + +If the state is mutable, it is important that the `emptyState` method creates a new State instance each time +it is called to ensure that the state is recreated in case of failure restarts. + +@@@ + +@@@ div { .group-scala } + +If the state is mutable, it is important to use the `DurableStateBehavior.withMutableState` factory method that +takes `emptyStateFactory: () => State` parameter to make sure that the state instance is recreated in case of +failure restarts. + +@@@ + More effects are explained in @ref:[Effects and Side Effects](#effects-and-side-effects). In addition to returning the primary `Effect` for the command, `DurableStateBehavior`s can also @@ -207,6 +222,14 @@ Scala Java : @@snip [PersistentActorCompileOnlyTest.java](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #commonChainedEffects } +@@@ div { .group-scala } + +It is recommended to use an immutable state class. If the state is mutable, it is important to use +the `DurableStateBehavior.withEnforcedRepliesMutableState` factory method that takes `emptyStateFactory: () => State` +parameter to make sure that the state instance is recreated in case of failure restarts. + +@@@ + ### Side effects ordering and guarantees Any side effects are executed on an at-most-once basis and will not be executed if the persist fails. diff --git a/akka-docs/src/main/paradox/typed/persistence-style.md b/akka-docs/src/main/paradox/typed/persistence-style.md index 5bde3e345b5..e4984d15dba 100644 --- a/akka-docs/src/main/paradox/typed/persistence-style.md +++ b/akka-docs/src/main/paradox/typed/persistence-style.md @@ -69,6 +69,9 @@ The above examples are using immutable state classes and below is corresponding Java : @@snip [AccountExampleWithNullState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java) { #account-entity } +If the state is mutable, it is important that the `emptyState` method creates a new State instance each time +it is called to ensure that the state is recreated in case of failure restarts. + ## Leveraging Java 21 features When building event sourced entities in a project using Java 21 or newer, the @javadoc[EventSourcedOnCommandBehavior](akka.persistence.typed.javadsl.EventSourcedOnCommandBehavior) diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index 1e170e68798..dc11234cf4a 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -94,6 +94,17 @@ Note that the concrete class does not contain any fields with state like a regul lost when the actor is stopped or restarted. Updates to the State are always performed in the eventHandler based on the events. +If the state is mutable, it is important that the `emptyState` method creates a new State instance each time +it is called to ensure that the state is recreated in case of failure restarts. + +@@@ + +@@@ div { .group-scala } + +It is recommended to use an immutable state class. If the state is mutable, it is important to use +the `EventSourcedBehavior.withMutableState` factory method that takes `emptyStateFactory: () => State` parameter +to make sure that the state instance is recreated in case of failure restarts. + @@@ Next we'll discuss each of these in detail. @@ -453,6 +464,14 @@ is not used, but then there will be no compilation errors if the reply decision Note that the `noReply` is a way of making conscious decision that a reply shouldn't be sent for a specific command or the reply will be sent later, perhaps after some asynchronous interaction with other actors or services. +@@@ div { .group-scala } + +It is recommended to use an immutable state class. If the state is mutable, it is important to use +the `EventSourcedBehavior.withEnforcedRepliesMutableState` factory method that takes `emptyStateFactory: () => State` +parameter to make sure that the state instance is recreated in case of failure restarts. + +@@@ + ## Serialization The same @ref:[serialization](../serialization.md) mechanism as for actor messages is also used for persistent actors. diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/Unpersistent.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/Unpersistent.scala index 2c6fd95d64d..4cc7efba13b 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/Unpersistent.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/Unpersistent.scala @@ -54,7 +54,7 @@ private[akka] object Unpersistent { findEventSourcedBehavior(behavior, context).fold { throw new AssertionError("Did not find the expected EventSourcedBehavior") } { esBehavior => - val (initialState, initialSequenceNr) = fromStateAndSequenceNr.getOrElse(esBehavior.emptyState -> 0L) + val (initialState, initialSequenceNr) = fromStateAndSequenceNr.getOrElse(esBehavior.emptyState() -> 0L) new WrappedEventSourcedBehavior(context, esBehavior, initialState, initialSequenceNr, onEvent, onSnapshot) } } @@ -79,7 +79,7 @@ private[akka] object Unpersistent { findDurableStateBehavior(behavior, context).fold { throw new AssertionError("Did not find the expected DurableStateBehavior") } { dsBehavior => - val initialState = fromState.getOrElse(dsBehavior.emptyState) + val initialState = fromState.getOrElse(dsBehavior.emptyState()) new WrappedDurableStateBehavior(context, dsBehavior, initialState, onPersist) } } diff --git a/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/javadsl/EventSourcedActorFailureTest.java b/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/javadsl/EventSourcedActorFailureTest.java index c0542b24947..39b212906f2 100644 --- a/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/javadsl/EventSourcedActorFailureTest.java +++ b/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/javadsl/EventSourcedActorFailureTest.java @@ -65,8 +65,13 @@ public String emptyState() { @Override public CommandHandler commandHandler() { return (state, command) -> { - probe.tell("persisting"); - return Effect().persist(command); + if (command.equals("get")) { + probe.tell("state [" + state + "]"); + return Effect().none(); + } else { + probe.tell("persisting"); + return Effect().persist(command); + } }; } @@ -120,16 +125,24 @@ public void persistEvents() throws Exception { probe.expectMessage("persisting"); probe.expectMessage("one"); probe.expectMessage("starting"); + c.tell("get"); + probe.expectMessage("state []"); + // fail c.tell("two"); probe.expectMessage("persisting"); probe.expectMessage("two"); probe.expectMessage("starting"); + c.tell("get"); + probe.expectMessage("state []"); + // work c.tell("three"); probe.expectMessage("persisting"); probe.expectMessage("three"); // no starting as this one did not fail probe.expectNoMessage(); + c.tell("get"); + probe.expectMessage("state [three]"); } } diff --git a/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/javadsl/EventSourcedActorWithMutableStateFailureTest.java b/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/javadsl/EventSourcedActorWithMutableStateFailureTest.java new file mode 100644 index 00000000000..3e78a97b5ee --- /dev/null +++ b/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/javadsl/EventSourcedActorWithMutableStateFailureTest.java @@ -0,0 +1,167 @@ +/* + * Copyright (C) 2018-2025 Lightbend Inc. + */ + +package akka.persistence.typed.javadsl; + +import static akka.persistence.typed.scaladsl.EventSourcedBehaviorFailureSpec.conf; + +import akka.actor.testkit.typed.TestException; +import akka.actor.testkit.typed.javadsl.LogCapturing; +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.SupervisorStrategy; +import akka.persistence.typed.PersistenceId; +import akka.persistence.typed.RecoveryCompleted; +import akka.persistence.typed.RecoveryFailed; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import java.time.Duration; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.scalatestplus.junit.JUnitSuite; + +class FailingEventSourcedActorWithMutableState + extends EventSourcedBehavior< + String, String, FailingEventSourcedActorWithMutableState.MutableState> { + + static final class MutableState { + private String value; + + MutableState(String value) { + this.value = value; + } + + void add(String s) { + value = value + s; + } + + String getValue() { + return value; + } + } + + private final ActorRef probe; + private final ActorRef recoveryFailureProbe; + + FailingEventSourcedActorWithMutableState( + PersistenceId persistenceId, + ActorRef probe, + ActorRef recoveryFailureProbe) { + + super( + persistenceId, + SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1)); + this.probe = probe; + this.recoveryFailureProbe = recoveryFailureProbe; + } + + @Override + public SignalHandler signalHandler() { + return newSignalHandlerBuilder() + .onSignal( + RecoveryCompleted.instance(), + state -> { + probe.tell("starting"); + }) + .onSignal( + RecoveryFailed.class, + (state, signal) -> { + recoveryFailureProbe.tell(signal.getFailure()); + }) + .build(); + } + + @Override + public MutableState emptyState() { + return new MutableState(""); + } + + @Override + public CommandHandler commandHandler() { + return (state, command) -> { + if (command.equals("get")) { + probe.tell("state [" + state.getValue() + "]"); + return Effect().none(); + } else { + probe.tell("persisting"); + return Effect().persist(command); + } + }; + } + + @Override + public EventHandler eventHandler() { + return (state, event) -> { + probe.tell(event); + state.add(event); + return state; + }; + } +} + +public class EventSourcedActorWithMutableStateFailureTest extends JUnitSuite { + + public static final Config config = conf().withFallback(ConfigFactory.load()); + + @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(config); + + @Rule public final LogCapturing logCapturing = new LogCapturing(); + + public static Behavior fail( + PersistenceId pid, ActorRef probe, ActorRef recoveryFailureProbe) { + return new FailingEventSourcedActorWithMutableState(pid, probe, recoveryFailureProbe); + } + + public static Behavior fail(PersistenceId pid, ActorRef probe) { + return fail(pid, probe, testKit.createTestProbe().ref()); + } + + @Test + public void notifyRecoveryFailure() { + TestProbe probe = testKit.createTestProbe(); + TestProbe recoveryFailureProbe = testKit.createTestProbe(); + Behavior p1 = + fail( + PersistenceId.ofUniqueId("fail-recovery-once"), + probe.ref(), + recoveryFailureProbe.ref()); + testKit.spawn(p1); + recoveryFailureProbe.expectMessageClass(TestException.class); + } + + @Test + public void persistEvents() throws Exception { + TestProbe probe = testKit.createTestProbe(); + Behavior p1 = fail(PersistenceId.ofUniqueId("fail-first-2"), probe.ref()); + ActorRef c = testKit.spawn(p1); + probe.expectMessage("starting"); + // fail + c.tell("one"); + probe.expectMessage("persisting"); + probe.expectMessage("one"); + probe.expectMessage("starting"); + c.tell("get"); + probe.expectMessage("state []"); + + // fail + c.tell("two"); + probe.expectMessage("persisting"); + probe.expectMessage("two"); + probe.expectMessage("starting"); + c.tell("get"); + probe.expectMessage("state []"); + + // work + c.tell("three"); + probe.expectMessage("persisting"); + probe.expectMessage("three"); + // no starting as this one did not fail + probe.expectNoMessage(); + c.tell("get"); + probe.expectMessage("state [three]"); + } +} diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala index a85d673f746..02aa247eed6 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala @@ -41,13 +41,13 @@ class ChaosJournal extends InmemJournal { override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = { val pid = messages.head.persistenceId counts = counts.updated(pid, counts.getOrElse(pid, 0) + 1) - if (pid == "fail-first-2" && counts(pid) <= 2) { + if (pid.startsWith("fail-first-2") && counts(pid) <= 2) { Future.failed(TestException("database says no")) } else if (pid.startsWith("fail-fifth") && counts(pid) == 5) { Future.failed(TestException("database says no")) } else if (pid.startsWith("fail-persist")) { Future.failed(TestException("database says no")) - } else if (pid == "reject-first" && reject) { + } else if (pid.startsWith("reject-first") && reject) { reject = false Future.successful(messages.map(_ => Try { @@ -99,13 +99,18 @@ class EventSourcedBehaviorFailureSpec EventSourcedBehavior[String, String, String]( pid, "", - (_, cmd) => { - if (cmd == "wrong") + (state, cmd) => { + if (cmd == "get") { + probe.tell(s"state [$state]") + Effect.none + } else if (cmd == "wrong") { throw TestException("wrong command") - probe.tell("persisting") - Effect.persist(cmd).thenRun { _ => - probe.tell("persisted") - if (cmd == "wrong-callback") throw TestException("wrong command") + } else { + probe.tell("persisting") + Effect.persist(cmd).thenRun { _ => + probe.tell("persisted") + if (cmd == "wrong-callback") throw TestException("wrong command") + } } }, (state, event) => { @@ -122,6 +127,52 @@ class EventSourcedBehaviorFailureSpec probe.tell("restarting") }) + final class MutableState(var value: String) { + def add(s: String): Unit = + value = value + s + } + + def failingPersistentActorWithMutableState( + pid: PersistenceId, + probe: ActorRef[String], + additionalSignalHandler: PartialFunction[(MutableState, Signal), Unit] = PartialFunction.empty) + : EventSourcedBehavior[String, String, MutableState] = + EventSourcedBehavior + .withMutableState[String, String, MutableState]( + pid, + () => new MutableState(""), + (state, cmd) => { + if (cmd == "get") { + probe.tell(s"state [${state.value}]") + Effect.none + } else if (cmd == "wrong") { + // eh, don't try this at home + state.add("wrong") + throw TestException("wrong command") + } else { + probe.tell("persisting") + Effect.persist(cmd).thenRun { _ => + probe.tell("persisted") + if (cmd == "wrong-callback") throw TestException("wrong command") + } + } + }, + (state, event) => { + if (event == "wrong-event") + throw TestException("wrong event") + probe.tell(event) + state.add(event) + state + }) + .receiveSignal(additionalSignalHandler.orElse { + case (_, RecoveryCompleted) => + probe.tell("starting") + case (_, PostStop) => + probe.tell("stopped") + case (_, PreRestart) => + probe.tell("restarting") + }) + "A typed persistent actor (failures)" must { "signal RecoveryFailure when replay fails" in { @@ -236,12 +287,18 @@ class EventSourcedBehaviorFailureSpec probe.expectMessage("one") probe.expectMessage("restarting") probe.expectMessage("starting") + c ! "get" + probe.expectMessage("state []") + // fail c ! "two" probe.expectMessage("persisting") probe.expectMessage("two") probe.expectMessage("restarting") probe.expectMessage("starting") + c ! "get" + probe.expectMessage("state []") + // work! c ! "three" probe.expectMessage("persisting") @@ -249,6 +306,8 @@ class EventSourcedBehaviorFailureSpec probe.expectMessage("persisted") // no restart probe.expectNoMessage() + c ! "get" + probe.expectMessage("state [three]") } "restart with backoff for recovery" in { @@ -310,6 +369,9 @@ class EventSourcedBehaviorFailureSpec probe.expectMessage("starting") c ! "wrong" probe.expectMessage("restarting") + probe.expectMessage("starting") + c ! "get" + probe.expectMessage("state []") } } @@ -355,4 +417,59 @@ class EventSourcedBehaviorFailureSpec } } } + + "A typed persistent actor (failures) with mutable state" must { + "restart with backoff" in { + val probe = TestProbe[String]() + val behav = failingPersistentActorWithMutableState(PersistenceId.ofUniqueId("fail-first-2-mutable"), probe.ref) + .onPersistFailure( + SupervisorStrategy.restartWithBackoff(1.milli, 10.millis, 0.1).withLoggingEnabled(enabled = false)) + val c = spawn(behav) + probe.expectMessage("starting") + // fail + c ! "one" + probe.expectMessage("persisting") + probe.expectMessage("one") + probe.expectMessage("restarting") + probe.expectMessage("starting") + c ! "get" + probe.expectMessage("state []") + + // fail + c ! "two" + probe.expectMessage("persisting") + probe.expectMessage("two") + probe.expectMessage("restarting") + probe.expectMessage("starting") + c ! "get" + probe.expectMessage("state []") + + // work! + c ! "three" + probe.expectMessage("persisting") + probe.expectMessage("three") + probe.expectMessage("persisted") + // no restart + probe.expectNoMessage() + c ! "get" + probe.expectMessage("state [three]") + } + + "restart supervisor strategy if command handler throws" in { + LoggingTestKit.error[TestException].expect { + val probe = TestProbe[String]() + val behav = Behaviors + .supervise(failingPersistentActorWithMutableState(PersistenceId.ofUniqueId("wrong-command-2"), probe.ref)) + .onFailure[TestException](SupervisorStrategy.restart) + val c = spawn(behav) + probe.expectMessage("starting") + c ! "wrong" + probe.expectMessage("restarting") + probe.expectMessage("starting") + c ! "get" + probe.expectMessage("state []") + } + } + + } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index b154b926248..c7cb317ea3a 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -103,7 +103,7 @@ private[akka] object EventSourcedBehaviorImpl { @InternalApi private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( persistenceId: PersistenceId, - emptyState: State, + emptyState: () => State, commandHandler: EventSourcedBehavior.CommandHandler[Command, Event, State], eventHandler: EventSourcedBehavior.EventHandler[State, Event], loggerClass: Class[_], @@ -208,7 +208,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( val eventSourcedSetup = new BehaviorSetup( ctx.asInstanceOf[ActorContext[InternalProtocol]], persistenceId, - emptyState, + emptyState(), commandHandler, eventHandler, WriterIdentity.newIdentity(), diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala index 74ba201429e..df2b7ae681f 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala @@ -61,6 +61,9 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] ( * This object will be passed into this behaviors handlers, until a new state replaces it. * * Also known as "zero state" or "neutral state". + * + * If the state is mutable, it is important that this creates a new State instance each time it is called + * to ensure that the state is recreated in case of failure restarts. */ protected def emptyState: State @@ -230,7 +233,7 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] ( val eventHandlerInstance = eventHandler() var behavior = new internal.EventSourcedBehaviorImpl[Command, Event, State]( persistenceId, - emptyState, + () => emptyState, (state, cmd) => commandHandlerInstance(state, cmd).asInstanceOf[EffectImpl[Event, State]], eventHandlerInstance(_, _), getClass) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedOnCommandBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedOnCommandBehavior.scala index c6df3a9d09f..77bba7de21c 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedOnCommandBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedOnCommandBehavior.scala @@ -67,6 +67,9 @@ abstract class EventSourcedOnCommandBehavior[Command, Event, State]( * This object will be passed into this behaviors handlers, until a new state replaces it. * * Also known as "zero state" or "neutral state". + * + * If the state is mutable, it is important that this creates a new State instance each time it is called + * to ensure that the state is recreated in case of failure restarts. */ protected def emptyState: State @@ -217,7 +220,7 @@ abstract class EventSourcedOnCommandBehavior[Command, Event, State]( var behavior = new internal.EventSourcedBehaviorImpl[Command, Event, State]( persistenceId, - emptyState, + () => emptyState, (state, cmd) => this.onCommand(state, cmd).asInstanceOf[EffectImpl[Event, State]], this.onEvent, getClass) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedOnCommandWithReplyBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedOnCommandWithReplyBehavior.scala index f902604e934..c32716f7ba5 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedOnCommandWithReplyBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedOnCommandWithReplyBehavior.scala @@ -69,6 +69,9 @@ abstract class EventSourcedOnCommandWithReplyBehavior[Command, Event, State]( * This object will be passed into this behaviors handlers, until a new state replaces it. * * Also known as "zero state" or "neutral state". + * + * If the state is mutable, it is important that this creates a new State instance each time it is called + * to ensure that the state is recreated in case of failure restarts. */ protected def emptyState: State @@ -219,7 +222,7 @@ abstract class EventSourcedOnCommandWithReplyBehavior[Command, Event, State]( var behavior = new internal.EventSourcedBehaviorImpl[Command, Event, State]( persistenceId, - emptyState, + () => emptyState, (state, cmd) => this.onCommand(state, cmd).asInstanceOf[EffectImpl[Event, State]], this.onEvent, getClass) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala index f99e066e728..713c6f36313 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -53,6 +53,9 @@ object EventSourcedBehavior { /** * Create a `Behavior` for a persistent actor. * + * This can be used when the state is immutable, but if the state is mutable, it is important to use + * the `withMutableState` that takes `emptyStateFactory: () => State` parameter. + * * @param persistenceId stable unique identifier for the event sourced behavior * @param emptyState the intial state for the entity before any events have been processed * @param commandHandler map commands to effects e.g. persisting events, replying to commands @@ -63,22 +66,60 @@ object EventSourcedBehavior { emptyState: State, commandHandler: (State, Command) => Effect[Event, State], eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] = { + withMutableState(persistenceId, () => emptyState, commandHandler, eventHandler) + } + + /** + * Create a `Behavior` with mutable state for a persistent actor. + * + * When the state is mutable, it is important to use this variant to make sure that the state instance is + * recreated in case of failure restarts. + * + * @param persistenceId stable unique identifier for the event sourced behavior + * @param emptyStateFactory factory function of the intial state for the entity before any events have been processed + * @param commandHandler map commands to effects e.g. persisting events, replying to commands + * @param eventHandler compute the new state given the current state when an event has been persisted + */ + def withMutableState[Command, Event, State]( + persistenceId: PersistenceId, + emptyStateFactory: () => State, + commandHandler: (State, Command) => Effect[Event, State], + eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] = { val loggerClass = LoggerClass.detectLoggerClassFromStack(classOf[EventSourcedBehavior[_, _, _]], logPrefixSkipList) - EventSourcedBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler, loggerClass) + EventSourcedBehaviorImpl(persistenceId, emptyStateFactory, commandHandler, eventHandler, loggerClass) } /** * Create a `Behavior` for a persistent actor that is enforcing that replies to commands are not forgotten. * Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be * created with [[Effect.reply]], [[Effect.noReply]], [[EffectBuilder.thenReply]], or [[EffectBuilder.thenNoReply]]. + * + * This can be used when the state is immutable, but if the state is mutable, it is important to use + * the `withEnforcedRepliesMutableState` that takes `emptyStateFactory: () => State` parameter. */ def withEnforcedReplies[Command, Event, State]( persistenceId: PersistenceId, emptyState: State, commandHandler: (State, Command) => ReplyEffect[Event, State], eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] = { + withEnforcedRepliesMutableState(persistenceId, () => emptyState, commandHandler, eventHandler) + } + + /** + * Create a `Behavior` with mutable state for a persistent actor that is enforcing that replies to commands are not forgotten. + * Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be + * created with [[Effect.reply]], [[Effect.noReply]], [[EffectBuilder.thenReply]], or [[EffectBuilder.thenNoReply]]. + * + * When the state is mutable, it is important to use this variant to make sure that the state instance is + * recreated in case of failure restarts. + */ + def withEnforcedRepliesMutableState[Command, Event, State]( + persistenceId: PersistenceId, + emptyStateFactory: () => State, + commandHandler: (State, Command) => ReplyEffect[Event, State], + eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] = { val loggerClass = LoggerClass.detectLoggerClassFromStack(classOf[EventSourcedBehavior[_, _, _]], logPrefixSkipList) - EventSourcedBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler, loggerClass) + EventSourcedBehaviorImpl(persistenceId, emptyStateFactory, commandHandler, eventHandler, loggerClass) } /** diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/DurableStateBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/DurableStateBehaviorImpl.scala index c68cc44b8be..7c47294d38d 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/DurableStateBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/DurableStateBehaviorImpl.scala @@ -44,7 +44,7 @@ private[akka] object DurableStateBehaviorImpl { @InternalApi private[akka] final case class DurableStateBehaviorImpl[Command, State]( persistenceId: PersistenceId, - emptyState: State, + emptyState: () => State, commandHandler: DurableStateBehavior.CommandHandler[Command, State], loggerClass: Class[_], durableStateStorePluginId: Option[String] = None, @@ -101,7 +101,7 @@ private[akka] final case class DurableStateBehaviorImpl[Command, State]( val durableStateSetup = new BehaviorSetup( ctx.asInstanceOf[ActorContext[InternalProtocol]], persistenceId, - emptyState, + emptyState(), commandHandler, actualSignalHandler, tag, diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/javadsl/DurableStateBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/javadsl/DurableStateBehavior.scala index 2cfa9dad2c9..5e72e0e1673 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/javadsl/DurableStateBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/javadsl/DurableStateBehavior.scala @@ -64,6 +64,9 @@ abstract class DurableStateBehavior[Command, State] private[akka] ( * This object will be passed into this behaviors handlers, until a new state replaces it. * * Also known as "zero state" or "neutral state". + * + * If the state is mutable, it is important that this creates a new State instance each time it is called + * to ensure that the state is recreated in case of failure restarts. */ protected def emptyState: State @@ -137,7 +140,7 @@ abstract class DurableStateBehavior[Command, State] private[akka] ( val behavior = new internal.DurableStateBehaviorImpl[Command, State]( persistenceId, - emptyState, + () => emptyState, (state, cmd) => commandHandler()(state, cmd).asInstanceOf[EffectImpl[State]], getClass).withTag(tag).snapshotAdapter(snapshotAdapter()).withDurableStateStorePluginId(durableStateStorePluginId) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/javadsl/DurableStateOnCommandBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/javadsl/DurableStateOnCommandBehavior.scala index df65ec6cf41..93201775e28 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/javadsl/DurableStateOnCommandBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/javadsl/DurableStateOnCommandBehavior.scala @@ -131,7 +131,7 @@ abstract class DurableStateOnCommandBehavior[Command, State] private[akka] ( val behavior = new internal.DurableStateBehaviorImpl[Command, State]( persistenceId, - emptyState, + () => emptyState, onCommand(_, _).asInstanceOf[EffectImpl[State]], getClass).withTag(tag).snapshotAdapter(snapshotAdapter()).withDurableStateStorePluginId(durableStateStorePluginId) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/javadsl/DurableStateOnCommandWithReplyBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/javadsl/DurableStateOnCommandWithReplyBehavior.scala index 12c04748da0..c7d61f8f0d6 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/javadsl/DurableStateOnCommandWithReplyBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/javadsl/DurableStateOnCommandWithReplyBehavior.scala @@ -133,7 +133,7 @@ abstract class DurableStateOnCommandWithReplyBehavior[Command, State] private[ak val behavior = new internal.DurableStateBehaviorImpl[Command, State]( persistenceId, - emptyState, + () => emptyState, onCommand(_, _).asInstanceOf[EffectImpl[State]], getClass).withTag(tag).snapshotAdapter(snapshotAdapter()).withDurableStateStorePluginId(durableStateStorePluginId) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/scaladsl/DurableStateBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/scaladsl/DurableStateBehavior.scala index e926d4fc02f..81e265efda8 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/scaladsl/DurableStateBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/scaladsl/DurableStateBehavior.scala @@ -39,6 +39,9 @@ object DurableStateBehavior { /** * Create a `Behavior` for a persistent actor with durable storage of its state. * + * This can be used when the state is immutable, but if the state is mutable, it is important to use + * the `withMutableState` that takes `emptyStateFactory: () => State` parameter. + * * @param persistenceId stable unique identifier for the `DurableStateBehavior` * @param emptyState the intial state for the entity before any state has been stored * @param commandHandler map commands to effects e.g. persisting state, replying to commands @@ -47,21 +50,56 @@ object DurableStateBehavior { persistenceId: PersistenceId, emptyState: State, commandHandler: (State, Command) => Effect[State]): DurableStateBehavior[Command, State] = { + withMutableState(persistenceId, () => emptyState, commandHandler) + } + + /** + * Create a `Behavior` with mutable state for a persistent actor with durable storage of its state. + * + * When the state is mutable, it is important to use this variant to make sure that the state instance is + * recreated in case of failure restarts. + * + * @param persistenceId stable unique identifier for the `DurableStateBehavior` + * @param emptyStateFactory factory function of the intial state for the entity before any state has been stored + * @param commandHandler map commands to effects e.g. persisting state, replying to commands + */ + def withMutableState[Command, State]( + persistenceId: PersistenceId, + emptyStateFactory: () => State, + commandHandler: (State, Command) => Effect[State]): DurableStateBehavior[Command, State] = { val loggerClass = LoggerClass.detectLoggerClassFromStack(classOf[DurableStateBehavior[_, _]], logPrefixSkipList) - DurableStateBehaviorImpl(persistenceId, emptyState, commandHandler, loggerClass) + DurableStateBehaviorImpl(persistenceId, emptyStateFactory, commandHandler, loggerClass) } /** * Create a `Behavior` for a persistent actor that is enforcing that replies to commands are not forgotten. * Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be * created with [[Effect.reply]], [[Effect.noReply]], [[EffectBuilder.thenReply]], or [[EffectBuilder.thenNoReply]]. + * + * This can be used when the state is immutable, but if the state is mutable, it is important to use + * the `withEnforcedRepliesMutableState` that takes `emptyStateFactory: () => State` parameter. */ def withEnforcedReplies[Command, State]( persistenceId: PersistenceId, emptyState: State, commandHandler: (State, Command) => ReplyEffect[State]): DurableStateBehavior[Command, State] = { + withEnforcedRepliesMutableState(persistenceId, () => emptyState, commandHandler) + } + + /** + * Create a `Behavior` with mutable state for a persistent actor that is enforcing that replies to commands are not forgotten. + * Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be + * created with [[Effect.reply]], [[Effect.noReply]], [[EffectBuilder.thenReply]], or [[EffectBuilder.thenNoReply]]. + * + * When the state is mutable, it is important to use this variant to make sure that the state instance is + * recreated in case of failure restarts. + */ + def withEnforcedRepliesMutableState[Command, State]( + persistenceId: PersistenceId, + emptyStateFactory: () => State, + commandHandler: (State, Command) => ReplyEffect[State]): DurableStateBehavior[Command, State] = { val loggerClass = LoggerClass.detectLoggerClassFromStack(classOf[DurableStateBehavior[_, _]], logPrefixSkipList) - DurableStateBehaviorImpl(persistenceId, emptyState, commandHandler, loggerClass) + DurableStateBehaviorImpl(persistenceId, emptyStateFactory, commandHandler, loggerClass) } /**