Skip to content

Commit

Permalink
Merge branch '5.0.2-post' into 5.0.3-post
Browse files Browse the repository at this point in the history
  • Loading branch information
vvcephei committed May 13, 2019
2 parents 8c25de0 + c127cb9 commit c41a9f2
Show file tree
Hide file tree
Showing 18 changed files with 774 additions and 938 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.apache.kafka.streams.processor.ProcessorContext
* Counts record values (in String format) probabilistically and then outputs the respective count estimate.
*/
class ProbabilisticCounter(val cmsStoreName: String)
extends Transformer[Array[Byte], String, KeyValue[String, Long]] {
extends Transformer[String, String, KeyValue[String, Long]] {

private var cmsState: CMSStore[String] = _
private var processorContext: ProcessorContext = _
Expand All @@ -18,7 +18,7 @@ class ProbabilisticCounter(val cmsStoreName: String)
cmsState = this.processorContext.getStateStore(cmsStoreName).asInstanceOf[CMSStore[String]]
}

override def transform(key: Array[Byte], value: String): KeyValue[String, Long] = {
override def transform(key: String, value: String): KeyValue[String, Long] = {
// Count the record value, think: "+ 1"
cmsState.put(value, this.processorContext.timestamp())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,13 @@
*/
package io.confluent.examples.streams;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Transformer;
Expand All @@ -35,67 +31,52 @@
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.TestUtils;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;

/**
* End-to-end integration test that demonstrates how to remove duplicate records from an input
* stream.
*
* <p>
* Here, a stateful {@link org.apache.kafka.streams.kstream.Transformer} (from the Processor API)
* detects and discards duplicate input records based on an "event id" that is embedded in each
* input record. This transformer is then included in a topology defined via the DSL.
*
* <p>
* In this simplified example, the values of input records represent the event ID by which
* duplicates will be detected. In practice, record values would typically be a more complex data
* structure, with perhaps one of the fields being such an event ID. De-duplication by an event ID
* is but one example of how to perform de-duplication in general. The code example below can be
* adapted to other de-duplication approaches.
*
* <p>
* IMPORTANT: Kafka including its Streams API support exactly-once semantics since version 0.11.
* With this feature available, most use cases will no longer need to worry about duplicate messages
* or duplicate processing. That said, there will still be some use cases where you have your own
* business rules that define when two events are considered to be "the same" and need to be
* de-duplicated (e.g. two events having the same payload but different timestamps). The example
* below demonstrates how to implement your own business rules for event de-duplication.
*
* <p>
* Note: This example uses lambda expressions and thus works with Java 8+ only.
*/
public class EventDeduplicationLambdaIntegrationTest {

@ClassRule
public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();

private static String inputTopic = "inputTopic";
private static String outputTopic = "outputTopic";

private static String storeName = "eventId-store";

@BeforeClass
public static void startKafkaCluster() throws Exception {
CLUSTER.createTopic(inputTopic);
CLUSTER.createTopic(outputTopic);
}
private static final String storeName = "eventId-store";

/**
* Discards duplicate records from the input stream.
*
* <p>
* Duplicate records are detected based on an event ID; in this simplified example, the record
* value is the event ID. The transformer remembers known event IDs in an associated window state
* store, which automatically purges/expires event IDs from the store after a certain amount of
* time has passed to prevent the store from growing indefinitely.
*
* <p>
* Note: This code is for demonstration purposes and was not tested for production usage.
*/
private static class DeduplicationTransformer<K, V, E> implements Transformer<K, V, KeyValue<K, V>> {
Expand All @@ -119,9 +100,9 @@ private static class DeduplicationTransformer<K, V, E> implements Transformer<K,
* ID), during the time of which any incoming duplicates of
* the event will be dropped, thereby de-duplicating the
* input.
* @param idExtractor extracts a unique identifier from a record by which we de-duplicate input
* records; if it returns null, the record will not be considered for
* de-duping but forwarded as-is.
* @param idExtractor extracts a unique identifier from a record by which we de-duplicate input
* records; if it returns null, the record will not be considered for
* de-duping but forwarded as-is.
*/
DeduplicationTransformer(final long maintainDurationPerEventInMs, final KeyValueMapper<K, V, E> idExtractor) {
if (maintainDurationPerEventInMs < 1) {
Expand Down Expand Up @@ -159,9 +140,9 @@ public KeyValue<K, V> transform(final K key, final V value) {
private boolean isDuplicate(final E eventId) {
final long eventTime = context.timestamp();
final WindowStoreIterator<Long> timeIterator = eventIdStore.fetch(
eventId,
eventTime - leftDurationMs,
eventTime + rightDurationMs);
eventId,
eventTime - leftDurationMs,
eventTime + rightDurationMs);
final boolean isDuplicate = timeIterator.hasNext();
timeIterator.close();
return isDuplicate;
Expand All @@ -184,12 +165,12 @@ public void close() {
}

@Test
public void shouldRemoveDuplicatesFromTheInput() throws Exception {
public void shouldRemoveDuplicatesFromTheInput() {
final String firstId = UUID.randomUUID().toString(); // e.g. "4ff3cb44-abcb-46e3-8f9a-afb7cc74fbb8"
final String secondId = UUID.randomUUID().toString();
final String thirdId = UUID.randomUUID().toString();
final List<String> inputValues = Arrays.asList(firstId, secondId, firstId, firstId, secondId, thirdId,
thirdId, firstId, secondId);
thirdId, firstId, secondId);
final List<String> expectedValues = Arrays.asList(firstId, secondId, thirdId);

//
Expand All @@ -199,13 +180,9 @@ public void shouldRemoveDuplicatesFromTheInput() throws Exception {

final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "deduplication-lambda-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// The commit interval for flushing records to state stores and downstream must be lower than
// this integration test's timeout (30 secs) to ensure we observe the expected processing results.
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, TimeUnit.SECONDS.toMillis(10));
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Use a temporary directory for storing state, which will be automatically removed after the test.
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());

Expand All @@ -227,54 +204,57 @@ public void shouldRemoveDuplicatesFromTheInput() throws Exception {
final long retentionPeriod = maintainDurationPerEventInMs;

final StoreBuilder<WindowStore<String, Long>> dedupStoreBuilder = Stores.windowStoreBuilder(
Stores.persistentWindowStore(storeName,
retentionPeriod,
numberOfSegments,
maintainDurationPerEventInMs,
false
),
Serdes.String(),
Serdes.Long());
Stores.persistentWindowStore(storeName,
retentionPeriod,
numberOfSegments,
maintainDurationPerEventInMs,
false
),
Serdes.String(),
Serdes.Long());


builder.addStateStore(dedupStoreBuilder);

final String inputTopic = "inputTopic";
final String outputTopic = "outputTopic";

final KStream<byte[], String> input = builder.stream(inputTopic);
final KStream<byte[], String> deduplicated = input.transform(
// In this example, we assume that the record value as-is represents a unique event ID by
// which we can perform de-duplication. If your records are different, adapt the extractor
// function as needed.
() -> new DeduplicationTransformer<>(maintainDurationPerEventInMs, (key, value) -> value),
storeName);
// In this example, we assume that the record value as-is represents a unique event ID by
// which we can perform de-duplication. If your records are different, adapt the extractor
// function as needed.
() -> new DeduplicationTransformer<>(maintainDurationPerEventInMs, (key, value) -> value),
storeName);
deduplicated.to(outputTopic);

final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();

//
// Step 2: Produce some input data to the input topic.
//
final Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputValues, producerConfig);

//
// Step 3: Verify the application's output data.
//
final Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "deduplication-integration-test-standard-consumer");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
final List<String> actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig,
outputTopic, expectedValues.size());
streams.close();
assertThat(actualValues).containsExactlyElementsOf(expectedValues);
final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration);

try {
//
// Step 2: Produce some input data to the input topic.
//
IntegrationTestUtils.produceKeyValuesSynchronously(
inputTopic,
inputValues.stream().map(v -> new KeyValue<>(null, v)).collect(Collectors.toList()),
topologyTestDriver,
new IntegrationTestUtils.NothingSerde<>(),
new StringSerializer()
);

//
// Step 3: Verify the application's output data.
//
final List<String> actualValues = IntegrationTestUtils.drainStreamOutput(
outputTopic,
topologyTestDriver,
new IntegrationTestUtils.NothingSerde<>(),
new StringDeserializer()
).stream().map(kv -> kv.value).collect(Collectors.toList());
assertThat(actualValues).containsExactlyElementsOf(expectedValues);
} finally {
topologyTestDriver.close();
}
}

}
Loading

0 comments on commit c41a9f2

Please sign in to comment.