From 4754a01114de78264ec4db04237a92efcbbd1596 Mon Sep 17 00:00:00 2001 From: Andrzej Ludwikowski Date: Mon, 28 Aug 2023 11:46:49 +0200 Subject: [PATCH] fix: eventing testkit - supporting multiple subscription to the same topic --- .../testkit/impl/EventingTestKitImpl.scala | 72 +++++++++++-------- .../javasdk/testkit/impl/SourcesHolder.scala | 66 +++++++++++++++++ .../javasdk/testkit/impl/TopicImplSpec.scala | 34 ++++++--- .../EventingTestkitIntegrationTest.java | 19 +++-- .../pubsub/ViewFromCounterEventsTopic.java | 2 +- 5 files changed, 147 insertions(+), 46 deletions(-) create mode 100644 sdk/java-sdk-protobuf-testkit/src/main/scala/kalix/javasdk/testkit/impl/SourcesHolder.scala diff --git a/sdk/java-sdk-protobuf-testkit/src/main/scala/kalix/javasdk/testkit/impl/EventingTestKitImpl.scala b/sdk/java-sdk-protobuf-testkit/src/main/scala/kalix/javasdk/testkit/impl/EventingTestKitImpl.scala index 7953c6f032..8925a8c294 100644 --- a/sdk/java-sdk-protobuf-testkit/src/main/scala/kalix/javasdk/testkit/impl/EventingTestKitImpl.scala +++ b/sdk/java-sdk-protobuf-testkit/src/main/scala/kalix/javasdk/testkit/impl/EventingTestKitImpl.scala @@ -16,11 +16,33 @@ package kalix.javasdk.testkit.impl +import java.time +import java.util.UUID +import java.util.concurrent.ConcurrentHashMap +import java.util.{ List => JList } + +import scala.compat.java8.DurationConverters.DurationOps +import scala.compat.java8.OptionConverters.RichOptionalGeneric +import scala.concurrent.Await +import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.concurrent.duration._ +import scala.jdk.CollectionConverters.IteratorHasAsScala +import scala.jdk.CollectionConverters.SeqHasAsJava +import scala.jdk.CollectionConverters._ +import scala.language.postfixOps +import scala.util.Failure +import scala.util.Success + import akka.NotUsed +import akka.actor.ActorRef import akka.actor.ActorSystem +import akka.actor.Props import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.model.HttpResponse +import akka.pattern._ import akka.stream.BoundedSourceQueue import akka.stream.QueueOfferResult import akka.stream.scaladsl.Sink @@ -30,9 +52,8 @@ import akka.util.BoxedType import com.google.protobuf.ByteString import com.google.protobuf.GeneratedMessageV3 import com.google.protobuf.any.{ Any => ScalaPbAny } -import kalix.eventing.EventDestination -import kalix.eventing.EventDestination.Destination import kalix.eventing.EventSource +import kalix.javasdk.JsonSupport import kalix.javasdk.Metadata.{ MetadataEntry => SdkMetadataEntry } import kalix.javasdk.impl.MessageCodec import kalix.javasdk.impl.MetadataImpl @@ -45,7 +66,6 @@ import kalix.javasdk.testkit.impl.TopicImpl.DefaultTimeout import kalix.javasdk.{ Metadata => SdkMetadata } import kalix.protocol.component.Metadata import kalix.protocol.component.MetadataEntry -import kalix.javasdk.JsonSupport import kalix.testkit.protocol.eventing_test_backend.EmitSingleCommand import kalix.testkit.protocol.eventing_test_backend.EmitSingleResult import kalix.testkit.protocol.eventing_test_backend.EventStreamOutCommand @@ -59,23 +79,6 @@ import kalix.testkit.protocol.eventing_test_backend.SourceElem import org.slf4j.LoggerFactory import scalapb.GeneratedMessage -import java.time -import java.util.UUID -import java.util.concurrent.ConcurrentHashMap -import java.util.{ List => JList } -import scala.compat.java8.DurationConverters.DurationOps -import scala.compat.java8.OptionConverters.RichOptionalGeneric -import scala.concurrent.Await -import scala.concurrent.Future -import scala.concurrent.Promise -import scala.concurrent.duration._ -import scala.jdk.CollectionConverters.CollectionHasAsScala -import scala.jdk.CollectionConverters.IteratorHasAsScala -import scala.jdk.CollectionConverters.SeqHasAsJava -import scala.util.Failure -import scala.util.Success -import scala.language.postfixOps - object EventingTestKitImpl { /** @@ -154,15 +157,17 @@ final class EventingTestServiceImpl(system: ActorSystem, val host: String, var p extends EventingTestKit { private val log = LoggerFactory.getLogger(classOf[EventingTestServiceImpl]) - private implicit val sys = system - private implicit val ec = sys.dispatcher + private implicit val sys: ActorSystem = system + private implicit val ec: ExecutionContextExecutor = sys.dispatcher private val topics = new ConcurrentHashMap[String, TopicImpl]() override def getTopic(topic: String): Topic = getTopicImpl(topic) private def getTopicImpl(topic: String): TopicImpl = - topics.computeIfAbsent(topic, _ => new TopicImpl(TestProbe(), TestProbe(), codec)) + topics.computeIfAbsent( + topic, + _ => new TopicImpl(TestProbe(), system.actorOf(Props[SourcesHolder](), "topic-source-holder-" + topic), codec)) final class ServiceImpl extends EventingTestKitService { override def emitSingle(in: EmitSingleCommand): Future[EmitSingleResult] = { @@ -201,7 +206,7 @@ final class EventingTestServiceImpl(system: ActorSystem, val host: String, var p eventSource) val (queue, source) = Source.queue[SourceElem](10).preMaterialize() val runningSourceProbe = RunningSourceProbe(serviceName, eventSource)(queue, source) - getTopicImpl(eventSource.getTopic).sourceProbe.ref ! runningSourceProbe + getTopicImpl(eventSource.getTopic).addSourceProbe(runningSourceProbe) runningSourcePromise.success(runningSourceProbe) Some(runningSourceProbe) @@ -229,16 +234,21 @@ final class EventingTestServiceImpl(system: ActorSystem, val host: String, var p } } +private case class PublishedMessage(message: ByteString, metadata: SdkMetadata) + private[testkit] class TopicImpl( private[testkit] val destinationProbe: TestProbe, - private[testkit] val sourceProbe: TestProbe, + private[testkit] val sourcesHolder: ActorRef, codec: MessageCodec) extends Topic { - private lazy val brokerProbe = sourceProbe.expectMsgType[RunningSourceProbe] - private val log = LoggerFactory.getLogger(classOf[TopicImpl]) + def addSourceProbe(runningSourceProbe: RunningSourceProbe): Unit = { + val addSource = sourcesHolder.ask(SourcesHolder.AddSource(runningSourceProbe))(5.seconds) + Await.result(addSource, 10.seconds) + } + override def expectNone(): Unit = expectNone(DefaultTimeout) override def expectNone(timeout: time.Duration): Unit = destinationProbe.expectNoMessage(timeout.toScala) @@ -328,8 +338,10 @@ private[testkit] class TopicImpl( override def publish(message: ByteString): Unit = publish(message, SdkMetadata.EMPTY) - override def publish(message: ByteString, metadata: SdkMetadata): Unit = - brokerProbe.emit(message, metadata) + override def publish(message: ByteString, metadata: SdkMetadata): Unit = { + val addSource = sourcesHolder.ask(SourcesHolder.Publish(message, metadata))(5.seconds) + Await.result(addSource, 5.seconds) + } override def publish(message: TestKitMessage[_]): Unit = message.getPayload match { case javaPb: GeneratedMessageV3 => publish(javaPb.toByteString, message.getMetadata) @@ -353,7 +365,7 @@ private[testkit] class TopicImpl( } private[testkit] object TopicImpl { - val DefaultTimeout = time.Duration.ofSeconds(3) + val DefaultTimeout: time.Duration = time.Duration.ofSeconds(3) } private[testkit] case class TestKitMessageImpl[P](payload: P, metadata: SdkMetadata) extends TestKitMessage[P] { diff --git a/sdk/java-sdk-protobuf-testkit/src/main/scala/kalix/javasdk/testkit/impl/SourcesHolder.scala b/sdk/java-sdk-protobuf-testkit/src/main/scala/kalix/javasdk/testkit/impl/SourcesHolder.scala new file mode 100644 index 0000000000..8e5af4d13a --- /dev/null +++ b/sdk/java-sdk-protobuf-testkit/src/main/scala/kalix/javasdk/testkit/impl/SourcesHolder.scala @@ -0,0 +1,66 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.testkit.impl + +import scala.collection.mutable.ArrayBuffer + +import akka.actor.Actor +import akka.event.Logging +import com.google.protobuf.ByteString +import kalix.javasdk.testkit.KalixTestKit +import kalix.javasdk.testkit.impl.EventingTestKitImpl.RunningSourceProbe +import kalix.javasdk.testkit.impl.SourcesHolder.AddSource +import kalix.javasdk.testkit.impl.SourcesHolder.Publish +import kalix.javasdk.{ Metadata => SdkMetadata } +import org.slf4j.LoggerFactory + +object SourcesHolder { + + case class AddSource(runningSourceProbe: RunningSourceProbe) + case class Publish(message: ByteString, metadata: SdkMetadata) +} + +class SourcesHolder extends Actor { + + private val log = LoggerFactory.getLogger(classOf[KalixTestKit]) + + private val sources: ArrayBuffer[RunningSourceProbe] = ArrayBuffer.empty + private val publishedMessages: ArrayBuffer[PublishedMessage] = ArrayBuffer.empty + + private case class PublishedMessage(message: ByteString, metadata: SdkMetadata) + + override def receive: Receive = { + case AddSource(runningSourceProbe) => + if (publishedMessages.nonEmpty) { + log.debug( + s"Emitting ${publishedMessages.size} messages to the new source from [${runningSourceProbe.serviceName}]") + publishedMessages.foreach { msg => + runningSourceProbe.emit(msg.message, msg.metadata) + } + } + sources.addOne(runningSourceProbe) + log.debug(s"Source from [${runningSourceProbe.serviceName}] added") + sender() ! "ok" + case Publish(message, metadata) => + sources.foreach { source => + log.debug(s"Emitting message to the source from [${source.serviceName}]") + source.emit(message, metadata) + } + publishedMessages.addOne(PublishedMessage(message, metadata)) + sender() ! "ok" + } +} diff --git a/sdk/java-sdk-protobuf-testkit/src/test/scala/kalix/javasdk/testkit/impl/TopicImplSpec.scala b/sdk/java-sdk-protobuf-testkit/src/test/scala/kalix/javasdk/testkit/impl/TopicImplSpec.scala index 6309d091ed..452f9070c8 100644 --- a/sdk/java-sdk-protobuf-testkit/src/test/scala/kalix/javasdk/testkit/impl/TopicImplSpec.scala +++ b/sdk/java-sdk-protobuf-testkit/src/test/scala/kalix/javasdk/testkit/impl/TopicImplSpec.scala @@ -16,10 +16,15 @@ package kalix.javasdk.testkit.impl -import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import scala.collection.mutable +import scala.jdk.CollectionConverters.CollectionHasAsScala + +import akka.actor.ActorSystem +import akka.actor.Props import akka.stream.BoundedSourceQueue import akka.stream.QueueOfferResult import akka.stream.scaladsl.Source +import akka.testkit.TestKit import akka.testkit.TestProbe import com.google.protobuf.ByteString import kalix.eventing.EventDestination @@ -33,22 +38,27 @@ import kalix.protocol.component.MetadataEntry.Value.StringValue import kalix.testkit.protocol.eventing_test_backend.EmitSingleCommand import kalix.testkit.protocol.eventing_test_backend.Message import kalix.testkit.protocol.eventing_test_backend.SourceElem +import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterEach import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike -import scala.collection.mutable -import scala.jdk.CollectionConverters.CollectionHasAsScala - -class TopicImplSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with Matchers with BeforeAndAfterEach { +class TopicImplSpec + extends TestKit(ActorSystem("MySpec")) + with AnyWordSpecLike + with Matchers + with BeforeAndAfterEach + with BeforeAndAfterAll { private val anySupport = new AnySupport(Array(), getClass.getClassLoader) - private val outProbe = TestProbe()(system.classicSystem) - private val inProbe = TestProbe()(system.classicSystem) - private val topic = new TopicImpl(outProbe, inProbe, anySupport) + private val outProbe = TestProbe()(system) + private val topic = + new TopicImpl(outProbe, system.actorOf(Props[SourcesHolder](), "holder"), anySupport) val queue = new DummyQueue(mutable.Queue.empty) - // establishing the dummy connection for the test broker - inProbe.ref ! RunningSourceProbe("dummy-service", EventSource.defaultInstance)(queue, Source.empty[SourceElem]) + + private val runningSourceProbe: RunningSourceProbe = + RunningSourceProbe("dummy-service", EventSource.defaultInstance)(queue, Source.empty[SourceElem]) + topic.addSourceProbe(runningSourceProbe) private val textPlainHeader = MetadataEntry("Content-Type", StringValue("text/plain; charset=utf-8")) private val bytesHeader = MetadataEntry("Content-Type", StringValue("application/octet-stream")) @@ -136,6 +146,10 @@ class TopicImplSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with topic.clear() } + override def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + } + class DummyQueue(val elems: mutable.Queue[SourceElem]) extends BoundedSourceQueue[SourceElem] { override def offer(elem: SourceElem): QueueOfferResult = { elems.append(elem) diff --git a/sdk/java-sdk-spring/src/it/java/com/example/wiring/pubsub/EventingTestkitIntegrationTest.java b/sdk/java-sdk-spring/src/it/java/com/example/wiring/pubsub/EventingTestkitIntegrationTest.java index a0a4edf4f9..f239679083 100644 --- a/sdk/java-sdk-spring/src/it/java/com/example/wiring/pubsub/EventingTestkitIntegrationTest.java +++ b/sdk/java-sdk-spring/src/it/java/com/example/wiring/pubsub/EventingTestkitIntegrationTest.java @@ -19,7 +19,6 @@ import com.example.Main; import com.example.wiring.TestkitConfig; import com.example.wiring.eventsourcedentities.counter.CounterEvent.ValueIncreased; -import kalix.javasdk.client.ComponentClient; import kalix.javasdk.testkit.EventingTestKit; import kalix.javasdk.testkit.EventingTestKit.Message; import kalix.javasdk.testkit.KalixTestKit; @@ -32,6 +31,7 @@ import org.springframework.context.annotation.Import; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.TestPropertySource; +import org.springframework.web.reactive.function.client.WebClient; import java.util.concurrent.TimeUnit; @@ -51,7 +51,7 @@ public class EventingTestkitIntegrationTest { private KalixTestKit kalixTestKit; private EventingTestKit.Topic eventsTopic; @Autowired - private ComponentClient componentClient; + private WebClient webClient; @BeforeAll public void beforeAll() { @@ -62,8 +62,8 @@ public void beforeAll() { public void shouldPublishEventWithTypeNameViaEventingTestkit() { //given String subject = "test"; - ValueIncreased event1 = new ValueIncreased(123); - ValueIncreased event2 = new ValueIncreased(123); + ValueIncreased event1 = new ValueIncreased(1); + ValueIncreased event2 = new ValueIncreased(2); //when Message test = kalixTestKit.getMessageBuilder().of(event1, subject); @@ -77,7 +77,16 @@ public void shouldPublishEventWithTypeNameViaEventingTestkit() { .untilAsserted(() -> { var response = DummyCounterEventStore.get(subject); assertThat(response).containsOnly(event1, event2); - }); + var viewResponse = webClient + .get() + .uri("/counter-view-topic-sub/less-then/" + 4) + .retrieve() + .bodyToFlux(CounterView.class) + .toStream() + .toList(); + + assertThat(viewResponse).containsOnly(new CounterView("test", 3)); + }); } } diff --git a/sdk/java-sdk-spring/src/it/java/com/example/wiring/pubsub/ViewFromCounterEventsTopic.java b/sdk/java-sdk-spring/src/it/java/com/example/wiring/pubsub/ViewFromCounterEventsTopic.java index 827a4fda81..2c0b75a0fe 100644 --- a/sdk/java-sdk-spring/src/it/java/com/example/wiring/pubsub/ViewFromCounterEventsTopic.java +++ b/sdk/java-sdk-spring/src/it/java/com/example/wiring/pubsub/ViewFromCounterEventsTopic.java @@ -34,7 +34,7 @@ import static kalix.javasdk.impl.MetadataImpl.CeSubject; -@Profile("docker-it-test") +@Profile({"docker-it-test", "eventing-testkit"}) @ViewId("counter_view_topic_sub") @Table("counter_view_topic_sub") @Subscribe.Topic(COUNTER_EVENTS_TOPIC)