Skip to content

Commit

Permalink
Add Phaser barrier for preventing Lucene close race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Feb 16, 2024
1 parent bfa3a76 commit e60ca0f
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Phaser;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.FileUtils;
import org.apache.lucene.analysis.Analyzer;
Expand Down Expand Up @@ -73,6 +74,10 @@ public class LuceneIndexStoreImpl implements LogStore<LogMessage> {

private final ReentrantLock indexWriterLock = new ReentrantLock();

// We use a phaser to allow all originally submitted writes (potentially in other threads) to
// finish before invoking a close on the indexWriter
private final Phaser pendingWritesPhaser = new Phaser();

// TODO: Set the policy via a lucene config file.
public static LuceneIndexStoreImpl makeLogStore(
File dataDirectory, KaldbConfigs.LuceneConfig luceneConfig, MeterRegistry metricsRegistry)
Expand Down Expand Up @@ -255,6 +260,7 @@ private void handleNonFatal(Throwable ex) {

@Override
public void addMessage(LogMessage message) {
pendingWritesPhaser.register();
try {
messagesReceivedCounter.increment();
if (indexWriter.isPresent()) {
Expand All @@ -270,6 +276,8 @@ public void addMessage(LogMessage message) {
// TODO: In future may need to handle this case more gracefully.
LOG.error("failed to add document", e);
new RuntimeHalterImpl().handleFatal(e);
} finally {
pendingWritesPhaser.arriveAndDeregister();
}
}

Expand Down Expand Up @@ -366,6 +374,8 @@ public void releaseIndexCommit(IndexCommit indexCommit) {
*/
@Override
public void close() {
// Wait for all previously submitted pendingWrites to finish
pendingWritesPhaser.awaitAdvance(0);
indexWriterLock.lock();
try {
if (indexWriter.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,27 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.lucene.index.IndexCommit;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;

@SuppressWarnings("unused")
public class LuceneIndexStoreImplTest {
private static final Logger LOG = LoggerFactory.getLogger(LuceneIndexStoreImplTest.class);

@BeforeAll
public static void beforeClass() {
Tracing.newBuilder().build();
Expand Down Expand Up @@ -539,4 +546,58 @@ public void testMaxRamBufferCalculations() {
assertThat(LuceneIndexStoreImpl.getRAMBufferSizeMB(Long.MAX_VALUE)).isEqualTo(256);
assertThat(LuceneIndexStoreImpl.getRAMBufferSizeMB((long) 24e+9)).isEqualTo(2048);
}

@Test
@Disabled("Demo test of phaser logic")
public void phaserDemoTest() {
AtomicInteger registers = new AtomicInteger(0);
AtomicInteger deRegisters = new AtomicInteger(0);
Phaser phaser = new Phaser();

Thread.ofVirtual()
.start(
() -> {
try {
Thread.sleep(100);
} catch (Exception e) {
throw new RuntimeException(e);
}
phaser.register();
registers.incrementAndGet();
LOG.info("Phase1 enter");
try {
Thread.sleep(5000);
} catch (Exception e) {
throw new RuntimeException(e);
}
phaser.arriveAndDeregister();
deRegisters.incrementAndGet();
LOG.info("Phase1 exit");
});

Thread.ofVirtual()
.start(
() -> {
try {
Thread.sleep(10);
} catch (Exception e) {
throw new RuntimeException(e);
}
phaser.register();
registers.incrementAndGet();
LOG.info("Phase2 enter");
try {
Thread.sleep(1000);
} catch (Exception e) {
throw new RuntimeException(e);
}
phaser.arriveAndDeregister();
deRegisters.incrementAndGet();
LOG.info("Phase2 exit");
});

LOG.info("Wait for phase close");
phaser.awaitAdvance(0);
LOG.info("Registers - {}, deRegisters - {}", registers.get(), deRegisters.get());
}
}

0 comments on commit e60ca0f

Please sign in to comment.