diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java index cc1d3a4ad60..8cb515b1e64 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java @@ -153,6 +153,7 @@ public interface BookKeeperServerStats { String THREAD_RUNTIME = "THREAD_RUNTIME"; String MAJOR_COMPACTION_COUNT = "MAJOR_COMPACTION_TOTAL"; String MINOR_COMPACTION_COUNT = "MINOR_COMPACTION_TOTAL"; + String ENTRY_LOCATION_COMPACTION_COUNT = "ENTRY_LOCATION_COMPACTION_TOTAL"; String ACTIVE_LEDGER_COUNT = "ACTIVE_LEDGER_TOTAL"; String DELETED_LEDGER_COUNT = "DELETED_LEDGER_TOTAL"; String GC_LEDGER_RUNTIME = "GC_LEDGER_RUNTIME"; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java index 3f872092f01..4ad450a64f1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java @@ -42,6 +42,8 @@ public class GarbageCollectionStatus { private long lastMajorCompactionTime; private long lastMinorCompactionTime; + private long lastEntryLocationCompactionTime; private long majorCompactionCounter; private long minorCompactionCounter; + private long entryLocationCompactionCounter; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index f9bdef9d565..2ac156a91ac 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -34,6 +34,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -87,6 +88,10 @@ public class GarbageCollectorThread implements Runnable { long majorCompactionMaxTimeMillis; long lastMajorCompactionTime; + final long entryLocationCompactionInterval; + long randomCompactionDelay; + long lastEntryLocationCompactionTime; + @Getter final boolean isForceGCAllowWhenNoSpace; @@ -211,6 +216,8 @@ public GarbageCollectorThread(ServerConfiguration conf, isForceGCAllowWhenNoSpace = conf.getIsForceGCAllowWhenNoSpace(); majorCompactionMaxTimeMillis = conf.getMajorCompactionMaxTimeMillis(); minorCompactionMaxTimeMillis = conf.getMinorCompactionMaxTimeMillis(); + entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval() * SECOND; + randomCompactionDelay= ThreadLocalRandom.current().nextLong(entryLocationCompactionInterval); boolean isForceAllowCompaction = conf.isForceAllowCompaction(); @@ -277,12 +284,21 @@ public void removeEntryLog(long logToRemove) { } } + if (entryLocationCompactionInterval > 0) { + if (entryLocationCompactionInterval < gcWaitTime) { + throw new IOException( + "Too short entry location compaction interval : " + entryLocationCompactionInterval); + } + } + LOG.info("Minor Compaction : enabled=" + enableMinorCompaction + ", threshold=" + minorCompactionThreshold + ", interval=" + minorCompactionInterval); LOG.info("Major Compaction : enabled=" + enableMajorCompaction + ", threshold=" + majorCompactionThreshold + ", interval=" + majorCompactionInterval); + LOG.info("Entry Location Compaction : interval=" + entryLocationCompactionInterval + ", randomCompactionDelay=" + + randomCompactionDelay); - lastMinorCompactionTime = lastMajorCompactionTime = System.currentTimeMillis(); + lastMinorCompactionTime = lastMajorCompactionTime = lastEntryLocationCompactionTime = System.currentTimeMillis(); } private EntryLogMetadataMap createEntryLogMetadataMap() throws IOException { @@ -470,6 +486,7 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin gcStats.getMajorCompactionCounter().inc(); majorCompacting.set(false); } + } else if (((isForceMinorCompactionAllow && force) || (enableMinorCompaction && (force || curTime - lastMinorCompactionTime > minorCompactionInterval))) && (!suspendMinor)) { @@ -489,6 +506,20 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin minorCompacting.set(false); } } + if (entryLocationCompactionInterval > 0 && (curTime - lastEntryLocationCompactionTime > ( + entryLocationCompactionInterval + randomCompactionDelay))) { + // enter entry location compaction + LOG.info( + "Enter entry location compaction, entryLocationCompactionInterval {}, randomCompactionDelay " + + "{}, lastEntryLocationCompactionTime {}", + entryLocationCompactionInterval, randomCompactionDelay, lastEntryLocationCompactionTime); + ledgerStorage.entryLocationCompact(); + lastEntryLocationCompactionTime = System.currentTimeMillis(); + randomCompactionDelay = ThreadLocalRandom.current().nextLong(entryLocationCompactionInterval); + LOG.info("Next entry location compaction interval {}", + entryLocationCompactionInterval + randomCompactionDelay); + gcStats.getEntryLocationCompactionCounter().inc(); + } gcStats.getCompactRuntime() .registerSuccessfulEvent(MathUtils.elapsedNanos(compactStart), TimeUnit.NANOSECONDS); gcStats.getGcThreadRuntime().registerSuccessfulEvent( @@ -855,8 +886,10 @@ public GarbageCollectionStatus getGarbageCollectionStatus() { .minorCompacting(minorCompacting.get()) .lastMajorCompactionTime(lastMajorCompactionTime) .lastMinorCompactionTime(lastMinorCompactionTime) + .lastEntryLocationCompactionTime(lastEntryLocationCompactionTime) .majorCompactionCounter(gcStats.getMajorCompactionCounter().get()) .minorCompactionCounter(gcStats.getMinorCompactionCounter().get()) + .entryLocationCompactionCounter(gcStats.getEntryLocationCompactionCounter().get()) .build(); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java index f579036df08..a9ecd180ddf 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java @@ -26,6 +26,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.COMPACT_RUNTIME; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.DELETED_LEDGER_COUNT; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRY_LOCATION_COMPACTION_COUNT; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRY_LOG_COMPACT_RATIO; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRY_LOG_SPACE_BYTES; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.EXTRACT_META_RUNTIME; @@ -67,6 +68,11 @@ public class GarbageCollectorStats { help = "Number of major compactions" ) private final Counter majorCompactionCounter; + @StatsDoc( + name = ENTRY_LOCATION_COMPACTION_COUNT, + help = "Number of entry location compactions" + ) + private final Counter entryLocationCompactionCounter; @StatsDoc( name = RECLAIMED_DELETION_SPACE_BYTES, help = "Number of disk space bytes reclaimed via deleting entry log files" @@ -147,6 +153,7 @@ public GarbageCollectorStats(StatsLogger statsLogger, this.minorCompactionCounter = statsLogger.getCounter(MINOR_COMPACTION_COUNT); this.majorCompactionCounter = statsLogger.getCounter(MAJOR_COMPACTION_COUNT); + this.entryLocationCompactionCounter = statsLogger.getCounter(ENTRY_LOCATION_COMPACTION_COUNT); this.reclaimedSpaceViaCompaction = statsLogger.getCounter(RECLAIMED_COMPACTION_SPACE_BYTES); this.reclaimedSpaceViaDeletes = statsLogger.getCounter(RECLAIMED_DELETION_SPACE_BYTES); this.reclaimFailedToDelete = statsLogger.getCounter(RECLAIM_FAILED_TO_DELETE); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 6ce2d4b4f54..1a1e92dd304 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -313,6 +313,8 @@ public boolean isMinorGcSuspended() { public void entryLocationCompact() { if (entryLocationIndex.isCompacting()) { // RocksDB already running compact. + log.info("Compacting directory {}, skipping this entryLocationCompaction this time.", + entryLocationIndex.getEntryLocationDBPath()); return; } cleanupExecutor.execute(() -> { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 01848882c55..e4d19edd7e2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -106,6 +106,7 @@ public class ServerConfiguration extends AbstractConfigurationIf it is set to less than zero, the entry location compaction is disabled. + * + * @return high water mark. + */ + public long getEntryLocationCompactionInterval() { + return getLong(ENTRY_LOCATION_COMPACTION_INTERVAL, -1); + } + + /** + * Set interval to run entry location compaction. + * + * @see #getMajorCompactionInterval() + * + * @param interval + * Interval to run entry location compaction + * @return server configuration + */ + public ServerConfiguration setEntryLocationCompactionInterval(long interval) { + setProperty(ENTRY_LOCATION_COMPACTION_INTERVAL, interval); + return this; + } + /** * Should we remove pages from page cache after force write. * @@ -3214,6 +3240,9 @@ public void validate() throws ConfigurationException { if (getMajorCompactionInterval() > 0 && getMajorCompactionInterval() * SECOND < getGcWaitTime()) { throw new ConfigurationException("majorCompactionInterval should be >= gcWaitTime."); } + if (getEntryLocationCompactionInterval() > 0 && getEntryLocationCompactionInterval() * SECOND < getGcWaitTime()) { + throw new ConfigurationException("entryLocationCompactionInterval should be >= gcWaitTime."); + } } /** diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java index 04ac87818f7..d8aa62d0d23 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java @@ -158,6 +158,7 @@ public void testEntryLogSizeLimit() throws ConfigurationException { public void testCompactionSettings() throws ConfigurationException { ServerConfiguration conf = new ServerConfiguration(); long major, minor; + long entryLocationCompactionInterval; // Default Values major = conf.getMajorCompactionMaxTimeMillis(); @@ -239,5 +240,24 @@ public void testCompactionSettings() throws ConfigurationException { minorThreshold = conf.getMinorCompactionThreshold(); Assert.assertEquals(0.6, majorThreshold, 0.00001); Assert.assertEquals(0.3, minorThreshold, 0.00001); + + // Default Values + entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval(); + Assert.assertEquals(-1, entryLocationCompactionInterval); + + // Set entry location compaction + conf.setEntryLocationCompactionInterval(3600); + entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval(); + Assert.assertEquals(3600, entryLocationCompactionInterval); + + conf.setEntryLocationCompactionInterval(550); + try { + conf.validate(); + fail(); + } catch (ConfigurationException ignore) { + } + + conf.setEntryLocationCompactionInterval(650); + conf.validate(); } } diff --git a/conf/bk_server.conf b/conf/bk_server.conf index 1dfb8215245..d96472a1eac 100644 --- a/conf/bk_server.conf +++ b/conf/bk_server.conf @@ -581,6 +581,11 @@ ledgerDirectories=/tmp/bk-data # Set the rate at which compaction will readd entries. The unit is bytes added per second. # compactionRateByBytes=1000000 +# Interval to run entry location compaction, in seconds +# If it is set to less than zero, the entry location compaction is disabled. +# Note: should be greater than gcWaitTime. +# entryLocationCompactionInterval=-1 + # Flag to enable/disable transactional compaction. If it is set to true, it will use transactional compaction, # which it will use new entry log files to store compacted entries during compaction; if it is set to false, # it will use normal compaction, which it shares same entry log file with normal add operations.