Skip to content

Commit

Permalink
fix: eventing testkit - supporting multiple subscription to the same …
Browse files Browse the repository at this point in the history
…topic
  • Loading branch information
aludwiko committed Aug 28, 2023
1 parent 3b1adcb commit 4754a01
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,33 @@

package kalix.javasdk.testkit.impl

import java.time
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.{ List => JList }

import scala.compat.java8.DurationConverters.DurationOps
import scala.compat.java8.OptionConverters.RichOptionalGeneric
import scala.concurrent.Await
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.jdk.CollectionConverters.IteratorHasAsScala
import scala.jdk.CollectionConverters.SeqHasAsJava
import scala.jdk.CollectionConverters._
import scala.language.postfixOps
import scala.util.Failure
import scala.util.Success

import akka.NotUsed
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.pattern._
import akka.stream.BoundedSourceQueue
import akka.stream.QueueOfferResult
import akka.stream.scaladsl.Sink
Expand All @@ -30,9 +52,8 @@ import akka.util.BoxedType
import com.google.protobuf.ByteString
import com.google.protobuf.GeneratedMessageV3
import com.google.protobuf.any.{ Any => ScalaPbAny }
import kalix.eventing.EventDestination
import kalix.eventing.EventDestination.Destination
import kalix.eventing.EventSource
import kalix.javasdk.JsonSupport
import kalix.javasdk.Metadata.{ MetadataEntry => SdkMetadataEntry }
import kalix.javasdk.impl.MessageCodec
import kalix.javasdk.impl.MetadataImpl
Expand All @@ -45,7 +66,6 @@ import kalix.javasdk.testkit.impl.TopicImpl.DefaultTimeout
import kalix.javasdk.{ Metadata => SdkMetadata }
import kalix.protocol.component.Metadata
import kalix.protocol.component.MetadataEntry
import kalix.javasdk.JsonSupport
import kalix.testkit.protocol.eventing_test_backend.EmitSingleCommand
import kalix.testkit.protocol.eventing_test_backend.EmitSingleResult
import kalix.testkit.protocol.eventing_test_backend.EventStreamOutCommand
Expand All @@ -59,23 +79,6 @@ import kalix.testkit.protocol.eventing_test_backend.SourceElem
import org.slf4j.LoggerFactory
import scalapb.GeneratedMessage

import java.time
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.{ List => JList }
import scala.compat.java8.DurationConverters.DurationOps
import scala.compat.java8.OptionConverters.RichOptionalGeneric
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.jdk.CollectionConverters.IteratorHasAsScala
import scala.jdk.CollectionConverters.SeqHasAsJava
import scala.util.Failure
import scala.util.Success
import scala.language.postfixOps

object EventingTestKitImpl {

/**
Expand Down Expand Up @@ -154,15 +157,17 @@ final class EventingTestServiceImpl(system: ActorSystem, val host: String, var p
extends EventingTestKit {

private val log = LoggerFactory.getLogger(classOf[EventingTestServiceImpl])
private implicit val sys = system
private implicit val ec = sys.dispatcher
private implicit val sys: ActorSystem = system
private implicit val ec: ExecutionContextExecutor = sys.dispatcher

private val topics = new ConcurrentHashMap[String, TopicImpl]()

override def getTopic(topic: String): Topic = getTopicImpl(topic)

private def getTopicImpl(topic: String): TopicImpl =
topics.computeIfAbsent(topic, _ => new TopicImpl(TestProbe(), TestProbe(), codec))
topics.computeIfAbsent(
topic,
_ => new TopicImpl(TestProbe(), system.actorOf(Props[SourcesHolder](), "topic-source-holder-" + topic), codec))

final class ServiceImpl extends EventingTestKitService {
override def emitSingle(in: EmitSingleCommand): Future[EmitSingleResult] = {
Expand Down Expand Up @@ -201,7 +206,7 @@ final class EventingTestServiceImpl(system: ActorSystem, val host: String, var p
eventSource)
val (queue, source) = Source.queue[SourceElem](10).preMaterialize()
val runningSourceProbe = RunningSourceProbe(serviceName, eventSource)(queue, source)
getTopicImpl(eventSource.getTopic).sourceProbe.ref ! runningSourceProbe
getTopicImpl(eventSource.getTopic).addSourceProbe(runningSourceProbe)
runningSourcePromise.success(runningSourceProbe)
Some(runningSourceProbe)

Expand Down Expand Up @@ -229,16 +234,21 @@ final class EventingTestServiceImpl(system: ActorSystem, val host: String, var p
}
}

private case class PublishedMessage(message: ByteString, metadata: SdkMetadata)

private[testkit] class TopicImpl(
private[testkit] val destinationProbe: TestProbe,
private[testkit] val sourceProbe: TestProbe,
private[testkit] val sourcesHolder: ActorRef,
codec: MessageCodec)
extends Topic {

private lazy val brokerProbe = sourceProbe.expectMsgType[RunningSourceProbe]

private val log = LoggerFactory.getLogger(classOf[TopicImpl])

def addSourceProbe(runningSourceProbe: RunningSourceProbe): Unit = {
val addSource = sourcesHolder.ask(SourcesHolder.AddSource(runningSourceProbe))(5.seconds)
Await.result(addSource, 10.seconds)
}

override def expectNone(): Unit = expectNone(DefaultTimeout)

override def expectNone(timeout: time.Duration): Unit = destinationProbe.expectNoMessage(timeout.toScala)
Expand Down Expand Up @@ -328,8 +338,10 @@ private[testkit] class TopicImpl(
override def publish(message: ByteString): Unit =
publish(message, SdkMetadata.EMPTY)

override def publish(message: ByteString, metadata: SdkMetadata): Unit =
brokerProbe.emit(message, metadata)
override def publish(message: ByteString, metadata: SdkMetadata): Unit = {
val addSource = sourcesHolder.ask(SourcesHolder.Publish(message, metadata))(5.seconds)
Await.result(addSource, 5.seconds)
}

override def publish(message: TestKitMessage[_]): Unit = message.getPayload match {
case javaPb: GeneratedMessageV3 => publish(javaPb.toByteString, message.getMetadata)
Expand All @@ -353,7 +365,7 @@ private[testkit] class TopicImpl(
}

private[testkit] object TopicImpl {
val DefaultTimeout = time.Duration.ofSeconds(3)
val DefaultTimeout: time.Duration = time.Duration.ofSeconds(3)
}

private[testkit] case class TestKitMessageImpl[P](payload: P, metadata: SdkMetadata) extends TestKitMessage[P] {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2021 Lightbend Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kalix.javasdk.testkit.impl

import scala.collection.mutable.ArrayBuffer

import akka.actor.Actor
import akka.event.Logging
import com.google.protobuf.ByteString
import kalix.javasdk.testkit.KalixTestKit
import kalix.javasdk.testkit.impl.EventingTestKitImpl.RunningSourceProbe
import kalix.javasdk.testkit.impl.SourcesHolder.AddSource
import kalix.javasdk.testkit.impl.SourcesHolder.Publish
import kalix.javasdk.{ Metadata => SdkMetadata }
import org.slf4j.LoggerFactory

object SourcesHolder {

case class AddSource(runningSourceProbe: RunningSourceProbe)
case class Publish(message: ByteString, metadata: SdkMetadata)
}

class SourcesHolder extends Actor {

private val log = LoggerFactory.getLogger(classOf[KalixTestKit])

private val sources: ArrayBuffer[RunningSourceProbe] = ArrayBuffer.empty
private val publishedMessages: ArrayBuffer[PublishedMessage] = ArrayBuffer.empty

private case class PublishedMessage(message: ByteString, metadata: SdkMetadata)

override def receive: Receive = {
case AddSource(runningSourceProbe) =>
if (publishedMessages.nonEmpty) {
log.debug(
s"Emitting ${publishedMessages.size} messages to the new source from [${runningSourceProbe.serviceName}]")
publishedMessages.foreach { msg =>
runningSourceProbe.emit(msg.message, msg.metadata)
}
}
sources.addOne(runningSourceProbe)
log.debug(s"Source from [${runningSourceProbe.serviceName}] added")
sender() ! "ok"
case Publish(message, metadata) =>
sources.foreach { source =>
log.debug(s"Emitting message to the source from [${source.serviceName}]")
source.emit(message, metadata)
}
publishedMessages.addOne(PublishedMessage(message, metadata))
sender() ! "ok"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@

package kalix.javasdk.testkit.impl

import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import scala.collection.mutable
import scala.jdk.CollectionConverters.CollectionHasAsScala

import akka.actor.ActorSystem
import akka.actor.Props
import akka.stream.BoundedSourceQueue
import akka.stream.QueueOfferResult
import akka.stream.scaladsl.Source
import akka.testkit.TestKit
import akka.testkit.TestProbe
import com.google.protobuf.ByteString
import kalix.eventing.EventDestination
Expand All @@ -33,22 +38,27 @@ import kalix.protocol.component.MetadataEntry.Value.StringValue
import kalix.testkit.protocol.eventing_test_backend.EmitSingleCommand
import kalix.testkit.protocol.eventing_test_backend.Message
import kalix.testkit.protocol.eventing_test_backend.SourceElem
import org.scalatest.BeforeAndAfterAll
import org.scalatest.BeforeAndAfterEach
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

import scala.collection.mutable
import scala.jdk.CollectionConverters.CollectionHasAsScala

class TopicImplSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with Matchers with BeforeAndAfterEach {
class TopicImplSpec
extends TestKit(ActorSystem("MySpec"))
with AnyWordSpecLike
with Matchers
with BeforeAndAfterEach
with BeforeAndAfterAll {

private val anySupport = new AnySupport(Array(), getClass.getClassLoader)
private val outProbe = TestProbe()(system.classicSystem)
private val inProbe = TestProbe()(system.classicSystem)
private val topic = new TopicImpl(outProbe, inProbe, anySupport)
private val outProbe = TestProbe()(system)
private val topic =
new TopicImpl(outProbe, system.actorOf(Props[SourcesHolder](), "holder"), anySupport)
val queue = new DummyQueue(mutable.Queue.empty)
// establishing the dummy connection for the test broker
inProbe.ref ! RunningSourceProbe("dummy-service", EventSource.defaultInstance)(queue, Source.empty[SourceElem])

private val runningSourceProbe: RunningSourceProbe =
RunningSourceProbe("dummy-service", EventSource.defaultInstance)(queue, Source.empty[SourceElem])
topic.addSourceProbe(runningSourceProbe)

private val textPlainHeader = MetadataEntry("Content-Type", StringValue("text/plain; charset=utf-8"))
private val bytesHeader = MetadataEntry("Content-Type", StringValue("application/octet-stream"))
Expand Down Expand Up @@ -136,6 +146,10 @@ class TopicImplSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
topic.clear()
}

override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}

class DummyQueue(val elems: mutable.Queue[SourceElem]) extends BoundedSourceQueue[SourceElem] {
override def offer(elem: SourceElem): QueueOfferResult = {
elems.append(elem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.example.Main;
import com.example.wiring.TestkitConfig;
import com.example.wiring.eventsourcedentities.counter.CounterEvent.ValueIncreased;
import kalix.javasdk.client.ComponentClient;
import kalix.javasdk.testkit.EventingTestKit;
import kalix.javasdk.testkit.EventingTestKit.Message;
import kalix.javasdk.testkit.KalixTestKit;
Expand All @@ -32,6 +31,7 @@
import org.springframework.context.annotation.Import;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.TestPropertySource;
import org.springframework.web.reactive.function.client.WebClient;

import java.util.concurrent.TimeUnit;

Expand All @@ -51,7 +51,7 @@ public class EventingTestkitIntegrationTest {
private KalixTestKit kalixTestKit;
private EventingTestKit.Topic eventsTopic;
@Autowired
private ComponentClient componentClient;
private WebClient webClient;

@BeforeAll
public void beforeAll() {
Expand All @@ -62,8 +62,8 @@ public void beforeAll() {
public void shouldPublishEventWithTypeNameViaEventingTestkit() {
//given
String subject = "test";
ValueIncreased event1 = new ValueIncreased(123);
ValueIncreased event2 = new ValueIncreased(123);
ValueIncreased event1 = new ValueIncreased(1);
ValueIncreased event2 = new ValueIncreased(2);

//when
Message<ValueIncreased> test = kalixTestKit.getMessageBuilder().of(event1, subject);
Expand All @@ -77,7 +77,16 @@ public void shouldPublishEventWithTypeNameViaEventingTestkit() {
.untilAsserted(() -> {
var response = DummyCounterEventStore.get(subject);
assertThat(response).containsOnly(event1, event2);
});

var viewResponse = webClient
.get()
.uri("/counter-view-topic-sub/less-then/" + 4)
.retrieve()
.bodyToFlux(CounterView.class)
.toStream()
.toList();

assertThat(viewResponse).containsOnly(new CounterView("test", 3));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import static kalix.javasdk.impl.MetadataImpl.CeSubject;


@Profile("docker-it-test")
@Profile({"docker-it-test", "eventing-testkit"})
@ViewId("counter_view_topic_sub")
@Table("counter_view_topic_sub")
@Subscribe.Topic(COUNTER_EVENTS_TOPIC)
Expand Down

0 comments on commit 4754a01

Please sign in to comment.