Skip to content

Commit

Permalink
atlas: support dropping data in rollup policy (#1133)
Browse files Browse the repository at this point in the history
Add an operation to the rule that can be used to indicate
data should be dropped entirely.
  • Loading branch information
brharrington authored Apr 12, 2024
1 parent dda652e commit 6c6d0a7
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,22 @@ static RollupPolicy fromRules(Map<String, String> commonTags, List<Rule> rules)
return Rollups.fromRules(commonTags, rules);
}

/** Operation associated with a rule. */
enum Operation {
/** Rollup data by removing specified dimensions. */
ROLLUP,

/** Drop the data that matches the query. */
DROP
}

/**
* Rule for matching a set of measurements and removing specified dimensions.
*/
final class Rule {
private final String query;
private final List<String> rollup;
private final Operation operation;

/**
* Create a new instance.
Expand All @@ -86,10 +96,25 @@ final class Rule {
* Atlas query expression that indicates the set of measurements matching this rule.
* @param rollup
* Set of dimensions to remove from the matching measurements.
* @param operation
* Operation to perform if there is a match to the query.
*/
public Rule(String query, List<String> rollup) {
public Rule(String query, List<String> rollup, Operation operation) {
this.query = Preconditions.checkNotNull(query, "query");
this.rollup = Preconditions.checkNotNull(rollup, "rollup");
this.operation = Preconditions.checkNotNull(operation, "operation");
}

/**
* Create a new instance.
*
* @param query
* Atlas query expression that indicates the set of measurements matching this rule.
* @param rollup
* Set of dimensions to remove from the matching measurements.
*/
public Rule(String query, List<String> rollup) {
this(query, rollup, Operation.ROLLUP);
}

/** Return the query expression string. */
Expand All @@ -102,18 +127,24 @@ public List<String> rollup() {
return rollup;
}

/** Return the operation to perform if the query matches. */
public Operation operation() {
return operation;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Rule)) return false;
Rule rule = (Rule) o;
return query.equals(rule.query)
&& rollup.equals(rule.rollup);
&& rollup.equals(rule.rollup)
&& operation == rule.operation;
}

@Override
public int hashCode() {
return Objects.hash(query, rollup);
return Objects.hash(query, rollup, operation);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,16 @@ static RollupPolicy fromRules(Map<String, String> commonTags, List<RollupPolicy.
for (Measurement m : ms) {
List<RollupPolicy.Rule> matches = index.findMatches(m.id());
if (matches.isEmpty()) {
// No matches for the id, but we sill need to treat as an aggregate because
// No matches for the id, but we still need to treat as an aggregate because
// rollup on another id could cause a collision
Map<Id, Aggregator> idMap = aggregates.computeIfAbsent(commonTags, k -> new HashMap<>());
updateAggregate(idMap, m.id(), m);
} else {
// Skip measurement if one of the rules indicates it should be dropped
if (shouldDrop(matches)) {
continue;
}

// For matching rules, find dimensions from common tags and others that are part
// of the id
Set<String> commonDimensions = new HashSet<>();
Expand All @@ -77,7 +82,7 @@ static RollupPolicy fromRules(Map<String, String> commonTags, List<RollupPolicy.
}
}

// Peform rollup by removing the dimensions
// Perform rollup by removing the dimensions
Map<String, String> tags = commonDimensions.isEmpty()
? commonTags
: rollup(commonTags, commonDimensions);
Expand All @@ -98,6 +103,15 @@ static RollupPolicy fromRules(Map<String, String> commonTags, List<RollupPolicy.
};
}

private static boolean shouldDrop(List<RollupPolicy.Rule> rules) {
for (RollupPolicy.Rule rule : rules) {
if (rule.operation() == RollupPolicy.Operation.DROP) {
return true;
}
}
return false;
}

private static Map<String, String> rollup(Map<String, String> tags, Set<String> dimensions) {
Map<String, String> tmp = new HashMap<>(tags);
for (String dimension : dimensions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,4 +271,29 @@ public void fromRulesMulti() {
}
}
}

@Test
public void fromRulesDrop() {
registry.counter("notDropped").increment();
for (int i = 0; i < 10; ++i) {
registry.counter("test", "i", "" + i).increment();
}
clock.setWallTime(5000);
List<Measurement> input = registry.measurements().collect(Collectors.toList());
List<RollupPolicy.Rule> rules = new ArrayList<>();
rules.add(new RollupPolicy.Rule("i,:has", list(), RollupPolicy.Operation.DROP));
RollupPolicy policy = Rollups.fromRules(map("app", "foo", "node", "i-123"), rules);

List<RollupPolicy.Result> results = policy.apply(input);
Assertions.assertEquals(1, results.size());
for (RollupPolicy.Result result : results) {
Assertions.assertEquals(1, result.measurements().size());
String name = result.measurements().get(0).id().name();
if ("notDropped".equals(name)) {
Assertions.assertEquals(map("app", "foo", "node", "i-123"), result.commonTags());
} else {
Assertions.fail("data not dropped");
}
}
}
}

0 comments on commit 6c6d0a7

Please sign in to comment.