From 75ddebe09a71bee06a0ab98ecfffefb3c1bb3cd4 Mon Sep 17 00:00:00 2001 From: Rohit Sinha Date: Tue, 4 Feb 2025 21:47:27 -0800 Subject: [PATCH] Other clean up changes --- .../runners/core/metrics/BoundedTrieCell.java | 4 +-- .../runners/core/metrics/BoundedTrieData.java | 5 ++- .../core/metrics/BoundedTrieNodeTest.java | 32 +++++++++++++------ .../MetricsToCounterUpdateConverter.java | 25 +++++++-------- .../worker/BatchModeExecutionContextTest.java | 2 +- .../MetricsToCounterUpdateConverterTest.java | 8 +++-- .../StreamingStepMetricsContainerTest.java | 9 ++++-- 7 files changed, 52 insertions(+), 33 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java index 06160a3b8c11..0e33881d0e37 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java @@ -79,9 +79,9 @@ public synchronized BoundedTrieData getCumulative() { // For delta we take the current value then reset the cell to empty so the next call only see // delta/updates from last call. public synchronized BoundedTrieData getAndReset() { - // since we are resetting no need to deep a copy just change the reference + // since we are resetting no need to do a deep copy, just change the reference BoundedTrieData shallowCopy = this.value; - this.value = new BoundedTrieData(); // create now object should not call reset on existing + this.value = new BoundedTrieData(); // create new object, should not call reset on existing return shallowCopy; } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java index 0cd2185d76fc..63fb289d3eec 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java @@ -180,6 +180,9 @@ public synchronized BoundedTrieResult extractResult() { */ public synchronized void add(Iterable segments) { List segmentsParts = ImmutableList.copyOf(segments); + if (segmentsParts.isEmpty()) { + return; + } if (this.singleton == null && this.root == null) { // empty case this.singleton = segmentsParts; @@ -342,7 +345,7 @@ static class BoundedTrieNode implements Serializable { * @param truncated Whether this node is truncated. * @param size The size of the subtree rooted at this node. */ - BoundedTrieNode(Map children, boolean truncated, int size) { + BoundedTrieNode(@Nonnull Map children, boolean truncated, int size) { this.children = children; this.size = size; this.truncated = truncated; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java index ca5421e6f77f..b51e14f6a23a 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java @@ -882,12 +882,31 @@ public void testClear() { assertTrue(trie.extractResult().getResult().isEmpty()); } + @Test + public void testIsEmpty() { + BoundedTrieData trie = new BoundedTrieData(); + assertTrue(trie.isEmpty()); + + trie.add(Collections.emptyList()); + assertTrue(trie.isEmpty()); + + trie.add(ImmutableList.of("a", "b")); + assertFalse(trie.isEmpty()); + + trie.add(ImmutableList.of("c", "d")); + assertFalse(trie.isEmpty()); + + trie.clear(); + assertTrue(trie.isEmpty()); + } + @Test public void testBoundedTrieDataContains() { BoundedTrieData trie = new BoundedTrieData(); trie.add(ImmutableList.of("a", "b")); assertTrue(trie.contains(ImmutableList.of("a", "b"))); - assertTrue(trie.contains(ImmutableList.of("a"))); + // path ab is not same as path a + assertFalse(trie.contains(ImmutableList.of("a"))); assertFalse(trie.contains(ImmutableList.of("a", "c"))); } @@ -1000,18 +1019,11 @@ public void testGetCumulativeWithRoot() { assertFalse(cumulativeTrie.contains(ImmutableList.of("g", "h"))); } - @Test - public void testAddEmptyPath() { - BoundedTrieData trie = new BoundedTrieData(); - trie.add(Collections.emptyList()); - assertEquals(1, trie.size()); - assertTrue(trie.extractResult().getResult().contains(ImmutableList.of("false"))); - } - @Test public void testContainsEmptyPath() { BoundedTrieData trie = new BoundedTrieData(); trie.add(Collections.emptyList()); - assertTrue(trie.contains(Collections.emptyList())); + assertFalse(trie.contains(Collections.emptyList())); + assertTrue(trie.isEmpty()); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java index 13fe051e0e3f..2ba881dca3ce 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java @@ -122,7 +122,7 @@ public static CounterUpdate fromBoundedTrie( MetricKey key, boolean isCumulative, BoundedTrieData boundedTrieData) { // BoundedTrie uses SET kind metric aggregation which tracks unique strings as a trie. CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.SET); - BoundedTrie counterUpdateTrie = getBoundedTrie(boundedTrieData); + BoundedTrie counterUpdateTrie = getBoundedTrie(boundedTrieData.toProto()); return new CounterUpdate() .setStructuredNameAndMetadata(name) @@ -130,26 +130,25 @@ public static CounterUpdate fromBoundedTrie( .setBoundedTrie(counterUpdateTrie); } + /** + * Converts from org.apache.beam.model.pipeline.v1.BoundedTrie to + * com.google.api.services.dataflow.model.BoundedTrie. This is because even though Dataflow + * CounterUpdate uses org.apache.beam.model.pipeline.v1.BoundedTrieNode in it's definition when + * the google-api client is generated the package is renamed. + * + * @param trie org.apache.beam.model.pipeline.v1.BoundedTrie to be converted + * @return converted com.google.api.services.dataflow.model.BoundedTrie. + */ @VisibleForTesting - static BoundedTrie getBoundedTrie(BoundedTrieData boundedTrieData) { + static BoundedTrie getBoundedTrie(MetricsApi.BoundedTrie trie) { BoundedTrie counterUpdateTrie = new BoundedTrie(); - MetricsApi.BoundedTrie trie = boundedTrieData.toProto(); counterUpdateTrie.setBound(trie.getBound()); counterUpdateTrie.setSingleton( trie.getSingletonList().isEmpty() ? null : trie.getSingletonList()); - counterUpdateTrie.setRoot(getBoundedTrieNode(trie.getRoot())); + counterUpdateTrie.setRoot(trie.hasRoot() ? getBoundedTrieNode(trie.getRoot()) : null); return counterUpdateTrie; } - /** - * Converts from org.apache.beam.model.pipeline.v1.BoundedTrieNode to - * com.google.api.services.dataflow.model.BoundedTrieNode. This is because even though Dataflow - * CounterUpdate uses org.apache.beam.model.pipeline.v1.BoundedTrieNode in it's definition when - * the google-api client is generated the package is renamed. - * - * @param node org.apache.beam.model.pipeline.v1.BoundedTrieNode to be converted - * @return converted org.apache.beam.model.pipeline.v1.BoundedTrieNode. - */ private static BoundedTrieNode getBoundedTrieNode(MetricsApi.BoundedTrieNode node) { BoundedTrieNode boundedTrieNode = new BoundedTrieNode(); boundedTrieNode.setTruncated(node.getTruncated()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java index ef968972a892..1d12295795d3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java @@ -226,7 +226,7 @@ public void extractMetricUpdatesBoundedTrie() { .setOriginalStepName("originalName")) .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) .setCumulative(false) - .setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(trieData)); + .setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(trieData.toProto())); assertThat(executionContext.extractMetricUpdates(false), containsInAnyOrder(expected)); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverterTest.java index 069cfe7f01f2..3de9678425e3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverterTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverterTest.java @@ -96,7 +96,8 @@ public void testGetBoundedTrieNodeLevels() { boundedTrieData.add(ImmutableList.of("ef", "gh")); boundedTrieData.add(ImmutableList.of("ef", "xy")); - BoundedTrie actualTrie = MetricsToCounterUpdateConverter.getBoundedTrie(boundedTrieData); + BoundedTrie actualTrie = + MetricsToCounterUpdateConverter.getBoundedTrie(boundedTrieData.toProto()); BoundedTrie expectedTrie = new BoundedTrie(); expectedTrie.setBound(100); @@ -116,12 +117,13 @@ public void testGetBoundedTrieNodeLevels() { public void testGetBoundedTrieNodeSingleton() { BoundedTrieData boundedTrieData = new BoundedTrieData(); boundedTrieData.add(ImmutableList.of("ab")); - BoundedTrie actualTrie = MetricsToCounterUpdateConverter.getBoundedTrie(boundedTrieData); + BoundedTrie actualTrie = + MetricsToCounterUpdateConverter.getBoundedTrie(boundedTrieData.toProto()); BoundedTrie expectedTrie = new BoundedTrie(); expectedTrie.setBound(100); expectedTrie.setSingleton(ImmutableList.of("ab")); - expectedTrie.setRoot(getEmptyNode()); + expectedTrie.setRoot(null); assertEquals(expectedTrie, actualTrie); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index 340d94761b6e..3bc16bf2d38c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -354,7 +354,8 @@ public void testBoundedTrieUpdateExtraction() { .setOriginalStepName("s1")) .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) .setCumulative(false) - .setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1)); + .setBoundedTrie( + MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto())); Iterable updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name1Update)); @@ -381,7 +382,8 @@ public void testBoundedTrieUpdateExtraction() { .setOriginalStepName("s2")) .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) .setCumulative(false) - .setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(expectedName2)); + .setBoundedTrie( + MetricsToCounterUpdateConverter.getBoundedTrie(expectedName2.toProto())); updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name2Update)); @@ -391,7 +393,8 @@ public void testBoundedTrieUpdateExtraction() { expectedName1.clear(); expectedName1.add(ImmutableList.of("op")); - name1Update.setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1)); + name1Update.setBoundedTrie( + MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto())); updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name1Update));