Skip to content

Commit

Permalink
Implement events by slice in testkit (#1533)
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrdom authored Oct 16, 2024
1 parent 9230df8 commit 0e10d29
Show file tree
Hide file tree
Showing 4 changed files with 381 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class PersistenceTestKitPlugin(@unused cfg: Config, cfgPath: String) extends Asy
.foreach { case (tag, (timestamp, highestSequenceNr)) =>
eventStream.publish(PersistenceTestKitPlugin.TagWrite(tag, timestamp, highestSequenceNr))
}
eventStream.publish(PersistenceTestKitPlugin.SliceWrite(aw.persistenceId, timestamp, aw.highestSequenceNr))
}
result
})))
Expand Down Expand Up @@ -130,6 +131,8 @@ object PersistenceTestKitPlugin {

private[testkit] case class Write(persistenceId: String, toSequenceNr: Long)
private[testkit] case class TagWrite(tag: String, timestamp: Long, highestSequenceNr: Long)
private[testkit] case class SliceWrite(persistenceId: String, timestamp: Long, highestSequenceNr: Long)

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.persistence.testkit.query.internal

import org.apache.pekko
import pekko.actor.ActorRef
import pekko.annotation.InternalApi
import pekko.persistence.Persistence
import pekko.persistence.query.typed
import pekko.persistence.query.Sequence
import pekko.persistence.testkit.EventStorage
import pekko.persistence.testkit.PersistenceTestKitPlugin.SliceWrite
import pekko.persistence.typed.PersistenceId
import pekko.stream.stage.GraphStage
import pekko.stream.stage.GraphStageLogic
import pekko.stream.stage.GraphStageLogicWithLogging
import pekko.stream.stage.OutHandler
import pekko.stream.Attributes
import pekko.stream.Outlet
import pekko.stream.SourceShape

/**
* INTERNAL API
*/
@InternalApi
private[pekko] object EventsBySliceStage {
// PersistenceTestKitPlugin increments timestamp for each atomic write,
// which can only contain a single persistence ID,
// so we only need to track timestamp and sequence number within state,
// because same timestamp will not have multiple persistence IDs.
case class State(
currentTimestamp: Long,
lastSequenceNr: Long
) {
def isAfter(timestamp: Long, sequenceNr: Long): Boolean = {
timestamp > currentTimestamp || (timestamp == currentTimestamp && sequenceNr > lastSequenceNr)
}
}
}

/**
* INTERNAL API
*/
@InternalApi
final private[pekko] class EventsBySliceStage[Event](
entityType: String,
minSlice: Int,
maxSlice: Int,
storage: EventStorage,
persistence: Persistence
) extends GraphStage[SourceShape[typed.EventEnvelope[Event]]] {
import EventsBySliceStage._

val out: Outlet[typed.EventEnvelope[Event]] = Outlet("EventsByTagSource")
override def shape: SourceShape[typed.EventEnvelope[Event]] = SourceShape(out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogicWithLogging(shape) with OutHandler {
private var state = Option.empty[State]
private var stageActorRef: ActorRef = null
override def preStart(): Unit = {
stageActorRef = getStageActor(receiveNotifications).ref
materializer.system.eventStream.subscribe(stageActorRef, classOf[SliceWrite])
}

private def shouldFilter(persistenceId: String): Boolean = {
val slice = persistence.sliceForPersistenceId(persistenceId)
PersistenceId.extractEntityType(persistenceId) == entityType && slice >= minSlice && slice <= maxSlice
}

private def receiveNotifications(in: (ActorRef, Any)): Unit = {
val (_, msg) = in
(msg, state) match {
case (SliceWrite(persistenceId, timestamp, highestSequenceNr), maybeState)
if shouldFilter(persistenceId) && maybeState.forall(_.isAfter(timestamp, highestSequenceNr)) =>
tryPush()
case _ =>
}
}

private def tryPush(): Unit = {
if (isAvailable(out)) {
val maybeNextEvent = storage.tryRead(entityType, repr => shouldFilter(repr.persistenceId))
.sortBy(pr => (pr.timestamp, pr.sequenceNr))
.find { pr =>
state.forall(_.isAfter(pr.timestamp, pr.sequenceNr))
}

log.debug("tryPush available. State {} event {}", state, maybeNextEvent)

maybeNextEvent.foreach { pr =>
val slice = persistence.sliceForPersistenceId(pr.persistenceId)
push(out,
new typed.EventEnvelope[Event](Sequence(pr.sequenceNr), pr.persistenceId, pr.sequenceNr,
Some(pr.payload.asInstanceOf[Event]), pr.timestamp, pr.metadata, entityType, slice))

state = Some(State(pr.timestamp, pr.sequenceNr))
}
} else {
log.debug("tryPush, no demand")
}
}

override def onPull(): Unit = {
tryPush()
}

setHandler(out, this)
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ import org.slf4j.LoggerFactory
import pekko.persistence.Persistence
import pekko.persistence.query.typed
import pekko.persistence.query.typed.scaladsl.CurrentEventsBySliceQuery
import pekko.persistence.query.typed.scaladsl.EventsBySliceQuery
import pekko.persistence.typed.PersistenceId
import pekko.persistence.query.scaladsl.EventsByTagQuery
import pekko.persistence.testkit.query.internal.EventsByTagStage
import pekko.persistence.testkit.query.internal.EventsBySliceStage

import scala.collection.immutable

Expand All @@ -53,7 +55,8 @@ final class PersistenceTestKitReadJournal(system: ExtendedActorSystem, @unused c
with CurrentEventsByTagQuery
with CurrentEventsBySliceQuery
with PagedPersistenceIdsQuery
with EventsByTagQuery {
with EventsByTagQuery
with EventsBySliceQuery {

private val log = LoggerFactory.getLogger(getClass)

Expand Down Expand Up @@ -167,4 +170,16 @@ final class PersistenceTestKitReadJournal(system: ExtendedActorSystem, @unused c
}
Source.fromGraph(new EventsByTagStage(tag, storage))
}

override def eventsBySlices[Event](
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset
): Source[typed.EventEnvelope[Event], NotUsed] = {
if (offset != NoOffset) {
throw new UnsupportedOperationException("Offsets not supported for persistence test kit eventsBySlices yet")
}
Source.fromGraph(new EventsBySliceStage(entityType, minSlice, maxSlice, storage, persistence))
}
}
Loading

0 comments on commit 0e10d29

Please sign in to comment.