Skip to content

Commit

Permalink
Offset-based deduplication for Unbounded Source and Dataflow Runner
Browse files Browse the repository at this point in the history
  • Loading branch information
tomstepp committed Jan 21, 2025
1 parent f1f079b commit cb6e451
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.internal;

import static com.google.api.client.util.Base64.encodeBase64String;
import static org.apache.beam.runners.dataflow.util.Structs.addBoolean;
import static org.apache.beam.runners.dataflow.util.Structs.addString;
import static org.apache.beam.runners.dataflow.util.Structs.addStringList;
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
Expand Down Expand Up @@ -45,6 +46,9 @@
public class CustomSources {
private static final String SERIALIZED_SOURCE = "serialized_source";
@VisibleForTesting static final String SERIALIZED_SOURCE_SPLITS = "serialized_source_splits";
@VisibleForTesting
static final String SERIALIZED_OFFSET_BASED_DEDUPLICATION =
"serialized_offset_based_deduplication";

private static final Logger LOG = LoggerFactory.getLogger(CustomSources.class);

Expand Down Expand Up @@ -93,6 +97,10 @@ public static com.google.api.services.dataflow.model.Source serializeToCloudSour
}
checkArgument(!encodedSplits.isEmpty(), "UnboundedSources must have at least one split");
addStringList(cloudSource.getSpec(), SERIALIZED_SOURCE_SPLITS, encodedSplits);
addBoolean(
cloudSource.getSpec(),
SERIALIZED_OFFSET_BASED_DEDUPLICATION,
unboundedSource.isOffsetDeduplication());
} else {
throw new IllegalArgumentException("Unexpected source kind: " + source.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
@Internal
public class StreamingModeExecutionContext extends DataflowExecutionContext<StepContext> {
private static final Logger LOG = LoggerFactory.getLogger(StreamingModeExecutionContext.class);
private static final byte[] NO_RECORD_OFFSET = new byte[0];

private final String computationId;
private final ImmutableMap<String, String> stateNameMap;
Expand Down Expand Up @@ -194,6 +195,13 @@ public boolean workIsFailed() {
return Optional.ofNullable(work).map(Work::isFailed).orElse(false);
}

public byte[] getCurrentRecordOffset() {
if (activeReader == null) {
return NO_RECORD_OFFSET;
}
return activeReader.getCurrentRecordOffset();
}

public void start(
@Nullable Object key,
Work work,
Expand Down Expand Up @@ -439,6 +447,13 @@ public Map<Long, Runnable> flushState() {
throw new RuntimeException("Exception while encoding checkpoint", e);
}
sourceStateBuilder.setState(stream.toByteString());
if (activeReader.getCurrentSource().isOffsetDeduplication()) {
byte[] offsetLimit = checkpointMark.getOffsetLimit();
if (offsetLimit.length == 0) {
throw new RuntimeException("Checkpoint offset limit must be non-empty.");
}
sourceStateBuilder.setOffsetLimit(ByteString.copyFrom(offsetLimit));
}
}
outputBuilder.setSourceWatermark(WindmillTimeUtils.harnessToWindmillTimestamp(watermark));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,16 @@ public long add(WindowedValue<T> data) throws IOException {
.setMetadata(metadata);
keyedOutput.addMessages(builder.build());
keyedOutput.addMessagesIds(id);
return (long) key.size() + value.size() + metadata.size() + id.size();

byte[] rawOffset = context.getCurrentRecordOffset();
long offsetSize = 0;
if (rawOffset.length > 0) {
ByteString offset = ByteString.copyFrom(context.getCurrentRecordOffset());
offsetSize = offset.size();
keyedOutput.addMessageOffsets(offset);
}

return (long) key.size() + value.size() + metadata.size() + id.size() + offsetSize;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ message KeyedMessageBundle {
optional fixed64 sharding_key = 4;
repeated Message messages = 2;
repeated bytes messages_ids = 3;
repeated bytes message_offsets = 5;
}

message LatencyAttribution {
Expand Down Expand Up @@ -410,6 +411,7 @@ message SourceState {
optional bytes state = 1;
repeated fixed64 finalize_ids = 2;
optional bool only_finalize = 3;
optional bytes offset_limit = 4;
}

message WatermarkHold {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,21 @@ public boolean requiresDeduping() {
return false;
}

/**
* If isOffsetDeduplication returns true, then the UnboundedSource needs to
* provide the following:
*
* <ul>
* <li>UnboundedReader which provides offsets that are unique for each
* element and lexicographically ordered.</li>
* <li>CheckpointMark which provides an offset greater than all elements
* read and less than or equal to the next offset that will be read.</li>
* </ul>
*/
public boolean isOffsetDeduplication() {
return false;
}

/**
* A marker representing the progress and state of an {@link
* org.apache.beam.sdk.io.UnboundedSource.UnboundedReader}.
Expand Down Expand Up @@ -139,6 +154,12 @@ public void finalizeCheckpoint() throws IOException {
// nothing to do
}
}

/* Get offset limit for unbounded source split checkpoint. */
default byte[] getOffsetLimit() {
throw new RuntimeException(
"CheckpointMark must override getOffsetLimit() if UnboundedSource supports offset deduplication.");
}
}

/**
Expand Down Expand Up @@ -203,6 +224,11 @@ public byte[] getCurrentRecordId() throws NoSuchElementException {
return EMPTY;
}

/* Returns the offset for the current record of this unbounded reader. */
public byte[] getCurrentRecordOffset() {
return EMPTY;
}

/**
* Returns a timestamp before or at the timestamps of all future elements read by this reader.
*
Expand Down

0 comments on commit cb6e451

Please sign in to comment.