Skip to content

Commit

Permalink
atlas: fix race condition for QueryIndex (#1148)
Browse files Browse the repository at this point in the history
Updates can happen concurrently with reads. This can
result in some of the volatile variables being set to
`null` after the `null` check occurs. Now they will get
copied to a local reference before being used for reads.
  • Loading branch information
brharrington authored Aug 2, 2024
1 parent 0fe1beb commit b050f12
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,13 @@ public boolean isEmpty() {
return matches.isEmpty()
&& equalChecks.values().stream().allMatch(QueryIndex::isEmpty)
&& otherChecks.values().stream().allMatch(QueryIndex::isEmpty)
&& (hasKeyIdx == null || hasKeyIdx.isEmpty())
&& (otherKeysIdx == null || otherKeysIdx.isEmpty())
&& (missingKeysIdx == null || missingKeysIdx.isEmpty());
&& isEmpty(hasKeyIdx)
&& isEmpty(otherKeysIdx)
&& isEmpty(missingKeysIdx);
}

private boolean isEmpty(QueryIndex<T> idx) {
return idx == null || idx.isEmpty();
}

/**
Expand Down Expand Up @@ -369,14 +373,15 @@ private void forEachMatch(Id tags, int i, Consumer<T> consumer) {
// Matches for this level
matches.forEach(consumer);

if (key != null) {
final String keyRef = key;
if (keyRef != null) {

boolean keyPresent = false;

for (int j = i; j < tags.size(); ++j) {
String k = tags.getKey(j);
String v = tags.getValue(j);
int cmp = compare(k, key);
int cmp = compare(k, keyRef);
if (cmp == 0) {
keyPresent = true;

Expand Down Expand Up @@ -414,8 +419,9 @@ private void forEachMatch(Id tags, int i, Consumer<T> consumer) {
}

// Check matches for has key
if (hasKeyIdx != null) {
hasKeyIdx.forEachMatch(tags, i, consumer);
final QueryIndex<T> hasKeyIdxRef = hasKeyIdx;
if (hasKeyIdxRef != null) {
hasKeyIdxRef.forEachMatch(tags, i, consumer);
}
}

Expand All @@ -426,13 +432,15 @@ private void forEachMatch(Id tags, int i, Consumer<T> consumer) {
}

// Check matches with other keys
if (otherKeysIdx != null) {
otherKeysIdx.forEachMatch(tags, i, consumer);
final QueryIndex<T> otherKeysIdxRef = otherKeysIdx;
if (otherKeysIdxRef != null) {
otherKeysIdxRef.forEachMatch(tags, i, consumer);
}

// Check matches with missing keys
if (missingKeysIdx != null && !keyPresent) {
missingKeysIdx.forEachMatch(tags, i, consumer);
final QueryIndex<T> missingKeysIdxRef = missingKeysIdx;
if (missingKeysIdxRef != null && !keyPresent) {
missingKeysIdxRef.forEachMatch(tags, i, consumer);
}
}
}
Expand Down Expand Up @@ -468,8 +476,9 @@ public void forEachMatch(Function<String, String> tags, Consumer<T> consumer) {
matches.forEach(consumer);

boolean keyPresent = false;
if (key != null) {
String v = tags.apply(key);
final String keyRef = key;
if (keyRef != null) {
String v = tags.apply(keyRef);
if (v != null) {
keyPresent = true;

Expand Down Expand Up @@ -507,20 +516,23 @@ public void forEachMatch(Function<String, String> tags, Consumer<T> consumer) {
}

// Check matches for has key
if (hasKeyIdx != null) {
hasKeyIdx.forEachMatch(tags, consumer);
final QueryIndex<T> hasKeyIdxRef = hasKeyIdx;
if (hasKeyIdxRef != null) {
hasKeyIdxRef.forEachMatch(tags, consumer);
}
}
}

// Check matches with other keys
if (otherKeysIdx != null) {
otherKeysIdx.forEachMatch(tags, consumer);
final QueryIndex<T> otherKeysIdxRef = otherKeysIdx;
if (otherKeysIdxRef != null) {
otherKeysIdxRef.forEachMatch(tags, consumer);
}

// Check matches with missing keys
if (missingKeysIdx != null && !keyPresent) {
missingKeysIdx.forEachMatch(tags, consumer);
final QueryIndex<T> missingKeysIdxRef = missingKeysIdx;
if (missingKeysIdxRef != null && !keyPresent) {
missingKeysIdxRef.forEachMatch(tags, consumer);
}
}

Expand All @@ -545,8 +557,9 @@ public boolean couldMatch(Function<String, String> tags) {
}

boolean keyPresent = false;
if (key != null) {
String v = tags.apply(key);
final String keyRef = key;
if (keyRef != null) {
String v = tags.apply(keyRef);
if (v != null) {
keyPresent = true;

Expand All @@ -571,14 +584,16 @@ public boolean couldMatch(Function<String, String> tags) {
}

// Check matches for has key
if (hasKeyIdx != null && hasKeyIdx.couldMatch(tags)) {
final QueryIndex<T> hasKeyIdxRef = hasKeyIdx;
if (hasKeyIdxRef != null && hasKeyIdxRef.couldMatch(tags)) {
return true;
}
}
}

// Check matches with other keys
if (otherKeysIdx != null && otherKeysIdx.couldMatch(tags)) {
final QueryIndex<T> otherKeysIdxRef = otherKeysIdx;
if (otherKeysIdxRef != null && otherKeysIdxRef.couldMatch(tags)) {
return true;
}

Expand Down Expand Up @@ -625,11 +640,12 @@ private void findHotSpots(
Deque<String> path,
BiConsumer<List<String>, List<Query.KeyQuery>> consumer
) {
if (key != null) {
path.addLast("K=" + key);
final String keyRef = key;
if (keyRef != null) {
path.addLast("K=" + keyRef);

equalChecks.forEach((v, idx) -> {
path.addLast(key + "," + v + ",:eq");
path.addLast(keyRef + "," + v + ",:eq");
idx.findHotSpots(threshold, path, consumer);
path.removeLast();
});
Expand All @@ -646,24 +662,27 @@ private void findHotSpots(
});
path.removeLast();

if (hasKeyIdx != null) {
final QueryIndex<T> hasKeyIdxRef = hasKeyIdx;
if (hasKeyIdxRef != null) {
path.addLast("has");
hasKeyIdx.findHotSpots(threshold, path, consumer);
hasKeyIdxRef.findHotSpots(threshold, path, consumer);
path.removeLast();
}

path.removeLast();
}

if (otherKeysIdx != null) {
final QueryIndex<T> otherKeysIdxRef = otherKeysIdx;
if (otherKeysIdxRef != null) {
path.addLast("other-keys");
otherKeysIdx.findHotSpots(threshold, path, consumer);
otherKeysIdxRef.findHotSpots(threshold, path, consumer);
path.removeLast();
}

if (missingKeysIdx != null) {
final QueryIndex<T> missingKeysIdxRef = missingKeysIdx;
if (missingKeysIdxRef != null) {
path.addLast("missing-keys");
missingKeysIdx.findHotSpots(threshold, path, consumer);
missingKeysIdxRef.findHotSpots(threshold, path, consumer);
path.removeLast();
}
}
Expand All @@ -682,8 +701,9 @@ private StringBuilder indent(StringBuilder builder, int n) {
}

private void buildString(StringBuilder builder, int n) {
if (key != null) {
indent(builder, n).append("key: [").append(key).append("]\n");
final String keyRef = key;
if (keyRef != null) {
indent(builder, n).append("key: [").append(keyRef).append("]\n");
}
if (!equalChecks.isEmpty()) {
indent(builder, n).append("equal checks:\n");
Expand All @@ -699,17 +719,20 @@ private void buildString(StringBuilder builder, int n) {
idx.buildString(builder, n + 1);
});
}
if (hasKeyIdx != null) {
final QueryIndex<T> hasKeyIdxRef = hasKeyIdx;
if (hasKeyIdxRef != null) {
indent(builder, n).append("has key:\n");
hasKeyIdx.buildString(builder, n + 1);
hasKeyIdxRef.buildString(builder, n + 1);
}
if (otherKeysIdx != null) {
final QueryIndex<T> otherKeysIdxRef = otherKeysIdx;
if (otherKeysIdxRef != null) {
indent(builder, n).append("other keys:\n");
otherKeysIdx.buildString(builder, n + 1);
otherKeysIdxRef.buildString(builder, n + 1);
}
if (missingKeysIdx != null) {
final QueryIndex<T> missingKeysIdxRef = missingKeysIdx;
if (missingKeysIdxRef != null) {
indent(builder, n).append("missing keys:\n");
missingKeysIdx.buildString(builder, n + 1);
missingKeysIdxRef.buildString(builder, n + 1);
}
if (!matches.isEmpty()) {
indent(builder, n).append("matches:\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

public class QueryIndexTest {
Expand Down Expand Up @@ -589,4 +590,50 @@ public void couldMatchPartial() {
Assertions.assertFalse(idx.couldMatch(Query.toMap(id("foo2", "id", "bar"))::get));
Assertions.assertFalse(idx.couldMatch(Query.toMap(id("foo", "app", "bar-main"))::get));
}

@Test
public void updateRaceCondition() throws Exception {
Query q1 = Parser.parseQuery("name,test,:eq");
// Will be placed in otherKeysIdx for parent
Query q2 = Parser.parseQuery("foo,bar,:eq");

QueryIndex<Query> idx = QueryIndex.newInstance(new NoopRegistry());
idx.add(q1, q1);

final int N = 1_000_000;
final List<Thread> threads = new ArrayList<>(16);

// Query tasks
final AtomicInteger matches = new AtomicInteger();
final Id id = Id.create("test").withTag("foo", "bar");
for (int i = 0; i < 16; ++i) {
Thread t = new Thread(() -> {
for (int j = 0; j < N; ++j) {
matches.addAndGet(idx.findMatches(id).size());
}
});
t.start();
threads.add(t);
}

// Update by adding and removing q2 to lead to race condition for accessing
// volatile reference
Thread updater = new Thread(() -> {
for (int j = 0; j < N; ++j) {
idx.add(q2, q2);
idx.remove(q2, q2);
}
});
updater.start();
updater.join();

// Wait for read tasks to complete
threads.forEach(t -> {
try {
t.join();
} catch (Exception e) {
Assertions.fail(e);
}
});
}
}

0 comments on commit b050f12

Please sign in to comment.