Skip to content

Commit

Permalink
Kafka source offset-based deduplication.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomstepp committed Jan 28, 2025
1 parent 3cb1440 commit 6bfb8ff
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ public static void verifyDeterministic(Coder<?> target, String message, Iterable
}
}

public static <T> long getEncodedElementByteSizeUsingCoder(Coder<T> target, T value) throws Exception {
public static <T> long getEncodedElementByteSizeUsingCoder(Coder<T> target, T value)
throws Exception {
return target.getEncodedElementByteSize(value);
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.kafka;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.Serializable;
import java.util.List;
import java.util.Optional;
Expand All @@ -42,6 +44,8 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark {
@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
private KafkaCheckpointMark() {} // for Avro

private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1;

public KafkaCheckpointMark(
List<PartitionMark> partitions, Optional<KafkaUnboundedReader<?, ?>> reader) {
this.partitions = partitions;
Expand All @@ -66,6 +70,23 @@ public String toString() {
return "KafkaCheckpointMark{partitions=" + Joiner.on(",").join(partitions) + '}';
}

@Override
public byte[] getOffsetLimit() {
if (!reader.isPresent()) {
throw new RuntimeException(
"KafkaCheckpointMark reader is not present while calling getOffsetLimit().");
}
if (!reader.get().offsetBasedDeduplicationSupported()) {
throw new RuntimeException(
"Unexpected getOffsetLimit() called while KafkaUnboundedReader not configured for offset deduplication.");
}

// KafkaUnboundedSource.split() must produce a 1:1 partition to split ratio.
checkState(partitions.size() == OFFSET_DEDUP_PARTITIONS_PER_SPLIT);
PartitionMark partition = partitions.get(/* index= */ 0);
return KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(partition.getNextOffset());
}

/**
* A tuple to hold topic, partition, and offset that comprise the checkpoint for a single
* partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ public static <K, V> Read<K, V> read() {
.setRedistributed(false)
.setAllowDuplicates(false)
.setRedistributeNumKeys(0)
.setOffsetDeduplication(false)
.build();
}

Expand Down Expand Up @@ -717,6 +718,9 @@ public abstract static class Read<K, V>
@Pure
public abstract int getRedistributeNumKeys();

@Pure
public abstract boolean isOffsetDeduplication();

@Pure
public abstract @Nullable Duration getWatchTopicPartitionDuration();

Expand Down Expand Up @@ -782,6 +786,8 @@ abstract Builder<K, V> setConsumerFactoryFn(

abstract Builder<K, V> setRedistributeNumKeys(int redistributeNumKeys);

abstract Builder<K, V> setOffsetDeduplication(boolean offsetDeduplication);

abstract Builder<K, V> setTimestampPolicyFactory(
TimestampPolicyFactory<K, V> timestampPolicyFactory);

Expand Down Expand Up @@ -886,11 +892,16 @@ static <K, V> void setupExternalBuilder(
if (config.allowDuplicates != null) {
builder.setAllowDuplicates(config.allowDuplicates);
}

if (config.redistribute
&& (config.allowDuplicates == null || !config.allowDuplicates)
&& config.offsetDeduplication != null) {
builder.setOffsetDeduplication(config.offsetDeduplication);
}
} else {
builder.setRedistributed(false);
builder.setRedistributeNumKeys(0);
builder.setAllowDuplicates(false);
builder.setOffsetDeduplication(false);
}
}

Expand Down Expand Up @@ -959,6 +970,7 @@ public static class Configuration {
private Integer redistributeNumKeys;
private Boolean redistribute;
private Boolean allowDuplicates;
private Boolean offsetDeduplication;

public void setConsumerConfig(Map<String, String> consumerConfig) {
this.consumerConfig = consumerConfig;
Expand Down Expand Up @@ -1015,6 +1027,10 @@ public void setRedistribute(Boolean redistribute) {
public void setAllowDuplicates(Boolean allowDuplicates) {
this.allowDuplicates = allowDuplicates;
}

public void setOffsetDeduplication(Boolean offsetDeduplication) {
this.offsetDeduplication = offsetDeduplication;
}
}
}

Expand Down Expand Up @@ -1066,26 +1082,21 @@ public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
* Sets redistribute transform that hints to the runner to try to redistribute the work evenly.
*/
public Read<K, V> withRedistribute() {
if (getRedistributeNumKeys() == 0 && isRedistributed()) {
LOG.warn("This will create a key per record, which is sub-optimal for most use cases.");
}
return toBuilder().setRedistributed(true).build();
}

public Read<K, V> withAllowDuplicates(Boolean allowDuplicates) {
if (!isAllowDuplicates()) {
LOG.warn("Setting this value without setting withRedistribute() will have no effect.");
}
return toBuilder().setAllowDuplicates(allowDuplicates).build();
}

public Read<K, V> withRedistributeNumKeys(int redistributeNumKeys) {
checkState(
isRedistributed(),
"withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform.");
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
}

public Read<K, V> withOffsetDeduplication(boolean offsetDeduplication) {
return toBuilder().setOffsetDeduplication(offsetDeduplication).build();
}

/**
* Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions
* from each of the matching topics are read.
Expand Down Expand Up @@ -1541,6 +1552,9 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
}
}

checkRedistributeConfiguration();

warnAboutUnsafeConfigurations(input);

// Infer key/value coders if not specified explicitly
Expand Down Expand Up @@ -1573,6 +1587,27 @@ && runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
}

private void checkRedistributeConfiguration() {
if (getRedistributeNumKeys() == 0 && isRedistributed()) {
LOG.warn(
"withRedistribute without withRedistributeNumKeys will create a key per record, which is sub-optimal for most use cases.");
}
if (isAllowDuplicates()) {
checkState(
isRedistributed(), "withAllowDuplicates without withRedistribute will have no effect.");
}
if (getRedistributeNumKeys() > 0) {
checkState(
isRedistributed(),
"withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform.");
}
if (isOffsetDeduplication()) {
checkState(
isRedistributed() && !isAllowDuplicates(),
"withOffsetDeduplication should only be used with withRedistribute and withAllowDuplicates(false).");
}
}

private void warnAboutUnsafeConfigurations(PBegin input) {
Long checkpointingInterval =
input
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,12 @@ Object getDefaultValue() {
return false;
}
},
;
OFFSET_DEDUPLICATION(LEGACY) {
@Override
Object getDefaultValue() {
return false;
}
};

private final @NonNull ImmutableSet<KafkaIOReadImplementation> supportedImplementations;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -142,4 +144,15 @@ void update(double quantity) {
return avg;
}
}

static final class OffsetBasedDeduplication {

static byte[] encodeOffset(long offset) {
return Longs.toByteArray(offset);
}

static byte[] getUniqueId(String topic, int partition, long offset) {
return (topic + "-" + partition + "-" + offset).getBytes(StandardCharsets.UTF_8);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Deserializer;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -299,6 +300,34 @@ public Instant getCurrentTimestamp() throws NoSuchElementException {
return curTimestamp;
}

private static final byte[] EMPTY_RECORD_ID = new byte[0];

@Override
public byte[] getCurrentRecordId() throws NoSuchElementException {
if (!this.offsetBasedDeduplicationSupported) {
// BoundedReadFromUnboundedSource and tests may call getCurrentRecordId(), even for non offset
// deduplication cases. Therefore, Kafka reader cannot produce an exception when offset
// deduplication is disabled. Instead an empty record ID is provided.
return EMPTY_RECORD_ID;
}
if (curRecord != null) {
return KafkaIOUtils.OffsetBasedDeduplication.getUniqueId(
curRecord.getTopic(), curRecord.getPartition(), curRecord.getOffset());
}
throw new NoSuchElementException("KafkaUnboundedReader's curRecord is null.");
}

@Override
public byte[] getCurrentRecordOffset() throws NoSuchElementException {
if (!this.offsetBasedDeduplicationSupported) {
throw new RuntimeException("UnboundedSource must enable offset-based deduplication.");
}
if (curRecord != null) {
return KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(curRecord.getOffset());
}
throw new NoSuchElementException("KafkaUnboundedReader's curRecord is null.");
}

@Override
public long getSplitBacklogBytes() {
long backlogBytes = 0;
Expand All @@ -314,6 +343,10 @@ public long getSplitBacklogBytes() {
return backlogBytes;
}

public boolean offsetBasedDeduplicationSupported() {
return this.offsetBasedDeduplicationSupported;
}

////////////////////////////////////////////////////////////////////////////////////////////////

private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedReader.class);
Expand All @@ -332,10 +365,12 @@ public long getSplitBacklogBytes() {
private final String name;
private @Nullable Consumer<byte[], byte[]> consumer = null;
private final List<PartitionState<K, V>> partitionStates;
private @Nullable KafkaRecord<K, V> curRecord = null;
private @Nullable Instant curTimestamp = null;
private @MonotonicNonNull KafkaRecord<K, V> curRecord = null;
private @MonotonicNonNull Instant curTimestamp = null;
private Iterator<PartitionState<K, V>> curBatch = Collections.emptyIterator();

private final boolean offsetBasedDeduplicationSupported;

private @Nullable Deserializer<K> keyDeserializerInstance = null;
private @Nullable Deserializer<V> valueDeserializerInstance = null;

Expand Down Expand Up @@ -507,6 +542,7 @@ Instant updateAndGetWatermark() {
KafkaUnboundedSource<K, V> source, @Nullable KafkaCheckpointMark checkpointMark) {
this.source = source;
this.name = "Reader-" + source.getId();
this.offsetBasedDeduplicationSupported = source.offsetBasedDeduplicationSupported();

List<TopicPartition> partitions =
Preconditions.checkArgumentNotNull(source.getSpec().getTopicPartitions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,20 @@ public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOpti
partitions.size() > 0,
"Could not find any partitions. Please check Kafka configuration and topic names");

int numSplits = Math.min(desiredNumSplits, partitions.size());
// XXX make all splits have the same # of partitions
while (partitions.size() % numSplits > 0) {
++numSplits;
int numSplits;
if (offsetBasedDeduplicationSupported()) {
// Enforce 1:1 split to partition ratio for offset deduplication.
numSplits = partitions.size();
LOG.info(
"Offset-based deduplication is enabled for KafkaUnboundedSource. "
+ "Forcing the number of splits to equal the number of total partitions: {}.",
numSplits);
} else {
numSplits = Math.min(desiredNumSplits, partitions.size());
// Make all splits have the same # of partitions.
while (partitions.size() % numSplits > 0) {
++numSplits;
}
}
List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);

Expand Down Expand Up @@ -177,6 +187,11 @@ public boolean requiresDeduping() {
return false;
}

@Override
public boolean offsetBasedDeduplicationSupported() {
return spec.isOffsetDeduplication();
}

@Override
public Coder<KafkaRecord<K, V>> getOutputCoder() {
Coder<K> keyCoder = Preconditions.checkStateNotNull(spec.getKeyCoder());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public void testConstructKafkaRead() throws Exception {
Field.of("consumer_polling_timeout", FieldType.INT64),
Field.of("redistribute_num_keys", FieldType.INT32),
Field.of("redistribute", FieldType.BOOLEAN),
Field.of("allow_duplicates", FieldType.BOOLEAN)))
Field.of("allow_duplicates", FieldType.BOOLEAN),
Field.of("offset_deduplication", FieldType.BOOLEAN)))
.withFieldValue("topics", topics)
.withFieldValue("consumer_config", consumerConfig)
.withFieldValue("key_deserializer", keyDeserializer)
Expand All @@ -123,6 +124,7 @@ public void testConstructKafkaRead() throws Exception {
.withFieldValue("redistribute_num_keys", 0)
.withFieldValue("redistribute", false)
.withFieldValue("allow_duplicates", false)
.withFieldValue("offset_deduplication", false)
.build());

RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
Expand All @@ -145,7 +147,7 @@ public void testConstructKafkaRead() throws Exception {
expansionService.expand(request, observer);
ExpansionApi.ExpansionResponse result = observer.result;
RunnerApi.PTransform transform = result.getTransform();
System.out.println("xxx : " + result.toString());
System.out.println("Expansion result: " + result.toString());
assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*")));
Expand Down Expand Up @@ -247,7 +249,8 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
Field.of("timestamp_policy", FieldType.STRING),
Field.of("redistribute_num_keys", FieldType.INT32),
Field.of("redistribute", FieldType.BOOLEAN),
Field.of("allow_duplicates", FieldType.BOOLEAN)))
Field.of("allow_duplicates", FieldType.BOOLEAN),
Field.of("offset_deduplication", FieldType.BOOLEAN)))
.withFieldValue("topics", topics)
.withFieldValue("consumer_config", consumerConfig)
.withFieldValue("key_deserializer", keyDeserializer)
Expand All @@ -258,6 +261,7 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
.withFieldValue("redistribute_num_keys", 0)
.withFieldValue("redistribute", false)
.withFieldValue("allow_duplicates", false)
.withFieldValue("offset_deduplication", false)
.build());

RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
Expand All @@ -281,6 +285,7 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
ExpansionApi.ExpansionResponse result = observer.result;
RunnerApi.PTransform transform = result.getTransform();

System.out.println("Expansion result: " + result.toString());
assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*")));
Expand Down
Loading

0 comments on commit 6bfb8ff

Please sign in to comment.