Skip to content

Commit

Permalink
Other clean up changes
Browse files Browse the repository at this point in the history
  • Loading branch information
rohitsinha54 committed Feb 5, 2025
1 parent 8bb0659 commit 75ddebe
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ public synchronized BoundedTrieData getCumulative() {
// For delta we take the current value then reset the cell to empty so the next call only see
// delta/updates from last call.
public synchronized BoundedTrieData getAndReset() {
// since we are resetting no need to deep a copy just change the reference
// since we are resetting no need to do a deep copy, just change the reference
BoundedTrieData shallowCopy = this.value;
this.value = new BoundedTrieData(); // create now object should not call reset on existing
this.value = new BoundedTrieData(); // create new object, should not call reset on existing
return shallowCopy;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ public synchronized BoundedTrieResult extractResult() {
*/
public synchronized void add(Iterable<String> segments) {
List<String> segmentsParts = ImmutableList.copyOf(segments);
if (segmentsParts.isEmpty()) {
return;
}
if (this.singleton == null && this.root == null) {
// empty case
this.singleton = segmentsParts;
Expand Down Expand Up @@ -342,7 +345,7 @@ static class BoundedTrieNode implements Serializable {
* @param truncated Whether this node is truncated.
* @param size The size of the subtree rooted at this node.
*/
BoundedTrieNode(Map<String, BoundedTrieNode> children, boolean truncated, int size) {
BoundedTrieNode(@Nonnull Map<String, BoundedTrieNode> children, boolean truncated, int size) {
this.children = children;
this.size = size;
this.truncated = truncated;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,12 +882,31 @@ public void testClear() {
assertTrue(trie.extractResult().getResult().isEmpty());
}

@Test
public void testIsEmpty() {
BoundedTrieData trie = new BoundedTrieData();
assertTrue(trie.isEmpty());

trie.add(Collections.emptyList());
assertTrue(trie.isEmpty());

trie.add(ImmutableList.of("a", "b"));
assertFalse(trie.isEmpty());

trie.add(ImmutableList.of("c", "d"));
assertFalse(trie.isEmpty());

trie.clear();
assertTrue(trie.isEmpty());
}

@Test
public void testBoundedTrieDataContains() {
BoundedTrieData trie = new BoundedTrieData();
trie.add(ImmutableList.of("a", "b"));
assertTrue(trie.contains(ImmutableList.of("a", "b")));
assertTrue(trie.contains(ImmutableList.of("a")));
// path ab is not same as path a
assertFalse(trie.contains(ImmutableList.of("a")));
assertFalse(trie.contains(ImmutableList.of("a", "c")));
}

Expand Down Expand Up @@ -1000,18 +1019,11 @@ public void testGetCumulativeWithRoot() {
assertFalse(cumulativeTrie.contains(ImmutableList.of("g", "h")));
}

@Test
public void testAddEmptyPath() {
BoundedTrieData trie = new BoundedTrieData();
trie.add(Collections.emptyList());
assertEquals(1, trie.size());
assertTrue(trie.extractResult().getResult().contains(ImmutableList.of("false")));
}

@Test
public void testContainsEmptyPath() {
BoundedTrieData trie = new BoundedTrieData();
trie.add(Collections.emptyList());
assertTrue(trie.contains(Collections.emptyList()));
assertFalse(trie.contains(Collections.emptyList()));
assertTrue(trie.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,34 +122,33 @@ public static CounterUpdate fromBoundedTrie(
MetricKey key, boolean isCumulative, BoundedTrieData boundedTrieData) {
// BoundedTrie uses SET kind metric aggregation which tracks unique strings as a trie.
CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.SET);
BoundedTrie counterUpdateTrie = getBoundedTrie(boundedTrieData);
BoundedTrie counterUpdateTrie = getBoundedTrie(boundedTrieData.toProto());

return new CounterUpdate()
.setStructuredNameAndMetadata(name)
.setCumulative(isCumulative)
.setBoundedTrie(counterUpdateTrie);
}

/**
* Converts from org.apache.beam.model.pipeline.v1.BoundedTrie to
* com.google.api.services.dataflow.model.BoundedTrie. This is because even though Dataflow
* CounterUpdate uses org.apache.beam.model.pipeline.v1.BoundedTrieNode in it's definition when
* the google-api client is generated the package is renamed.
*
* @param trie org.apache.beam.model.pipeline.v1.BoundedTrie to be converted
* @return converted com.google.api.services.dataflow.model.BoundedTrie.
*/
@VisibleForTesting
static BoundedTrie getBoundedTrie(BoundedTrieData boundedTrieData) {
static BoundedTrie getBoundedTrie(MetricsApi.BoundedTrie trie) {
BoundedTrie counterUpdateTrie = new BoundedTrie();
MetricsApi.BoundedTrie trie = boundedTrieData.toProto();
counterUpdateTrie.setBound(trie.getBound());
counterUpdateTrie.setSingleton(
trie.getSingletonList().isEmpty() ? null : trie.getSingletonList());
counterUpdateTrie.setRoot(getBoundedTrieNode(trie.getRoot()));
counterUpdateTrie.setRoot(trie.hasRoot() ? getBoundedTrieNode(trie.getRoot()) : null);
return counterUpdateTrie;
}

/**
* Converts from org.apache.beam.model.pipeline.v1.BoundedTrieNode to
* com.google.api.services.dataflow.model.BoundedTrieNode. This is because even though Dataflow
* CounterUpdate uses org.apache.beam.model.pipeline.v1.BoundedTrieNode in it's definition when
* the google-api client is generated the package is renamed.
*
* @param node org.apache.beam.model.pipeline.v1.BoundedTrieNode to be converted
* @return converted org.apache.beam.model.pipeline.v1.BoundedTrieNode.
*/
private static BoundedTrieNode getBoundedTrieNode(MetricsApi.BoundedTrieNode node) {
BoundedTrieNode boundedTrieNode = new BoundedTrieNode();
boundedTrieNode.setTruncated(node.getTruncated());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public void extractMetricUpdatesBoundedTrie() {
.setOriginalStepName("originalName"))
.setMetadata(new CounterMetadata().setKind(Kind.SET.toString())))
.setCumulative(false)
.setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(trieData));
.setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(trieData.toProto()));

assertThat(executionContext.extractMetricUpdates(false), containsInAnyOrder(expected));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public void testGetBoundedTrieNodeLevels() {
boundedTrieData.add(ImmutableList.of("ef", "gh"));
boundedTrieData.add(ImmutableList.of("ef", "xy"));

BoundedTrie actualTrie = MetricsToCounterUpdateConverter.getBoundedTrie(boundedTrieData);
BoundedTrie actualTrie =
MetricsToCounterUpdateConverter.getBoundedTrie(boundedTrieData.toProto());

BoundedTrie expectedTrie = new BoundedTrie();
expectedTrie.setBound(100);
Expand All @@ -116,12 +117,13 @@ public void testGetBoundedTrieNodeLevels() {
public void testGetBoundedTrieNodeSingleton() {
BoundedTrieData boundedTrieData = new BoundedTrieData();
boundedTrieData.add(ImmutableList.of("ab"));
BoundedTrie actualTrie = MetricsToCounterUpdateConverter.getBoundedTrie(boundedTrieData);
BoundedTrie actualTrie =
MetricsToCounterUpdateConverter.getBoundedTrie(boundedTrieData.toProto());

BoundedTrie expectedTrie = new BoundedTrie();
expectedTrie.setBound(100);
expectedTrie.setSingleton(ImmutableList.of("ab"));
expectedTrie.setRoot(getEmptyNode());
expectedTrie.setRoot(null);

assertEquals(expectedTrie, actualTrie);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ public void testBoundedTrieUpdateExtraction() {
.setOriginalStepName("s1"))
.setMetadata(new CounterMetadata().setKind(Kind.SET.toString())))
.setCumulative(false)
.setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1));
.setBoundedTrie(
MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto()));

Iterable<CounterUpdate> updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update));
Expand All @@ -381,7 +382,8 @@ public void testBoundedTrieUpdateExtraction() {
.setOriginalStepName("s2"))
.setMetadata(new CounterMetadata().setKind(Kind.SET.toString())))
.setCumulative(false)
.setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(expectedName2));
.setBoundedTrie(
MetricsToCounterUpdateConverter.getBoundedTrie(expectedName2.toProto()));

updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name2Update));
Expand All @@ -391,7 +393,8 @@ public void testBoundedTrieUpdateExtraction() {

expectedName1.clear();
expectedName1.add(ImmutableList.of("op"));
name1Update.setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1));
name1Update.setBoundedTrie(
MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto()));

updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update));
Expand Down

0 comments on commit 75ddebe

Please sign in to comment.