diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index f10f3b91e7aa..2463ca682ca2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -101,6 +101,7 @@ @NotThreadSafe @Internal public class StreamingModeExecutionContext extends DataflowExecutionContext { + private static final Logger LOG = LoggerFactory.getLogger(StreamingModeExecutionContext.class); private final String computationId; @@ -191,7 +192,7 @@ public boolean throwExceptionsForLargeOutput() { } public boolean workIsFailed() { - return Optional.ofNullable(work).map(Work::isFailed).orElse(false); + return work != null && work.isFailed(); } public void start( @@ -553,6 +554,7 @@ protected DataflowExecutionState createState( } private static class ScopedReadStateSupplier implements Supplier { + private final ExecutionState readState; private final @Nullable ExecutionStateTracker stateTracker; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillComputationKey.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillComputationKey.java index 274fa3aff026..d5a1b9ea1741 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillComputationKey.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillComputationKey.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.worker; import com.google.auto.value.AutoValue; +import java.util.Objects; import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.TextFormat; @@ -45,4 +46,10 @@ public final String toString() { return String.format( "%s: %s-%d", computationId(), TextFormat.escapeBytes(key()), shardingKey()); } + + @Override + public final int hashCode() { + // Sharding key collisions are unexpected, avoid hashing full key + return Objects.hash(shardingKey(), computationId()); + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index 502fb605316a..2c4e18d68c4c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -21,7 +21,6 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.function.Consumer; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; @@ -33,8 +32,6 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.util.ExposedByteArrayInputStream; -import org.apache.beam.sdk.util.ExposedByteArrayOutputStream; import org.apache.beam.sdk.util.VarInt; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -54,6 +51,7 @@ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) class WindmillTimerInternals implements TimerInternals { + private static final Instant OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE = GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1)); @@ -406,36 +404,27 @@ private static boolean useNewTimerTagEncoding(TimerData timerData) { */ public static ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) { String tagString; - ExposedByteArrayOutputStream out = new ExposedByteArrayOutputStream(); - try { - if (useNewTimerTagEncoding(timerData)) { - tagString = - new StringBuilder() - .append(prefix.byteString().toStringUtf8()) // this never ends with a slash - .append( - timerData.getNamespace().stringKey()) // this must begin and end with a slash - .append('+') - .append(timerData.getTimerId()) // this is arbitrary; currently unescaped - .append('+') - .append(timerData.getTimerFamilyId()) - .toString(); - out.write(tagString.getBytes(StandardCharsets.UTF_8)); - } else { - // Timers without timerFamily would have timerFamily would be an empty string - tagString = - new StringBuilder() - .append(prefix.byteString().toStringUtf8()) // this never ends with a slash - .append( - timerData.getNamespace().stringKey()) // this must begin and end with a slash - .append('+') - .append(timerData.getTimerId()) // this is arbitrary; currently unescaped - .toString(); - out.write(tagString.getBytes(StandardCharsets.UTF_8)); - } - return ByteString.readFrom(new ExposedByteArrayInputStream(out.toByteArray())); - } catch (IOException e) { - throw new RuntimeException(e); + if (useNewTimerTagEncoding(timerData)) { + tagString = + new StringBuilder() + .append(prefix.byteString().toStringUtf8()) // this never ends with a slash + .append(timerData.getNamespace().stringKey()) // this must begin and end with a slash + .append('+') + .append(timerData.getTimerId()) // this is arbitrary; currently unescaped + .append('+') + .append(timerData.getTimerFamilyId()) + .toString(); + } else { + // Timers without timerFamily would have timerFamily would be an empty string + tagString = + new StringBuilder() + .append(prefix.byteString().toStringUtf8()) // this never ends with a slash + .append(timerData.getNamespace().stringKey()) // this must begin and end with a slash + .append('+') + .append(timerData.getTimerId()) // this is arbitrary; currently unescaped + .toString(); } + return ByteString.copyFromUtf8(tagString); } /** diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index 82b0948b7713..2fd5e5457629 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -30,7 +30,6 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Queue; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -57,6 +56,7 @@ @ThreadSafe @Internal public final class ActiveWorkState { + private static final Logger LOG = LoggerFactory.getLogger(ActiveWorkState.class); /* The max number of keys in COMMITTING or COMMIT_QUEUED status to be shown for observability.*/ @@ -77,14 +77,15 @@ public final class ActiveWorkState { * activated in {@link #activateWorkForKey(ExecutableWork)}, and decremented when work is * completed in {@link #completeWorkAndGetNextWorkForKey(ShardedKey, WorkId)}. */ - private final AtomicReference activeGetWorkBudget; + @GuardedBy("this") + private GetWorkBudget activeGetWorkBudget; private ActiveWorkState( Map> activeWork, WindmillStateCache.ForComputation computationStateCache) { this.activeWork = activeWork; this.computationStateCache = computationStateCache; - this.activeGetWorkBudget = new AtomicReference<>(GetWorkBudget.noBudget()); + this.activeGetWorkBudget = GetWorkBudget.noBudget(); } static ActiveWorkState create(WindmillStateCache.ForComputation computationStateCache) { @@ -219,14 +220,12 @@ synchronized ImmutableList getRefreshableWork(Instant refreshDe .collect(toImmutableList()); } - private void incrementActiveWorkBudget(Work work) { - activeGetWorkBudget.updateAndGet( - getWorkBudget -> getWorkBudget.apply(1, work.getSerializedWorkItemSize())); + private synchronized void incrementActiveWorkBudget(Work work) { + activeGetWorkBudget = activeGetWorkBudget.apply(1, work.getSerializedWorkItemSize()); } - private void decrementActiveWorkBudget(Work work) { - activeGetWorkBudget.updateAndGet( - getWorkBudget -> getWorkBudget.subtract(1, work.getSerializedWorkItemSize())); + private synchronized void decrementActiveWorkBudget(Work work) { + activeGetWorkBudget = activeGetWorkBudget.subtract(1, work.getSerializedWorkItemSize()); } /** @@ -331,8 +330,8 @@ private synchronized ImmutableMap getStuckCommitsAt( * means that the work is received from Windmill, being processed or queued to be processed in * {@link ActiveWorkState}, and not committed back to Windmill. */ - GetWorkBudget currentActiveWorkBudget() { - return activeGetWorkBudget.get(); + synchronized GetWorkBudget currentActiveWorkBudget() { + return activeGetWorkBudget; } synchronized void printActiveWork(PrintWriter writer, Instant now) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ShardedKey.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ShardedKey.java index 572125462a33..91fc66c72041 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ShardedKey.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ShardedKey.java @@ -35,4 +35,10 @@ public static ShardedKey create(ByteString key, long shardingKey) { public final String toString() { return String.format("%016x", shardingKey()); } + + @Override + public final int hashCode() { + // Sharding key collisions are unexpected, avoid hashing full key + return Long.hashCode(shardingKey()); + } }