From 97a38dadd0163177bf5f6076f4cdc018964140eb Mon Sep 17 00:00:00 2001 From: houbonan Date: Thu, 20 Feb 2025 16:55:58 +0800 Subject: [PATCH 1/3] fix entry location compaction --- .../bookie/GarbageCollectorThread.java | 7 ++++++ .../bookkeeper/conf/ServerConfiguration.java | 23 +++++++++++++++++++ .../conf/TestServerConfiguration.java | 8 +++++++ conf/bk_server.conf | 3 +++ 4 files changed, 41 insertions(+) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index f9bdef9d565..16f2080240c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -87,6 +87,8 @@ public class GarbageCollectorThread implements Runnable { long majorCompactionMaxTimeMillis; long lastMajorCompactionTime; + boolean entryLocationCompaction = false; + @Getter final boolean isForceGCAllowWhenNoSpace; @@ -211,6 +213,7 @@ public GarbageCollectorThread(ServerConfiguration conf, isForceGCAllowWhenNoSpace = conf.getIsForceGCAllowWhenNoSpace(); majorCompactionMaxTimeMillis = conf.getMajorCompactionMaxTimeMillis(); minorCompactionMaxTimeMillis = conf.getMinorCompactionMaxTimeMillis(); + entryLocationCompaction = conf.getEntryLocationCompactionEnabled(); boolean isForceAllowCompaction = conf.isForceAllowCompaction(); @@ -470,6 +473,10 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin gcStats.getMajorCompactionCounter().inc(); majorCompacting.set(false); } + if (entryLocationCompaction) { + // submit entryLocation compaction task + ledgerStorage.entryLocationCompact(); + } } else if (((isForceMinorCompactionAllow && force) || (enableMinorCompaction && (force || curTime - lastMinorCompactionTime > minorCompactionInterval))) && (!suspendMinor)) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 01848882c55..a7fa2fcdf7c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -106,6 +106,7 @@ public class ServerConfiguration extends AbstractConfiguration Date: Thu, 6 Mar 2025 14:59:33 +0800 Subject: [PATCH 2/3] replace entryLocationCompactionEnable with entryLocationCompactionInterval --- .../bookie/BookKeeperServerStats.java | 1 + .../bookie/GarbageCollectionStatus.java | 2 ++ .../bookie/GarbageCollectorThread.java | 36 +++++++++++++++---- .../bookie/stats/GarbageCollectorStats.java | 7 ++++ .../ldb/SingleDirectoryDbLedgerStorage.java | 2 ++ .../bookkeeper/conf/ServerConfiguration.java | 30 +++++++++------- .../conf/TestServerConfiguration.java | 28 ++++++++++----- conf/bk_server.conf | 6 ++-- 8 files changed, 83 insertions(+), 29 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java index cc1d3a4ad60..8cb515b1e64 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java @@ -153,6 +153,7 @@ public interface BookKeeperServerStats { String THREAD_RUNTIME = "THREAD_RUNTIME"; String MAJOR_COMPACTION_COUNT = "MAJOR_COMPACTION_TOTAL"; String MINOR_COMPACTION_COUNT = "MINOR_COMPACTION_TOTAL"; + String ENTRY_LOCATION_COMPACTION_COUNT = "ENTRY_LOCATION_COMPACTION_TOTAL"; String ACTIVE_LEDGER_COUNT = "ACTIVE_LEDGER_TOTAL"; String DELETED_LEDGER_COUNT = "DELETED_LEDGER_TOTAL"; String GC_LEDGER_RUNTIME = "GC_LEDGER_RUNTIME"; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java index 3f872092f01..4ad450a64f1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java @@ -42,6 +42,8 @@ public class GarbageCollectionStatus { private long lastMajorCompactionTime; private long lastMinorCompactionTime; + private long lastEntryLocationCompactionTime; private long majorCompactionCounter; private long minorCompactionCounter; + private long entryLocationCompactionCounter; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index 16f2080240c..8f7635dca28 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -87,7 +87,9 @@ public class GarbageCollectorThread implements Runnable { long majorCompactionMaxTimeMillis; long lastMajorCompactionTime; - boolean entryLocationCompaction = false; + boolean enableEntryLocationCompaction = false; + final long entryLocationCompactionInterval; + long lastEntryLocationCompactionTime; @Getter final boolean isForceGCAllowWhenNoSpace; @@ -213,7 +215,7 @@ public GarbageCollectorThread(ServerConfiguration conf, isForceGCAllowWhenNoSpace = conf.getIsForceGCAllowWhenNoSpace(); majorCompactionMaxTimeMillis = conf.getMajorCompactionMaxTimeMillis(); minorCompactionMaxTimeMillis = conf.getMinorCompactionMaxTimeMillis(); - entryLocationCompaction = conf.getEntryLocationCompactionEnabled(); + entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval() * SECOND; boolean isForceAllowCompaction = conf.isForceAllowCompaction(); @@ -280,12 +282,22 @@ public void removeEntryLog(long logToRemove) { } } + if (entryLocationCompactionInterval > 0) { + if (entryLocationCompactionInterval < gcWaitTime) { + throw new IOException( + "Too short entry location compaction interval : " + entryLocationCompactionInterval); + } + enableEntryLocationCompaction = true; + } + LOG.info("Minor Compaction : enabled=" + enableMinorCompaction + ", threshold=" + minorCompactionThreshold + ", interval=" + minorCompactionInterval); LOG.info("Major Compaction : enabled=" + enableMajorCompaction + ", threshold=" + majorCompactionThreshold + ", interval=" + majorCompactionInterval); + LOG.info("Entry Location Compaction : enabled=" + enableEntryLocationCompaction + ", interval=" + + entryLocationCompactionInterval); - lastMinorCompactionTime = lastMajorCompactionTime = System.currentTimeMillis(); + lastMinorCompactionTime = lastMajorCompactionTime = lastEntryLocationCompactionTime = System.currentTimeMillis(); } private EntryLogMetadataMap createEntryLogMetadataMap() throws IOException { @@ -473,10 +485,7 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin gcStats.getMajorCompactionCounter().inc(); majorCompacting.set(false); } - if (entryLocationCompaction) { - // submit entryLocation compaction task - ledgerStorage.entryLocationCompact(); - } + } else if (((isForceMinorCompactionAllow && force) || (enableMinorCompaction && (force || curTime - lastMinorCompactionTime > minorCompactionInterval))) && (!suspendMinor)) { @@ -496,6 +505,17 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin minorCompacting.set(false); } } + if (enableEntryLocationCompaction && (curTime - lastEntryLocationCompactionTime + > entryLocationCompactionInterval)) { + // enter entry location compaction + LOG.info( + "Enter entry location compaction, entryLocationCompactionInterval {}, " + + "lastEntryLocationCompactionTime {}", + entryLocationCompactionInterval, lastEntryLocationCompactionTime); + ledgerStorage.entryLocationCompact(); + lastEntryLocationCompactionTime = System.currentTimeMillis(); + gcStats.getEntryLocationCompactionCounter().inc(); + } gcStats.getCompactRuntime() .registerSuccessfulEvent(MathUtils.elapsedNanos(compactStart), TimeUnit.NANOSECONDS); gcStats.getGcThreadRuntime().registerSuccessfulEvent( @@ -862,8 +882,10 @@ public GarbageCollectionStatus getGarbageCollectionStatus() { .minorCompacting(minorCompacting.get()) .lastMajorCompactionTime(lastMajorCompactionTime) .lastMinorCompactionTime(lastMinorCompactionTime) + .lastEntryLocationCompactionTime(lastEntryLocationCompactionTime) .majorCompactionCounter(gcStats.getMajorCompactionCounter().get()) .minorCompactionCounter(gcStats.getMinorCompactionCounter().get()) + .entryLocationCompactionCounter(gcStats.getEntryLocationCompactionCounter().get()) .build(); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java index f579036df08..a9ecd180ddf 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java @@ -26,6 +26,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.COMPACT_RUNTIME; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.DELETED_LEDGER_COUNT; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRY_LOCATION_COMPACTION_COUNT; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRY_LOG_COMPACT_RATIO; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRY_LOG_SPACE_BYTES; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.EXTRACT_META_RUNTIME; @@ -67,6 +68,11 @@ public class GarbageCollectorStats { help = "Number of major compactions" ) private final Counter majorCompactionCounter; + @StatsDoc( + name = ENTRY_LOCATION_COMPACTION_COUNT, + help = "Number of entry location compactions" + ) + private final Counter entryLocationCompactionCounter; @StatsDoc( name = RECLAIMED_DELETION_SPACE_BYTES, help = "Number of disk space bytes reclaimed via deleting entry log files" @@ -147,6 +153,7 @@ public GarbageCollectorStats(StatsLogger statsLogger, this.minorCompactionCounter = statsLogger.getCounter(MINOR_COMPACTION_COUNT); this.majorCompactionCounter = statsLogger.getCounter(MAJOR_COMPACTION_COUNT); + this.entryLocationCompactionCounter = statsLogger.getCounter(ENTRY_LOCATION_COMPACTION_COUNT); this.reclaimedSpaceViaCompaction = statsLogger.getCounter(RECLAIMED_COMPACTION_SPACE_BYTES); this.reclaimedSpaceViaDeletes = statsLogger.getCounter(RECLAIMED_DELETION_SPACE_BYTES); this.reclaimFailedToDelete = statsLogger.getCounter(RECLAIM_FAILED_TO_DELETE); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 6ce2d4b4f54..1a1e92dd304 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -313,6 +313,8 @@ public boolean isMinorGcSuspended() { public void entryLocationCompact() { if (entryLocationIndex.isCompacting()) { // RocksDB already running compact. + log.info("Compacting directory {}, skipping this entryLocationCompaction this time.", + entryLocationIndex.getEntryLocationDBPath()); return; } cleanupExecutor.execute(() -> { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index a7fa2fcdf7c..e4d19edd7e2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -106,7 +106,7 @@ public class ServerConfiguration extends AbstractConfigurationIf it is set to less than zero, the entry location compaction is disabled. + * + * @return high water mark. */ - public boolean getEntryLocationCompactionEnabled() { - return getBoolean(ENTRY_LOCATION_COMPACTION_ENABLED, false); + public long getEntryLocationCompactionInterval() { + return getLong(ENTRY_LOCATION_COMPACTION_INTERVAL, -1); } /** - * Sets that whether to enable entry location compaction with major compaction. + * Set interval to run entry location compaction. * - * @param enabled - * - true if entry location compaction should be enabled with major compaction. Otherwise false. - * false. - * @return ServerConfiguration + * @see #getMajorCompactionInterval() + * + * @param interval + * Interval to run entry location compaction + * @return server configuration */ - public ServerConfiguration setEntryLocationCompactionEnabled(boolean enabled) { - setProperty(ENTRY_LOCATION_COMPACTION_ENABLED, enabled); + public ServerConfiguration setEntryLocationCompactionInterval(long interval) { + setProperty(ENTRY_LOCATION_COMPACTION_INTERVAL, interval); return this; } @@ -3237,6 +3240,9 @@ public void validate() throws ConfigurationException { if (getMajorCompactionInterval() > 0 && getMajorCompactionInterval() * SECOND < getGcWaitTime()) { throw new ConfigurationException("majorCompactionInterval should be >= gcWaitTime."); } + if (getEntryLocationCompactionInterval() > 0 && getEntryLocationCompactionInterval() * SECOND < getGcWaitTime()) { + throw new ConfigurationException("entryLocationCompactionInterval should be >= gcWaitTime."); + } } /** diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java index 48d30eb3e05..d8aa62d0d23 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java @@ -158,20 +158,13 @@ public void testEntryLogSizeLimit() throws ConfigurationException { public void testCompactionSettings() throws ConfigurationException { ServerConfiguration conf = new ServerConfiguration(); long major, minor; - boolean entryLocationCompaction; + long entryLocationCompactionInterval; // Default Values major = conf.getMajorCompactionMaxTimeMillis(); minor = conf.getMinorCompactionMaxTimeMillis(); - entryLocationCompaction=conf.getEntryLocationCompactionEnabled(); Assert.assertEquals(-1, major); Assert.assertEquals(-1, minor); - Assert.assertFalse(entryLocationCompaction); - - //Set entryLocationCompaction enable - conf.setEntryLocationCompactionEnabled(true); - entryLocationCompaction=conf.getEntryLocationCompactionEnabled(); - Assert.assertTrue(entryLocationCompaction); // Set values major then minor conf.setMajorCompactionMaxTimeMillis(500).setMinorCompactionMaxTimeMillis(250); @@ -247,5 +240,24 @@ public void testCompactionSettings() throws ConfigurationException { minorThreshold = conf.getMinorCompactionThreshold(); Assert.assertEquals(0.6, majorThreshold, 0.00001); Assert.assertEquals(0.3, minorThreshold, 0.00001); + + // Default Values + entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval(); + Assert.assertEquals(-1, entryLocationCompactionInterval); + + // Set entry location compaction + conf.setEntryLocationCompactionInterval(3600); + entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval(); + Assert.assertEquals(3600, entryLocationCompactionInterval); + + conf.setEntryLocationCompactionInterval(550); + try { + conf.validate(); + fail(); + } catch (ConfigurationException ignore) { + } + + conf.setEntryLocationCompactionInterval(650); + conf.validate(); } } diff --git a/conf/bk_server.conf b/conf/bk_server.conf index 67c6ba033c0..d96472a1eac 100644 --- a/conf/bk_server.conf +++ b/conf/bk_server.conf @@ -581,8 +581,10 @@ ledgerDirectories=/tmp/bk-data # Set the rate at which compaction will readd entries. The unit is bytes added per second. # compactionRateByBytes=1000000 -# Sets that whether to enable entry location compaction with major compaction. -# entryLocationCompactionEnabled=true +# Interval to run entry location compaction, in seconds +# If it is set to less than zero, the entry location compaction is disabled. +# Note: should be greater than gcWaitTime. +# entryLocationCompactionInterval=-1 # Flag to enable/disable transactional compaction. If it is set to true, it will use transactional compaction, # which it will use new entry log files to store compacted entries during compaction; if it is set to false, From ea8f55e3e1fc97d36f85c38410fca0d89ea4cb18 Mon Sep 17 00:00:00 2001 From: houbonan Date: Mon, 10 Mar 2025 14:54:54 +0800 Subject: [PATCH 3/3] Add randomCompactionDelay to avoid all the bookies triggering compaction simultaneously --- .../bookie/GarbageCollectorThread.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index 8f7635dca28..2ac156a91ac 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -34,6 +34,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -87,8 +88,8 @@ public class GarbageCollectorThread implements Runnable { long majorCompactionMaxTimeMillis; long lastMajorCompactionTime; - boolean enableEntryLocationCompaction = false; final long entryLocationCompactionInterval; + long randomCompactionDelay; long lastEntryLocationCompactionTime; @Getter @@ -216,6 +217,7 @@ public GarbageCollectorThread(ServerConfiguration conf, majorCompactionMaxTimeMillis = conf.getMajorCompactionMaxTimeMillis(); minorCompactionMaxTimeMillis = conf.getMinorCompactionMaxTimeMillis(); entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval() * SECOND; + randomCompactionDelay= ThreadLocalRandom.current().nextLong(entryLocationCompactionInterval); boolean isForceAllowCompaction = conf.isForceAllowCompaction(); @@ -287,15 +289,14 @@ public void removeEntryLog(long logToRemove) { throw new IOException( "Too short entry location compaction interval : " + entryLocationCompactionInterval); } - enableEntryLocationCompaction = true; } LOG.info("Minor Compaction : enabled=" + enableMinorCompaction + ", threshold=" + minorCompactionThreshold + ", interval=" + minorCompactionInterval); LOG.info("Major Compaction : enabled=" + enableMajorCompaction + ", threshold=" + majorCompactionThreshold + ", interval=" + majorCompactionInterval); - LOG.info("Entry Location Compaction : enabled=" + enableEntryLocationCompaction + ", interval=" - + entryLocationCompactionInterval); + LOG.info("Entry Location Compaction : interval=" + entryLocationCompactionInterval + ", randomCompactionDelay=" + + randomCompactionDelay); lastMinorCompactionTime = lastMajorCompactionTime = lastEntryLocationCompactionTime = System.currentTimeMillis(); } @@ -505,15 +506,18 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin minorCompacting.set(false); } } - if (enableEntryLocationCompaction && (curTime - lastEntryLocationCompactionTime - > entryLocationCompactionInterval)) { + if (entryLocationCompactionInterval > 0 && (curTime - lastEntryLocationCompactionTime > ( + entryLocationCompactionInterval + randomCompactionDelay))) { // enter entry location compaction LOG.info( - "Enter entry location compaction, entryLocationCompactionInterval {}, " - + "lastEntryLocationCompactionTime {}", - entryLocationCompactionInterval, lastEntryLocationCompactionTime); + "Enter entry location compaction, entryLocationCompactionInterval {}, randomCompactionDelay " + + "{}, lastEntryLocationCompactionTime {}", + entryLocationCompactionInterval, randomCompactionDelay, lastEntryLocationCompactionTime); ledgerStorage.entryLocationCompact(); lastEntryLocationCompactionTime = System.currentTimeMillis(); + randomCompactionDelay = ThreadLocalRandom.current().nextLong(entryLocationCompactionInterval); + LOG.info("Next entry location compaction interval {}", + entryLocationCompactionInterval + randomCompactionDelay); gcStats.getEntryLocationCompactionCounter().inc(); } gcStats.getCompactRuntime()