diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java index f60d8a93b6..0f2ffae3c7 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java @@ -15,6 +15,7 @@ import java.util.TimerTask; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Phaser; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.io.FileUtils; import org.apache.lucene.analysis.Analyzer; @@ -73,6 +74,10 @@ public class LuceneIndexStoreImpl implements LogStore { private final ReentrantLock indexWriterLock = new ReentrantLock(); + // We use a phaser to allow all originally submitted writes (potentially in other threads) to + // finish before invoking a close on the indexWriter + private final Phaser pendingWritesPhaser = new Phaser(); + // TODO: Set the policy via a lucene config file. public static LuceneIndexStoreImpl makeLogStore( File dataDirectory, KaldbConfigs.LuceneConfig luceneConfig, MeterRegistry metricsRegistry) @@ -255,6 +260,7 @@ private void handleNonFatal(Throwable ex) { @Override public void addMessage(LogMessage message) { + pendingWritesPhaser.register(); try { messagesReceivedCounter.increment(); if (indexWriter.isPresent()) { @@ -270,6 +276,8 @@ public void addMessage(LogMessage message) { // TODO: In future may need to handle this case more gracefully. LOG.error("failed to add document", e); new RuntimeHalterImpl().handleFatal(e); + } finally { + pendingWritesPhaser.arriveAndDeregister(); } } @@ -366,6 +374,8 @@ public void releaseIndexCommit(IndexCommit indexCommit) { */ @Override public void close() { + // Wait for all previously submitted pendingWrites to finish + pendingWritesPhaser.awaitAdvance(0); indexWriterLock.lock(); try { if (indexWriter.isEmpty()) { diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java index 464a6c4ea3..4e53d4705d 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java @@ -38,20 +38,27 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.lucene.index.IndexCommit; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; @SuppressWarnings("unused") public class LuceneIndexStoreImplTest { + private static final Logger LOG = LoggerFactory.getLogger(LuceneIndexStoreImplTest.class); + @BeforeAll public static void beforeClass() { Tracing.newBuilder().build(); @@ -539,4 +546,58 @@ public void testMaxRamBufferCalculations() { assertThat(LuceneIndexStoreImpl.getRAMBufferSizeMB(Long.MAX_VALUE)).isEqualTo(256); assertThat(LuceneIndexStoreImpl.getRAMBufferSizeMB((long) 24e+9)).isEqualTo(2048); } + + @Test + @Disabled("Demo test of phaser logic") + public void phaserDemoTest() { + AtomicInteger registers = new AtomicInteger(0); + AtomicInteger deRegisters = new AtomicInteger(0); + Phaser phaser = new Phaser(); + + Thread.ofVirtual() + .start( + () -> { + try { + Thread.sleep(100); + } catch (Exception e) { + throw new RuntimeException(e); + } + phaser.register(); + registers.incrementAndGet(); + LOG.info("Phase1 enter"); + try { + Thread.sleep(5000); + } catch (Exception e) { + throw new RuntimeException(e); + } + phaser.arriveAndDeregister(); + deRegisters.incrementAndGet(); + LOG.info("Phase1 exit"); + }); + + Thread.ofVirtual() + .start( + () -> { + try { + Thread.sleep(10); + } catch (Exception e) { + throw new RuntimeException(e); + } + phaser.register(); + registers.incrementAndGet(); + LOG.info("Phase2 enter"); + try { + Thread.sleep(1000); + } catch (Exception e) { + throw new RuntimeException(e); + } + phaser.arriveAndDeregister(); + deRegisters.incrementAndGet(); + LOG.info("Phase2 exit"); + }); + + LOG.info("Wait for phase close"); + phaser.awaitAdvance(0); + LOG.info("Registers - {}, deRegisters - {}", registers.get(), deRegisters.get()); + } }