Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

Commit

Permalink
Add graceful stop and stream completion for CRDT entities
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter committed Dec 21, 2020
1 parent 84fbab4 commit 6d83bd1
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,17 @@ object CrdtEntity {
private final case class Relay(actorRef: ActorRef)

/**
* This is sent by Akka streams when the gRPC stream to the user function has closed - which typically shouldn't
* happen unless it crashes for some reason.
* This is sent by Akka streams when the gRPC stream to the user function has closed - which is expected when the
* entity is stopping (such as for passivation) or when deleted.
*/
final case object EntityStreamClosed

/**
* This is sent by Akka streams when the gRPC stream to the user function has failed - which typically shouldn't
* happen unless it crashes for some reason.
*/
final case class EntityStreamFailed(cause: Throwable)

final case object Stop

private final case class AnyKey(_id: String) extends Key[ReplicatedData](_id)
Expand Down Expand Up @@ -149,6 +155,7 @@ final class CrdtEntity(client: Crdt, configuration: CrdtEntity.Configuration, en
private[this] final var outstanding = Map.empty[Long, Initiator]
private[this] final var streamedCalls = Map.empty[Long, ActorRef]
private[this] final var closingStreams = Set.empty[Long]
private[this] final var closing = false
private[this] final var stopping = false

implicit val ec = context.dispatcher
Expand All @@ -167,7 +174,7 @@ final class CrdtEntity(client: Crdt, configuration: CrdtEntity.Configuration, en
NotUsed
}
)
.runWith(Sink.actorRef(self, EntityStreamClosed))
.runWith(Sink.actorRef(self, EntityStreamClosed, EntityStreamFailed.apply))

// We initially do a read to get the initial state. Try a majority read first in case this is a new node.
replicator ! Get(key, ReadMajority(configuration.initialReadTimeout))
Expand Down Expand Up @@ -400,9 +407,10 @@ final class CrdtEntity(client: Crdt, configuration: CrdtEntity.Configuration, en
}

case EntityStreamClosed =>
crash("Unexpected entity termination due to stream closure")
if (closing) context.stop(self)
else crash("Unexpected entity termination due to stream closure")

case Status.Failure(cause) =>
case EntityStreamFailed(cause) =>
// Means the stream stopped unexpectedly
crash("Entity crashed", Some(cause))

Expand All @@ -414,7 +422,8 @@ final class CrdtEntity(client: Crdt, configuration: CrdtEntity.Configuration, en
actorRef ! Status.Success(Done)
streamedCalls -= commandId
}
context.stop(self)
relay ! Status.Success(())
closing = true // wait for stream to be closed before stopping actor
} else {
stopping = true
}
Expand Down Expand Up @@ -522,7 +531,8 @@ final class CrdtEntity(client: Crdt, configuration: CrdtEntity.Configuration, en
private def operationFinished(): Unit =
if (stopping) {
if (outstanding.isEmpty) {
context.stop(self)
relay ! Status.Success(())
closing = true // wait for stream to be closed before stopping actor
}
} else {
if (outstandingMutatingOperations > 1) {
Expand Down Expand Up @@ -659,6 +669,9 @@ final class CrdtEntity(client: Crdt, configuration: CrdtEntity.Configuration, en
case EntityStreamClosed =>
// Ignore

case EntityStreamFailed(_) =>
// Ignore

case ReceiveTimeout =>
context.parent ! CrdtEntityManager.Passivate

Expand Down
16 changes: 7 additions & 9 deletions tck/src/main/scala/io/cloudstate/tck/CrdtEntityTCK.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1983,15 +1983,13 @@ trait CrdtEntityTCK extends TCKSpec {
}

"verify passivation timeout" in crdtConfiguredTest { id =>
pendingUntilFixed { // FIXME: we don't get stream completion, but a failed stream with PeerClosedStreamException
configuredClient.call(Request(id))
interceptor
.expectCrdtEntityConnection()
.expectClient(init(ServiceConfigured, id))
.expectClient(command(1, id, "Call", Request(id)))
.expectService(reply(1, Response()))
.expectClosed(2.seconds) // check passivation (with expected timeout of 100 millis)
}
configuredClient.call(Request(id))
interceptor
.expectCrdtEntityConnection()
.expectClient(init(ServiceConfigured, id))
.expectClient(command(1, id, "Call", Request(id)))
.expectService(reply(1, Response()))
.expectClosed(2.seconds) // check passivation (with expected timeout of 100 millis)
}
}
}

0 comments on commit 6d83bd1

Please sign in to comment.