Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dataflow Streaming] Code micro optimizations (1/N) #33580

Merged
merged 13 commits into from
Jan 22, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
@NotThreadSafe
@Internal
public class StreamingModeExecutionContext extends DataflowExecutionContext<StepContext> {

private static final Logger LOG = LoggerFactory.getLogger(StreamingModeExecutionContext.class);

private final String computationId;
Expand Down Expand Up @@ -191,7 +192,7 @@ public boolean throwExceptionsForLargeOutput() {
}

public boolean workIsFailed() {
return Optional.ofNullable(work).map(Work::isFailed).orElse(false);
return work != null && work.isFailed();
}

public void start(
Expand Down Expand Up @@ -553,6 +554,7 @@ protected DataflowExecutionState createState(
}

private static class ScopedReadStateSupplier implements Supplier<Closeable> {

private final ExecutionState readState;
private final @Nullable ExecutionStateTracker stateTracker;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker;

import com.google.auto.value.AutoValue;
import java.util.Objects;
import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.TextFormat;
Expand Down Expand Up @@ -45,4 +46,10 @@ public final String toString() {
return String.format(
"%s: %s-%d", computationId(), TextFormat.escapeBytes(key()), shardingKey());
}

@Override
public final int hashCode() {
// Sharding key collisions are unexpected, avoid hashing full key
return Objects.hash(shardingKey(), computationId());
arunpandianp marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.function.Consumer;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
Expand All @@ -33,8 +32,6 @@
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.ExposedByteArrayInputStream;
import org.apache.beam.sdk.util.ExposedByteArrayOutputStream;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand All @@ -54,6 +51,7 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class WindmillTimerInternals implements TimerInternals {

private static final Instant OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE =
GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1));

Expand Down Expand Up @@ -406,36 +404,27 @@ private static boolean useNewTimerTagEncoding(TimerData timerData) {
*/
public static ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) {
String tagString;
ExposedByteArrayOutputStream out = new ExposedByteArrayOutputStream();
try {
if (useNewTimerTagEncoding(timerData)) {
tagString =
new StringBuilder()
.append(prefix.byteString().toStringUtf8()) // this never ends with a slash
.append(
timerData.getNamespace().stringKey()) // this must begin and end with a slash
.append('+')
.append(timerData.getTimerId()) // this is arbitrary; currently unescaped
.append('+')
.append(timerData.getTimerFamilyId())
.toString();
out.write(tagString.getBytes(StandardCharsets.UTF_8));
} else {
// Timers without timerFamily would have timerFamily would be an empty string
tagString =
new StringBuilder()
.append(prefix.byteString().toStringUtf8()) // this never ends with a slash
.append(
timerData.getNamespace().stringKey()) // this must begin and end with a slash
.append('+')
.append(timerData.getTimerId()) // this is arbitrary; currently unescaped
.toString();
out.write(tagString.getBytes(StandardCharsets.UTF_8));
}
return ByteString.readFrom(new ExposedByteArrayInputStream(out.toByteArray()));
} catch (IOException e) {
throw new RuntimeException(e);
if (useNewTimerTagEncoding(timerData)) {
tagString =
new StringBuilder()
.append(prefix.byteString().toStringUtf8()) // this never ends with a slash
.append(timerData.getNamespace().stringKey()) // this must begin and end with a slash
.append('+')
.append(timerData.getTimerId()) // this is arbitrary; currently unescaped
.append('+')
.append(timerData.getTimerFamilyId())
.toString();
} else {
// Timers without timerFamily would have timerFamily would be an empty string
tagString =
new StringBuilder()
.append(prefix.byteString().toStringUtf8()) // this never ends with a slash
.append(timerData.getNamespace().stringKey()) // this must begin and end with a slash
.append('+')
.append(timerData.getTimerId()) // this is arbitrary; currently unescaped
.toString();
}
return ByteString.copyFromUtf8(tagString);
arunpandianp marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
Expand All @@ -57,6 +56,7 @@
@ThreadSafe
@Internal
public final class ActiveWorkState {

private static final Logger LOG = LoggerFactory.getLogger(ActiveWorkState.class);

/* The max number of keys in COMMITTING or COMMIT_QUEUED status to be shown for observability.*/
Expand All @@ -77,14 +77,15 @@ public final class ActiveWorkState {
* activated in {@link #activateWorkForKey(ExecutableWork)}, and decremented when work is
* completed in {@link #completeWorkAndGetNextWorkForKey(ShardedKey, WorkId)}.
*/
private final AtomicReference<GetWorkBudget> activeGetWorkBudget;
@GuardedBy("this")
private GetWorkBudget activeGetWorkBudget;

private ActiveWorkState(
Map<ShardedKey, Deque<ExecutableWork>> activeWork,
WindmillStateCache.ForComputation computationStateCache) {
this.activeWork = activeWork;
this.computationStateCache = computationStateCache;
this.activeGetWorkBudget = new AtomicReference<>(GetWorkBudget.noBudget());
this.activeGetWorkBudget = GetWorkBudget.noBudget();
}

static ActiveWorkState create(WindmillStateCache.ForComputation computationStateCache) {
Expand Down Expand Up @@ -219,14 +220,12 @@ synchronized ImmutableList<RefreshableWork> getRefreshableWork(Instant refreshDe
.collect(toImmutableList());
}

private void incrementActiveWorkBudget(Work work) {
activeGetWorkBudget.updateAndGet(
getWorkBudget -> getWorkBudget.apply(1, work.getSerializedWorkItemSize()));
private synchronized void incrementActiveWorkBudget(Work work) {
activeGetWorkBudget = activeGetWorkBudget.apply(1, work.getSerializedWorkItemSize());
}

private void decrementActiveWorkBudget(Work work) {
activeGetWorkBudget.updateAndGet(
getWorkBudget -> getWorkBudget.subtract(1, work.getSerializedWorkItemSize()));
private synchronized void decrementActiveWorkBudget(Work work) {
arunpandianp marked this conversation as resolved.
Show resolved Hide resolved
activeGetWorkBudget = activeGetWorkBudget.subtract(1, work.getSerializedWorkItemSize());
}

/**
Expand Down Expand Up @@ -331,8 +330,8 @@ private synchronized ImmutableMap<ShardedKey, WorkId> getStuckCommitsAt(
* means that the work is received from Windmill, being processed or queued to be processed in
* {@link ActiveWorkState}, and not committed back to Windmill.
*/
GetWorkBudget currentActiveWorkBudget() {
return activeGetWorkBudget.get();
synchronized GetWorkBudget currentActiveWorkBudget() {
return activeGetWorkBudget;
}

synchronized void printActiveWork(PrintWriter writer, Instant now) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,10 @@ public static ShardedKey create(ByteString key, long shardingKey) {
public final String toString() {
return String.format("%016x", shardingKey());
}

@Override
public final int hashCode() {
// Sharding key collisions are unexpected, avoid hashing full key
return Long.hashCode(shardingKey());
arunpandianp marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading