Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions akka-docs/src/main/paradox/typed/durable-state/persistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,21 @@ The state is typically defined as an immutable class and a new instance of the s
You may choose to use a mutable class for the state, and then the command handler may update the state instance, but
it must still pass the updated state to the `persist` effect.

@@@ div { .group-java }

If the state is mutable, it is important that the `emptyState` method creates a new State instance each time
it is called to ensure that the state is recreated in case of failure restarts.

@@@

@@@ div { .group-scala }

If the state is mutable, it is important to use the `DurableStateBehavior.withMutableState` factory method that
takes `emptyStateFactory: () => State` parameter to make sure that the state instance is recreated in case of
failure restarts.

@@@

More effects are explained in @ref:[Effects and Side Effects](#effects-and-side-effects).

In addition to returning the primary `Effect` for the command, `DurableStateBehavior`s can also
Expand Down Expand Up @@ -207,6 +222,14 @@ Scala
Java
: @@snip [PersistentActorCompileOnlyTest.java](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #commonChainedEffects }

@@@ div { .group-scala }

It is recommended to use an immutable state class. If the state is mutable, it is important to use
the `DurableStateBehavior.withEnforcedRepliesMutableState` factory method that takes `emptyStateFactory: () => State`
parameter to make sure that the state instance is recreated in case of failure restarts.

@@@

### Side effects ordering and guarantees

Any side effects are executed on an at-most-once basis and will not be executed if the persist fails.
Expand Down
3 changes: 3 additions & 0 deletions akka-docs/src/main/paradox/typed/persistence-style.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ The above examples are using immutable state classes and below is corresponding
Java
: @@snip [AccountExampleWithNullState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java) { #account-entity }

If the state is mutable, it is important that the `emptyState` method creates a new State instance each time
it is called to ensure that the state is recreated in case of failure restarts.

## Leveraging Java 21 features

When building event sourced entities in a project using Java 21 or newer, the @javadoc[EventSourcedOnCommandBehavior](akka.persistence.typed.javadsl.EventSourcedOnCommandBehavior)
Expand Down
19 changes: 19 additions & 0 deletions akka-docs/src/main/paradox/typed/persistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ Note that the concrete class does not contain any fields with state like a regul
lost when the actor is stopped or restarted. Updates to the State are always performed in the eventHandler
based on the events.

If the state is mutable, it is important that the `emptyState` method creates a new State instance each time
it is called to ensure that the state is recreated in case of failure restarts.

@@@

@@@ div { .group-scala }

It is recommended to use an immutable state class. If the state is mutable, it is important to use
the `EventSourcedBehavior.withMutableState` factory method that takes `emptyStateFactory: () => State` parameter
to make sure that the state instance is recreated in case of failure restarts.

@@@

Next we'll discuss each of these in detail.
Expand Down Expand Up @@ -453,6 +464,14 @@ is not used, but then there will be no compilation errors if the reply decision
Note that the `noReply` is a way of making conscious decision that a reply shouldn't be sent for a specific
command or the reply will be sent later, perhaps after some asynchronous interaction with other actors or services.

@@@ div { .group-scala }

It is recommended to use an immutable state class. If the state is mutable, it is important to use
the `EventSourcedBehavior.withEnforcedRepliesMutableState` factory method that takes `emptyStateFactory: () => State`
parameter to make sure that the state instance is recreated in case of failure restarts.

@@@

## Serialization

The same @ref:[serialization](../serialization.md) mechanism as for actor messages is also used for persistent actors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[akka] object Unpersistent {
findEventSourcedBehavior(behavior, context).fold {
throw new AssertionError("Did not find the expected EventSourcedBehavior")
} { esBehavior =>
val (initialState, initialSequenceNr) = fromStateAndSequenceNr.getOrElse(esBehavior.emptyState -> 0L)
val (initialState, initialSequenceNr) = fromStateAndSequenceNr.getOrElse(esBehavior.emptyState() -> 0L)
new WrappedEventSourcedBehavior(context, esBehavior, initialState, initialSequenceNr, onEvent, onSnapshot)
}
}
Expand All @@ -79,7 +79,7 @@ private[akka] object Unpersistent {
findDurableStateBehavior(behavior, context).fold {
throw new AssertionError("Did not find the expected DurableStateBehavior")
} { dsBehavior =>
val initialState = fromState.getOrElse(dsBehavior.emptyState)
val initialState = fromState.getOrElse(dsBehavior.emptyState())
new WrappedDurableStateBehavior(context, dsBehavior, initialState, onPersist)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ public String emptyState() {
@Override
public CommandHandler<String, String, String> commandHandler() {
return (state, command) -> {
probe.tell("persisting");
return Effect().persist(command);
if (command.equals("get")) {
probe.tell("state [" + state + "]");
return Effect().none();
} else {
probe.tell("persisting");
return Effect().persist(command);
}
};
}

Expand Down Expand Up @@ -120,16 +125,24 @@ public void persistEvents() throws Exception {
probe.expectMessage("persisting");
probe.expectMessage("one");
probe.expectMessage("starting");
c.tell("get");
probe.expectMessage("state []");

// fail
c.tell("two");
probe.expectMessage("persisting");
probe.expectMessage("two");
probe.expectMessage("starting");
c.tell("get");
probe.expectMessage("state []");

// work
c.tell("three");
probe.expectMessage("persisting");
probe.expectMessage("three");
// no starting as this one did not fail
probe.expectNoMessage();
c.tell("get");
probe.expectMessage("state [three]");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright (C) 2018-2025 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.typed.javadsl;

import static akka.persistence.typed.scaladsl.EventSourcedBehaviorFailureSpec.conf;

import akka.actor.testkit.typed.TestException;
import akka.actor.testkit.typed.javadsl.LogCapturing;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.SupervisorStrategy;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.RecoveryCompleted;
import akka.persistence.typed.RecoveryFailed;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.time.Duration;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;

class FailingEventSourcedActorWithMutableState
extends EventSourcedBehavior<
String, String, FailingEventSourcedActorWithMutableState.MutableState> {

static final class MutableState {
private String value;

MutableState(String value) {
this.value = value;
}

void add(String s) {
value = value + s;
}

String getValue() {
return value;
}
}

private final ActorRef<String> probe;
private final ActorRef<Throwable> recoveryFailureProbe;

FailingEventSourcedActorWithMutableState(
PersistenceId persistenceId,
ActorRef<String> probe,
ActorRef<Throwable> recoveryFailureProbe) {

super(
persistenceId,
SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1));
this.probe = probe;
this.recoveryFailureProbe = recoveryFailureProbe;
}

@Override
public SignalHandler<MutableState> signalHandler() {
return newSignalHandlerBuilder()
.onSignal(
RecoveryCompleted.instance(),
state -> {
probe.tell("starting");
})
.onSignal(
RecoveryFailed.class,
(state, signal) -> {
recoveryFailureProbe.tell(signal.getFailure());
})
.build();
}

@Override
public MutableState emptyState() {
return new MutableState("");
}

@Override
public CommandHandler<String, String, MutableState> commandHandler() {
return (state, command) -> {
if (command.equals("get")) {
probe.tell("state [" + state.getValue() + "]");
return Effect().none();
} else {
probe.tell("persisting");
return Effect().persist(command);
}
};
}

@Override
public EventHandler<MutableState, String> eventHandler() {
return (state, event) -> {
probe.tell(event);
state.add(event);
return state;
};
}
}

public class EventSourcedActorWithMutableStateFailureTest extends JUnitSuite {

public static final Config config = conf().withFallback(ConfigFactory.load());

@ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(config);

@Rule public final LogCapturing logCapturing = new LogCapturing();

public static Behavior<String> fail(
PersistenceId pid, ActorRef<String> probe, ActorRef<Throwable> recoveryFailureProbe) {
return new FailingEventSourcedActorWithMutableState(pid, probe, recoveryFailureProbe);
}

public static Behavior<String> fail(PersistenceId pid, ActorRef<String> probe) {
return fail(pid, probe, testKit.<Throwable>createTestProbe().ref());
}

@Test
public void notifyRecoveryFailure() {
TestProbe<String> probe = testKit.createTestProbe();
TestProbe<Throwable> recoveryFailureProbe = testKit.createTestProbe();
Behavior<String> p1 =
fail(
PersistenceId.ofUniqueId("fail-recovery-once"),
probe.ref(),
recoveryFailureProbe.ref());
testKit.spawn(p1);
recoveryFailureProbe.expectMessageClass(TestException.class);
}

@Test
public void persistEvents() throws Exception {
TestProbe<String> probe = testKit.createTestProbe();
Behavior<String> p1 = fail(PersistenceId.ofUniqueId("fail-first-2"), probe.ref());
ActorRef<String> c = testKit.spawn(p1);
probe.expectMessage("starting");
// fail
c.tell("one");
probe.expectMessage("persisting");
probe.expectMessage("one");
probe.expectMessage("starting");
c.tell("get");
probe.expectMessage("state []");

// fail
c.tell("two");
probe.expectMessage("persisting");
probe.expectMessage("two");
probe.expectMessage("starting");
c.tell("get");
probe.expectMessage("state []");

// work
c.tell("three");
probe.expectMessage("persisting");
probe.expectMessage("three");
// no starting as this one did not fail
probe.expectNoMessage();
c.tell("get");
probe.expectMessage("state [three]");
}
}
Loading
Loading