diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java index 6bcdea0c0ab6..0a3650ca133b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java @@ -198,7 +198,8 @@ public static void verifyDeterministic(Coder target, String message, Iterable } } - public static long getEncodedElementByteSizeUsingCoder(Coder target, T value) throws Exception { + public static long getEncodedElementByteSizeUsingCoder(Coder target, T value) + throws Exception { return target.getEncodedElementByteSize(value); } /** diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java index 966363e41f62..4271d6f72a03 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.kafka; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + import java.io.Serializable; import java.util.List; import java.util.Optional; @@ -42,6 +44,8 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark { @SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction private KafkaCheckpointMark() {} // for Avro + private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1; + public KafkaCheckpointMark( List partitions, Optional> reader) { this.partitions = partitions; @@ -66,6 +70,23 @@ public String toString() { return "KafkaCheckpointMark{partitions=" + Joiner.on(",").join(partitions) + '}'; } + @Override + public byte[] getOffsetLimit() { + if (!reader.isPresent()) { + throw new RuntimeException( + "KafkaCheckpointMark reader is not present while calling getOffsetLimit()."); + } + if (!reader.get().offsetBasedDeduplicationSupported()) { + throw new RuntimeException( + "Unexpected getOffsetLimit() called while KafkaUnboundedReader not configured for offset deduplication."); + } + + // KafkaUnboundedSource.split() must produce a 1:1 partition to split ratio. + checkState(partitions.size() == OFFSET_DEDUP_PARTITIONS_PER_SPLIT); + PartitionMark partition = partitions.get(/* index= */ 0); + return KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(partition.getNextOffset()); + } + /** * A tuple to hold topic, partition, and offset that comprise the checkpoint for a single * partition. diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index cb7b3020c66a..f3127ad3b113 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -610,6 +610,7 @@ public static Read read() { .setRedistributed(false) .setAllowDuplicates(false) .setRedistributeNumKeys(0) + .setOffsetDeduplication(false) .build(); } @@ -717,6 +718,9 @@ public abstract static class Read @Pure public abstract int getRedistributeNumKeys(); + @Pure + public abstract boolean isOffsetDeduplication(); + @Pure public abstract @Nullable Duration getWatchTopicPartitionDuration(); @@ -782,6 +786,8 @@ abstract Builder setConsumerFactoryFn( abstract Builder setRedistributeNumKeys(int redistributeNumKeys); + abstract Builder setOffsetDeduplication(boolean offsetDeduplication); + abstract Builder setTimestampPolicyFactory( TimestampPolicyFactory timestampPolicyFactory); @@ -886,11 +892,20 @@ static void setupExternalBuilder( if (config.allowDuplicates != null) { builder.setAllowDuplicates(config.allowDuplicates); } - + /* + * TODO(tomstepp): Auto-enable offset deduplication if: redistributed and !allowDuplicates. + * Until then, enforce explicit enablement only with redistributed without duplicates. + */ + if (config.redistribute + && (config.allowDuplicates == null || !config.allowDuplicates) + && config.offsetDeduplication != null) { + builder.setOffsetDeduplication(config.offsetDeduplication); + } } else { builder.setRedistributed(false); builder.setRedistributeNumKeys(0); builder.setAllowDuplicates(false); + builder.setOffsetDeduplication(false); } } @@ -959,6 +974,7 @@ public static class Configuration { private Integer redistributeNumKeys; private Boolean redistribute; private Boolean allowDuplicates; + private Boolean offsetDeduplication; public void setConsumerConfig(Map consumerConfig) { this.consumerConfig = consumerConfig; @@ -1015,6 +1031,10 @@ public void setRedistribute(Boolean redistribute) { public void setAllowDuplicates(Boolean allowDuplicates) { this.allowDuplicates = allowDuplicates; } + + public void setOffsetDeduplication(Boolean offsetDeduplication) { + this.offsetDeduplication = offsetDeduplication; + } } } @@ -1073,7 +1093,7 @@ public Read withRedistribute() { } public Read withAllowDuplicates(Boolean allowDuplicates) { - if (!isAllowDuplicates()) { + if (!isRedistributed()) { LOG.warn("Setting this value without setting withRedistribute() will have no effect."); } return toBuilder().setAllowDuplicates(allowDuplicates).build(); @@ -1086,6 +1106,17 @@ public Read withRedistributeNumKeys(int redistributeNumKeys) { return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build(); } + public Read withOffsetDeduplication(boolean offsetDeduplication) { + /* + * TODO(tomstepp): Auto-enable offset deduplication if: redistributed and !allowDuplicates. + * Until then, enforce explicit enablement only with redistributed without duplicates. + */ + checkState( + isRedistributed() && !isAllowDuplicates(), + "withOffsetDeduplication is currently only supported with: withRedistribute() and withAllowDuplicates(false)."); + return toBuilder().setOffsetDeduplication(offsetDeduplication).build(); + } + /** * Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions * from each of the matching topics are read. diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java index 457e0003705e..6702380c0897 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java @@ -137,7 +137,12 @@ Object getDefaultValue() { return false; } }, - ; + OFFSET_DEDUPLICATION(LEGACY) { + @Override + Object getDefaultValue() { + return false; + } + }; private final @NonNull ImmutableSet supportedImplementations; diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java index 748418d16664..95f95000a58f 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java @@ -19,11 +19,13 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.Random; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -142,4 +144,15 @@ void update(double quantity) { return avg; } } + + static final class OffsetBasedDeduplication { + + static byte[] encodeOffset(long offset) { + return Longs.toByteArray(offset); + } + + static byte[] getUniqueId(String topic, int partition, long offset) { + return (topic + "-" + partition + "-" + offset).getBytes(StandardCharsets.UTF_8); + } + } } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index ab9e26b3b740..36a024d41a50 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -66,6 +66,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.Deserializer; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -299,6 +300,29 @@ public Instant getCurrentTimestamp() throws NoSuchElementException { return curTimestamp; } + @Override + public byte[] getCurrentRecordId() throws NoSuchElementException { + if (!this.offsetBasedDeduplicationSupported) { + throw new RuntimeException("UnboundedSource must enable offset-based deduplication."); + } + if (curRecord != null) { + return KafkaIOUtils.OffsetBasedDeduplication.getUniqueId( + curRecord.getTopic(), curRecord.getPartition(), curRecord.getOffset()); + } + throw new NoSuchElementException("KafkaUnboundedReader's curRecord is null."); + } + + @Override + public byte[] getCurrentRecordOffset() throws NoSuchElementException { + if (!this.offsetBasedDeduplicationSupported) { + throw new RuntimeException("UnboundedSource must enable offset-based deduplication."); + } + if (curRecord != null) { + return KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(curRecord.getOffset()); + } + throw new NoSuchElementException("KafkaUnboundedReader's curRecord is null."); + } + @Override public long getSplitBacklogBytes() { long backlogBytes = 0; @@ -314,6 +338,10 @@ public long getSplitBacklogBytes() { return backlogBytes; } + public boolean offsetBasedDeduplicationSupported() { + return this.offsetBasedDeduplicationSupported; + } + //////////////////////////////////////////////////////////////////////////////////////////////// private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedReader.class); @@ -332,10 +360,12 @@ public long getSplitBacklogBytes() { private final String name; private @Nullable Consumer consumer = null; private final List> partitionStates; - private @Nullable KafkaRecord curRecord = null; + private @MonotonicNonNull KafkaRecord curRecord = null; private @Nullable Instant curTimestamp = null; private Iterator> curBatch = Collections.emptyIterator(); + private final boolean offsetBasedDeduplicationSupported; + private @Nullable Deserializer keyDeserializerInstance = null; private @Nullable Deserializer valueDeserializerInstance = null; @@ -507,6 +537,7 @@ Instant updateAndGetWatermark() { KafkaUnboundedSource source, @Nullable KafkaCheckpointMark checkpointMark) { this.source = source; this.name = "Reader-" + source.getId(); + this.offsetBasedDeduplicationSupported = source.offsetBasedDeduplicationSupported(); List partitions = Preconditions.checkArgumentNotNull(source.getSpec().getTopicPartitions()); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java index 9685d859b0a1..dc59de133cda 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java @@ -113,10 +113,20 @@ public List> split(int desiredNumSplits, PipelineOpti partitions.size() > 0, "Could not find any partitions. Please check Kafka configuration and topic names"); - int numSplits = Math.min(desiredNumSplits, partitions.size()); - // XXX make all splits have the same # of partitions - while (partitions.size() % numSplits > 0) { - ++numSplits; + int numSplits; + if (offsetBasedDeduplicationSupported()) { + // Enforce 1:1 split to partition ratio for offset deduplication. + numSplits = partitions.size(); + LOG.info( + "Offset-based deduplication is enabled for KafkaUnboundedSource. " + + "Forcing the number of splits to equal the number of total partitions: {}.", + numSplits); + } else { + numSplits = Math.min(desiredNumSplits, partitions.size()); + // Make all splits have the same # of partitions. + while (partitions.size() % numSplits > 0) { + ++numSplits; + } } List> assignments = new ArrayList<>(numSplits); @@ -177,6 +187,11 @@ public boolean requiresDeduping() { return false; } + @Override + public boolean offsetBasedDeduplicationSupported() { + return spec.isOffsetDeduplication(); + } + @Override public Coder> getOutputCoder() { Coder keyCoder = Preconditions.checkStateNotNull(spec.getKeyCoder()); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java index f021789a912c..482476536577 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java @@ -111,7 +111,8 @@ public void testConstructKafkaRead() throws Exception { Field.of("consumer_polling_timeout", FieldType.INT64), Field.of("redistribute_num_keys", FieldType.INT32), Field.of("redistribute", FieldType.BOOLEAN), - Field.of("allow_duplicates", FieldType.BOOLEAN))) + Field.of("allow_duplicates", FieldType.BOOLEAN), + Field.of("offset_deduplication", FieldType.BOOLEAN))) .withFieldValue("topics", topics) .withFieldValue("consumer_config", consumerConfig) .withFieldValue("key_deserializer", keyDeserializer) @@ -123,6 +124,7 @@ public void testConstructKafkaRead() throws Exception { .withFieldValue("redistribute_num_keys", 0) .withFieldValue("redistribute", false) .withFieldValue("allow_duplicates", false) + .withFieldValue("offset_deduplication", false) .build()); RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance(); @@ -247,7 +249,8 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception { Field.of("timestamp_policy", FieldType.STRING), Field.of("redistribute_num_keys", FieldType.INT32), Field.of("redistribute", FieldType.BOOLEAN), - Field.of("allow_duplicates", FieldType.BOOLEAN))) + Field.of("allow_duplicates", FieldType.BOOLEAN), + Field.of("offset_deduplication", FieldType.BOOLEAN))) .withFieldValue("topics", topics) .withFieldValue("consumer_config", consumerConfig) .withFieldValue("key_deserializer", keyDeserializer) @@ -258,6 +261,7 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception { .withFieldValue("redistribute_num_keys", 0) .withFieldValue("redistribute", false) .withFieldValue("allow_duplicates", false) + .withFieldValue("offset_deduplication", false) .build()); RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance(); diff --git a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java index 841236969d25..3d10b96c00b5 100644 --- a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java +++ b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java @@ -101,6 +101,7 @@ static class KafkaIOReadWithMetadataTranslator implements TransformPayloadTransl .addBooleanField("redistribute") .addBooleanField("allows_duplicates") .addNullableInt32Field("redistribute_num_keys") + .addBooleanField("offset_deduplication") .addNullableLogicalTypeField("watch_topic_partition_duration", new NanosDuration()) .addByteArrayField("timestamp_policy_factory") .addNullableMapField("offset_consumer_config", FieldType.STRING, FieldType.BYTES) @@ -221,6 +222,7 @@ public Row toConfigRow(Read transform) { fieldValues.put("redistribute", transform.isRedistributed()); fieldValues.put("redistribute_num_keys", transform.getRedistributeNumKeys()); fieldValues.put("allows_duplicates", transform.isAllowDuplicates()); + fieldValues.put("offset_deduplication", transform.isOffsetDeduplication()); return Row.withSchema(schema).withFieldValues(fieldValues).build(); } @@ -349,6 +351,10 @@ public Row toConfigRow(Read transform) { } } } + Boolean offsetDeduplication = configRow.getValue("offset_deduplication"); + if (offsetDeduplication != null) { + transform = transform.withOffsetDeduplication(offsetDeduplication); + } Duration maxReadTime = configRow.getValue("max_read_time"); if (maxReadTime != null) { transform =