Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/5.0.3-post' into 5.1.0-post
Browse files Browse the repository at this point in the history
  • Loading branch information
vvcephei committed May 13, 2019
2 parents c0044e7 + c41a9f2 commit 8a35826
Show file tree
Hide file tree
Showing 18 changed files with 680 additions and 927 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.apache.kafka.streams.processor.ProcessorContext
* Counts record values (in String format) probabilistically and then outputs the respective count estimate.
*/
class ProbabilisticCounter(val cmsStoreName: String)
extends Transformer[Array[Byte], String, KeyValue[String, Long]] {
extends Transformer[String, String, KeyValue[String, Long]] {

private var cmsState: CMSStore[String] = _
private var processorContext: ProcessorContext = _
Expand All @@ -18,7 +18,7 @@ class ProbabilisticCounter(val cmsStoreName: String)
cmsState = this.processorContext.getStateStore(cmsStoreName).asInstanceOf[CMSStore[String]]
}

override def transform(key: Array[Byte], value: String): KeyValue[String, Long] = {
override def transform(key: String, value: String): KeyValue[String, Long] = {
// Count the record value, think: "+ 1"
cmsState.put(value, this.processorContext.timestamp())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,13 @@
*/
package io.confluent.examples.streams;

import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Transformer;
Expand All @@ -36,66 +31,52 @@
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.TestUtils;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;

/**
* End-to-end integration test that demonstrates how to remove duplicate records from an input
* stream.
*
* <p>
* Here, a stateful {@link org.apache.kafka.streams.kstream.Transformer} (from the Processor API)
* detects and discards duplicate input records based on an "event id" that is embedded in each
* input record. This transformer is then included in a topology defined via the DSL.
*
* <p>
* In this simplified example, the values of input records represent the event ID by which
* duplicates will be detected. In practice, record values would typically be a more complex data
* structure, with perhaps one of the fields being such an event ID. De-duplication by an event ID
* is but one example of how to perform de-duplication in general. The code example below can be
* adapted to other de-duplication approaches.
*
* <p>
* IMPORTANT: Kafka including its Streams API support exactly-once semantics since version 0.11.
* With this feature available, most use cases will no longer need to worry about duplicate messages
* or duplicate processing. That said, there will still be some use cases where you have your own
* business rules that define when two events are considered to be "the same" and need to be
* de-duplicated (e.g. two events having the same payload but different timestamps). The example
* below demonstrates how to implement your own business rules for event de-duplication.
*
* <p>
* Note: This example uses lambda expressions and thus works with Java 8+ only.
*/
public class EventDeduplicationLambdaIntegrationTest {

@ClassRule
public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();

private static final String inputTopic = "inputTopic";
private static final String outputTopic = "outputTopic";

private static final String storeName = "eventId-store";

@BeforeClass
public static void startKafkaCluster() {
CLUSTER.createTopic(inputTopic);
CLUSTER.createTopic(outputTopic);
}

/**
* Discards duplicate records from the input stream.
*
* <p>
* Duplicate records are detected based on an event ID; in this simplified example, the record
* value is the event ID. The transformer remembers known event IDs in an associated window state
* store, which automatically purges/expires event IDs from the store after a certain amount of
* time has passed to prevent the store from growing indefinitely.
*
* <p>
* Note: This code is for demonstration purposes and was not tested for production usage.
*/
private static class DeduplicationTransformer<K, V, E> implements Transformer<K, V, KeyValue<K, V>> {
Expand All @@ -119,9 +100,9 @@ private static class DeduplicationTransformer<K, V, E> implements Transformer<K,
* ID), during the time of which any incoming duplicates of
* the event will be dropped, thereby de-duplicating the
* input.
* @param idExtractor extracts a unique identifier from a record by which we de-duplicate input
* records; if it returns null, the record will not be considered for
* de-duping but forwarded as-is.
* @param idExtractor extracts a unique identifier from a record by which we de-duplicate input
* records; if it returns null, the record will not be considered for
* de-duping but forwarded as-is.
*/
DeduplicationTransformer(final long maintainDurationPerEventInMs, final KeyValueMapper<K, V, E> idExtractor) {
if (maintainDurationPerEventInMs < 1) {
Expand Down Expand Up @@ -159,9 +140,9 @@ public KeyValue<K, V> transform(final K key, final V value) {
private boolean isDuplicate(final E eventId) {
final long eventTime = context.timestamp();
final WindowStoreIterator<Long> timeIterator = eventIdStore.fetch(
eventId,
eventTime - leftDurationMs,
eventTime + rightDurationMs);
eventId,
eventTime - leftDurationMs,
eventTime + rightDurationMs);
final boolean isDuplicate = timeIterator.hasNext();
timeIterator.close();
return isDuplicate;
Expand All @@ -184,7 +165,7 @@ public void close() {
}

@Test
public void shouldRemoveDuplicatesFromTheInput() throws Exception {
public void shouldRemoveDuplicatesFromTheInput() {
final String firstId = UUID.randomUUID().toString(); // e.g. "4ff3cb44-abcb-46e3-8f9a-afb7cc74fbb8"
final String secondId = UUID.randomUUID().toString();
final String thirdId = UUID.randomUUID().toString();
Expand All @@ -199,13 +180,9 @@ public void shouldRemoveDuplicatesFromTheInput() throws Exception {

final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "deduplication-lambda-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// The commit interval for flushing records to state stores and downstream must be lower than
// this integration test's timeout (30 secs) to ensure we observe the expected processing results.
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, TimeUnit.SECONDS.toMillis(10));
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Use a temporary directory for storing state, which will be automatically removed after the test.
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());

Expand All @@ -230,9 +207,11 @@ public void shouldRemoveDuplicatesFromTheInput() throws Exception {
Serdes.String(),
Serdes.Long());


builder.addStateStore(dedupStoreBuilder);

final String inputTopic = "inputTopic";
final String outputTopic = "outputTopic";

final KStream<byte[], String> input = builder.stream(inputTopic);
final KStream<byte[], String> deduplicated = input.transform(
// In this example, we assume that the record value as-is represents a unique event ID by
Expand All @@ -242,36 +221,29 @@ public void shouldRemoveDuplicatesFromTheInput() throws Exception {
storeName);
deduplicated.to(outputTopic);

final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();

//
// Step 2: Produce some input data to the input topic.
//
final Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputValues, producerConfig);

//
// Step 3: Verify the application's output data.
//
final Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "deduplication-integration-test-standard-consumer");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
final List<String> actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(
consumerConfig,
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration)) {
//
// Step 2: Produce some input data to the input topic.
//
IntegrationTestUtils.produceKeyValuesSynchronously(
inputTopic,
inputValues.stream().map(v -> new KeyValue<>(null, v)).collect(Collectors.toList()),
topologyTestDriver,
new IntegrationTestUtils.NothingSerde<>(),
new StringSerializer()
);

//
// Step 3: Verify the application's output data.
//
final List<String> actualValues = IntegrationTestUtils.drainStreamOutput(
outputTopic,
expectedValues.size()
);
streams.close();
assertThat(actualValues).containsExactlyElementsOf(expectedValues);
topologyTestDriver,
new IntegrationTestUtils.NothingSerde<>(),
new StringDeserializer()
).stream().map(kv -> kv.value).collect(Collectors.toList());
assertThat(actualValues).containsExactlyElementsOf(expectedValues);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,14 @@
*/
package io.confluent.examples.streams;

import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

import java.util.Arrays;
Expand All @@ -40,7 +34,7 @@

/**
* End-to-end integration test that demonstrates "fan-out", using an embedded Kafka cluster.
*
* <p>
* This example shows how you can read from one input topic/stream, transform the data (here:
* trivially) in two different ways via two intermediate streams, and then write the respective
* results to two output topics.
Expand All @@ -56,27 +50,13 @@
*
* }
* </pre>
*
* <p>
* Note: This example uses lambda expressions and thus works with Java 8+ only.
*/
public class FanoutLambdaIntegrationTest {

@ClassRule
public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();

private static final String inputTopicA = "A";
private static final String outputTopicB = "B";
private static final String outputTopicC = "C";

@BeforeClass
public static void startKafkaCluster() {
CLUSTER.createTopic(inputTopicA);
CLUSTER.createTopic(outputTopicB);
CLUSTER.createTopic(outputTopicC);
}

@Test
public void shouldFanoutTheInput() throws Exception {
public void shouldFanoutTheInput() {
final List<String> inputValues = Arrays.asList("Hello", "World");
final List<String> expectedValuesForB = inputValues.stream().map(String::toUpperCase).collect(Collectors.toList());
final List<String> expectedValuesForC = inputValues.stream().map(String::toLowerCase).collect(Collectors.toList());
Expand All @@ -88,62 +68,52 @@ public void shouldFanoutTheInput() throws Exception {

final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fanout-lambda-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config");
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

final String inputTopicA = "A";
final String outputTopicB = "B";
final String outputTopicC = "C";
final KStream<byte[], String> stream1 = builder.stream(inputTopicA);
final KStream<byte[], String> stream2 = stream1.mapValues((v -> v.toUpperCase()));
final KStream<byte[], String> stream3 = stream1.mapValues(v -> v.toLowerCase());
final KStream<byte[], String> stream2 = stream1.mapValues(s -> s.toUpperCase());
final KStream<byte[], String> stream3 = stream1.mapValues(s -> s.toLowerCase());
stream2.to(outputTopicB);
stream3.to(outputTopicC);

final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();

//
// Step 2: Produce some input data to the input topic.
//
final Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceValuesSynchronously(inputTopicA, inputValues, producerConfig);

//
// Step 3: Verify the application's output data.
//

// Verify output topic B
final Properties consumerConfigB = new Properties();
consumerConfigB.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerConfigB.put(ConsumerConfig.GROUP_ID_CONFIG, "fanout-lambda-integration-test-standard-consumer-topicB");
consumerConfigB.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfigB.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfigB.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
final List<String> actualValuesForB = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(
consumerConfigB,
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration)) {
//
// Step 2: Produce some input data to the input topic.
//
IntegrationTestUtils.produceKeyValuesSynchronously(
inputTopicA,
inputValues.stream().map(v -> new KeyValue<>(null, v)).collect(Collectors.toList()),
topologyTestDriver,
new IntegrationTestUtils.NothingSerde<>(),
new StringSerializer()
);

//
// Step 3: Verify the application's output data.
//

// Verify output topic B
final List<String> actualValuesForB = IntegrationTestUtils.drainStreamOutput(
outputTopicB,
inputValues.size()
);
assertThat(actualValuesForB).isEqualTo(expectedValuesForB);

// Verify output topic C
final Properties consumerConfigC = new Properties();
consumerConfigC.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerConfigC.put(ConsumerConfig.GROUP_ID_CONFIG, "fanout-lambda-integration-test-standard-consumer-topicC");
consumerConfigC.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfigC.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfigC.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
final List<String> actualValuesForC = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(
consumerConfigC,
topologyTestDriver,
new IntegrationTestUtils.NothingSerde<>(),
new StringDeserializer()
).stream().map(kv -> kv.value).collect(Collectors.toList());
assertThat(actualValuesForB).isEqualTo(expectedValuesForB);

// Verify output topic C
final List<String> actualValuesForC = IntegrationTestUtils.drainStreamOutput(
outputTopicC,
inputValues.size()
);
streams.close();
assertThat(actualValuesForC).isEqualTo(expectedValuesForC);
topologyTestDriver,
new IntegrationTestUtils.NothingSerde<>(),
new StringDeserializer()
).stream().map(kv -> kv.value).collect(Collectors.toList());
assertThat(actualValuesForC).isEqualTo(expectedValuesForC);
}
}

}
Loading

0 comments on commit 8a35826

Please sign in to comment.