From 9a2d27eb1179bddbcda514c86685f796ae01cdb3 Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Wed, 15 Jan 2025 00:23:24 +0000 Subject: [PATCH] Kafka source offset-based deduplication. --- .../apache/beam/sdk/util}/OrderedCode.java | 2 +- .../sdk/io/kafka/KafkaCheckpointMark.java | 12 ++++++ .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 19 ++++++++++ .../beam/sdk/io/kafka/KafkaIOUtils.java | 12 ++++++ .../sdk/io/kafka/KafkaUnboundedReader.java | 37 +++++++++++++++++++ .../sdk/io/kafka/KafkaUnboundedSource.java | 5 +++ .../io/kafka/upgrade/KafkaIOTranslation.java | 6 +++ 7 files changed, 92 insertions(+), 1 deletion(-) rename {runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker => sdks/java/core/src/main/java/org/apache/beam/sdk/util}/OrderedCode.java (99%) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OrderedCode.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/OrderedCode.java similarity index 99% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OrderedCode.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/util/OrderedCode.java index 89217c72ad19..72d522ac1e46 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OrderedCode.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/OrderedCode.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker; +package org.apache.beam.sdk.util; import java.math.RoundingMode; import java.util.ArrayList; diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java index 966363e41f62..a4583b2d6c44 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java @@ -66,6 +66,18 @@ public String toString() { return "KafkaCheckpointMark{partitions=" + Joiner.on(",").join(partitions) + '}'; } + @Override + public byte[] getOffsetLimit() { + // Currently Kafka offset-based deduplication is only supported with a single Kafka partition + // per Beam split. Enforce the number of Beam splits with: + // "--desiredNumUnboundedSourceSplits=". + if (reader.isPresent() && reader.get().offsetDeduplication()) { + PartitionMark partition = partitions.get(0); + return KafkaIOUtils.getOrderedCode(partition.getNextOffset()); + } + return new byte[0]; + } + /** * A tuple to hold topic, partition, and offset that comprise the checkpoint for a single * partition. diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index cb7b3020c66a..8d48c38ea1a3 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -610,6 +610,7 @@ public static Read read() { .setRedistributed(false) .setAllowDuplicates(false) .setRedistributeNumKeys(0) + .setOffsetDeduplication(false) .build(); } @@ -717,6 +718,9 @@ public abstract static class Read @Pure public abstract int getRedistributeNumKeys(); + @Pure + public abstract boolean isOffsetDeduplication(); + @Pure public abstract @Nullable Duration getWatchTopicPartitionDuration(); @@ -782,6 +786,8 @@ abstract Builder setConsumerFactoryFn( abstract Builder setRedistributeNumKeys(int redistributeNumKeys); + abstract Builder setOffsetDeduplication(boolean offsetDeduplication); + abstract Builder setTimestampPolicyFactory( TimestampPolicyFactory timestampPolicyFactory); @@ -892,6 +898,10 @@ static void setupExternalBuilder( builder.setRedistributeNumKeys(0); builder.setAllowDuplicates(false); } + // TODO(tomstepp): Auto-enable offset deduplication if: redistributed and !allowDuplicates. + if (config.offsetDeduplication != null) { + builder.setOffsetDeduplication(config.offsetDeduplication); + } } private static Coder resolveCoder(Class> deserializer) { @@ -959,6 +969,7 @@ public static class Configuration { private Integer redistributeNumKeys; private Boolean redistribute; private Boolean allowDuplicates; + private Boolean offsetDeduplication; public void setConsumerConfig(Map consumerConfig) { this.consumerConfig = consumerConfig; @@ -1015,6 +1026,10 @@ public void setRedistribute(Boolean redistribute) { public void setAllowDuplicates(Boolean allowDuplicates) { this.allowDuplicates = allowDuplicates; } + + public void setOffsetDeduplication(Boolean offsetDeduplication) { + this.offsetDeduplication = offsetDeduplication; + } } } @@ -1086,6 +1101,10 @@ public Read withRedistributeNumKeys(int redistributeNumKeys) { return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build(); } + public Read withOffsetDeduplication(boolean offsetDeduplication) { + return toBuilder().setOffsetDeduplication(offsetDeduplication).build(); + } + /** * Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions * from each of the matching topics are read. diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java index 748418d16664..28f511e95035 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java @@ -19,10 +19,12 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.Random; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.OrderedCode; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -142,4 +144,14 @@ void update(double quantity) { return avg; } } + + static byte[] getOrderedCode(long offset) { + OrderedCode orderedCode = new OrderedCode(); + orderedCode.writeNumIncreasing(offset); + return orderedCode.getEncodedBytes(); + } + + static byte[] getUniqueId(String topic, int partition, long offset) { + return (topic + "-" + partition + "-" + offset).getBytes(StandardCharsets.UTF_8); + } } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index ab9e26b3b740..358ae027a659 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -214,6 +214,10 @@ public boolean advance() throws IOException { curTimestamp = pState.timestampPolicy.getTimestampForRecord(pState.mkTimestampPolicyContext(), record); curRecord = record; + if (this.offsetDeduplication) { + curOffset = KafkaIOUtils.getOrderedCode(offset); + curId = KafkaIOUtils.getUniqueId(rawRecord.topic(), rawRecord.partition(), rawRecord.offset()); + } int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) @@ -299,6 +303,30 @@ public Instant getCurrentTimestamp() throws NoSuchElementException { return curTimestamp; } + @Override + public byte[] getCurrentRecordId() throws NoSuchElementException { + if (curId == null) { + if (this.offsetDeduplication) { + throw new NoSuchElementException(); + } else { + return new byte[0]; + } + } + return curId; + } + + @Override + public byte[] getCurrentRecordOffset() throws NoSuchElementException { + if (curOffset == null) { + if (this.offsetDeduplication) { + throw new NoSuchElementException(); + } else { + return new byte[0]; + } + } + return curOffset; + } + @Override public long getSplitBacklogBytes() { long backlogBytes = 0; @@ -314,6 +342,10 @@ public long getSplitBacklogBytes() { return backlogBytes; } + public boolean offsetDeduplication() { + return offsetDeduplication; + } + //////////////////////////////////////////////////////////////////////////////////////////////// private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedReader.class); @@ -336,6 +368,10 @@ public long getSplitBacklogBytes() { private @Nullable Instant curTimestamp = null; private Iterator> curBatch = Collections.emptyIterator(); + private final boolean offsetDeduplication; + private byte[] curOffset = new byte[0]; + private byte[] curId = new byte[0]; + private @Nullable Deserializer keyDeserializerInstance = null; private @Nullable Deserializer valueDeserializerInstance = null; @@ -507,6 +543,7 @@ Instant updateAndGetWatermark() { KafkaUnboundedSource source, @Nullable KafkaCheckpointMark checkpointMark) { this.source = source; this.name = "Reader-" + source.getId(); + this.offsetDeduplication = source.offsetDeduplication(); List partitions = Preconditions.checkArgumentNotNull(source.getSpec().getTopicPartitions()); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java index 9685d859b0a1..1fe13b8999c4 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java @@ -177,6 +177,11 @@ public boolean requiresDeduping() { return false; } + @Override + public boolean offsetDeduplication() { + return spec.isOffsetDeduplication(); + } + @Override public Coder> getOutputCoder() { Coder keyCoder = Preconditions.checkStateNotNull(spec.getKeyCoder()); diff --git a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java index 841236969d25..3d10b96c00b5 100644 --- a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java +++ b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java @@ -101,6 +101,7 @@ static class KafkaIOReadWithMetadataTranslator implements TransformPayloadTransl .addBooleanField("redistribute") .addBooleanField("allows_duplicates") .addNullableInt32Field("redistribute_num_keys") + .addBooleanField("offset_deduplication") .addNullableLogicalTypeField("watch_topic_partition_duration", new NanosDuration()) .addByteArrayField("timestamp_policy_factory") .addNullableMapField("offset_consumer_config", FieldType.STRING, FieldType.BYTES) @@ -221,6 +222,7 @@ public Row toConfigRow(Read transform) { fieldValues.put("redistribute", transform.isRedistributed()); fieldValues.put("redistribute_num_keys", transform.getRedistributeNumKeys()); fieldValues.put("allows_duplicates", transform.isAllowDuplicates()); + fieldValues.put("offset_deduplication", transform.isOffsetDeduplication()); return Row.withSchema(schema).withFieldValues(fieldValues).build(); } @@ -349,6 +351,10 @@ public Row toConfigRow(Read transform) { } } } + Boolean offsetDeduplication = configRow.getValue("offset_deduplication"); + if (offsetDeduplication != null) { + transform = transform.withOffsetDeduplication(offsetDeduplication); + } Duration maxReadTime = configRow.getValue("max_read_time"); if (maxReadTime != null) { transform =