Skip to content

Commit 6b93537

Browse files
authored
fix: Recreate state for EventSourcedBehavior with mutable state (#32822)
In case of failure restarts the state instance must be recreated if the state is mutable, same for DurableState
1 parent 670c60a commit 6b93537

File tree

17 files changed

+459
-26
lines changed

17 files changed

+459
-26
lines changed

akka-docs/src/main/paradox/typed/durable-state/persistence.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,21 @@ The state is typically defined as an immutable class and a new instance of the s
104104
You may choose to use a mutable class for the state, and then the command handler may update the state instance, but
105105
it must still pass the updated state to the `persist` effect.
106106

107+
@@@ div { .group-java }
108+
109+
If the state is mutable, it is important that the `emptyState` method creates a new State instance each time
110+
it is called to ensure that the state is recreated in case of failure restarts.
111+
112+
@@@
113+
114+
@@@ div { .group-scala }
115+
116+
If the state is mutable, it is important to use the `DurableStateBehavior.withMutableState` factory method that
117+
takes `emptyStateFactory: () => State` parameter to make sure that the state instance is recreated in case of
118+
failure restarts.
119+
120+
@@@
121+
107122
More effects are explained in @ref:[Effects and Side Effects](#effects-and-side-effects).
108123

109124
In addition to returning the primary `Effect` for the command, `DurableStateBehavior`s can also
@@ -207,6 +222,14 @@ Scala
207222
Java
208223
: @@snip [PersistentActorCompileOnlyTest.java](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #commonChainedEffects }
209224

225+
@@@ div { .group-scala }
226+
227+
It is recommended to use an immutable state class. If the state is mutable, it is important to use
228+
the `DurableStateBehavior.withEnforcedRepliesMutableState` factory method that takes `emptyStateFactory: () => State`
229+
parameter to make sure that the state instance is recreated in case of failure restarts.
230+
231+
@@@
232+
210233
### Side effects ordering and guarantees
211234

212235
Any side effects are executed on an at-most-once basis and will not be executed if the persist fails.

akka-docs/src/main/paradox/typed/persistence-style.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ The above examples are using immutable state classes and below is corresponding
6969
Java
7070
: @@snip [AccountExampleWithNullState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java) { #account-entity }
7171

72+
If the state is mutable, it is important that the `emptyState` method creates a new State instance each time
73+
it is called to ensure that the state is recreated in case of failure restarts.
74+
7275
## Leveraging Java 21 features
7376

7477
When building event sourced entities in a project using Java 21 or newer, the @javadoc[EventSourcedOnCommandBehavior](akka.persistence.typed.javadsl.EventSourcedOnCommandBehavior)

akka-docs/src/main/paradox/typed/persistence.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,17 @@ Note that the concrete class does not contain any fields with state like a regul
9494
lost when the actor is stopped or restarted. Updates to the State are always performed in the eventHandler
9595
based on the events.
9696

97+
If the state is mutable, it is important that the `emptyState` method creates a new State instance each time
98+
it is called to ensure that the state is recreated in case of failure restarts.
99+
100+
@@@
101+
102+
@@@ div { .group-scala }
103+
104+
It is recommended to use an immutable state class. If the state is mutable, it is important to use
105+
the `EventSourcedBehavior.withMutableState` factory method that takes `emptyStateFactory: () => State` parameter
106+
to make sure that the state instance is recreated in case of failure restarts.
107+
97108
@@@
98109

99110
Next we'll discuss each of these in detail.
@@ -453,6 +464,14 @@ is not used, but then there will be no compilation errors if the reply decision
453464
Note that the `noReply` is a way of making conscious decision that a reply shouldn't be sent for a specific
454465
command or the reply will be sent later, perhaps after some asynchronous interaction with other actors or services.
455466

467+
@@@ div { .group-scala }
468+
469+
It is recommended to use an immutable state class. If the state is mutable, it is important to use
470+
the `EventSourcedBehavior.withEnforcedRepliesMutableState` factory method that takes `emptyStateFactory: () => State`
471+
parameter to make sure that the state instance is recreated in case of failure restarts.
472+
473+
@@@
474+
456475
## Serialization
457476

458477
The same @ref:[serialization](../serialization.md) mechanism as for actor messages is also used for persistent actors.

akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/Unpersistent.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private[akka] object Unpersistent {
5454
findEventSourcedBehavior(behavior, context).fold {
5555
throw new AssertionError("Did not find the expected EventSourcedBehavior")
5656
} { esBehavior =>
57-
val (initialState, initialSequenceNr) = fromStateAndSequenceNr.getOrElse(esBehavior.emptyState -> 0L)
57+
val (initialState, initialSequenceNr) = fromStateAndSequenceNr.getOrElse(esBehavior.emptyState() -> 0L)
5858
new WrappedEventSourcedBehavior(context, esBehavior, initialState, initialSequenceNr, onEvent, onSnapshot)
5959
}
6060
}
@@ -79,7 +79,7 @@ private[akka] object Unpersistent {
7979
findDurableStateBehavior(behavior, context).fold {
8080
throw new AssertionError("Did not find the expected DurableStateBehavior")
8181
} { dsBehavior =>
82-
val initialState = fromState.getOrElse(dsBehavior.emptyState)
82+
val initialState = fromState.getOrElse(dsBehavior.emptyState())
8383
new WrappedDurableStateBehavior(context, dsBehavior, initialState, onPersist)
8484
}
8585
}

akka-persistence-typed-tests/src/test/java/akka/persistence/typed/javadsl/EventSourcedActorFailureTest.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,13 @@ public String emptyState() {
6565
@Override
6666
public CommandHandler<String, String, String> commandHandler() {
6767
return (state, command) -> {
68-
probe.tell("persisting");
69-
return Effect().persist(command);
68+
if (command.equals("get")) {
69+
probe.tell("state [" + state + "]");
70+
return Effect().none();
71+
} else {
72+
probe.tell("persisting");
73+
return Effect().persist(command);
74+
}
7075
};
7176
}
7277

@@ -120,16 +125,24 @@ public void persistEvents() throws Exception {
120125
probe.expectMessage("persisting");
121126
probe.expectMessage("one");
122127
probe.expectMessage("starting");
128+
c.tell("get");
129+
probe.expectMessage("state []");
130+
123131
// fail
124132
c.tell("two");
125133
probe.expectMessage("persisting");
126134
probe.expectMessage("two");
127135
probe.expectMessage("starting");
136+
c.tell("get");
137+
probe.expectMessage("state []");
138+
128139
// work
129140
c.tell("three");
130141
probe.expectMessage("persisting");
131142
probe.expectMessage("three");
132143
// no starting as this one did not fail
133144
probe.expectNoMessage();
145+
c.tell("get");
146+
probe.expectMessage("state [three]");
134147
}
135148
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Copyright (C) 2018-2025 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package akka.persistence.typed.javadsl;
6+
7+
import static akka.persistence.typed.scaladsl.EventSourcedBehaviorFailureSpec.conf;
8+
9+
import akka.actor.testkit.typed.TestException;
10+
import akka.actor.testkit.typed.javadsl.LogCapturing;
11+
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
12+
import akka.actor.testkit.typed.javadsl.TestProbe;
13+
import akka.actor.typed.ActorRef;
14+
import akka.actor.typed.Behavior;
15+
import akka.actor.typed.SupervisorStrategy;
16+
import akka.persistence.typed.PersistenceId;
17+
import akka.persistence.typed.RecoveryCompleted;
18+
import akka.persistence.typed.RecoveryFailed;
19+
import com.typesafe.config.Config;
20+
import com.typesafe.config.ConfigFactory;
21+
import java.time.Duration;
22+
import org.junit.ClassRule;
23+
import org.junit.Rule;
24+
import org.junit.Test;
25+
import org.scalatestplus.junit.JUnitSuite;
26+
27+
class FailingEventSourcedActorWithMutableState
28+
extends EventSourcedBehavior<
29+
String, String, FailingEventSourcedActorWithMutableState.MutableState> {
30+
31+
static final class MutableState {
32+
private String value;
33+
34+
MutableState(String value) {
35+
this.value = value;
36+
}
37+
38+
void add(String s) {
39+
value = value + s;
40+
}
41+
42+
String getValue() {
43+
return value;
44+
}
45+
}
46+
47+
private final ActorRef<String> probe;
48+
private final ActorRef<Throwable> recoveryFailureProbe;
49+
50+
FailingEventSourcedActorWithMutableState(
51+
PersistenceId persistenceId,
52+
ActorRef<String> probe,
53+
ActorRef<Throwable> recoveryFailureProbe) {
54+
55+
super(
56+
persistenceId,
57+
SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1));
58+
this.probe = probe;
59+
this.recoveryFailureProbe = recoveryFailureProbe;
60+
}
61+
62+
@Override
63+
public SignalHandler<MutableState> signalHandler() {
64+
return newSignalHandlerBuilder()
65+
.onSignal(
66+
RecoveryCompleted.instance(),
67+
state -> {
68+
probe.tell("starting");
69+
})
70+
.onSignal(
71+
RecoveryFailed.class,
72+
(state, signal) -> {
73+
recoveryFailureProbe.tell(signal.getFailure());
74+
})
75+
.build();
76+
}
77+
78+
@Override
79+
public MutableState emptyState() {
80+
return new MutableState("");
81+
}
82+
83+
@Override
84+
public CommandHandler<String, String, MutableState> commandHandler() {
85+
return (state, command) -> {
86+
if (command.equals("get")) {
87+
probe.tell("state [" + state.getValue() + "]");
88+
return Effect().none();
89+
} else {
90+
probe.tell("persisting");
91+
return Effect().persist(command);
92+
}
93+
};
94+
}
95+
96+
@Override
97+
public EventHandler<MutableState, String> eventHandler() {
98+
return (state, event) -> {
99+
probe.tell(event);
100+
state.add(event);
101+
return state;
102+
};
103+
}
104+
}
105+
106+
public class EventSourcedActorWithMutableStateFailureTest extends JUnitSuite {
107+
108+
public static final Config config = conf().withFallback(ConfigFactory.load());
109+
110+
@ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(config);
111+
112+
@Rule public final LogCapturing logCapturing = new LogCapturing();
113+
114+
public static Behavior<String> fail(
115+
PersistenceId pid, ActorRef<String> probe, ActorRef<Throwable> recoveryFailureProbe) {
116+
return new FailingEventSourcedActorWithMutableState(pid, probe, recoveryFailureProbe);
117+
}
118+
119+
public static Behavior<String> fail(PersistenceId pid, ActorRef<String> probe) {
120+
return fail(pid, probe, testKit.<Throwable>createTestProbe().ref());
121+
}
122+
123+
@Test
124+
public void notifyRecoveryFailure() {
125+
TestProbe<String> probe = testKit.createTestProbe();
126+
TestProbe<Throwable> recoveryFailureProbe = testKit.createTestProbe();
127+
Behavior<String> p1 =
128+
fail(
129+
PersistenceId.ofUniqueId("fail-recovery-once"),
130+
probe.ref(),
131+
recoveryFailureProbe.ref());
132+
testKit.spawn(p1);
133+
recoveryFailureProbe.expectMessageClass(TestException.class);
134+
}
135+
136+
@Test
137+
public void persistEvents() throws Exception {
138+
TestProbe<String> probe = testKit.createTestProbe();
139+
Behavior<String> p1 = fail(PersistenceId.ofUniqueId("fail-first-2"), probe.ref());
140+
ActorRef<String> c = testKit.spawn(p1);
141+
probe.expectMessage("starting");
142+
// fail
143+
c.tell("one");
144+
probe.expectMessage("persisting");
145+
probe.expectMessage("one");
146+
probe.expectMessage("starting");
147+
c.tell("get");
148+
probe.expectMessage("state []");
149+
150+
// fail
151+
c.tell("two");
152+
probe.expectMessage("persisting");
153+
probe.expectMessage("two");
154+
probe.expectMessage("starting");
155+
c.tell("get");
156+
probe.expectMessage("state []");
157+
158+
// work
159+
c.tell("three");
160+
probe.expectMessage("persisting");
161+
probe.expectMessage("three");
162+
// no starting as this one did not fail
163+
probe.expectNoMessage();
164+
c.tell("get");
165+
probe.expectMessage("state [three]");
166+
}
167+
}

0 commit comments

Comments
 (0)