diff --git a/proxy/core/src/main/scala/io/cloudstate/proxy/crdt/CrdtEntity.scala b/proxy/core/src/main/scala/io/cloudstate/proxy/crdt/CrdtEntity.scala index c2d9174de..956bacce9 100644 --- a/proxy/core/src/main/scala/io/cloudstate/proxy/crdt/CrdtEntity.scala +++ b/proxy/core/src/main/scala/io/cloudstate/proxy/crdt/CrdtEntity.scala @@ -46,11 +46,17 @@ object CrdtEntity { private final case class Relay(actorRef: ActorRef) /** - * This is sent by Akka streams when the gRPC stream to the user function has closed - which typically shouldn't - * happen unless it crashes for some reason. + * This is sent by Akka streams when the gRPC stream to the user function has closed - which is expected when the + * entity is stopping (such as for passivation) or when deleted. */ final case object EntityStreamClosed + /** + * This is sent by Akka streams when the gRPC stream to the user function has failed - which typically shouldn't + * happen unless it crashes for some reason. + */ + final case class EntityStreamFailed(cause: Throwable) + final case object Stop private final case class AnyKey(_id: String) extends Key[ReplicatedData](_id) @@ -149,6 +155,7 @@ final class CrdtEntity(client: Crdt, configuration: CrdtEntity.Configuration, en private[this] final var outstanding = Map.empty[Long, Initiator] private[this] final var streamedCalls = Map.empty[Long, ActorRef] private[this] final var closingStreams = Set.empty[Long] + private[this] final var closing = false private[this] final var stopping = false implicit val ec = context.dispatcher @@ -167,7 +174,7 @@ final class CrdtEntity(client: Crdt, configuration: CrdtEntity.Configuration, en NotUsed } ) - .runWith(Sink.actorRef(self, EntityStreamClosed)) + .runWith(Sink.actorRef(self, EntityStreamClosed, EntityStreamFailed.apply)) // We initially do a read to get the initial state. Try a majority read first in case this is a new node. replicator ! Get(key, ReadMajority(configuration.initialReadTimeout)) @@ -400,9 +407,10 @@ final class CrdtEntity(client: Crdt, configuration: CrdtEntity.Configuration, en } case EntityStreamClosed => - crash("Unexpected entity termination due to stream closure") + if (closing) context.stop(self) + else crash("Unexpected entity termination due to stream closure") - case Status.Failure(cause) => + case EntityStreamFailed(cause) => // Means the stream stopped unexpectedly crash("Entity crashed", Some(cause)) @@ -414,7 +422,8 @@ final class CrdtEntity(client: Crdt, configuration: CrdtEntity.Configuration, en actorRef ! Status.Success(Done) streamedCalls -= commandId } - context.stop(self) + relay ! Status.Success(()) + closing = true // wait for stream to be closed before stopping actor } else { stopping = true } @@ -522,7 +531,8 @@ final class CrdtEntity(client: Crdt, configuration: CrdtEntity.Configuration, en private def operationFinished(): Unit = if (stopping) { if (outstanding.isEmpty) { - context.stop(self) + relay ! Status.Success(()) + closing = true // wait for stream to be closed before stopping actor } } else { if (outstandingMutatingOperations > 1) { @@ -659,6 +669,9 @@ final class CrdtEntity(client: Crdt, configuration: CrdtEntity.Configuration, en case EntityStreamClosed => // Ignore + case EntityStreamFailed(_) => + // Ignore + case ReceiveTimeout => context.parent ! CrdtEntityManager.Passivate diff --git a/tck/src/main/scala/io/cloudstate/tck/CrdtEntityTCK.scala b/tck/src/main/scala/io/cloudstate/tck/CrdtEntityTCK.scala index f7d6ea32a..307f1a847 100644 --- a/tck/src/main/scala/io/cloudstate/tck/CrdtEntityTCK.scala +++ b/tck/src/main/scala/io/cloudstate/tck/CrdtEntityTCK.scala @@ -1983,15 +1983,13 @@ trait CrdtEntityTCK extends TCKSpec { } "verify passivation timeout" in crdtConfiguredTest { id => - pendingUntilFixed { // FIXME: we don't get stream completion, but a failed stream with PeerClosedStreamException - configuredClient.call(Request(id)) - interceptor - .expectCrdtEntityConnection() - .expectClient(init(ServiceConfigured, id)) - .expectClient(command(1, id, "Call", Request(id))) - .expectService(reply(1, Response())) - .expectClosed(2.seconds) // check passivation (with expected timeout of 100 millis) - } + configuredClient.call(Request(id)) + interceptor + .expectCrdtEntityConnection() + .expectClient(init(ServiceConfigured, id)) + .expectClient(command(1, id, "Call", Request(id))) + .expectService(reply(1, Response())) + .expectClosed(2.seconds) // check passivation (with expected timeout of 100 millis) } } }