Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: eventing testkit - supporting multiple subscription to the same … #1778

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,33 @@

package kalix.javasdk.testkit.impl

import java.time
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.{ List => JList }

import scala.compat.java8.DurationConverters.DurationOps
import scala.compat.java8.OptionConverters.RichOptionalGeneric
import scala.concurrent.Await
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.jdk.CollectionConverters.IteratorHasAsScala
import scala.jdk.CollectionConverters.SeqHasAsJava
import scala.jdk.CollectionConverters._
import scala.language.postfixOps
import scala.util.Failure
import scala.util.Success

import akka.NotUsed
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.pattern._
import akka.stream.BoundedSourceQueue
import akka.stream.QueueOfferResult
import akka.stream.scaladsl.Sink
Expand All @@ -30,9 +52,8 @@ import akka.util.BoxedType
import com.google.protobuf.ByteString
import com.google.protobuf.GeneratedMessageV3
import com.google.protobuf.any.{ Any => ScalaPbAny }
import kalix.eventing.EventDestination
import kalix.eventing.EventDestination.Destination
import kalix.eventing.EventSource
import kalix.javasdk.JsonSupport
import kalix.javasdk.Metadata.{ MetadataEntry => SdkMetadataEntry }
import kalix.javasdk.impl.MessageCodec
import kalix.javasdk.impl.MetadataImpl
Expand All @@ -45,7 +66,6 @@ import kalix.javasdk.testkit.impl.TopicImpl.DefaultTimeout
import kalix.javasdk.{ Metadata => SdkMetadata }
import kalix.protocol.component.Metadata
import kalix.protocol.component.MetadataEntry
import kalix.javasdk.JsonSupport
import kalix.testkit.protocol.eventing_test_backend.EmitSingleCommand
import kalix.testkit.protocol.eventing_test_backend.EmitSingleResult
import kalix.testkit.protocol.eventing_test_backend.EventStreamOutCommand
Expand All @@ -59,23 +79,6 @@ import kalix.testkit.protocol.eventing_test_backend.SourceElem
import org.slf4j.LoggerFactory
import scalapb.GeneratedMessage

import java.time
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.{ List => JList }
import scala.compat.java8.DurationConverters.DurationOps
import scala.compat.java8.OptionConverters.RichOptionalGeneric
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.jdk.CollectionConverters.IteratorHasAsScala
import scala.jdk.CollectionConverters.SeqHasAsJava
import scala.util.Failure
import scala.util.Success
import scala.language.postfixOps

object EventingTestKitImpl {

/**
Expand Down Expand Up @@ -154,15 +157,17 @@ final class EventingTestServiceImpl(system: ActorSystem, val host: String, var p
extends EventingTestKit {

private val log = LoggerFactory.getLogger(classOf[EventingTestServiceImpl])
private implicit val sys = system
private implicit val ec = sys.dispatcher
private implicit val sys: ActorSystem = system
private implicit val ec: ExecutionContextExecutor = sys.dispatcher

private val topics = new ConcurrentHashMap[String, TopicImpl]()

override def getTopic(topic: String): Topic = getTopicImpl(topic)

private def getTopicImpl(topic: String): TopicImpl =
topics.computeIfAbsent(topic, _ => new TopicImpl(TestProbe(), TestProbe(), codec))
topics.computeIfAbsent(
topic,
_ => new TopicImpl(TestProbe(), system.actorOf(Props[SourcesHolder](), "topic-source-holder-" + topic), codec))

final class ServiceImpl extends EventingTestKitService {
override def emitSingle(in: EmitSingleCommand): Future[EmitSingleResult] = {
Expand Down Expand Up @@ -201,7 +206,7 @@ final class EventingTestServiceImpl(system: ActorSystem, val host: String, var p
eventSource)
val (queue, source) = Source.queue[SourceElem](10).preMaterialize()
val runningSourceProbe = RunningSourceProbe(serviceName, eventSource)(queue, source)
getTopicImpl(eventSource.getTopic).sourceProbe.ref ! runningSourceProbe
getTopicImpl(eventSource.getTopic).addSourceProbe(runningSourceProbe)
runningSourcePromise.success(runningSourceProbe)
Some(runningSourceProbe)

Expand Down Expand Up @@ -229,16 +234,21 @@ final class EventingTestServiceImpl(system: ActorSystem, val host: String, var p
}
}

private case class PublishedMessage(message: ByteString, metadata: SdkMetadata)

private[testkit] class TopicImpl(
private[testkit] val destinationProbe: TestProbe,
private[testkit] val sourceProbe: TestProbe,
private[testkit] val sourcesHolder: ActorRef,
codec: MessageCodec)
extends Topic {

private lazy val brokerProbe = sourceProbe.expectMsgType[RunningSourceProbe]

private val log = LoggerFactory.getLogger(classOf[TopicImpl])

def addSourceProbe(runningSourceProbe: RunningSourceProbe): Unit = {
val addSource = sourcesHolder.ask(SourcesHolder.AddSource(runningSourceProbe))(5.seconds)
Await.result(addSource, 10.seconds)
}

override def expectNone(): Unit = expectNone(DefaultTimeout)

override def expectNone(timeout: time.Duration): Unit = destinationProbe.expectNoMessage(timeout.toScala)
Expand Down Expand Up @@ -328,8 +338,10 @@ private[testkit] class TopicImpl(
override def publish(message: ByteString): Unit =
publish(message, SdkMetadata.EMPTY)

override def publish(message: ByteString, metadata: SdkMetadata): Unit =
brokerProbe.emit(message, metadata)
override def publish(message: ByteString, metadata: SdkMetadata): Unit = {
val addSource = sourcesHolder.ask(SourcesHolder.Publish(message, metadata))(5.seconds)
Await.result(addSource, 5.seconds)
}

override def publish(message: TestKitMessage[_]): Unit = message.getPayload match {
case javaPb: GeneratedMessageV3 => publish(javaPb.toByteString, message.getMetadata)
Expand All @@ -353,7 +365,7 @@ private[testkit] class TopicImpl(
}

private[testkit] object TopicImpl {
val DefaultTimeout = time.Duration.ofSeconds(3)
val DefaultTimeout: time.Duration = time.Duration.ofSeconds(3)
}

private[testkit] case class TestKitMessageImpl[P](payload: P, metadata: SdkMetadata) extends TestKitMessage[P] {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2021 Lightbend Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kalix.javasdk.testkit.impl

import scala.collection.mutable.ArrayBuffer

import akka.actor.Actor
import akka.event.Logging
import com.google.protobuf.ByteString
import kalix.javasdk.testkit.KalixTestKit
import kalix.javasdk.testkit.impl.EventingTestKitImpl.RunningSourceProbe
import kalix.javasdk.testkit.impl.SourcesHolder.AddSource
import kalix.javasdk.testkit.impl.SourcesHolder.Publish
import kalix.javasdk.{ Metadata => SdkMetadata }
import org.slf4j.LoggerFactory

object SourcesHolder {

case class AddSource(runningSourceProbe: RunningSourceProbe)
case class Publish(message: ByteString, metadata: SdkMetadata)
}

class SourcesHolder extends Actor {

private val log = LoggerFactory.getLogger(classOf[KalixTestKit])

private val sources: ArrayBuffer[RunningSourceProbe] = ArrayBuffer.empty
private val publishedMessages: ArrayBuffer[PublishedMessage] = ArrayBuffer.empty

private case class PublishedMessage(message: ByteString, metadata: SdkMetadata)

override def receive: Receive = {
case AddSource(runningSourceProbe) =>
if (publishedMessages.nonEmpty) {
log.debug(
s"Emitting ${publishedMessages.size} messages to the new source from [${runningSourceProbe.serviceName}]")
publishedMessages.foreach { msg =>
runningSourceProbe.emit(msg.message, msg.metadata)
}
}
sources.addOne(runningSourceProbe)
log.debug(s"Source from [${runningSourceProbe.serviceName}] added")
sender() ! "ok"
case Publish(message, metadata) =>
sources.foreach { source =>
log.debug(s"Emitting message to the source from [${source.serviceName}]")
source.emit(message, metadata)
}
publishedMessages.addOne(PublishedMessage(message, metadata))
sender() ! "ok"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@

package kalix.javasdk.testkit.impl

import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import scala.collection.mutable
import scala.jdk.CollectionConverters.CollectionHasAsScala

import akka.actor.ActorSystem
import akka.actor.Props
import akka.stream.BoundedSourceQueue
import akka.stream.QueueOfferResult
import akka.stream.scaladsl.Source
import akka.testkit.TestKit
import akka.testkit.TestProbe
import com.google.protobuf.ByteString
import kalix.eventing.EventDestination
Expand All @@ -33,22 +38,27 @@ import kalix.protocol.component.MetadataEntry.Value.StringValue
import kalix.testkit.protocol.eventing_test_backend.EmitSingleCommand
import kalix.testkit.protocol.eventing_test_backend.Message
import kalix.testkit.protocol.eventing_test_backend.SourceElem
import org.scalatest.BeforeAndAfterAll
import org.scalatest.BeforeAndAfterEach
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

import scala.collection.mutable
import scala.jdk.CollectionConverters.CollectionHasAsScala

class TopicImplSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with Matchers with BeforeAndAfterEach {
class TopicImplSpec
extends TestKit(ActorSystem("MySpec"))
with AnyWordSpecLike
with Matchers
with BeforeAndAfterEach
with BeforeAndAfterAll {

private val anySupport = new AnySupport(Array(), getClass.getClassLoader)
private val outProbe = TestProbe()(system.classicSystem)
private val inProbe = TestProbe()(system.classicSystem)
private val topic = new TopicImpl(outProbe, inProbe, anySupport)
private val outProbe = TestProbe()(system)
private val topic =
new TopicImpl(outProbe, system.actorOf(Props[SourcesHolder](), "holder"), anySupport)
val queue = new DummyQueue(mutable.Queue.empty)
// establishing the dummy connection for the test broker
inProbe.ref ! RunningSourceProbe("dummy-service", EventSource.defaultInstance)(queue, Source.empty[SourceElem])

private val runningSourceProbe: RunningSourceProbe =
RunningSourceProbe("dummy-service", EventSource.defaultInstance)(queue, Source.empty[SourceElem])
topic.addSourceProbe(runningSourceProbe)

private val textPlainHeader = MetadataEntry("Content-Type", StringValue("text/plain; charset=utf-8"))
private val bytesHeader = MetadataEntry("Content-Type", StringValue("application/octet-stream"))
Expand Down Expand Up @@ -136,6 +146,10 @@ class TopicImplSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
topic.clear()
}

override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}

class DummyQueue(val elems: mutable.Queue[SourceElem]) extends BoundedSourceQueue[SourceElem] {
override def offer(elem: SourceElem): QueueOfferResult = {
elems.append(elem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.example.Main;
import com.example.wiring.TestkitConfig;
import com.example.wiring.eventsourcedentities.counter.CounterEvent.ValueIncreased;
import kalix.javasdk.client.ComponentClient;
import kalix.javasdk.testkit.EventingTestKit;
import kalix.javasdk.testkit.EventingTestKit.Message;
import kalix.javasdk.testkit.KalixTestKit;
Expand All @@ -32,6 +31,7 @@
import org.springframework.context.annotation.Import;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.TestPropertySource;
import org.springframework.web.reactive.function.client.WebClient;

import java.util.concurrent.TimeUnit;

Expand All @@ -51,7 +51,7 @@ public class EventingTestkitIntegrationTest {
private KalixTestKit kalixTestKit;
private EventingTestKit.Topic eventsTopic;
@Autowired
private ComponentClient componentClient;
private WebClient webClient;

@BeforeAll
public void beforeAll() {
Expand All @@ -62,8 +62,8 @@ public void beforeAll() {
public void shouldPublishEventWithTypeNameViaEventingTestkit() {
//given
String subject = "test";
ValueIncreased event1 = new ValueIncreased(123);
ValueIncreased event2 = new ValueIncreased(123);
ValueIncreased event1 = new ValueIncreased(1);
ValueIncreased event2 = new ValueIncreased(2);

//when
Message<ValueIncreased> test = kalixTestKit.getMessageBuilder().of(event1, subject);
Expand All @@ -77,7 +77,16 @@ public void shouldPublishEventWithTypeNameViaEventingTestkit() {
.untilAsserted(() -> {
var response = DummyCounterEventStore.get(subject);
assertThat(response).containsOnly(event1, event2);
});

var viewResponse = webClient
.get()
.uri("/counter-view-topic-sub/less-then/" + 4)
.retrieve()
.bodyToFlux(CounterView.class)
.toStream()
.toList();

assertThat(viewResponse).containsOnly(new CounterView("test", 3));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import static kalix.javasdk.impl.MetadataImpl.CeSubject;


@Profile("docker-it-test")
@Profile({"docker-it-test", "eventing-testkit"})
@ViewId("counter_view_topic_sub")
@Table("counter_view_topic_sub")
@Subscribe.Topic(COUNTER_EVENTS_TOPIC)
Expand Down