Skip to content

Commit

Permalink
Copy thread safety fix from parent repo
Browse files Browse the repository at this point in the history
Tests now running successfully and repeatedly

opensearch-project@e34d607
  • Loading branch information
bhallett-atlassian committed Oct 10, 2024
1 parent 44b9a42 commit a3ad0a2
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 27 deletions.
5 changes: 1 addition & 4 deletions src/main/java/com/o19s/es/ltr/query/RankerQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,7 @@ public boolean isCacheable(LeafReaderContext ctx) {
}

List<Weight> weights = new ArrayList<>(queries.size());
// XXX: this is not thread safe and may run into extremely weird issues
// if the searcher uses the parallel collector
// Hopefully elastic never runs
MutableSupplier<LtrRanker.FeatureVector> vectorSupplier = new Suppliers.FeatureVectorSupplier();
MutableSupplier<LtrRanker.FeatureVector> vectorSupplier = new Suppliers.MutableSupplier<>();
FVLtrRankerWrapper ltrRankerWrapper = new FVLtrRankerWrapper(ranker, vectorSupplier);
LtrRewriteContext context = new LtrRewriteContext(ranker, vectorSupplier);
for (Query q : queries) {
Expand Down
30 changes: 7 additions & 23 deletions src/main/java/com/o19s/es/ltr/utils/Suppliers.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@
import com.o19s.es.ltr.ranker.LtrRanker;
import org.opensearch.core.Assertions;

import java.util.concurrent.atomic.AtomicReference;
import java.util.Objects;
import java.util.function.Supplier;

public final class Suppliers {
/**
* Utility class
*/
private Suppliers() {}
private Suppliers() {
}

/**
* @param supplier the original supplier to store
* @param <E> the supplied type
* @param <E> the supplied type
* @return a supplier storing and returning the same instance
*/
public static <E> Supplier<E> memoize(Supplier<E> supplier) {
Expand Down Expand Up @@ -66,33 +68,15 @@ public E get() {
* A mutable supplier
*/
public static class MutableSupplier<T> implements Supplier<T> {
T obj;
private final AtomicReference<T> ref = new AtomicReference<>();

@Override
public T get() {
return obj;
return ref.get();
}

public void set(T obj) {
this.obj = obj;
}
}

/**
* Simple wrapper to make sure we run on the same thread
*/
public static class FeatureVectorSupplier extends MutableSupplier<LtrRanker.FeatureVector> {
private final long threadId = Assertions.ENABLED ? Thread.currentThread().getId() : 0;

public LtrRanker.FeatureVector get() {
assert threadId == Thread.currentThread().getId();
return super.get();
}

@Override
public void set(LtrRanker.FeatureVector obj) {
assert threadId == Thread.currentThread().getId();
super.set(obj);
this.ref.set(obj);
}
}
}

0 comments on commit a3ad0a2

Please sign in to comment.