From 461cb1899f2d74cef9bf359c2fb92f5e16b3e9a4 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Mon, 13 May 2019 15:13:27 -0500 Subject: [PATCH] MINOR: port more tests to TopologyTestDriver (#248) Continue the work of #242 , #245 , and #246 by porting the rest of the integration tests that don't use Schema Registry. The Schema Registry tests will get ported in a later PR. --- .../algebird/ProbabilisticCounter.scala | 4 +- ...entDeduplicationLambdaIntegrationTest.java | 141 ++++++++--------- .../streams/FanoutLambdaIntegrationTest.java | 119 ++++++-------- ...gCorruptedInputRecordsIntegrationTest.java | 142 ++++++++--------- .../MapFunctionLambdaIntegrationTest.java | 84 ++++------ .../MixAndMatchLambdaIntegrationTest.java | 103 ++++++------ .../streams/PassThroughIntegrationTest.java | 51 +++--- .../StateStoresInTheDSLIntegrationTest.java | 47 +++--- .../StreamToStreamJoinIntegrationTest.java | 101 ++++++------ .../StreamToTableJoinIntegrationTest.java | 147 +++++++++--------- .../streams/SumLambdaIntegrationTest.java | 54 +++---- .../TableToTableJoinIntegrationTest.java | 126 ++++++++------- ...rCountsPerRegionLambdaIntegrationTest.java | 54 ++++--- ...teractiveQueriesLambdaIntegrationTest.java | 108 ++++++------- .../WordCountLambdaIntegrationTest.java | 132 +++++++--------- ...bilisticCountingScalaIntegrationTest.scala | 106 +++++-------- ...treamToTableJoinScalaIntegrationTest.scala | 124 ++++++++------- .../WordCountScalaIntegrationTest.scala | 90 ++++------- 18 files changed, 786 insertions(+), 947 deletions(-) diff --git a/src/main/scala/io/confluent/examples/streams/algebird/ProbabilisticCounter.scala b/src/main/scala/io/confluent/examples/streams/algebird/ProbabilisticCounter.scala index d75705965a..b22ae349b0 100644 --- a/src/main/scala/io/confluent/examples/streams/algebird/ProbabilisticCounter.scala +++ b/src/main/scala/io/confluent/examples/streams/algebird/ProbabilisticCounter.scala @@ -9,7 +9,7 @@ import org.apache.kafka.streams.processor.ProcessorContext * estimate. */ class ProbabilisticCounter(val cmsStoreName: String) - extends Transformer[Array[Byte], String, KeyValue[String, Long]] { + extends Transformer[String, String, KeyValue[String, Long]] { private var cmsState: CMSStore[String] = _ private var processorContext: ProcessorContext = _ @@ -19,7 +19,7 @@ class ProbabilisticCounter(val cmsStoreName: String) cmsState = this.processorContext.getStateStore(cmsStoreName).asInstanceOf[CMSStore[String]] } - override def transform(key: Array[Byte], value: String): KeyValue[String, Long] = { + override def transform(key: String, value: String): KeyValue[String, Long] = { // Count the record value, think: "+ 1" cmsState.put(value, this.processorContext.timestamp()) diff --git a/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java b/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java index 346748efc6..9006710db6 100644 --- a/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java @@ -15,18 +15,13 @@ */ package io.confluent.examples.streams; -import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Transformer; @@ -36,8 +31,6 @@ import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.test.TestUtils; -import org.junit.BeforeClass; -import org.junit.ClassRule; import org.junit.Test; import java.util.Arrays; @@ -45,56 +38,45 @@ import java.util.Properties; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; /** * End-to-end integration test that demonstrates how to remove duplicate records from an input * stream. - * + *

* Here, a stateful {@link org.apache.kafka.streams.kstream.Transformer} (from the Processor API) * detects and discards duplicate input records based on an "event id" that is embedded in each * input record. This transformer is then included in a topology defined via the DSL. - * + *

* In this simplified example, the values of input records represent the event ID by which * duplicates will be detected. In practice, record values would typically be a more complex data * structure, with perhaps one of the fields being such an event ID. De-duplication by an event ID * is but one example of how to perform de-duplication in general. The code example below can be * adapted to other de-duplication approaches. - * + *

* IMPORTANT: Kafka including its Streams API support exactly-once semantics since version 0.11. * With this feature available, most use cases will no longer need to worry about duplicate messages * or duplicate processing. That said, there will still be some use cases where you have your own * business rules that define when two events are considered to be "the same" and need to be * de-duplicated (e.g. two events having the same payload but different timestamps). The example * below demonstrates how to implement your own business rules for event de-duplication. - * + *

* Note: This example uses lambda expressions and thus works with Java 8+ only. */ public class EventDeduplicationLambdaIntegrationTest { - @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); - - private static String inputTopic = "inputTopic"; - private static String outputTopic = "outputTopic"; - - private static String storeName = "eventId-store"; - - @BeforeClass - public static void startKafkaCluster() throws Exception { - CLUSTER.createTopic(inputTopic); - CLUSTER.createTopic(outputTopic); - } + private static final String storeName = "eventId-store"; /** * Discards duplicate records from the input stream. - * + *

* Duplicate records are detected based on an event ID; in this simplified example, the record * value is the event ID. The transformer remembers known event IDs in an associated window state * store, which automatically purges/expires event IDs from the store after a certain amount of * time has passed to prevent the store from growing indefinitely. - * + *

* Note: This code is for demonstration purposes and was not tested for production usage. */ private static class DeduplicationTransformer implements Transformer> { @@ -118,9 +100,9 @@ private static class DeduplicationTransformer implements Transformer idExtractor) { if (maintainDurationPerEventInMs < 1) { @@ -158,9 +140,9 @@ public KeyValue transform(final K key, final V value) { private boolean isDuplicate(final E eventId) { final long eventTime = context.timestamp(); final WindowStoreIterator timeIterator = eventIdStore.fetch( - eventId, - eventTime - leftDurationMs, - eventTime + rightDurationMs); + eventId, + eventTime - leftDurationMs, + eventTime + rightDurationMs); final boolean isDuplicate = timeIterator.hasNext(); timeIterator.close(); return isDuplicate; @@ -189,12 +171,12 @@ public void close() { } @Test - public void shouldRemoveDuplicatesFromTheInput() throws Exception { + public void shouldRemoveDuplicatesFromTheInput() { final String firstId = UUID.randomUUID().toString(); // e.g. "4ff3cb44-abcb-46e3-8f9a-afb7cc74fbb8" final String secondId = UUID.randomUUID().toString(); final String thirdId = UUID.randomUUID().toString(); final List inputValues = Arrays.asList(firstId, secondId, firstId, firstId, secondId, thirdId, - thirdId, firstId, secondId); + thirdId, firstId, secondId); final List expectedValues = Arrays.asList(firstId, secondId, thirdId); // @@ -204,13 +186,9 @@ public void shouldRemoveDuplicatesFromTheInput() throws Exception { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "deduplication-lambda-integration-test"); - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - // The commit interval for flushing records to state stores and downstream must be lower than - // this integration test's timeout (30 secs) to ensure we observe the expected processing results. - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, TimeUnit.SECONDS.toMillis(10)); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Use a temporary directory for storing state, which will be automatically removed after the test. streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); @@ -232,54 +210,57 @@ public void shouldRemoveDuplicatesFromTheInput() throws Exception { final long retentionPeriod = maintainDurationPerEventInMs; final StoreBuilder> dedupStoreBuilder = Stores.windowStoreBuilder( - Stores.persistentWindowStore(storeName, - retentionPeriod, - numberOfSegments, - maintainDurationPerEventInMs, - false - ), - Serdes.String(), - Serdes.Long()); + Stores.persistentWindowStore(storeName, + retentionPeriod, + numberOfSegments, + maintainDurationPerEventInMs, + false + ), + Serdes.String(), + Serdes.Long()); builder.addStateStore(dedupStoreBuilder); + final String inputTopic = "inputTopic"; + final String outputTopic = "outputTopic"; + final KStream input = builder.stream(inputTopic); final KStream deduplicated = input.transform( - // In this example, we assume that the record value as-is represents a unique event ID by - // which we can perform de-duplication. If your records are different, adapt the extractor - // function as needed. - () -> new DeduplicationTransformer<>(maintainDurationPerEventInMs, (key, value) -> value), - storeName); + // In this example, we assume that the record value as-is represents a unique event ID by + // which we can perform de-duplication. If your records are different, adapt the extractor + // function as needed. + () -> new DeduplicationTransformer<>(maintainDurationPerEventInMs, (key, value) -> value), + storeName); deduplicated.to(outputTopic); - final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); - streams.start(); - - // - // Step 2: Produce some input data to the input topic. - // - final Properties producerConfig = new Properties(); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputValues, producerConfig); - - // - // Step 3: Verify the application's output data. - // - final Properties consumerConfig = new Properties(); - consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "deduplication-integration-test-standard-consumer"); - consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - final List actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig, - outputTopic, expectedValues.size()); - streams.close(); - assertThat(actualValues).containsExactlyElementsOf(expectedValues); + final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration); + + try { + // + // Step 2: Produce some input data to the input topic. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + inputTopic, + inputValues.stream().map(v -> new KeyValue<>(null, v)).collect(Collectors.toList()), + topologyTestDriver, + new IntegrationTestUtils.NothingSerde<>(), + new StringSerializer() + ); + + // + // Step 3: Verify the application's output data. + // + final List actualValues = IntegrationTestUtils.drainStreamOutput( + outputTopic, + topologyTestDriver, + new IntegrationTestUtils.NothingSerde<>(), + new StringDeserializer() + ).stream().map(kv -> kv.value).collect(Collectors.toList()); + assertThat(actualValues).containsExactlyElementsOf(expectedValues); + } finally { + topologyTestDriver.close(); + } } } diff --git a/src/test/java/io/confluent/examples/streams/FanoutLambdaIntegrationTest.java b/src/test/java/io/confluent/examples/streams/FanoutLambdaIntegrationTest.java index 4c780f2951..63786f5c29 100644 --- a/src/test/java/io/confluent/examples/streams/FanoutLambdaIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/FanoutLambdaIntegrationTest.java @@ -15,20 +15,15 @@ */ package io.confluent.examples.streams; -import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.KStream; -import org.junit.BeforeClass; -import org.junit.ClassRule; +import org.apache.kafka.streams.kstream.ValueMapper; import org.junit.Test; import java.util.Arrays; @@ -40,7 +35,7 @@ /** * End-to-end integration test that demonstrates "fan-out", using an embedded Kafka cluster. - * + *

* This example shows how you can read from one input topic/stream, transform the data (here: * trivially) in two different ways via two intermediate streams, and then write the respective * results to two output topics. @@ -56,27 +51,13 @@ * * } * - * + *

* Note: This example uses lambda expressions and thus works with Java 8+ only. */ public class FanoutLambdaIntegrationTest { - @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); - - private static String inputTopicA = "A"; - private static String outputTopicB = "B"; - private static String outputTopicC = "C"; - - @BeforeClass - public static void startKafkaCluster() { - CLUSTER.createTopic(inputTopicA); - CLUSTER.createTopic(outputTopicB); - CLUSTER.createTopic(outputTopicC); - } - @Test - public void shouldFanoutTheInput() throws Exception { + public void shouldFanoutTheInput() { final List inputValues = Arrays.asList("Hello", "World"); final List expectedValuesForB = inputValues.stream().map(String::toUpperCase).collect(Collectors.toList()); final List expectedValuesForC = inputValues.stream().map(String::toLowerCase).collect(Collectors.toList()); @@ -88,56 +69,56 @@ public void shouldFanoutTheInput() throws Exception { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fanout-lambda-integration-test"); - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + final String inputTopicA = "A"; + final String outputTopicB = "B"; + final String outputTopicC = "C"; final KStream stream1 = builder.stream(inputTopicA); - final KStream stream2 = stream1.mapValues((v -> v.toUpperCase())); - final KStream stream3 = stream1.mapValues(v -> v.toLowerCase()); + final KStream stream2 = stream1.mapValues(s -> s.toUpperCase()); + final KStream stream3 = stream1.mapValues(s -> s.toLowerCase()); stream2.to(outputTopicB); stream3.to(outputTopicC); - final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); - streams.start(); - - // - // Step 2: Produce some input data to the input topic. - // - final Properties producerConfig = new Properties(); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - IntegrationTestUtils.produceValuesSynchronously(inputTopicA, inputValues, producerConfig); - - // - // Step 3: Verify the application's output data. - // - - // Verify output topic B - final Properties consumerConfigB = new Properties(); - consumerConfigB.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - consumerConfigB.put(ConsumerConfig.GROUP_ID_CONFIG, "fanout-lambda-integration-test-standard-consumer-topicB"); - consumerConfigB.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerConfigB.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - consumerConfigB.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - final List actualValuesForB = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigB, - outputTopicB, inputValues.size()); - assertThat(actualValuesForB).isEqualTo(expectedValuesForB); - - // Verify output topic C - final Properties consumerConfigC = new Properties(); - consumerConfigC.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - consumerConfigC.put(ConsumerConfig.GROUP_ID_CONFIG, "fanout-lambda-integration-test-standard-consumer-topicC"); - consumerConfigC.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerConfigC.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - consumerConfigC.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - final List actualValuesForC = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigC, - outputTopicC, inputValues.size()); - streams.close(); - assertThat(actualValuesForC).isEqualTo(expectedValuesForC); + final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration); + + try { + // + // Step 2: Produce some input data to the input topic. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + inputTopicA, + inputValues.stream().map(v -> new KeyValue<>(null, v)).collect(Collectors.toList()), + topologyTestDriver, + new IntegrationTestUtils.NothingSerde<>(), + new StringSerializer() + ); + + // + // Step 3: Verify the application's output data. + // + + // Verify output topic B + final List actualValuesForB = IntegrationTestUtils.drainStreamOutput( + outputTopicB, + topologyTestDriver, + new IntegrationTestUtils.NothingSerde<>(), + new StringDeserializer() + ).stream().map(kv -> kv.value).collect(Collectors.toList()); + assertThat(actualValuesForB).isEqualTo(expectedValuesForB); + + // Verify output topic C + final List actualValuesForC = IntegrationTestUtils.drainStreamOutput( + outputTopicC, + topologyTestDriver, + new IntegrationTestUtils.NothingSerde<>(), + new StringDeserializer() + ).stream().map(kv -> kv.value).collect(Collectors.toList()); + assertThat(actualValuesForC).isEqualTo(expectedValuesForC); + } finally { + topologyTestDriver.close(); + } } } \ No newline at end of file diff --git a/src/test/java/io/confluent/examples/streams/HandlingCorruptedInputRecordsIntegrationTest.java b/src/test/java/io/confluent/examples/streams/HandlingCorruptedInputRecordsIntegrationTest.java index 8b2ac7efb2..cf638aaa5f 100644 --- a/src/test/java/io/confluent/examples/streams/HandlingCorruptedInputRecordsIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/HandlingCorruptedInputRecordsIntegrationTest.java @@ -15,25 +15,18 @@ */ package io.confluent.examples.streams; -import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; -import org.junit.BeforeClass; -import org.junit.ClassRule; import org.junit.Test; import java.util.Arrays; @@ -48,27 +41,15 @@ * End-to-end integration test that demonstrates how to handle corrupt input records (think: poison * pill messages) in a Kafka topic, which would normally lead to application failures due to * (de)serialization exceptions. - * + *

* In this example we choose to ignore/skip corrupted input records. We describe further options at * http://docs.confluent.io/current/streams/faq.html, e.g. sending corrupted records to a quarantine * topic (think: dead letter queue). - * + *

* Note: This example uses lambda expressions and thus works with Java 8+ only. */ public class HandlingCorruptedInputRecordsIntegrationTest { - @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); - - private static String inputTopic = "inputTopic"; - private static String outputTopic = "outputTopic"; - - @BeforeClass - public static void startKafkaCluster() throws Exception { - CLUSTER.createTopic(inputTopic); - CLUSTER.createTopic(outputTopic); - } - @Test public void shouldIgnoreCorruptInputRecords() throws Exception { final List inputValues = Arrays.asList(1L, 2L, 3L); @@ -81,79 +62,80 @@ public void shouldIgnoreCorruptInputRecords() throws Exception { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "failure-handling-integration-test"); - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); final Serde stringSerde = Serdes.String(); final Serde longSerde = Serdes.Long(); + final String inputTopic = "inputTopic"; + final String outputTopic = "outputTopic"; + final KStream input = builder.stream(inputTopic); // Note how the returned stream is of type `KStream`. final KStream doubled = input.flatMap( - (k, v) -> { - try { - // Attempt deserialization - final String key = stringSerde.deserializer().deserialize("input-topic", k); - final long value = longSerde.deserializer().deserialize("input-topic", v); - - // Ok, the record is valid (not corrupted). Let's take the - // opportunity to also process the record in some way so that - // we haven't paid the deserialization cost just for "poison pill" - // checking. - return Collections.singletonList(KeyValue.pair(key, 2 * value)); - } catch (final SerializationException e) { - // Ignore/skip the corrupted record by catching the exception. - // Optionally, we can log the fact that we did so: - System.err.println("Could not deserialize record: " + e.getMessage()); - } - return Collections.emptyList(); + (k, v) -> { + try { + // Attempt deserialization + final String key = stringSerde.deserializer().deserialize("input-topic", k); + final long value = longSerde.deserializer().deserialize("input-topic", v); + + // Ok, the record is valid (not corrupted). Let's take the + // opportunity to also process the record in some way so that + // we haven't paid the deserialization cost just for "poison pill" + // checking. + return Collections.singletonList(KeyValue.pair(key, 2 * value)); + } catch (final SerializationException e) { + // Ignore/skip the corrupted record by catching the exception. + // Optionally, we can log the fact that we did so: + System.err.println("Could not deserialize record: " + e.getMessage()); } + return Collections.emptyList(); + } ); // Write the processing results (which was generated from valid records only) to Kafka. doubled.to(outputTopic, Produced.with(stringSerde, longSerde)); - final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); - streams.start(); - - // - // Step 2: Produce some corrupt input data to the input topic. - // - final Properties producerConfigForCorruptRecords = new Properties(); - producerConfigForCorruptRecords.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - producerConfigForCorruptRecords.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfigForCorruptRecords.put(ProducerConfig.RETRIES_CONFIG, 0); - producerConfigForCorruptRecords.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - producerConfigForCorruptRecords.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - IntegrationTestUtils.produceValuesSynchronously(inputTopic, - Collections.singletonList("corrupt"), producerConfigForCorruptRecords); - - // - // Step 3: Produce some (valid) input data to the input topic. - // - final Properties producerConfig = new Properties(); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class); - IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputValues, producerConfig); - - // - // Step 4: Verify the application's output data. - // - final Properties consumerConfig = new Properties(); - consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "map-function-lambda-integration-test-standard-consumer"); - consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - final List actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig, - outputTopic, expectedValues.size()); - streams.close(); - assertThat(actualValues).isEqualTo(expectedValues); + final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration); + + try { + // + // Step 2: Produce some corrupt input data to the input topic. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + inputTopic, + Collections.singletonList(new KeyValue<>(null, "corrupt")), + topologyTestDriver, + new IntegrationTestUtils.NothingSerde<>(), + new StringSerializer() + ); + + // + // Step 3: Produce some (valid) input data to the input topic. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + inputTopic, + inputValues.stream().map(v -> new KeyValue<>(null, v)).collect(Collectors.toList()), + topologyTestDriver, + new IntegrationTestUtils.NothingSerde<>(), + new LongSerializer() + ); + + // + // Step 4: Verify the application's output data. + // + final List actualValues = IntegrationTestUtils.drainStreamOutput( + outputTopic, + topologyTestDriver, + new IntegrationTestUtils.NothingSerde<>(), + new LongDeserializer() + ).stream().map(kv -> kv.value).collect(Collectors.toList()); + assertThat(actualValues).isEqualTo(expectedValues); + } finally { + topologyTestDriver.close(); + } } } diff --git a/src/test/java/io/confluent/examples/streams/MapFunctionLambdaIntegrationTest.java b/src/test/java/io/confluent/examples/streams/MapFunctionLambdaIntegrationTest.java index acf1b66350..63a944c013 100644 --- a/src/test/java/io/confluent/examples/streams/MapFunctionLambdaIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/MapFunctionLambdaIntegrationTest.java @@ -15,20 +15,15 @@ */ package io.confluent.examples.streams; -import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.KStream; -import org.junit.BeforeClass; -import org.junit.ClassRule; +import org.apache.kafka.streams.kstream.ValueMapper; import org.junit.Test; import java.util.Arrays; @@ -40,25 +35,13 @@ /** * End-to-end integration test based on MapFunctionLambdaExample, using an embedded Kafka cluster. - * + *

* Note: This example uses lambda expressions and thus works with Java 8+ only. */ public class MapFunctionLambdaIntegrationTest { - @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); - - private static String inputTopic = "inputTopic"; - private static String outputTopic = "outputTopic"; - - @BeforeClass - public static void startKafkaCluster() { - CLUSTER.createTopic(inputTopic); - CLUSTER.createTopic(outputTopic); - } - @Test - public void shouldUppercaseTheInput() throws Exception { + public void shouldUppercaseTheInput() { final List inputValues = Arrays.asList("hello", "world"); final List expectedValues = inputValues.stream().map(String::toUpperCase).collect(Collectors.toList()); @@ -69,41 +52,42 @@ public void shouldUppercaseTheInput() throws Exception { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "map-function-lambda-integration-test"); - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + final String inputTopic = "inputTopic"; + final String outputTopic = "outputTopic"; final KStream input = builder.stream(inputTopic); - final KStream uppercased = input.mapValues(v -> v.toUpperCase()); + final KStream uppercased = input.mapValues(s -> s.toUpperCase()); uppercased.to(outputTopic); - final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); - streams.start(); + final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration); - // - // Step 2: Produce some input data to the input topic. - // - final Properties producerConfig = new Properties(); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputValues, producerConfig); + try { + // + // Step 2: Produce some input data to the input topic. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + inputTopic, + inputValues.stream().map(v -> new KeyValue<>(null, v)).collect(Collectors.toList()), + topologyTestDriver, + new IntegrationTestUtils.NothingSerde<>(), + new StringSerializer() + ); - // - // Step 3: Verify the application's output data. - // - final Properties consumerConfig = new Properties(); - consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "map-function-lambda-integration-test-standard-consumer"); - consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - final List actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig, - outputTopic, expectedValues.size()); - streams.close(); - assertThat(actualValues).isEqualTo(expectedValues); + // + // Step 3: Verify the application's output data. + // + final List actualValues = IntegrationTestUtils.drainStreamOutput( + outputTopic, + topologyTestDriver, + new IntegrationTestUtils.NothingSerde<>(), + new StringDeserializer() + ).stream().map(kv -> kv.value).collect(Collectors.toList()); + assertThat(actualValues).isEqualTo(expectedValues); + } finally { + topologyTestDriver.close(); + } } } diff --git a/src/test/java/io/confluent/examples/streams/MixAndMatchLambdaIntegrationTest.java b/src/test/java/io/confluent/examples/streams/MixAndMatchLambdaIntegrationTest.java index 54f300ad26..0f892de6eb 100644 --- a/src/test/java/io/confluent/examples/streams/MixAndMatchLambdaIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/MixAndMatchLambdaIntegrationTest.java @@ -15,76 +15,58 @@ */ package io.confluent.examples.streams; -import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.TransformerSupplier; +import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.junit.BeforeClass; -import org.junit.ClassRule; import org.junit.Test; import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; /** * End-to-end integration test that demonstrates how to mix and match the DSL and the Processor * API of Kafka Streams. - * + *

* More concretely, we show how to use the {@link KStream#transform(TransformerSupplier, String...)} * method to include a custom {@link org.apache.kafka.streams.kstream.Transformer} (from the * Processor API) in a topology defined via the DSL. The fictitious use case is to anonymize * IPv4 addresses contained in the input data. - * + *

* Tip: Users that want to use {@link KStream#process(ProcessorSupplier, String...)} would need to * include a custom {@link org.apache.kafka.streams.processor.Processor}. Keep in mind though that * the return type of {@link KStream#process(ProcessorSupplier, String...)} is `void`, which means * you cannot add further operators (such as `to()` as we do below) after having called `process()`. * If you need to add further operators, you'd have to use `transform()` as we do in this example. - * + *

* Note: This example uses lambda expressions and thus works with Java 8+ only. */ public class MixAndMatchLambdaIntegrationTest { - @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); - - private static String inputTopic = "inputTopic"; - private static String outputTopic = "outputTopic"; - - @BeforeClass - public static void startKafkaCluster() { - CLUSTER.createTopic(inputTopic); - CLUSTER.createTopic(outputTopic); - } - /** * Performs rudimentary anonymization of IPv4 address in the input data. * You should use this for demonstration purposes only. */ private static class AnonymizeIpAddressTransformer implements Transformer> { - private static Pattern ipv4AddressPattern = - Pattern.compile("(?[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.)(?[0-9]{1,3})"); + private static final Pattern ipv4AddressPattern = + Pattern.compile("(?[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.)(?[0-9]{1,3})"); @Override - @SuppressWarnings("unchecked") public void init(final ProcessorContext context) { // Not needed. } @@ -100,7 +82,7 @@ public KeyValue transform(final byte[] recordKey, final String r /** * Anonymizes the provided IPv4 address (in string representation) by replacing the fourth byte * with "XXX". For example, "192.168.1.234" is anonymized to "192.168.1.XXX". - * + *

* Note: This method is for illustration purposes only. The anonymization is both lackluster * (in terms of the achieved anonymization) and slow/inefficient (in terms of implementation). * @@ -126,7 +108,7 @@ public void close() { @Test - public void shouldAnonymizeTheInput() throws Exception { + public void shouldAnonymizeTheInput() { final List inputValues = Arrays.asList("Hello, 1.2.3.4!", "foo 192.168.1.55 bar"); final List expectedValues = Arrays.asList("HELLO, 1.2.3.XXX!", "FOO 192.168.1.XXX BAR"); @@ -137,44 +119,47 @@ public void shouldAnonymizeTheInput() throws Exception { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "mix-and-match-lambda-integration-test"); - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + final String inputTopic = "inputTopic"; + final String outputTopic = "outputTopic"; final KStream input = builder.stream(inputTopic); final KStream uppercasedAndAnonymized = input - .mapValues(v -> v.toUpperCase()) - .transform(AnonymizeIpAddressTransformer::new); + .mapValues(s -> s.toUpperCase()) + .transform(AnonymizeIpAddressTransformer::new); uppercasedAndAnonymized.to(outputTopic); - final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); - streams.start(); - - // - // Step 2: Produce some input data to the input topic. - // - final Properties producerConfig = new Properties(); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputValues, producerConfig); - - // - // Step 3: Verify the application's output data. - // - final Properties consumerConfig = new Properties(); - consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "mix-and-match-lambda-integration-test-standard-consumer"); - consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - final List actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig, - outputTopic, expectedValues.size()); - streams.close(); - assertThat(actualValues).isEqualTo(expectedValues); + final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration); + + try { + // + // Step 2: Produce some input data to the input topic. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + inputTopic, + inputValues.stream().map(v -> new KeyValue<>(null, v)).collect(Collectors.toList()), + topologyTestDriver, + new IntegrationTestUtils.NothingSerde<>(), + new StringSerializer() + ); + + // + // Step 3: Verify the application's output data. + // + final List actualValues = IntegrationTestUtils.drainStreamOutput( + outputTopic, + topologyTestDriver, + new IntegrationTestUtils.NothingSerde<>(), + new StringDeserializer() + ).stream().map(kv -> kv.value).collect(Collectors.toList()); + assertThat(actualValues).isEqualTo(expectedValues); + } finally { + topologyTestDriver.close(); + } } + } diff --git a/src/test/java/io/confluent/examples/streams/PassThroughIntegrationTest.java b/src/test/java/io/confluent/examples/streams/PassThroughIntegrationTest.java index 624d8596a3..fd607de0ad 100644 --- a/src/test/java/io/confluent/examples/streams/PassThroughIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/PassThroughIntegrationTest.java @@ -59,39 +59,42 @@ public void shouldWriteTheInputDataAsIsToTheOutputTopic() { streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Write the input data as-is to the output topic. builder.stream(inputTopic).to(outputTopic); final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration); - // - // Step 2: Produce some input data to the input topic. - // - IntegrationTestUtils.produceKeyValuesSynchronously( - inputTopic, - inputValues.stream().map(v -> new KeyValue<>(null, v)).collect(Collectors.toList()), - topologyTestDriver, - new IntegrationTestUtils.NothingSerde<>(), - new StringSerializer() - ); - - // - // Step 3: Verify the application's output data. - // - final List actualValues = - IntegrationTestUtils.drainStreamOutput( - outputTopic, + try { + // + // Step 2: Produce some input data to the input topic. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + inputTopic, + inputValues.stream().map(v -> new KeyValue<>(null, v)).collect(Collectors.toList()), topologyTestDriver, new IntegrationTestUtils.NothingSerde<>(), - new StringDeserializer() - ) - .stream() - .map(kv -> kv.value) - .collect(Collectors.toList()); + new StringSerializer() + ); + + // + // Step 3: Verify the application's output data. + // + final List actualValues = + IntegrationTestUtils.drainStreamOutput( + outputTopic, + topologyTestDriver, + new IntegrationTestUtils.NothingSerde<>(), + new StringDeserializer() + ) + .stream() + .map(kv -> kv.value) + .collect(Collectors.toList()); - assertThat(actualValues).isEqualTo(inputValues); + assertThat(actualValues).isEqualTo(inputValues); + } finally { + topologyTestDriver.close(); + } } } diff --git a/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java b/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java index b3f1c28668..1f58723400 100644 --- a/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java @@ -139,7 +139,6 @@ public void shouldAllowStateStoreAccessFromDSL() { streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Use a temporary directory for storing state, which will be automatically removed after the test. streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); @@ -171,27 +170,31 @@ public void shouldAllowStateStoreAccessFromDSL() { final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration); - // - // Step 2: Produce some input data to the input topic. - // - IntegrationTestUtils.produceKeyValuesSynchronously( - inputTopic, - inputValues.stream().map(v -> new KeyValue<>(null, v)).collect(Collectors.toList()), - topologyTestDriver, - new IntegrationTestUtils.NothingSerde<>(), - new StringSerializer() - ); - - // - // Step 3: Verify the application's output data. - // - final List> actualValues = IntegrationTestUtils.drainStreamOutput( - outputTopic, - topologyTestDriver, - new StringDeserializer(), - new LongDeserializer() - ); - assertThat(actualValues).isEqualTo(expectedRecords); + try { + // + // Step 2: Produce some input data to the input topic. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + inputTopic, + inputValues.stream().map(v -> new KeyValue<>(null, v)).collect(Collectors.toList()), + topologyTestDriver, + new IntegrationTestUtils.NothingSerde<>(), + new StringSerializer() + ); + + // + // Step 3: Verify the application's output data. + // + final List> actualValues = IntegrationTestUtils.drainStreamOutput( + outputTopic, + topologyTestDriver, + new StringDeserializer(), + new LongDeserializer() + ); + assertThat(actualValues).isEqualTo(expectedRecords); + } finally { + topologyTestDriver.close(); + } } } diff --git a/src/test/java/io/confluent/examples/streams/StreamToStreamJoinIntegrationTest.java b/src/test/java/io/confluent/examples/streams/StreamToStreamJoinIntegrationTest.java index 0a7f659d04..4f2138d1eb 100644 --- a/src/test/java/io/confluent/examples/streams/StreamToStreamJoinIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/StreamToStreamJoinIntegrationTest.java @@ -54,44 +54,39 @@ public class StreamToStreamJoinIntegrationTest { private static final String outputTopic = "output-topic"; @Test - public void shouldJoinTwoStreams() throws Exception { + public void shouldJoinTwoStreams() { // Input 1: Ad impressions final List> inputAdImpressions = Arrays.asList( - new KeyValue<>("car-advertisement", "shown"), - new KeyValue<>("newspaper-advertisement", "shown"), - new KeyValue<>("gadget-advertisement", "shown") + new KeyValue<>("car-advertisement", "shown"), + new KeyValue<>("newspaper-advertisement", "shown"), + new KeyValue<>("gadget-advertisement", "shown") ); // Input 2: Ad clicks final List> inputAdClicks = Arrays.asList( - new KeyValue<>("newspaper-advertisement", "clicked"), - new KeyValue<>("gadget-advertisement", "clicked"), - new KeyValue<>("newspaper-advertisement", "clicked") + new KeyValue<>("newspaper-advertisement", "clicked"), + new KeyValue<>("gadget-advertisement", "clicked"), + new KeyValue<>("newspaper-advertisement", "clicked") ); final List> expectedResults = Arrays.asList( - new KeyValue<>("car-advertisement", "shown/null"), - new KeyValue<>("newspaper-advertisement", "shown/null"), - new KeyValue<>("gadget-advertisement", "shown/null"), - new KeyValue<>("newspaper-advertisement", "shown/clicked"), - new KeyValue<>("gadget-advertisement", "shown/clicked"), - new KeyValue<>("newspaper-advertisement", "shown/clicked") + new KeyValue<>("car-advertisement", "shown/null"), + new KeyValue<>("newspaper-advertisement", "shown/null"), + new KeyValue<>("gadget-advertisement", "shown/null"), + new KeyValue<>("newspaper-advertisement", "shown/clicked"), + new KeyValue<>("gadget-advertisement", "shown/clicked"), + new KeyValue<>("newspaper-advertisement", "shown/clicked") ); // // Step 1: Configure and start the processor topology. // - final Serde stringSerde = Serdes.String(); final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-stream-join-lambda-integration-test"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - // The commit interval for flushing records to state stores and downstream must be lower than - // this integration test's timeout (30 secs) to ensure we observe the expected processing results. - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Use a temporary directory for storing state, which will be automatically removed after the test. streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); @@ -104,48 +99,52 @@ public void shouldJoinTwoStreams() throws Exception { // for the same join key (e.g. "newspaper-advertisement"), we receive an update from either of // the two joined streams during the defined join window. final KStream impressionsAndClicks = alerts.outerJoin(incidents, - (impressionValue, clickValue) -> impressionValue + "/" + clickValue, - // KStream-KStream joins are always windowed joins, hence we must provide a join window. - JoinWindows.of(TimeUnit.SECONDS.toMillis(5))); + (impressionValue, clickValue) -> impressionValue + "/" + clickValue, + // KStream-KStream joins are always windowed joins, hence we must provide a join window. + JoinWindows.of(TimeUnit.SECONDS.toMillis(5))); // Write the results to the output topic. impressionsAndClicks.to(outputTopic); final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration); - // - // Step 2: Publish ad impressions. - // - IntegrationTestUtils.produceKeyValuesSynchronously( - adImpressionsTopic, - inputAdImpressions, - topologyTestDriver, - new StringSerializer(), - new StringSerializer() - ); - - // - // Step 3: Publish ad clicks. - // - IntegrationTestUtils.produceKeyValuesSynchronously( - adClicksTopic, - inputAdClicks, - topologyTestDriver, - new StringSerializer(), - new StringSerializer() - ); + try { + // + // Step 2: Publish ad impressions. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + adImpressionsTopic, + inputAdImpressions, + topologyTestDriver, + new StringSerializer(), + new StringSerializer() + ); - // - // Step 4: Verify the application's output data. - // - final List> actualResults = - IntegrationTestUtils.drainStreamOutput( - outputTopic, + // + // Step 3: Publish ad clicks. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + adClicksTopic, + inputAdClicks, topologyTestDriver, - new StringDeserializer(), - new StringDeserializer() + new StringSerializer(), + new StringSerializer() ); - assertThat(actualResults).containsExactlyElementsOf(expectedResults); + + // + // Step 4: Verify the application's output data. + // + final List> actualResults = + IntegrationTestUtils.drainStreamOutput( + outputTopic, + topologyTestDriver, + new StringDeserializer(), + new StringDeserializer() + ); + assertThat(actualResults).containsExactlyElementsOf(expectedResults); + } finally { + topologyTestDriver.close(); + } } } diff --git a/src/test/java/io/confluent/examples/streams/StreamToTableJoinIntegrationTest.java b/src/test/java/io/confluent/examples/streams/StreamToTableJoinIntegrationTest.java index b687780b2c..b13f6a4b52 100644 --- a/src/test/java/io/confluent/examples/streams/StreamToTableJoinIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/StreamToTableJoinIntegrationTest.java @@ -15,7 +15,6 @@ */ package io.confluent.examples.streams; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serde; @@ -65,7 +64,7 @@ private static final class RegionWithClicks { private final String region; private final long clicks; - public RegionWithClicks(final String region, final long clicks) { + RegionWithClicks(final String region, final long clicks) { if (region == null || region.isEmpty()) { throw new IllegalArgumentException("region must be set"); } @@ -76,39 +75,39 @@ public RegionWithClicks(final String region, final long clicks) { this.clicks = clicks; } - public String getRegion() { + String getRegion() { return region; } - public long getClicks() { + long getClicks() { return clicks; } } @Test - public void shouldCountClicksPerRegion() throws Exception { + public void shouldCountClicksPerRegion() { // Input 1: Clicks per user (multiple records allowed per user). final List> userClicks = Arrays.asList( - new KeyValue<>("alice", 13L), - new KeyValue<>("bob", 4L), - new KeyValue<>("chao", 25L), - new KeyValue<>("bob", 19L), - new KeyValue<>("dave", 56L), - new KeyValue<>("eve", 78L), - new KeyValue<>("alice", 40L), - new KeyValue<>("fang", 99L) + new KeyValue<>("alice", 13L), + new KeyValue<>("bob", 4L), + new KeyValue<>("chao", 25L), + new KeyValue<>("bob", 19L), + new KeyValue<>("dave", 56L), + new KeyValue<>("eve", 78L), + new KeyValue<>("alice", 40L), + new KeyValue<>("fang", 99L) ); // Input 2: Region per user (multiple records allowed per user). final List> userRegions = Arrays.asList( - new KeyValue<>("alice", "asia"), /* Alice lived in Asia originally... */ - new KeyValue<>("bob", "americas"), - new KeyValue<>("chao", "asia"), - new KeyValue<>("dave", "europe"), - new KeyValue<>("alice", "europe"), /* ...but moved to Europe some time later. */ - new KeyValue<>("eve", "americas"), - new KeyValue<>("fang", "asia") + new KeyValue<>("alice", "asia"), /* Alice lived in Asia originally... */ + new KeyValue<>("bob", "americas"), + new KeyValue<>("chao", "asia"), + new KeyValue<>("dave", "europe"), + new KeyValue<>("alice", "europe"), /* ...but moved to Europe some time later. */ + new KeyValue<>("eve", "americas"), + new KeyValue<>("fang", "asia") ); final Map expectedClicksPerRegion = mkMap( @@ -128,10 +127,6 @@ public void shouldCountClicksPerRegion() throws Exception { streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - // The commit interval for flushing records to state stores and downstream must be lower than - // this integration test's timeout (30 secs) to ensure we observe the expected processing results. - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Use a temporary directory for storing state, which will be automatically removed after the test. streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); @@ -161,64 +156,68 @@ public void shouldCountClicksPerRegion() throws Exception { // The resulting KTable is continuously being updated as new data records are arriving in the // input KStream `userClicksStream` and input KTable `userRegionsTable`. final KTable clicksPerRegion = userClicksStream - // Join the stream against the table. - // - // Null values possible: In general, null values are possible for region (i.e. the value of - // the KTable we are joining against) so we must guard against that (here: by setting the - // fallback region "UNKNOWN"). In this specific example this is not really needed because - // we know, based on the test setup, that all users have appropriate region entries at the - // time we perform the join. - // - // Also, we need to return a tuple of (region, clicks) for each user. But because Java does - // not support tuples out-of-the-box, we must use a custom class `RegionWithClicks` to - // achieve the same effect. - .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks)) - // Change the stream from -> to -> - .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks())) - // Compute the total per region by summing the individual click counts per region. - .groupByKey(Serialized.with(stringSerde, longSerde)) - .reduce((firstClicks, secondClicks) -> firstClicks + secondClicks); + // Join the stream against the table. + // + // Null values possible: In general, null values are possible for region (i.e. the value of + // the KTable we are joining against) so we must guard against that (here: by setting the + // fallback region "UNKNOWN"). In this specific example this is not really needed because + // we know, based on the test setup, that all users have appropriate region entries at the + // time we perform the join. + // + // Also, we need to return a tuple of (region, clicks) for each user. But because Java does + // not support tuples out-of-the-box, we must use a custom class `RegionWithClicks` to + // achieve the same effect. + .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks)) + // Change the stream from -> to -> + .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks())) + // Compute the total per region by summing the individual click counts per region. + .groupByKey(Serialized.with(stringSerde, longSerde)) + .reduce((firstClicks, secondClicks) -> firstClicks + secondClicks); // Write the (continuously updating) results to the output topic. clicksPerRegion.toStream().to(outputTopic, Produced.with(stringSerde, longSerde)); final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration); - // - // Step 2: Publish user-region information. - // - // To keep this code example simple and easier to understand/reason about, we publish all - // user-region records before any user-click records (cf. step 3). In practice though, - // data records would typically be arriving concurrently in both input streams/topics. - IntegrationTestUtils.produceKeyValuesSynchronously( - userRegionsTopic, - userRegions, - topologyTestDriver, - new StringSerializer(), - new StringSerializer() - ); - - // - // Step 3: Publish some user click events. - // - IntegrationTestUtils.produceKeyValuesSynchronously( - userClicksTopic, - userClicks, - topologyTestDriver, - new StringSerializer(), - new LongSerializer()); - - // - // Step 4: Verify the application's output data. - // - final Map actualClicksPerRegion = - IntegrationTestUtils.drainTableOutput( - outputTopic, + try { + // + // Step 2: Publish user-region information. + // + // To keep this code example simple and easier to understand/reason about, we publish all + // user-region records before any user-click records (cf. step 3). In practice though, + // data records would typically be arriving concurrently in both input streams/topics. + IntegrationTestUtils.produceKeyValuesSynchronously( + userRegionsTopic, + userRegions, topologyTestDriver, - new StringDeserializer(), - new LongDeserializer() + new StringSerializer(), + new StringSerializer() ); - assertThat(actualClicksPerRegion).isEqualTo(expectedClicksPerRegion); + + // + // Step 3: Publish some user click events. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + userClicksTopic, + userClicks, + topologyTestDriver, + new StringSerializer(), + new LongSerializer()); + + // + // Step 4: Verify the application's output data. + // + final Map actualClicksPerRegion = + IntegrationTestUtils.drainTableOutput( + outputTopic, + topologyTestDriver, + new StringDeserializer(), + new LongDeserializer() + ); + assertThat(actualClicksPerRegion).isEqualTo(expectedClicksPerRegion); + } finally { + topologyTestDriver.close(); + } } } diff --git a/src/test/java/io/confluent/examples/streams/SumLambdaIntegrationTest.java b/src/test/java/io/confluent/examples/streams/SumLambdaIntegrationTest.java index f614f39404..db1764f2b1 100644 --- a/src/test/java/io/confluent/examples/streams/SumLambdaIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/SumLambdaIntegrationTest.java @@ -15,7 +15,6 @@ */ package io.confluent.examples.streams; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; @@ -40,11 +39,8 @@ */ public class SumLambdaIntegrationTest { - private static String inputTopic = SumLambdaExample.NUMBERS_TOPIC; - private static String outputTopic = SumLambdaExample.SUM_OF_ODD_NUMBERS_TOPIC; - @Test - public void shouldSumEvenNumbers() throws Exception { + public void shouldSumEvenNumbers() { final List inputValues = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); final List> expectedValues = Arrays.asList( new KeyValue<>(1, 1), @@ -62,37 +58,37 @@ public void shouldSumEvenNumbers() throws Exception { streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); - // The commit interval for flushing records to state stores and downstream must be lower than - // this integration test's timeout (30 secs) to ensure we observe the expected processing results. - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Use a temporary directory for storing state, which will be automatically removed after the test. streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(SumLambdaExample.getTopology(), streamsConfiguration); - // - // Step 2: Produce some input data to the input topic. - // - IntegrationTestUtils.produceKeyValuesSynchronously( - inputTopic, - inputValues.stream().map(i -> new KeyValue(null, i)).collect(Collectors.toList()), - topologyTestDriver, - new IntegrationTestUtils.NothingSerde<>(), - new IntegerSerializer() - ); + try { + // + // Step 2: Produce some input data to the input topic. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + SumLambdaExample.NUMBERS_TOPIC, + inputValues.stream().map(i -> new KeyValue(null, i)).collect(Collectors.toList()), + topologyTestDriver, + new IntegrationTestUtils.NothingSerde<>(), + new IntegerSerializer() + ); - // - // Step 3: Verify the application's output data. - // - final List> actualValues = IntegrationTestUtils.drainStreamOutput( - outputTopic, - topologyTestDriver, - new IntegerDeserializer(), - new IntegerDeserializer() - ); - assertThat(actualValues).isEqualTo(expectedValues); + // + // Step 3: Verify the application's output data. + // + final List> actualValues = IntegrationTestUtils.drainStreamOutput( + SumLambdaExample.SUM_OF_ODD_NUMBERS_TOPIC, + topologyTestDriver, + new IntegerDeserializer(), + new IntegerDeserializer() + ); + assertThat(actualValues).isEqualTo(expectedValues); + } finally { + topologyTestDriver.close(); + } } } diff --git a/src/test/java/io/confluent/examples/streams/TableToTableJoinIntegrationTest.java b/src/test/java/io/confluent/examples/streams/TableToTableJoinIntegrationTest.java index 739c3feec6..01e619b68a 100644 --- a/src/test/java/io/confluent/examples/streams/TableToTableJoinIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/TableToTableJoinIntegrationTest.java @@ -15,7 +15,6 @@ */ package io.confluent.examples.streams; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; @@ -57,31 +56,31 @@ public class TableToTableJoinIntegrationTest { public void shouldJoinTwoTables() { // Input: Region per user (multiple records allowed per user). final List> userRegionRecords = Arrays.asList( - new KeyValue<>("alice", "asia"), - new KeyValue<>("bob", "europe"), - new KeyValue<>("alice", "europe"), - new KeyValue<>("charlie", "europe"), - new KeyValue<>("bob", "asia") + new KeyValue<>("alice", "asia"), + new KeyValue<>("bob", "europe"), + new KeyValue<>("alice", "europe"), + new KeyValue<>("charlie", "europe"), + new KeyValue<>("bob", "asia") ); // Input 2: Timestamp of last login per user (multiple records allowed per user) final List> userLastLoginRecords = Arrays.asList( - new KeyValue<>("alice", 1485500000L), - new KeyValue<>("bob", 1485520000L), - new KeyValue<>("alice", 1485530000L), - new KeyValue<>("bob", 1485560000L) + new KeyValue<>("alice", 1485500000L), + new KeyValue<>("bob", 1485520000L), + new KeyValue<>("alice", 1485530000L), + new KeyValue<>("bob", 1485560000L) ); final List> expectedResults = Arrays.asList( - new KeyValue<>("alice", "europe/1485500000"), - new KeyValue<>("bob", "asia/1485520000"), - new KeyValue<>("alice", "europe/1485530000"), - new KeyValue<>("bob", "asia/1485560000") + new KeyValue<>("alice", "europe/1485500000"), + new KeyValue<>("bob", "asia/1485520000"), + new KeyValue<>("alice", "europe/1485530000"), + new KeyValue<>("bob", "asia/1485560000") ); final List> expectedResultsForJoinStateStore = Arrays.asList( - new KeyValue<>("alice", "europe/1485530000"), - new KeyValue<>("bob", "asia/1485560000") + new KeyValue<>("alice", "europe/1485530000"), + new KeyValue<>("bob", "asia/1485560000") ); // @@ -95,12 +94,6 @@ public void shouldJoinTwoTables() { streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - // For didactic reasons: disable record caching so we can observe every individual update record being sent downstream - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - // The commit interval for flushing records to state stores and downstream must be lower than - // this integration test's timeout (30 secs) to ensure we observe the expected processing results. - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Use a temporary directory for storing state, which will be automatically removed after the test. streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); @@ -110,53 +103,56 @@ public void shouldJoinTwoTables() { final String storeName = "joined-store"; userRegions.join(userLastLogins, - (regionValue, lastLoginValue) -> regionValue + "/" + lastLoginValue, - Materialized.as(storeName)) - .toStream() - .to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); + (regionValue, lastLoginValue) -> regionValue + "/" + lastLoginValue, + Materialized.as(storeName)) + .toStream() + .to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration); - // - // Step 2: Publish user regions. - // - IntegrationTestUtils.produceKeyValuesSynchronously( - userRegionTopic, - userRegionRecords, - topologyTestDriver, - new StringSerializer(), - new StringSerializer() - ); - - // - // Step 3: Publish user's last login timestamps. - // - IntegrationTestUtils.produceKeyValuesSynchronously( - userLastLoginTopic, - userLastLoginRecords, - topologyTestDriver, - new StringSerializer(), - new LongSerializer() - ); - - // - // Step 4: Verify the application's output data. - // - final List> actualResults = IntegrationTestUtils.drainStreamOutput( - outputTopic, - topologyTestDriver, - new StringDeserializer(), - new StringDeserializer() - ); - - // Verify the (local) state store of the joined table. - // For a comprehensive demonstration of interactive queries please refer to KafkaMusicExample. - final ReadOnlyKeyValueStore readOnlyKeyValueStore = + try { + // + // Step 2: Publish user regions. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + userRegionTopic, + userRegionRecords, + topologyTestDriver, + new StringSerializer(), + new StringSerializer() + ); + + // + // Step 3: Publish user's last login timestamps. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + userLastLoginTopic, + userLastLoginRecords, + topologyTestDriver, + new StringSerializer(), + new LongSerializer() + ); + + // + // Step 4: Verify the application's output data. + // + final List> actualResults = IntegrationTestUtils.drainStreamOutput( + outputTopic, + topologyTestDriver, + new StringDeserializer(), + new StringDeserializer() + ); + + // Verify the (local) state store of the joined table. + // For a comprehensive demonstration of interactive queries please refer to KafkaMusicExample. + final ReadOnlyKeyValueStore readOnlyKeyValueStore = topologyTestDriver.getKeyValueStore(storeName); - final KeyValueIterator keyValueIterator = readOnlyKeyValueStore.all(); - assertThat(keyValueIterator).containsExactlyElementsOf(expectedResultsForJoinStateStore); + final KeyValueIterator keyValueIterator = readOnlyKeyValueStore.all(); + assertThat(keyValueIterator).containsExactlyElementsOf(expectedResultsForJoinStateStore); - assertThat(actualResults).containsExactlyElementsOf(expectedResults); + assertThat(actualResults).containsExactlyElementsOf(expectedResults); + } finally { + topologyTestDriver.close(); + } } - } diff --git a/src/test/java/io/confluent/examples/streams/UserCountsPerRegionLambdaIntegrationTest.java b/src/test/java/io/confluent/examples/streams/UserCountsPerRegionLambdaIntegrationTest.java index 57ee5d37e3..7b91fe2876 100644 --- a/src/test/java/io/confluent/examples/streams/UserCountsPerRegionLambdaIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/UserCountsPerRegionLambdaIntegrationTest.java @@ -15,7 +15,6 @@ */ package io.confluent.examples.streams; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; @@ -95,10 +94,6 @@ public void shouldCountUsersPerRegion() { streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - // The commit interval for flushing records to state stores and downstream must be lower than - // this integration test's timeout (30 secs) to ensure we observe the expected processing results. - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Use a temporary directory for storing state, which will be automatically removed after the test. streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); @@ -116,29 +111,32 @@ public void shouldCountUsersPerRegion() { final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration); - // - // Step 2: Publish user-region information. - // - - IntegrationTestUtils.produceKeyValuesSynchronously( - inputTopic, - userRegionRecords, - topologyTestDriver, - new StringSerializer(), - new StringSerializer() - ); - - // - // Step 3: Verify the application's output data. - // - - final Map actualClicksPerRegion = IntegrationTestUtils.drainTableOutput( - outputTopic, - topologyTestDriver, - new StringDeserializer(), - new LongDeserializer() - ); - assertThat(actualClicksPerRegion).isEqualTo(expectedUsersPerRegion); + try { + // + // Step 2: Publish user-region information. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + inputTopic, + userRegionRecords, + topologyTestDriver, + new StringSerializer(), + new StringSerializer() + ); + + // + // Step 3: Verify the application's output data. + // + + final Map actualClicksPerRegion = IntegrationTestUtils.drainTableOutput( + outputTopic, + topologyTestDriver, + new StringDeserializer(), + new LongDeserializer() + ); + assertThat(actualClicksPerRegion).isEqualTo(expectedUsersPerRegion); + } finally { + topologyTestDriver.close(); + } } } diff --git a/src/test/java/io/confluent/examples/streams/ValidateStateWithInteractiveQueriesLambdaIntegrationTest.java b/src/test/java/io/confluent/examples/streams/ValidateStateWithInteractiveQueriesLambdaIntegrationTest.java index 06bce104d9..0542fbbdca 100644 --- a/src/test/java/io/confluent/examples/streams/ValidateStateWithInteractiveQueriesLambdaIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/ValidateStateWithInteractiveQueriesLambdaIntegrationTest.java @@ -15,16 +15,15 @@ */ package io.confluent.examples.streams; -import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.TimeWindows; @@ -32,8 +31,6 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.ReadOnlyWindowStore; import org.apache.kafka.test.TestUtils; -import org.junit.BeforeClass; -import org.junit.ClassRule; import org.junit.Test; import java.util.Arrays; @@ -45,33 +42,23 @@ /** * Demonstrates how to validate an application's expected state through interactive queries. - * + *

* Note: This example uses lambda expressions and thus works with Java 8+ only. */ public class ValidateStateWithInteractiveQueriesLambdaIntegrationTest { - @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); - - private static String inputTopic = "inputTopic"; - - @BeforeClass - public static void startKafkaCluster() { - CLUSTER.createTopic(inputTopic); - } - @Test public void shouldComputeMaxValuePerKey() throws Exception { // A user may be listed multiple times. final List> inputUserClicks = Arrays.asList( - new KeyValue<>("alice", 13L), - new KeyValue<>("bob", 4L), - new KeyValue<>("chao", 25L), - new KeyValue<>("bob", 19L), - new KeyValue<>("chao", 56L), - new KeyValue<>("alice", 78L), - new KeyValue<>("alice", 40L), - new KeyValue<>("bob", 3L) + new KeyValue<>("alice", 13L), + new KeyValue<>("bob", 4L), + new KeyValue<>("chao", 25L), + new KeyValue<>("bob", 19L), + new KeyValue<>("chao", 56L), + new KeyValue<>("alice", 78L), + new KeyValue<>("alice", 40L), + new KeyValue<>("bob", 3L) ); final Map expectedMaxClicksPerUser = new HashMap() { @@ -89,7 +76,7 @@ public void shouldComputeMaxValuePerKey() throws Exception { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "validating-with-interactive-queries-integration-test"); - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -99,50 +86,55 @@ public void shouldComputeMaxValuePerKey() throws Exception { // Use a temporary directory for storing state, which will be automatically removed after the test. streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); + final String inputTopic = "inputTopic"; + final KStream input = builder.stream(inputTopic); // rolling MAX() aggregation final String maxStore = "max-store"; input.groupByKey().aggregate( - () -> Long.MIN_VALUE, - (aggKey, value, aggregate) -> Math.max(value, aggregate), - Materialized.as(maxStore) + () -> Long.MIN_VALUE, + (aggKey, value, aggregate) -> Math.max(value, aggregate), + Materialized.as(maxStore) ); // windowed MAX() aggregation final String maxWindowStore = "max-window-store"; input.groupByKey() - .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(1L)).until(TimeUnit.MINUTES.toMillis(5L))) - .aggregate( - () -> Long.MIN_VALUE, - (aggKey, value, aggregate) -> Math.max(value, aggregate), - Materialized.as(maxWindowStore)); - - final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); - streams.start(); - - // - // Step 2: Produce some input data to the input topic. - // - final Properties producerConfig = new Properties(); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class); - IntegrationTestUtils.produceKeyValuesSynchronously(inputTopic, inputUserClicks, producerConfig); - - // - // Step 3: Validate the application's state by interactively querying its state stores. - // - final ReadOnlyKeyValueStore keyValueStore = - IntegrationTestUtils.waitUntilStoreIsQueryable(maxStore, QueryableStoreTypes.keyValueStore(), streams); - final ReadOnlyWindowStore windowStore = - IntegrationTestUtils.waitUntilStoreIsQueryable(maxWindowStore, QueryableStoreTypes.windowStore(), streams); - - IntegrationTestUtils.assertThatKeyValueStoreContains(keyValueStore, expectedMaxClicksPerUser); - IntegrationTestUtils.assertThatOldestWindowContains(windowStore, expectedMaxClicksPerUser); - streams.close(); + .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(1L)).until(TimeUnit.MINUTES.toMillis(5L))) + .aggregate( + () -> Long.MIN_VALUE, + (aggKey, value, aggregate) -> Math.max(value, aggregate), + Materialized.as(maxWindowStore)); + + final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration); + + try { + // + // Step 2: Produce some input data to the input topic. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + inputTopic, + inputUserClicks, + topologyTestDriver, + new StringSerializer(), + new LongSerializer() + ); + + // + // Step 3: Validate the application's state by interactively querying its state stores. + // + final ReadOnlyKeyValueStore keyValueStore = + topologyTestDriver.getKeyValueStore(maxStore); + + final ReadOnlyWindowStore windowStore = + topologyTestDriver.getWindowStore(maxWindowStore); + + IntegrationTestUtils.assertThatKeyValueStoreContains(keyValueStore, expectedMaxClicksPerUser); + IntegrationTestUtils.assertThatOldestWindowContains(windowStore, expectedMaxClicksPerUser); + } finally { + topologyTestDriver.close(); + } } } diff --git a/src/test/java/io/confluent/examples/streams/WordCountLambdaIntegrationTest.java b/src/test/java/io/confluent/examples/streams/WordCountLambdaIntegrationTest.java index 177a9de727..9c76a1d006 100644 --- a/src/test/java/io/confluent/examples/streams/WordCountLambdaIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/WordCountLambdaIntegrationTest.java @@ -15,79 +15,69 @@ */ package io.confluent.examples.streams; -import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.test.TestUtils; -import org.junit.BeforeClass; -import org.junit.ClassRule; import org.junit.Test; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import static io.confluent.examples.streams.IntegrationTestUtils.mkEntry; +import static io.confluent.examples.streams.IntegrationTestUtils.mkMap; import static org.assertj.core.api.Assertions.assertThat; /** * End-to-end integration test based on {@link WordCountLambdaExample}, using an embedded Kafka * cluster. - * + *

* See {@link WordCountLambdaExample} for further documentation. - * + *

* See {@link WordCountScalaIntegrationTest} for the equivalent Scala example. - * + *

* Note: This example uses lambda expressions and thus works with Java 8+ only. */ public class WordCountLambdaIntegrationTest { - @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); - private static final String inputTopic = "inputTopic"; private static final String outputTopic = "outputTopic"; - @BeforeClass - public static void startKafkaCluster() throws Exception { - CLUSTER.createTopic(inputTopic); - CLUSTER.createTopic(outputTopic); - } - @Test - public void shouldCountWords() throws Exception { + public void shouldCountWords() { final List inputValues = Arrays.asList( - "Hello Kafka Streams", - "All streams lead to Kafka", - "Join Kafka Summit", - "И теперь пошли русские слова" + "Hello Kafka Streams", + "All streams lead to Kafka", + "Join Kafka Summit", + "И теперь пошли русские слова" ); - final List> expectedWordCounts = Arrays.asList( - new KeyValue<>("hello", 1L), - new KeyValue<>("all", 1L), - new KeyValue<>("streams", 2L), - new KeyValue<>("lead", 1L), - new KeyValue<>("to", 1L), - new KeyValue<>("join", 1L), - new KeyValue<>("kafka", 3L), - new KeyValue<>("summit", 1L), - new KeyValue<>("и", 1L), - new KeyValue<>("теперь", 1L), - new KeyValue<>("пошли", 1L), - new KeyValue<>("русские", 1L), - new KeyValue<>("слова", 1L) + final Map expectedWordCounts = mkMap( + mkEntry("hello", 1L), + mkEntry("all", 1L), + mkEntry("streams", 2L), + mkEntry("lead", 1L), + mkEntry("to", 1L), + mkEntry("join", 1L), + mkEntry("kafka", 3L), + mkEntry("summit", 1L), + mkEntry("и", 1L), + mkEntry("теперь", 1L), + mkEntry("пошли", 1L), + mkEntry("русские", 1L), + mkEntry("слова", 1L) ); // @@ -98,13 +88,9 @@ public void shouldCountWords() throws Exception { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-integration-test"); - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - // The commit interval for flushing records to state stores and downstream must be lower than - // this integration test's timeout (30 secs) to ensure we observe the expected processing results. - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Use a temporary directory for storing state, which will be automatically removed after the test. streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); @@ -115,40 +101,40 @@ public void shouldCountWords() throws Exception { final Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); final KTable wordCounts = textLines - .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) - // no need to specify explicit serdes because the resulting key and value types match our default serde settings - .groupBy((key, word) -> word) - .count(); + .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) + // no need to specify explicit serdes because the resulting key and value types match our default serde settings + .groupBy((key, word) -> word) + .count(); wordCounts.toStream().to(outputTopic, Produced.with(stringSerde, longSerde)); - final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); - streams.start(); - - // - // Step 2: Produce some input data to the input topic. - // - final Properties producerConfig = new Properties(); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputValues, producerConfig); - - // - // Step 3: Verify the application's output data. - // - final Properties consumerConfig = new Properties(); - consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "wordcount-lambda-integration-test-standard-consumer"); - consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - final List> actualWordCounts = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, - outputTopic, expectedWordCounts.size()); - streams.close(); - assertThat(actualWordCounts).containsExactlyElementsOf(expectedWordCounts); + final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration); + + try { + // + // Step 2: Produce some input data to the input topic. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + inputTopic, + inputValues.stream().map(v -> new KeyValue<>(null, v)).collect(Collectors.toList()), + topologyTestDriver, + new IntegrationTestUtils.NothingSerde<>(), + new StringSerializer() + ); + + // + // Step 3: Verify the application's output data. + // + final Map actualWordCounts = IntegrationTestUtils.drainTableOutput( + outputTopic, + topologyTestDriver, + new StringDeserializer(), + new LongDeserializer() + ); + assertThat(actualWordCounts).isEqualTo(expectedWordCounts); + } finally { + topologyTestDriver.close(); + } } } diff --git a/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala b/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala index 04687deae8..14d4852eb6 100644 --- a/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala +++ b/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala @@ -18,18 +18,18 @@ package io.confluent.examples.streams import java.util import java.util.Properties +import io.confluent.examples.streams.IntegrationTestUtils.NothingSerde import io.confluent.examples.streams.algebird.{CMSStore, CMSStoreBuilder, ProbabilisticCounter} -import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization._ +import org.apache.kafka.streams._ import org.apache.kafka.streams.kstream.{KStream, Produced, TransformerSupplier} -import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsBuilder, StreamsConfig} import org.apache.kafka.test.TestUtils import org.assertj.core.api.Assertions.assertThat import org.junit._ import org.scalatest.junit.AssertionsForJUnit +import scala.collection.JavaConverters._ + /** * End-to-end integration test that demonstrates how to probabilistically count items in an input stream. * @@ -38,23 +38,12 @@ import org.scalatest.junit.AssertionsForJUnit */ class ProbabilisticCountingScalaIntegrationTest extends AssertionsForJUnit { - private val privateCluster: EmbeddedSingleNodeKafkaCluster = new EmbeddedSingleNodeKafkaCluster - - @Rule def cluster: EmbeddedSingleNodeKafkaCluster = privateCluster - private val inputTopic = "inputTopic" private val outputTopic = "output-topic" - @Before - def startKafkaCluster() { - cluster.createTopic(inputTopic) - cluster.createTopic(outputTopic) - } - @Test def shouldProbabilisticallyCountWords() { // To convert between Scala's `Tuple2` and Streams' `KeyValue`. - import KeyValueImplicits._ val inputTextLines: Seq[String] = Seq( "Hello Kafka Streams", @@ -62,7 +51,7 @@ class ProbabilisticCountingScalaIntegrationTest extends AssertionsForJUnit { "Join Kafka Summit" ) - val expectedWordCounts: Seq[KeyValue[String, Long]] = Seq( + val expectedWordCounts: Map[String, Long] = Map( ("hello", 1L), ("kafka", 1L), ("streams", 1L), @@ -82,13 +71,9 @@ class ProbabilisticCountingScalaIntegrationTest extends AssertionsForJUnit { val streamsConfiguration: Properties = { val p = new Properties() p.put(StreamsConfig.APPLICATION_ID_CONFIG, "probabilistic-counting-scala-integration-test") - p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) + p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config") p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass.getName) p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) - // The commit interval for flushing records to state stores and downstream must be lower than - // this integration test's timeout (30 secs) to ensure we observe the expected processing results. - p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000") - p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // Use a temporary directory for storing state, which will be automatically removed after the test. p.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory.getAbsolutePath) p @@ -99,63 +84,52 @@ class ProbabilisticCountingScalaIntegrationTest extends AssertionsForJUnit { builder.addStateStore(createCMSStoreBuilder(cmsStoreName)) // Read the input from Kafka. - val textLines: KStream[Array[Byte], String] = builder.stream(inputTopic) + val textLines: KStream[String, String] = builder.stream(inputTopic) // Scala-Java interoperability: to convert `scala.collection.Iterable` to `java.util.Iterable` // in `flatMapValues()` below. import collection.JavaConverters.asJavaIterableConverter val approximateWordCounts: KStream[String, Long] = textLines - .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava) - .transform( - new TransformerSupplier[Array[Byte], String, KeyValue[String, Long]] { - override def get() = new ProbabilisticCounter(cmsStoreName) - }, - cmsStoreName) + .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava) + .transform( + new TransformerSupplier[String, String, KeyValue[String, Long]] { + override def get() = new ProbabilisticCounter(cmsStoreName) + }, + cmsStoreName) // Trick to re-use Kafka's serde for java.lang.Long for scala.Long. val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]] // Write the results back to Kafka. approximateWordCounts.to(outputTopic, Produced.`with`(Serdes.String(), longSerde)) - val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration) - streams.start() - - // - // Step 2: Publish some input text lines. - // - val producerConfig: Properties = { - val p = new Properties() - p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) - p.put(ProducerConfig.ACKS_CONFIG, "all") - p.put(ProducerConfig.RETRIES_CONFIG, "0") - p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer]) - p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) - p - } - import collection.JavaConverters._ - IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputTextLines.asJava, producerConfig) - - // - // Step 3: Verify the application's output data. - // - val consumerConfig = { - val p = new Properties() - p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) - p.put(ConsumerConfig.GROUP_ID_CONFIG, "probabilistic-counting-consumer") - p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) - p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[LongDeserializer]) - p + val topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration) + + try { + // + // Step 2: Publish some input text lines. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + inputTopic, + inputTextLines.map(v => new KeyValue(null, v)).asJava, + topologyTestDriver, + new NothingSerde[Null], + new StringSerializer + ) + + // + // Step 3: Verify the application's output data. + // + val actualWordCounts = + IntegrationTestUtils.drainTableOutput(outputTopic, topologyTestDriver, new StringDeserializer, new LongDeserializer) + + // Note: This example only processes a small amount of input data, for which the word counts + // will actually be exact counts. However, for large amounts of input data we would expect to + // observe approximate counts (where the approximate counts would be >= true exact counts). + assertThat(actualWordCounts).isEqualTo(expectedWordCounts.asJava) + } finally { + topologyTestDriver.close() } - val actualWordCounts: java.util.List[KeyValue[String, Long]] = - IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedWordCounts.size) - streams.close() - - // Note: This example only processes a small amount of input data, for which the word counts - // will actually be exact counts. However, for large amounts of input data we would expect to - // observe approximate counts (where the approximate counts would be >= true exact counts). - assertThat(actualWordCounts).containsExactlyElementsOf(expectedWordCounts.asJava) } private def createCMSStoreBuilder(cmsStoreName: String): CMSStoreBuilder[String] = { @@ -168,7 +142,7 @@ class ProbabilisticCountingScalaIntegrationTest extends AssertionsForJUnit { cfg } new CMSStoreBuilder[String](cmsStoreName, Serdes.String()) - .withLoggingEnabled(changelogConfig) + .withLoggingEnabled(changelogConfig) } } diff --git a/src/test/scala/io/confluent/examples/streams/StreamToTableJoinScalaIntegrationTest.scala b/src/test/scala/io/confluent/examples/streams/StreamToTableJoinScalaIntegrationTest.scala index 8f03fc82c7..40b38c766f 100644 --- a/src/test/scala/io/confluent/examples/streams/StreamToTableJoinScalaIntegrationTest.scala +++ b/src/test/scala/io/confluent/examples/streams/StreamToTableJoinScalaIntegrationTest.scala @@ -21,8 +21,8 @@ import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization._ -import org.apache.kafka.streams.kstream.{KStream, KTable, Produced, Serialized} import org.apache.kafka.streams._ +import org.apache.kafka.streams.kstream.{KStream, KTable, Produced, Serialized} import org.apache.kafka.test.TestUtils import org.assertj.core.api.Assertions.assertThat import org.junit._ @@ -85,7 +85,7 @@ class StreamToTableJoinScalaIntegrationTest extends AssertionsForJUnit { ("fang", "asia") ) - val expectedClicksPerRegion: Seq[KeyValue[String, Long]] = Seq( + val expectedClicksPerRegion: Map[String, Long] = Map( ("americas", 101L), ("europe", 109L), ("asia", 124L) @@ -108,13 +108,9 @@ class StreamToTableJoinScalaIntegrationTest extends AssertionsForJUnit { val streamsConfiguration: Properties = { val p = new Properties() p.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-table-join-scala-integration-test") - p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) + p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config") p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) - // The commit interval for flushing records to state stores and downstream must be lower than - // this integration test's timeout (30 secs) to ensure we observe the expected processing results. - p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000") - p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // Use a temporary directory for storing state, which will be automatically removed after the test. p.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory.getAbsolutePath) p @@ -145,7 +141,7 @@ class StreamToTableJoinScalaIntegrationTest extends AssertionsForJUnit { // // The resulting KTable is continuously being updated as new data records are arriving in the // input KStream `userClicksStream` and input KTable `userRegionsTable`. - val userClicksJoinRegion : KStream[String, (String, Long)] = userClicksStream + val userClicksJoinRegion: KStream[String, (String, Long)] = userClicksStream // Join the stream against the table. // // Null values possible: In general, null values are possible for region (i.e. the value of @@ -154,70 +150,80 @@ class StreamToTableJoinScalaIntegrationTest extends AssertionsForJUnit { // we know, based on the test setup, that all users have appropriate region entries at the // time we perform the join. .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks)) - val clicksByRegion : KStream[String, Long] = userClicksJoinRegion + val clicksByRegion: KStream[String, Long] = userClicksJoinRegion // Change the stream from -> to -> .map((_: String, regionWithClicks: (String, Long)) => new KeyValue[String, Long]( regionWithClicks._1, regionWithClicks._2)) val clicksPerRegion: KTable[String, Long] = clicksByRegion - // Compute the total per region by summing the individual click counts per region. - .groupByKey(Serialized.`with`(stringSerde, longSerde)) - .reduce(_ + _) + // Compute the total per region by summing the individual click counts per region. + .groupByKey(Serialized.`with`(stringSerde, longSerde)) + .reduce(_ + _) // Write the (continuously updating) results to the output topic. clicksPerRegion.toStream().to(outputTopic, Produced.`with`(stringSerde, longSerde)) - val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration) - streams.start() + val topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration) - // - // Step 2: Publish user-region information. - // - // To keep this code example simple and easier to understand/reason about, we publish all - // user-region records before any user-click records (cf. step 3). In practice though, - // data records would typically be arriving concurrently in both input streams/topics. - val userRegionsProducerConfig: Properties = { - val p = new Properties() - p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) - p.put(ProducerConfig.ACKS_CONFIG, "all") - p.put(ProducerConfig.RETRIES_CONFIG, "0") - p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) - p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) - p - } - import collection.JavaConverters._ - IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic, userRegions.asJava, userRegionsProducerConfig) + try { + // + // Step 2: Publish user-region information. + // + // To keep this code example simple and easier to understand/reason about, we publish all + // user-region records before any user-click records (cf. step 3). In practice though, + // data records would typically be arriving concurrently in both input streams/topics. + val userRegionsProducerConfig: Properties = { + val p = new Properties() + p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) + p.put(ProducerConfig.ACKS_CONFIG, "all") + p.put(ProducerConfig.RETRIES_CONFIG, "0") + p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + p + } + import collection.JavaConverters._ + IntegrationTestUtils.produceKeyValuesSynchronously( + userRegionsTopic, + userRegions.asJava, + topologyTestDriver, + new StringSerializer, + new StringSerializer + ) - // - // Step 3: Publish some user click events. - // - val userClicksProducerConfig: Properties = { - val p = new Properties() - p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) - p.put(ProducerConfig.ACKS_CONFIG, "all") - p.put(ProducerConfig.RETRIES_CONFIG, "0") - p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) - p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[LongSerializer]) - p - } - IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic, userClicks.asJava, userClicksProducerConfig) + // + // Step 3: Publish some user click events. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + userClicksTopic, + userClicks.map(kv => new KeyValue(kv.key, kv.value.asInstanceOf[java.lang.Long])).asJava, + topologyTestDriver, + new StringSerializer, + new LongSerializer + ) - // - // Step 4: Verify the application's output data. - // - val consumerConfig = { - val p = new Properties() - p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) - p.put(ConsumerConfig.GROUP_ID_CONFIG, "join-scala-integration-test-standard-consumer") - p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) - p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[LongDeserializer]) - p + // + // Step 4: Verify the application's output data. + // + val consumerConfig = { + val p = new Properties() + p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) + p.put(ConsumerConfig.GROUP_ID_CONFIG, "join-scala-integration-test-standard-consumer") + p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) + p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[LongDeserializer]) + p + } + val actualClicksPerRegion = + IntegrationTestUtils.drainTableOutput( + outputTopic, + topologyTestDriver, + new StringDeserializer, + new LongDeserializer + ) + assertThat(actualClicksPerRegion).isEqualTo(expectedClicksPerRegion.asJava) + } finally { + topologyTestDriver.close() } - val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, - outputTopic, expectedClicksPerRegion.size) - streams.close() - assertThat(actualClicksPerRegion).containsExactlyElementsOf(expectedClicksPerRegion.asJava) } } diff --git a/src/test/scala/io/confluent/examples/streams/WordCountScalaIntegrationTest.scala b/src/test/scala/io/confluent/examples/streams/WordCountScalaIntegrationTest.scala index 6d0e838642..411931194a 100644 --- a/src/test/scala/io/confluent/examples/streams/WordCountScalaIntegrationTest.scala +++ b/src/test/scala/io/confluent/examples/streams/WordCountScalaIntegrationTest.scala @@ -18,17 +18,17 @@ package io.confluent.examples.streams import java.lang.{Long => JLong} import java.util.Properties -import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.producer.ProducerConfig +import io.confluent.examples.streams.IntegrationTestUtils.NothingSerde import org.apache.kafka.common.serialization._ +import org.apache.kafka.streams._ import org.apache.kafka.streams.kstream.{KStream, KTable, Produced} -import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsBuilder, StreamsConfig} import org.apache.kafka.test.TestUtils import org.assertj.core.api.Assertions.assertThat import org.junit._ import org.scalatest.junit.AssertionsForJUnit +import scala.collection.JavaConverters._ + /** * End-to-end integration test based on [[WordCountLambdaExample]], using an embedded Kafka cluster. * @@ -44,23 +44,12 @@ import org.scalatest.junit.AssertionsForJUnit */ class WordCountScalaIntegrationTest extends AssertionsForJUnit { - private val privateCluster: EmbeddedSingleNodeKafkaCluster = new EmbeddedSingleNodeKafkaCluster - - @Rule def cluster: EmbeddedSingleNodeKafkaCluster = privateCluster - private val inputTopic = "inputTopic" private val outputTopic = "output-topic" - @Before - def startKafkaCluster() { - cluster.createTopic(inputTopic) - cluster.createTopic(outputTopic) - } - @Test def shouldCountWords() { // To convert between Scala's `Tuple2` and Streams' `KeyValue`. - import KeyValueImplicits._ val inputTextLines: Seq[String] = Seq( "Hello Kafka Streams", @@ -68,7 +57,7 @@ class WordCountScalaIntegrationTest extends AssertionsForJUnit { "Join Kafka Summit" ) - val expectedWordCounts: Seq[KeyValue[String, Long]] = Seq( + val expectedWordCounts: Map[String, Long] = Map( ("hello", 1L), ("all", 1L), ("streams", 2L), @@ -85,13 +74,9 @@ class WordCountScalaIntegrationTest extends AssertionsForJUnit { val streamsConfiguration: Properties = { val p = new Properties() p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-scala-integration-test") - p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) + p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config") p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) - // The commit interval for flushing records to state stores and downstream must be lower than - // this integration test's timeout (30 secs) to ensure we observe the expected processing results. - p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000") - p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // Use a temporary directory for storing state, which will be automatically removed after the test. p.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory.getAbsolutePath) p @@ -111,47 +96,36 @@ class WordCountScalaIntegrationTest extends AssertionsForJUnit { import collection.JavaConverters.asJavaIterableConverter val wordCounts: KTable[String, JLong] = textLines - .flatMapValues(value => value.toLowerCase.split("\\W+").toIterable.asJava) - // no need to specify explicit serdes because the resulting key and value types match our default serde settings - .groupBy((_, word) => word) - .count() + .flatMapValues(value => value.toLowerCase.split("\\W+").toIterable.asJava) + // no need to specify explicit serdes because the resulting key and value types match our default serde settings + .groupBy((_, word) => word) + .count() wordCounts.toStream.to(outputTopic, Produced.`with`(stringSerde, longSerde)) - val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration) - streams.start() - - // - // Step 2: Publish some input text lines. - // - val producerConfig: Properties = { - val p = new Properties() - p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) - p.put(ProducerConfig.ACKS_CONFIG, "all") - p.put(ProducerConfig.RETRIES_CONFIG, "0") - p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) - p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) - p - } - import collection.JavaConverters._ - IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputTextLines.asJava, producerConfig) - - // - // Step 3: Verify the application's output data. - // - val consumerConfig = { - val p = new Properties() - p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) - p.put(ConsumerConfig.GROUP_ID_CONFIG, "wordcount-scala-integration-test-standard-consumer") - p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) - p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[LongDeserializer]) - p + val topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration) + + try { + // + // Step 2: Publish some input text lines. + // + IntegrationTestUtils.produceKeyValuesSynchronously( + inputTopic, + inputTextLines.map(v => new KeyValue(null, v)).asJava, + topologyTestDriver, + new NothingSerde[Null], + new StringSerializer + ) + + // + // Step 3: Verify the application's output data. + // + val actualWordCounts = + IntegrationTestUtils.drainTableOutput(outputTopic, topologyTestDriver, new StringDeserializer, new LongDeserializer) + assertThat(actualWordCounts).isEqualTo(expectedWordCounts.asJava) + } finally { + topologyTestDriver.close() } - val actualWordCounts: java.util.List[KeyValue[String, Long]] = - IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedWordCounts.size) - streams.close() - assertThat(actualWordCounts).containsExactlyElementsOf(expectedWordCounts.asJava) } }