Skip to content

Commit

Permalink
docs: eventing testkit in protocol-first sdks (#1688)
Browse files Browse the repository at this point in the history
* docs: eventing testkit sample in protocol-first sdks

* docs(java-pb): eventing testkit sample with metadata

* docs(java-pb): eventing testkit sample with clear topics

* added diagram and refactored small bits

* Update docs/src/modules/java-protobuf/pages/actions-publishing-subscribing.adoc

Co-authored-by: Renato Cavalcanti <[email protected]>

---------

Co-authored-by: Renato Cavalcanti <[email protected]>
  • Loading branch information
efgpinto and octonato authored Jun 22, 2023
1 parent 56f8bdd commit bcd9660
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,125 @@ Referencing environment variables is done with the syntax `$\{VAR_NAME}` in the

Note that if changing the `topic` name after it has once been deployed for an event consumer means the consumer will start over from the beginning of the topic.

See https://docs.kalix.io/kalix/kalix_services_deploy.html[kalix service deploy] for details on how to set environment variables when deploying a service.
See https://docs.kalix.io/kalix/kalix_services_deploy.html[kalix service deploy] for details on how to set environment variables when deploying a service.


== Testing the Integration

When a Kalix service relies on a broker, it might be useful to use integration tests to assert that those boundaries work as intended. For such scenarios, you can either:

* Use TestKit's mocked topic:
** this offers a general API to inject messages into topics or read the messages written to another topic, regardless of the specific broker integration you have configured.
* Run an external broker instance:
** if you're interested in running your integration tests against a real instance, you need to provide the broker instance yourself by running it in a separate process in your local setup and make sure to disable the use of TestKit's test broker. Currently, **the only external broker supported in integration tests is Google PubSub Emulator.**

=== TestKit Mocked Topic

Following up on the counter entity example used above, let's consider an example (composed by 2 Actions and 1 Event Sourced entity) as pictured below:

ifdef::todo[TODO: convert this diagram once we have a standard language for this]
image::java:eventing-testkit-sample.svg[]

In this example:

* commands are consumed from an external topic `event-commands` and forwarded to a Counter entity;
* the Counter entity is an Event Sourced Entity and has its events published to another topic `counter-events`.

To test this flow, we will take advantage of the TestKit to be able to push commands into the `event-commands` topic and check what messages are produced to topic `counter-events`.

[.tabset]
Java::
+
[source,java]
.src/it/java/com/example/CounterTopicIntegrationTest.java
----
include::example$java-protobuf-eventsourced-counter/src/it/java/com/example/CounterTopicIntegrationTest.java[tag=test-topic]
----
<1> Start the TestKit, booting up both the service and its proxy.
<2> Get a mocked topic named `counter-commands` from the TestKit.
<3> Get a mocked topic named `counter-events` from the TestKit.
<4> Build 2 commands and publish both to the topic. Note the `counterId` is passed as the subject id of the message.
<5> Read 2 messages, one at a time. Note we pass in the expected class type for the next message.
<6> Assert the received messages have the same value as the commands sent.

Scala::
+
[source,scala]
.src/test/scala/com/example/CounterServiceIntegrationSpec.scala
----
include::example$scala-protobuf-eventsourced-counter/src/test/scala/com/example/CounterServiceIntegrationSpec.scala[tag=test-topic]
----
<1> Start the TestKit, booting up both the service and its proxy.
<2> Get a mocked topic named `counter-commands` from the TestKit.
<3> Get a mocked topic named `counter-events` from the TestKit.
<4> Build 2 commands and publish both to the topic. Note the `counterId` is passed as the subject id of the message.
<5> Read 2 messages, one at a time and assert the received messages values. Note we pass in the expected class type for the next message.

TIP: In the example above we take advantage of the TestKit to serialize / deserialize the messages and pass all the required metadata automatically for us. However, the API also offers the possibility to read and write raw bytes, construct your metadata or read multiple messages at once.

==== Metadata

Typically, messages are published with associated metadata. If you want to construct your own `Metadata` to be consumed by a service or make sure the messages published out of your service have specific metadata attached, you can do so using the TestKit, as shown below.

[.tabset]
Java::
+
[source,java,indent=0]
.src/it/java/com/example/CounterTopicIntegrationTest.java
----
include::example$java-protobuf-eventsourced-counter/src/it/java/com/example/CounterTopicIntegrationTest.java[tag=test-topic-metadata]
----
<1> Build a `CloudEvent` object with the 3 required attributes, respectively: `id`, `source` and `type`.
<2> Add the subject to which the message is related, that is the `counterId`.
<3> Set the mandatory header "Content-Type" accordingly.
<4> Publish the message along with its metadata to topic `commandsTopic`.
<5> Upon receiving the message, access the metadata.
<6> Assert the headers `Content-Type` and `ce-subject` (every CloudEvent header is prefixed with "ce-") have the expected values.

Scala::
+
[source,scala,indent=0]
.src/test/scala/com/example/CounterServiceIntegrationSpec.scala
----
include::example$scala-protobuf-eventsourced-counter/src/test/scala/com/example/CounterServiceIntegrationSpec.scala[tag=test-topic-metadata]
----
<1> Build a `CloudEvent` object with the 3 required attributes, respectively: `id`, `source` and `type`.
<2> Add the subject to which the message is related, that is the `counterId`.
<3> Set the mandatory header "Content-Type" accordingly.
<4> Publish the message along with its metadata to topic `commandsTopic`.
<5> Receive the message of correct type and extract `Metadata`.
<6> Assert the headers `Content-Type` and `ce-subject` (every CloudEvent header is prefixed with "ce-") have the expected values.


==== One Suite, Multiple Tests

When running multiple test cases under the same test suite and thus using a common TestKit instance, you might face some issues if unconsumed messages from previous tests mess up with the current one. To avoid this, be sure to:

- have the tests run in sequence, not in parallel;
- clear the contents of the topics in use before the test.

As an alternative, you can consider using different test suites which will use independent TestKit instances.


[.tabset]
Java::
+
[source,java,indent=0]
.src/it/java/com/example/CounterTopicIntegrationTest.java
----
include::example$java-protobuf-eventsourced-counter/src/it/java/com/example/CounterTopicIntegrationTest.java[tag=clear-topics]
----
<1> Run this before each test.
<2> Clear the topic ignoring any unread messages.

Scala::
+
[source,scala,indent=0]
.src/test/scala/com/example/CounterServiceIntegrationSpec.scala
----
include::example$scala-protobuf-eventsourced-counter/src/test/scala/com/example/CounterServiceIntegrationSpec.scala[tag=clear-topics]
----
<1> Override method from trait `BeforeAndAfterEach`.
<2> Clear the topic ignoring any unread messages.

NOTE: Despite the example, you are neither forced to clear all topics nor to do it before each test. You can do it selectively, or you might not even need it depending on your tests and the flows they test.
4 changes: 4 additions & 0 deletions docs/src/modules/java/images/eventing-testkit-sample.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ If you don't add `ignoreUnknown=true`, the action would fail when processing a `

Another possible usage for Actions is to consume events and act upon.

For example, you may consume events from one entity or from a topic, transform to commands and send to an another entity or an external system. This is similar to the usage explained in xref:actions-as-controller.adoc[Actions as Controller], except that the Action is driven by the flow of incoming events instead of external user requests.
For example, you may consume events from one entity or from a topic, transform to commands and send to another entity or an external system. This is similar to the usage explained in xref:actions-as-controller.adoc[Actions as Controller], except that the Action is driven by the flow of incoming events instead of external user requests.



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,102 @@
package com.example;

import com.example.actions.CounterTopicApi;
import kalix.javasdk.CloudEvent;
// tag::test-topic[]
import kalix.javasdk.testkit.EventingTestKit;
import kalix.javasdk.testkit.junit.KalixTestKitResource;
// ...
// end::test-topic[]
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.FixMethodOrder;
import org.junit.Test;

import java.util.concurrent.CompletionStage;
import java.net.URI;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;

// Example of an integration test calling our service via the Kalix proxy
// Run all test classes ending with "IntegrationTest" using `mvn verify -Pit`
// tag::test-topic[]

public class CounterTopicIntegrationTest {

/**
* The test kit starts both the service container and the Kalix proxy.
*/
@ClassRule
public static final KalixTestKitResource testKit =
new KalixTestKitResource(Main.createKalix());
new KalixTestKitResource(Main.createKalix()); // <1>

private static EventingTestKit.Topic commandsTopic;
private static EventingTestKit.Topic eventsTopic;
// end::test-topic[]

private static EventingTestKit.Topic eventsTopicWithMeta;

private final EventingTestKit.Topic commandsTopic;
private final EventingTestKit.Topic eventsTopic;
// tag::test-topic[]

public CounterTopicIntegrationTest() {
commandsTopic = testKit.getTopic("counter-commands");
eventsTopic = testKit.getTopic("counter-events");
commandsTopic = testKit.getTopic("counter-commands"); // <2>
eventsTopic = testKit.getTopic("counter-events"); // <3>
// end::test-topic[]
eventsTopicWithMeta = testKit.getTopic("counter-events-with-meta");
// tag::test-topic[]
}
// end::test-topic[]


// since multiple tests are using the same topics, make sure to reset them before each new test
// so unread messages from previous tests do not mess with the current one
// tag::clear-topics[]
@Before // <1>
public void clearTopics() {
commandsTopic.clear(); // <2>
eventsTopic.clear();
eventsTopicWithMeta.clear();
}
// end::clear-topics[]
// tag::test-topic[]

@Test
public void verifyCounterCommandsAndPublish() {
var counterId = "test-topic";

var increaseCmd = CounterApi.IncreaseValue.newBuilder().setCounterId(counterId).setValue(4).build();
var decreaseCmd = CounterApi.DecreaseValue.newBuilder().setCounterId(counterId).setValue(1).build();
commandsTopic.publish(increaseCmd, counterId);
commandsTopic.publish(increaseCmd, counterId); // <4>
commandsTopic.publish(decreaseCmd, counterId);

var increasedEvent = eventsTopic.expectOneTyped(CounterTopicApi.Increased.class);
var increasedEvent = eventsTopic.expectOneTyped(CounterTopicApi.Increased.class); // <5>
var decreasedEvent = eventsTopic.expectOneTyped(CounterTopicApi.Decreased.class);
assertEquals(increaseCmd.getValue(), increasedEvent.getPayload().getValue());
assertEquals(increaseCmd.getValue(), increasedEvent.getPayload().getValue()); // <6>
assertEquals(decreaseCmd.getValue(), decreasedEvent.getPayload().getValue());
}
// end::test-topic[]

// tag::test-topic-metadata[]
@Test
public void verifyCounterCommandsAndPublishWithMetadata() {
var counterId = "test-topic-metadata";
var increaseCmd = CounterApi.IncreaseValue.newBuilder().setCounterId(counterId).setValue(10).build();

var metadata = CloudEvent.of( // <1>
"cmd1",
URI.create("CounterTopicIntegrationTest"),
increaseCmd.getDescriptorForType().getFullName())
.withSubject(counterId) // <2>
.asMetadata()
.add("Content-Type", "application/protobuf"); // <3>

commandsTopic.publish(EventingTestKit.Message.of(increaseCmd, metadata)); // <4>

var increasedEvent = eventsTopicWithMeta.expectOneTyped(CounterTopicApi.Increased.class);
var actualMd = increasedEvent.getMetadata(); // <5>
assertEquals(counterId, actualMd.asCloudEvent().subject().get()); // <6>
assertEquals("application/protobuf", actualMd.get("Content-Type").get());
}
// end::test-topic-metadata[]
// tag::test-topic[]
}
// end::test-topic[]
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
</appender>

<logger name="akka" level="INFO"/>
<logger name="kalix" level="INFO"/>
<logger name="kalix" level="DEBUG"/>
<logger name="com.github.dockerjava" level="INFO"/>
<logger name="io.grpc.netty" level="INFO"/>
<logger name="org.testcontainers" level="INFO"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
</appender>

<logger name="akka" level="INFO"/>
<logger name="kalix" level="INFO"/>
<logger name="kalix" level="DEBUG"/>
<logger name="com.github.dockerjava" level="INFO"/>
<logger name="io.grpc.netty" level="INFO"/>
<logger name="org.testcontainers" level="INFO"/>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
package com.example

import com.example.actions.{Decreased, Increased}
import kalix.scalasdk.CloudEvent

import java.net.URI
// tag::test-topic[]
import kalix.scalasdk.testkit.{KalixTestKit, Message}
import org.scalatest.BeforeAndAfterEach
// ...
// end::test-topic[]
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
Expand All @@ -17,20 +24,41 @@ import scala.language.postfixOps
// As long as this file exists it will not be overwritten: you can maintain it yourself,
// or delete it so it is regenerated as needed.

class CounterServiceIntegrationSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with ScalaFutures {
// tag::test-topic[]

class CounterServiceIntegrationSpec extends AnyWordSpec with Matchers with BeforeAndAfterEach with BeforeAndAfterAll with ScalaFutures {

// end::test-topic[]
implicit private val patience: PatienceConfig =
PatienceConfig(Span(5, Seconds), Span(500, Millis))

private val testKit = KalixTestKit(Main.createKalix()).start()
// tag::test-topic[]
private val testKit = KalixTestKit(Main.createKalix()).start() // <1>
// end::test-topic[]

private val client = testKit.getGrpcClient(classOf[CounterService])

private val commandsTopic = testKit.getTopic("counter-commands")
private val eventsTopic = testKit.getTopic("counter-events")
// tag::test-topic[]
private val commandsTopic = testKit.getTopic("counter-commands") // <2>
private val eventsTopic = testKit.getTopic("counter-events") // <3>
// end::test-topic[]

private val eventsTopicWithMeta = testKit.getTopic("counter-events-with-meta")

// tag::clear-topics[]
override def beforeEach(): Unit = { // <1>
commandsTopic.clear() // <2>
eventsTopic.clear()
eventsTopicWithMeta.clear()
}
// end::clear-topics[]


// tag::test-topic[]

"CounterService" must {
val counterId = "xyz"
// end::test-topic[]

"handle side effect that adds the initial input multiplied by two and verify publishing" in {

Expand All @@ -57,27 +85,49 @@ class CounterServiceIntegrationSpec extends AnyWordSpec with Matchers with Befor
counter.value shouldBe 15

// verify message published to topic
val msg: Message[Decreased] = eventsTopic.expectOneTyped
val Message(payload, md) = msg
payload shouldBe Decreased(15)
val Message(decEvent, md): Message[Decreased] = eventsTopic.expectOneTyped
decEvent shouldBe Decreased(15)
md.get("ce-type") should contain(classOf[Decreased].getName)
md.get("Content-Type") should contain("application/protobuf")
}

"handle commands from topic and verify publishing" in {
commandsTopic.publish(IncreaseValue("abc", 4), "abc")
commandsTopic.publish(DecreaseValue("abc", 1), "abc")
// tag::test-topic[]
"handle commands from topic and publishing related events out" in {
commandsTopic.publish(IncreaseValue(counterId, 4), counterId) // <4>
commandsTopic.publish(DecreaseValue(counterId, 1), counterId)

val increaseEvent: Message[Increased] = eventsTopic.expectOneTyped
val decreaseEvent: Message[Decreased] = eventsTopic.expectOneTyped
increaseEvent.payload.value shouldBe 4
decreaseEvent.payload.value shouldBe 1
val Message(incEvent, _): Message[Increased] = eventsTopic.expectOneTyped // <5>
val Message(decEvent, _): Message[Decreased] = eventsTopic.expectOneTyped
incEvent shouldBe Increased(4) // <6>
decEvent shouldBe Decreased(1)
}

// end::test-topic[]

// tag::test-topic-metadata[]
"allow passing and reading metadata for messages" in {
val increaseCmd = IncreaseValue(counterId, 4)
val md = CloudEvent( // <1>
id = "cmd1",
source = URI.create("CounterServiceIntegrationSpec"),
`type` = increaseCmd.companion.javaDescriptor.getFullName)
.withSubject(counterId) // <2>
.asMetadata
.add("Content-Type", "application/protobuf"); // <3>

commandsTopic.publish(Message(increaseCmd, md)) // <4>

val Message(incEvent, actualMd): Message[Increased] = eventsTopicWithMeta.expectOneTyped // <5>
incEvent shouldBe Increased(4)
actualMd.get("Content-Type") should contain("application/protobuf") // <6>
actualMd.asCloudEvent.subject should contain(counterId)
}
// end::test-topic-metadata[]
// tag::test-topic[]
}

override def afterAll(): Unit = {
testKit.stop()
super.afterAll()
}
}
}
// end::test-topic[]
Loading

0 comments on commit bcd9660

Please sign in to comment.