Skip to content

Commit

Permalink
Kafka source offset-based deduplication.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomstepp committed Jan 15, 2025
1 parent d204f6a commit 829cc76
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
package org.apache.beam.sdk.util;

import java.math.RoundingMode;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
package org.apache.beam.sdk.util;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ public String toString() {
return "KafkaCheckpointMark{partitions=" + Joiner.on(",").join(partitions) + '}';
}

@Override
public byte[] getOffsetLimit() {
// Currently Kafka offset-based deduplication is only supported with a single Kafka partition
// per Beam split. Enforce the number of Beam splits with:
// "--desiredNumUnboundedSourceSplits=<NumKafkaPartitions>".
if (reader.isPresent() && reader.get().offsetDeduplication()) {
PartitionMark partition = partitions.get(0);
return KafkaIOUtils.getOrderedCode(partition.getNextOffset());
}
return new byte[0];
}

/**
* A tuple to hold topic, partition, and offset that comprise the checkpoint for a single
* partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ public static <K, V> Read<K, V> read() {
.setRedistributed(false)
.setAllowDuplicates(false)
.setRedistributeNumKeys(0)
.setOffsetDeduplication(false)
.build();
}

Expand Down Expand Up @@ -717,6 +718,9 @@ public abstract static class Read<K, V>
@Pure
public abstract int getRedistributeNumKeys();

@Pure
public abstract boolean isOffsetDeduplication();

@Pure
public abstract @Nullable Duration getWatchTopicPartitionDuration();

Expand Down Expand Up @@ -782,6 +786,8 @@ abstract Builder<K, V> setConsumerFactoryFn(

abstract Builder<K, V> setRedistributeNumKeys(int redistributeNumKeys);

abstract Builder<K, V> setOffsetDeduplication(boolean offsetDeduplication);

abstract Builder<K, V> setTimestampPolicyFactory(
TimestampPolicyFactory<K, V> timestampPolicyFactory);

Expand Down Expand Up @@ -892,6 +898,10 @@ static <K, V> void setupExternalBuilder(
builder.setRedistributeNumKeys(0);
builder.setAllowDuplicates(false);
}
// TODO(tomstepp): Auto-enable offset deduplication if: redistributed and !allowDuplicates.
if (config.offsetDeduplication != null) {
builder.setOffsetDeduplication(config.offsetDeduplication);
}
}

private static <T> Coder<T> resolveCoder(Class<Deserializer<T>> deserializer) {
Expand Down Expand Up @@ -959,6 +969,7 @@ public static class Configuration {
private Integer redistributeNumKeys;
private Boolean redistribute;
private Boolean allowDuplicates;
private Boolean offsetDeduplication;

public void setConsumerConfig(Map<String, String> consumerConfig) {
this.consumerConfig = consumerConfig;
Expand Down Expand Up @@ -1015,6 +1026,10 @@ public void setRedistribute(Boolean redistribute) {
public void setAllowDuplicates(Boolean allowDuplicates) {
this.allowDuplicates = allowDuplicates;
}

public void setOffsetDeduplication(Boolean offsetDeduplication) {
this.offsetDeduplication = offsetDeduplication;
}
}
}

Expand Down Expand Up @@ -1086,6 +1101,10 @@ public Read<K, V> withRedistributeNumKeys(int redistributeNumKeys) {
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
}

public Read<K, V> withOffsetDeduplication(boolean offsetDeduplication) {
return toBuilder().setOffsetDeduplication(offsetDeduplication).build();
}

/**
* Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions
* from each of the matching topics are read.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.OrderedCode;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -142,4 +144,14 @@ void update(double quantity) {
return avg;
}
}

static byte[] getOrderedCode(long offset) {
OrderedCode orderedCode = new OrderedCode();
orderedCode.writeNumIncreasing(offset);
return orderedCode.getEncodedBytes();
}

static byte[] getUniqueId(String topic, int partition, long offset) {
return (topic + "-" + partition + "-" + offset).getBytes(StandardCharsets.UTF_8);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ public boolean advance() throws IOException {
curTimestamp =
pState.timestampPolicy.getTimestampForRecord(pState.mkTimestampPolicyContext(), record);
curRecord = record;
if (this.offsetDeduplication) {
curOffset = KafkaIOUtils.getOrderedCode(offset);
curId = KafkaIOUtils.getUniqueId(rawRecord.topic(), rawRecord.partition(), rawRecord.offset());
}

int recordSize =
(rawRecord.key() == null ? 0 : rawRecord.key().length)
Expand Down Expand Up @@ -299,6 +303,30 @@ public Instant getCurrentTimestamp() throws NoSuchElementException {
return curTimestamp;
}

@Override
public byte[] getCurrentRecordId() throws NoSuchElementException {
if (curId == null) {
if (this.offsetDeduplication) {
throw new NoSuchElementException();
} else {
return new byte[0];
}
}
return curId;
}

@Override
public byte[] getCurrentRecordOffset() throws NoSuchElementException {
if (curOffset == null) {
if (this.offsetDeduplication) {
throw new NoSuchElementException();
} else {
return new byte[0];
}
}
return curOffset;
}

@Override
public long getSplitBacklogBytes() {
long backlogBytes = 0;
Expand All @@ -314,6 +342,10 @@ public long getSplitBacklogBytes() {
return backlogBytes;
}

public boolean offsetDeduplication() {
return offsetDeduplication;
}

////////////////////////////////////////////////////////////////////////////////////////////////

private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedReader.class);
Expand All @@ -336,6 +368,10 @@ public long getSplitBacklogBytes() {
private @Nullable Instant curTimestamp = null;
private Iterator<PartitionState<K, V>> curBatch = Collections.emptyIterator();

private final boolean offsetDeduplication;
private byte[] curOffset = new byte[0];
private byte[] curId = new byte[0];

private @Nullable Deserializer<K> keyDeserializerInstance = null;
private @Nullable Deserializer<V> valueDeserializerInstance = null;

Expand Down Expand Up @@ -507,6 +543,7 @@ Instant updateAndGetWatermark() {
KafkaUnboundedSource<K, V> source, @Nullable KafkaCheckpointMark checkpointMark) {
this.source = source;
this.name = "Reader-" + source.getId();
this.offsetDeduplication = source.offsetDeduplication();

List<TopicPartition> partitions =
Preconditions.checkArgumentNotNull(source.getSpec().getTopicPartitions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ public boolean requiresDeduping() {
return false;
}

@Override
public boolean offsetDeduplication() {
return spec.isOffsetDeduplication();
}

@Override
public Coder<KafkaRecord<K, V>> getOutputCoder() {
Coder<K> keyCoder = Preconditions.checkStateNotNull(spec.getKeyCoder());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ static class KafkaIOReadWithMetadataTranslator implements TransformPayloadTransl
.addBooleanField("redistribute")
.addBooleanField("allows_duplicates")
.addNullableInt32Field("redistribute_num_keys")
.addBooleanField("offset_deduplication")
.addNullableLogicalTypeField("watch_topic_partition_duration", new NanosDuration())
.addByteArrayField("timestamp_policy_factory")
.addNullableMapField("offset_consumer_config", FieldType.STRING, FieldType.BYTES)
Expand Down Expand Up @@ -221,6 +222,7 @@ public Row toConfigRow(Read<?, ?> transform) {
fieldValues.put("redistribute", transform.isRedistributed());
fieldValues.put("redistribute_num_keys", transform.getRedistributeNumKeys());
fieldValues.put("allows_duplicates", transform.isAllowDuplicates());
fieldValues.put("offset_deduplication", transform.isOffsetDeduplication());
return Row.withSchema(schema).withFieldValues(fieldValues).build();
}

Expand Down Expand Up @@ -349,6 +351,10 @@ public Row toConfigRow(Read<?, ?> transform) {
}
}
}
Boolean offsetDeduplication = configRow.getValue("offset_deduplication");
if (offsetDeduplication != null) {
transform = transform.withOffsetDeduplication(offsetDeduplication);
}
Duration maxReadTime = configRow.getValue("max_read_time");
if (maxReadTime != null) {
transform =
Expand Down

0 comments on commit 829cc76

Please sign in to comment.