From cb6e451eb9cce5522d87a453b6242bd50affaa97 Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Tue, 14 Jan 2025 22:06:52 +0000 Subject: [PATCH] Offset-based deduplication for Unbounded Source and Dataflow Runner --- .../dataflow/internal/CustomSources.java | 8 ++++++ .../worker/StreamingModeExecutionContext.java | 15 +++++++++++ .../runners/dataflow/worker/WindmillSink.java | 11 +++++++- .../windmill/src/main/proto/windmill.proto | 2 ++ .../apache/beam/sdk/io/UnboundedSource.java | 26 +++++++++++++++++++ 5 files changed, 61 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java index fcfe3fe3ce05..647bdca06861 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.internal; import static com.google.api.client.util.Base64.encodeBase64String; +import static org.apache.beam.runners.dataflow.util.Structs.addBoolean; import static org.apache.beam.runners.dataflow.util.Structs.addString; import static org.apache.beam.runners.dataflow.util.Structs.addStringList; import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; @@ -45,6 +46,9 @@ public class CustomSources { private static final String SERIALIZED_SOURCE = "serialized_source"; @VisibleForTesting static final String SERIALIZED_SOURCE_SPLITS = "serialized_source_splits"; + @VisibleForTesting + static final String SERIALIZED_OFFSET_BASED_DEDUPLICATION = + "serialized_offset_based_deduplication"; private static final Logger LOG = LoggerFactory.getLogger(CustomSources.class); @@ -93,6 +97,10 @@ public static com.google.api.services.dataflow.model.Source serializeToCloudSour } checkArgument(!encodedSplits.isEmpty(), "UnboundedSources must have at least one split"); addStringList(cloudSource.getSpec(), SERIALIZED_SOURCE_SPLITS, encodedSplits); + addBoolean( + cloudSource.getSpec(), + SERIALIZED_OFFSET_BASED_DEDUPLICATION, + unboundedSource.isOffsetDeduplication()); } else { throw new IllegalArgumentException("Unexpected source kind: " + source.getClass()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index f10f3b91e7aa..03f9463eb36b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -102,6 +102,7 @@ @Internal public class StreamingModeExecutionContext extends DataflowExecutionContext { private static final Logger LOG = LoggerFactory.getLogger(StreamingModeExecutionContext.class); + private static final byte[] NO_RECORD_OFFSET = new byte[0]; private final String computationId; private final ImmutableMap stateNameMap; @@ -194,6 +195,13 @@ public boolean workIsFailed() { return Optional.ofNullable(work).map(Work::isFailed).orElse(false); } + public byte[] getCurrentRecordOffset() { + if (activeReader == null) { + return NO_RECORD_OFFSET; + } + return activeReader.getCurrentRecordOffset(); + } + public void start( @Nullable Object key, Work work, @@ -439,6 +447,13 @@ public Map flushState() { throw new RuntimeException("Exception while encoding checkpoint", e); } sourceStateBuilder.setState(stream.toByteString()); + if (activeReader.getCurrentSource().isOffsetDeduplication()) { + byte[] offsetLimit = checkpointMark.getOffsetLimit(); + if (offsetLimit.length == 0) { + throw new RuntimeException("Checkpoint offset limit must be non-empty."); + } + sourceStateBuilder.setOffsetLimit(ByteString.copyFrom(offsetLimit)); + } } outputBuilder.setSourceWatermark(WindmillTimeUtils.harnessToWindmillTimestamp(watermark)); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java index f83c68ab3c90..6ed8caab4454 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java @@ -215,7 +215,16 @@ public long add(WindowedValue data) throws IOException { .setMetadata(metadata); keyedOutput.addMessages(builder.build()); keyedOutput.addMessagesIds(id); - return (long) key.size() + value.size() + metadata.size() + id.size(); + + byte[] rawOffset = context.getCurrentRecordOffset(); + long offsetSize = 0; + if (rawOffset.length > 0) { + ByteString offset = ByteString.copyFrom(context.getCurrentRecordOffset()); + offsetSize = offset.size(); + keyedOutput.addMessageOffsets(offset); + } + + return (long) key.size() + value.size() + metadata.size() + id.size() + offsetSize; } @Override diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto index 3b3348dbc3fa..5e5de232c872 100644 --- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto +++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto @@ -56,6 +56,7 @@ message KeyedMessageBundle { optional fixed64 sharding_key = 4; repeated Message messages = 2; repeated bytes messages_ids = 3; + repeated bytes message_offsets = 5; } message LatencyAttribution { @@ -410,6 +411,7 @@ message SourceState { optional bytes state = 1; repeated fixed64 finalize_ids = 2; optional bool only_finalize = 3; + optional bytes offset_limit = 4; } message WatermarkHold { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java index 840e4910e2a2..2d5bd7f8a6e7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java @@ -93,6 +93,21 @@ public boolean requiresDeduping() { return false; } + /** + * If isOffsetDeduplication returns true, then the UnboundedSource needs to + * provide the following: + * + *
    + *
  • UnboundedReader which provides offsets that are unique for each + * element and lexicographically ordered.
  • + *
  • CheckpointMark which provides an offset greater than all elements + * read and less than or equal to the next offset that will be read.
  • + *
+ */ + public boolean isOffsetDeduplication() { + return false; + } + /** * A marker representing the progress and state of an {@link * org.apache.beam.sdk.io.UnboundedSource.UnboundedReader}. @@ -139,6 +154,12 @@ public void finalizeCheckpoint() throws IOException { // nothing to do } } + + /* Get offset limit for unbounded source split checkpoint. */ + default byte[] getOffsetLimit() { + throw new RuntimeException( + "CheckpointMark must override getOffsetLimit() if UnboundedSource supports offset deduplication."); + } } /** @@ -203,6 +224,11 @@ public byte[] getCurrentRecordId() throws NoSuchElementException { return EMPTY; } + /* Returns the offset for the current record of this unbounded reader. */ + public byte[] getCurrentRecordOffset() { + return EMPTY; + } + /** * Returns a timestamp before or at the timestamps of all future elements read by this reader. *