Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offset-based deduplication for Kafka source. #33596

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ public static void verifyDeterministic(Coder<?> target, String message, Iterable
}
}

public static <T> long getEncodedElementByteSizeUsingCoder(Coder<T> target, T value) throws Exception {
public static <T> long getEncodedElementByteSizeUsingCoder(Coder<T> target, T value)
throws Exception {
tomstepp marked this conversation as resolved.
Show resolved Hide resolved
return target.getEncodedElementByteSize(value);
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.kafka;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.Serializable;
import java.util.List;
import java.util.Optional;
Expand All @@ -42,6 +44,8 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark {
@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
private KafkaCheckpointMark() {} // for Avro

private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1;

public KafkaCheckpointMark(
List<PartitionMark> partitions, Optional<KafkaUnboundedReader<?, ?>> reader) {
this.partitions = partitions;
Expand All @@ -66,6 +70,23 @@ public String toString() {
return "KafkaCheckpointMark{partitions=" + Joiner.on(",").join(partitions) + '}';
}

@Override
public byte[] getOffsetLimit() {
if (!reader.isPresent()) {
throw new RuntimeException(
"KafkaCheckpointMark reader is not present while calling getOffsetLimit().");
}
if (!reader.get().offsetBasedDeduplicationSupported()) {
throw new RuntimeException(
"Unexpected getOffsetLimit() called while KafkaUnboundedReader not configured for offset deduplication.");
}

// KafkaUnboundedSource.split() must produce a 1:1 partition to split ratio.
checkState(partitions.size() == OFFSET_DEDUP_PARTITIONS_PER_SPLIT);
PartitionMark partition = partitions.get(/* index= */ 0);
return KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(partition.getNextOffset());
}

/**
* A tuple to hold topic, partition, and offset that comprise the checkpoint for a single
* partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,9 @@ public abstract static class Read<K, V>
@Pure
public abstract int getRedistributeNumKeys();

@Pure
public abstract @Nullable Boolean getOffsetDeduplication();

@Pure
public abstract @Nullable Duration getWatchTopicPartitionDuration();

Expand Down Expand Up @@ -782,6 +785,8 @@ abstract Builder<K, V> setConsumerFactoryFn(

abstract Builder<K, V> setRedistributeNumKeys(int redistributeNumKeys);

abstract Builder<K, V> setOffsetDeduplication(Boolean offsetDeduplication);

abstract Builder<K, V> setTimestampPolicyFactory(
TimestampPolicyFactory<K, V> timestampPolicyFactory);

Expand Down Expand Up @@ -886,11 +891,16 @@ static <K, V> void setupExternalBuilder(
if (config.allowDuplicates != null) {
builder.setAllowDuplicates(config.allowDuplicates);
}

if (config.redistribute
&& (config.allowDuplicates == null || !config.allowDuplicates)
&& config.offsetDeduplication != null) {
builder.setOffsetDeduplication(config.offsetDeduplication);
}
} else {
builder.setRedistributed(false);
builder.setRedistributeNumKeys(0);
builder.setAllowDuplicates(false);
builder.setOffsetDeduplication(false);
}
}

Expand Down Expand Up @@ -959,6 +969,7 @@ public static class Configuration {
private Integer redistributeNumKeys;
private Boolean redistribute;
private Boolean allowDuplicates;
private Boolean offsetDeduplication;

public void setConsumerConfig(Map<String, String> consumerConfig) {
this.consumerConfig = consumerConfig;
Expand Down Expand Up @@ -1015,6 +1026,10 @@ public void setRedistribute(Boolean redistribute) {
public void setAllowDuplicates(Boolean allowDuplicates) {
this.allowDuplicates = allowDuplicates;
}

public void setOffsetDeduplication(Boolean offsetDeduplication) {
this.offsetDeduplication = offsetDeduplication;
}
}
}

Expand Down Expand Up @@ -1066,26 +1081,21 @@ public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
* Sets redistribute transform that hints to the runner to try to redistribute the work evenly.
*/
public Read<K, V> withRedistribute() {
if (getRedistributeNumKeys() == 0 && isRedistributed()) {
LOG.warn("This will create a key per record, which is sub-optimal for most use cases.");
}
return toBuilder().setRedistributed(true).build();
}

public Read<K, V> withAllowDuplicates(Boolean allowDuplicates) {
if (!isAllowDuplicates()) {
LOG.warn("Setting this value without setting withRedistribute() will have no effect.");
}
return toBuilder().setAllowDuplicates(allowDuplicates).build();
}

public Read<K, V> withRedistributeNumKeys(int redistributeNumKeys) {
tomstepp marked this conversation as resolved.
Show resolved Hide resolved
checkState(
isRedistributed(),
"withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform.");
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
}

public Read<K, V> withOffsetDeduplication(Boolean offsetDeduplication) {
return toBuilder().setOffsetDeduplication(offsetDeduplication).build();
}

/**
* Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions
* from each of the matching topics are read.
Expand Down Expand Up @@ -1541,6 +1551,9 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
}
}

checkRedistributeConfiguration();

warnAboutUnsafeConfigurations(input);

// Infer key/value coders if not specified explicitly
Expand Down Expand Up @@ -1573,6 +1586,27 @@ && runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
}

private void checkRedistributeConfiguration() {
if (getRedistributeNumKeys() == 0 && isRedistributed()) {
LOG.warn(
"withRedistribute without withRedistributeNumKeys will create a key per record, which is sub-optimal for most use cases.");
}
if (isAllowDuplicates()) {
checkState(
isRedistributed(), "withAllowDuplicates without withRedistribute will have no effect.");
}
if (getRedistributeNumKeys() > 0) {
checkState(
isRedistributed(),
"withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform.");
}
if (getOffsetDeduplication() != null && getOffsetDeduplication()) {
checkState(
isRedistributed() && !isAllowDuplicates(),
"withOffsetDeduplication should only be used with withRedistribute and withAllowDuplicates(false).");
}
}

private void warnAboutUnsafeConfigurations(PBegin input) {
Long checkpointingInterval =
input
Expand Down Expand Up @@ -1776,6 +1810,9 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
if (kafkaRead.getRedistributeNumKeys() > 0) {
readTransform = readTransform.withRedistributeNumKeys(kafkaRead.getRedistributeNumKeys());
}
// if (kafkaRead.getOffsetDeduplication() != null) {
// readTransform = readTransform.withOffsetDeduplication();
// }
PCollection<KafkaSourceDescriptor> output;
if (kafkaRead.isDynamicRead()) {
Set<String> topics = new HashSet<>();
Expand Down Expand Up @@ -2221,6 +2258,9 @@ public abstract static class ReadSourceDescriptors<K, V>
@Pure
abstract int getRedistributeNumKeys();

// @Pure
// abstract @Nullable Boolean getOffsetDeduplication();

@Pure
abstract @Nullable TimestampPolicyFactory<K, V> getTimestampPolicyFactory();

Expand Down Expand Up @@ -2293,6 +2333,9 @@ abstract ReadSourceDescriptors.Builder<K, V> setBadRecordErrorHandler(

abstract ReadSourceDescriptors.Builder<K, V> setRedistributeNumKeys(int redistributeNumKeys);

// abstract ReadSourceDescriptors.Builder<K, V> setOffsetDeduplication(
// Boolean offsetDeduplication);

abstract ReadSourceDescriptors<K, V> build();
}

Expand All @@ -2308,6 +2351,7 @@ public static <K, V> ReadSourceDescriptors<K, V> read() {
.setRedistribute(false)
.setAllowDuplicates(false)
.setRedistributeNumKeys(0)
// .setOffsetDeduplication(false)
.build()
.withProcessingTime()
.withMonotonicallyIncreasingWatermarkEstimator();
Expand Down Expand Up @@ -2480,6 +2524,10 @@ public ReadSourceDescriptors<K, V> withRedistributeNumKeys(int redistributeNumKe
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
}

// public ReadSourceDescriptors<K, V> withOffsetDeduplication() {
// return toBuilder().setOffsetDeduplication(true).build();
// }

/** Use the creation time of {@link KafkaRecord} as the output timestamp. */
public ReadSourceDescriptors<K, V> withCreateTime() {
return withExtractOutputTimestampFn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ Object getDefaultValue() {
return false;
}
},
OFFSET_DEDUPLICATION(LEGACY),
;

private final @NonNull ImmutableSet<KafkaIOReadImplementation> supportedImplementations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -142,4 +144,15 @@ void update(double quantity) {
return avg;
}
}

static final class OffsetBasedDeduplication {

static byte[] encodeOffset(long offset) {
return Longs.toByteArray(offset);
}

static byte[] getUniqueId(String topic, int partition, long offset) {
return (topic + "-" + partition + "-" + offset).getBytes(StandardCharsets.UTF_8);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Deserializer;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -299,6 +300,34 @@ public Instant getCurrentTimestamp() throws NoSuchElementException {
return curTimestamp;
}

private static final byte[] EMPTY_RECORD_ID = new byte[0];

@Override
public byte[] getCurrentRecordId() throws NoSuchElementException {
tomstepp marked this conversation as resolved.
Show resolved Hide resolved
tomstepp marked this conversation as resolved.
Show resolved Hide resolved
if (!this.offsetBasedDeduplicationSupported) {
// BoundedReadFromUnboundedSource and tests may call getCurrentRecordId(), even for non offset
// deduplication cases. Therefore, Kafka reader cannot produce an exception when offset
// deduplication is disabled. Instead an empty record ID is provided.
return EMPTY_RECORD_ID;
}
if (curRecord != null) {
return KafkaIOUtils.OffsetBasedDeduplication.getUniqueId(
curRecord.getTopic(), curRecord.getPartition(), curRecord.getOffset());
tomstepp marked this conversation as resolved.
Show resolved Hide resolved
}
throw new NoSuchElementException("KafkaUnboundedReader's curRecord is null.");
}

@Override
public byte[] getCurrentRecordOffset() throws NoSuchElementException {
scwhittle marked this conversation as resolved.
Show resolved Hide resolved
if (!this.offsetBasedDeduplicationSupported) {
throw new RuntimeException("UnboundedSource must enable offset-based deduplication.");
}
if (curRecord != null) {
return KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(curRecord.getOffset());
}
throw new NoSuchElementException("KafkaUnboundedReader's curRecord is null.");
}

@Override
public long getSplitBacklogBytes() {
long backlogBytes = 0;
Expand All @@ -314,6 +343,10 @@ public long getSplitBacklogBytes() {
return backlogBytes;
}

public boolean offsetBasedDeduplicationSupported() {
return this.offsetBasedDeduplicationSupported;
}

////////////////////////////////////////////////////////////////////////////////////////////////

private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedReader.class);
Expand All @@ -332,10 +365,12 @@ public long getSplitBacklogBytes() {
private final String name;
private @Nullable Consumer<byte[], byte[]> consumer = null;
private final List<PartitionState<K, V>> partitionStates;
private @Nullable KafkaRecord<K, V> curRecord = null;
private @Nullable Instant curTimestamp = null;
private @MonotonicNonNull KafkaRecord<K, V> curRecord = null;
tomstepp marked this conversation as resolved.
Show resolved Hide resolved
private @MonotonicNonNull Instant curTimestamp = null;
private Iterator<PartitionState<K, V>> curBatch = Collections.emptyIterator();

private final boolean offsetBasedDeduplicationSupported;

private @Nullable Deserializer<K> keyDeserializerInstance = null;
private @Nullable Deserializer<V> valueDeserializerInstance = null;

Expand Down Expand Up @@ -507,6 +542,7 @@ Instant updateAndGetWatermark() {
KafkaUnboundedSource<K, V> source, @Nullable KafkaCheckpointMark checkpointMark) {
this.source = source;
this.name = "Reader-" + source.getId();
this.offsetBasedDeduplicationSupported = source.offsetBasedDeduplicationSupported();

List<TopicPartition> partitions =
Preconditions.checkArgumentNotNull(source.getSpec().getTopicPartitions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,20 @@ public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOpti
partitions.size() > 0,
"Could not find any partitions. Please check Kafka configuration and topic names");

int numSplits = Math.min(desiredNumSplits, partitions.size());
// XXX make all splits have the same # of partitions
while (partitions.size() % numSplits > 0) {
++numSplits;
int numSplits;
if (offsetBasedDeduplicationSupported()) {
// Enforce 1:1 split to partition ratio for offset deduplication.
tomstepp marked this conversation as resolved.
Show resolved Hide resolved
numSplits = partitions.size();
LOG.info(
"Offset-based deduplication is enabled for KafkaUnboundedSource. "
+ "Forcing the number of splits to equal the number of total partitions: {}.",
numSplits);
} else {
numSplits = Math.min(desiredNumSplits, partitions.size());
// Make all splits have the same # of partitions.
while (partitions.size() % numSplits > 0) {
++numSplits;
}
}
List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);

Expand Down Expand Up @@ -177,6 +187,11 @@ public boolean requiresDeduping() {
return false;
}

@Override
public boolean offsetBasedDeduplicationSupported() {
return spec.getOffsetDeduplication() != null && spec.getOffsetDeduplication();
}

@Override
public Coder<KafkaRecord<K, V>> getOutputCoder() {
Coder<K> keyCoder = Preconditions.checkStateNotNull(spec.getKeyCoder());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ public void testConstructKafkaRead() throws Exception {
expansionService.expand(request, observer);
ExpansionApi.ExpansionResponse result = observer.result;
RunnerApi.PTransform transform = result.getTransform();
System.out.println("xxx : " + result.toString());
assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*")));
Expand Down
Loading
Loading