Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SST files not being cleaned up in the locations folder #4555

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public interface BookKeeperServerStats {
String THREAD_RUNTIME = "THREAD_RUNTIME";
String MAJOR_COMPACTION_COUNT = "MAJOR_COMPACTION_TOTAL";
String MINOR_COMPACTION_COUNT = "MINOR_COMPACTION_TOTAL";
String ENTRY_LOCATION_COMPACTION_COUNT = "ENTRY_LOCATION_COMPACTION_TOTAL";
String ACTIVE_LEDGER_COUNT = "ACTIVE_LEDGER_TOTAL";
String DELETED_LEDGER_COUNT = "DELETED_LEDGER_TOTAL";
String GC_LEDGER_RUNTIME = "GC_LEDGER_RUNTIME";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class GarbageCollectionStatus {

private long lastMajorCompactionTime;
private long lastMinorCompactionTime;
private long lastEntryLocationCompactionTime;
private long majorCompactionCounter;
private long minorCompactionCounter;
private long entryLocationCompactionCounter;
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -87,6 +88,10 @@ public class GarbageCollectorThread implements Runnable {
long majorCompactionMaxTimeMillis;
long lastMajorCompactionTime;

final long entryLocationCompactionInterval;
long randomCompactionDelay;
long lastEntryLocationCompactionTime;

@Getter
final boolean isForceGCAllowWhenNoSpace;

Expand Down Expand Up @@ -211,6 +216,8 @@ public GarbageCollectorThread(ServerConfiguration conf,
isForceGCAllowWhenNoSpace = conf.getIsForceGCAllowWhenNoSpace();
majorCompactionMaxTimeMillis = conf.getMajorCompactionMaxTimeMillis();
minorCompactionMaxTimeMillis = conf.getMinorCompactionMaxTimeMillis();
entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval() * SECOND;
randomCompactionDelay= ThreadLocalRandom.current().nextLong(entryLocationCompactionInterval);

boolean isForceAllowCompaction = conf.isForceAllowCompaction();

Expand Down Expand Up @@ -277,12 +284,21 @@ public void removeEntryLog(long logToRemove) {
}
}

if (entryLocationCompactionInterval > 0) {
if (entryLocationCompactionInterval < gcWaitTime) {
throw new IOException(
"Too short entry location compaction interval : " + entryLocationCompactionInterval);
}
}

LOG.info("Minor Compaction : enabled=" + enableMinorCompaction + ", threshold="
+ minorCompactionThreshold + ", interval=" + minorCompactionInterval);
LOG.info("Major Compaction : enabled=" + enableMajorCompaction + ", threshold="
+ majorCompactionThreshold + ", interval=" + majorCompactionInterval);
LOG.info("Entry Location Compaction : interval=" + entryLocationCompactionInterval + ", randomCompactionDelay="
+ randomCompactionDelay);

lastMinorCompactionTime = lastMajorCompactionTime = System.currentTimeMillis();
lastMinorCompactionTime = lastMajorCompactionTime = lastEntryLocationCompactionTime = System.currentTimeMillis();
}

private EntryLogMetadataMap createEntryLogMetadataMap() throws IOException {
Expand Down Expand Up @@ -470,6 +486,7 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin
gcStats.getMajorCompactionCounter().inc();
majorCompacting.set(false);
}

} else if (((isForceMinorCompactionAllow && force) || (enableMinorCompaction
&& (force || curTime - lastMinorCompactionTime > minorCompactionInterval)))
&& (!suspendMinor)) {
Expand All @@ -489,6 +506,20 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin
minorCompacting.set(false);
}
}
if (entryLocationCompactionInterval > 0 && (curTime - lastEntryLocationCompactionTime > (
entryLocationCompactionInterval + randomCompactionDelay))) {
// enter entry location compaction
LOG.info(
"Enter entry location compaction, entryLocationCompactionInterval {}, randomCompactionDelay "
+ "{}, lastEntryLocationCompactionTime {}",
entryLocationCompactionInterval, randomCompactionDelay, lastEntryLocationCompactionTime);
ledgerStorage.entryLocationCompact();
lastEntryLocationCompactionTime = System.currentTimeMillis();
randomCompactionDelay = ThreadLocalRandom.current().nextLong(entryLocationCompactionInterval);
LOG.info("Next entry location compaction interval {}",
entryLocationCompactionInterval + randomCompactionDelay);
gcStats.getEntryLocationCompactionCounter().inc();
}
gcStats.getCompactRuntime()
.registerSuccessfulEvent(MathUtils.elapsedNanos(compactStart), TimeUnit.NANOSECONDS);
gcStats.getGcThreadRuntime().registerSuccessfulEvent(
Expand Down Expand Up @@ -855,8 +886,10 @@ public GarbageCollectionStatus getGarbageCollectionStatus() {
.minorCompacting(minorCompacting.get())
.lastMajorCompactionTime(lastMajorCompactionTime)
.lastMinorCompactionTime(lastMinorCompactionTime)
.lastEntryLocationCompactionTime(lastEntryLocationCompactionTime)
.majorCompactionCounter(gcStats.getMajorCompactionCounter().get())
.minorCompactionCounter(gcStats.getMinorCompactionCounter().get())
.entryLocationCompactionCounter(gcStats.getEntryLocationCompactionCounter().get())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.COMPACT_RUNTIME;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.DELETED_LEDGER_COUNT;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRY_LOCATION_COMPACTION_COUNT;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRY_LOG_COMPACT_RATIO;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRY_LOG_SPACE_BYTES;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.EXTRACT_META_RUNTIME;
Expand Down Expand Up @@ -67,6 +68,11 @@ public class GarbageCollectorStats {
help = "Number of major compactions"
)
private final Counter majorCompactionCounter;
@StatsDoc(
name = ENTRY_LOCATION_COMPACTION_COUNT,
help = "Number of entry location compactions"
)
private final Counter entryLocationCompactionCounter;
@StatsDoc(
name = RECLAIMED_DELETION_SPACE_BYTES,
help = "Number of disk space bytes reclaimed via deleting entry log files"
Expand Down Expand Up @@ -147,6 +153,7 @@ public GarbageCollectorStats(StatsLogger statsLogger,

this.minorCompactionCounter = statsLogger.getCounter(MINOR_COMPACTION_COUNT);
this.majorCompactionCounter = statsLogger.getCounter(MAJOR_COMPACTION_COUNT);
this.entryLocationCompactionCounter = statsLogger.getCounter(ENTRY_LOCATION_COMPACTION_COUNT);
this.reclaimedSpaceViaCompaction = statsLogger.getCounter(RECLAIMED_COMPACTION_SPACE_BYTES);
this.reclaimedSpaceViaDeletes = statsLogger.getCounter(RECLAIMED_DELETION_SPACE_BYTES);
this.reclaimFailedToDelete = statsLogger.getCounter(RECLAIM_FAILED_TO_DELETE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ public boolean isMinorGcSuspended() {
public void entryLocationCompact() {
if (entryLocationIndex.isCompacting()) {
// RocksDB already running compact.
log.info("Compacting directory {}, skipping this entryLocationCompaction this time.",
entryLocationIndex.getEntryLocationDBPath());
return;
}
cleanupExecutor.execute(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
protected static final String COMPACTION_RATE = "compactionRate";
protected static final String COMPACTION_RATE_BY_ENTRIES = "compactionRateByEntries";
protected static final String COMPACTION_RATE_BY_BYTES = "compactionRateByBytes";
protected static final String ENTRY_LOCATION_COMPACTION_INTERVAL = "entryLocationCompactionInterval";

// Gc Parameters
protected static final String GC_WAIT_TIME = "gcWaitTime";
Expand Down Expand Up @@ -2974,6 +2975,31 @@ public ServerConfiguration setCompactionRateByBytes(int rate) {
return this;
}

/**
* Get interval to run entry location compaction, in seconds.
*
* <p>If it is set to less than zero, the entry location compaction is disabled.
*
* @return high water mark.
*/
public long getEntryLocationCompactionInterval() {
return getLong(ENTRY_LOCATION_COMPACTION_INTERVAL, -1);
}

/**
* Set interval to run entry location compaction.
*
* @see #getMajorCompactionInterval()
*
* @param interval
* Interval to run entry location compaction
* @return server configuration
*/
public ServerConfiguration setEntryLocationCompactionInterval(long interval) {
setProperty(ENTRY_LOCATION_COMPACTION_INTERVAL, interval);
return this;
}

/**
* Should we remove pages from page cache after force write.
*
Expand Down Expand Up @@ -3214,6 +3240,9 @@ public void validate() throws ConfigurationException {
if (getMajorCompactionInterval() > 0 && getMajorCompactionInterval() * SECOND < getGcWaitTime()) {
throw new ConfigurationException("majorCompactionInterval should be >= gcWaitTime.");
}
if (getEntryLocationCompactionInterval() > 0 && getEntryLocationCompactionInterval() * SECOND < getGcWaitTime()) {
throw new ConfigurationException("entryLocationCompactionInterval should be >= gcWaitTime.");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public void testEntryLogSizeLimit() throws ConfigurationException {
public void testCompactionSettings() throws ConfigurationException {
ServerConfiguration conf = new ServerConfiguration();
long major, minor;
long entryLocationCompactionInterval;

// Default Values
major = conf.getMajorCompactionMaxTimeMillis();
Expand Down Expand Up @@ -239,5 +240,24 @@ public void testCompactionSettings() throws ConfigurationException {
minorThreshold = conf.getMinorCompactionThreshold();
Assert.assertEquals(0.6, majorThreshold, 0.00001);
Assert.assertEquals(0.3, minorThreshold, 0.00001);

// Default Values
entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval();
Assert.assertEquals(-1, entryLocationCompactionInterval);

// Set entry location compaction
conf.setEntryLocationCompactionInterval(3600);
entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval();
Assert.assertEquals(3600, entryLocationCompactionInterval);

conf.setEntryLocationCompactionInterval(550);
try {
conf.validate();
fail();
} catch (ConfigurationException ignore) {
}

conf.setEntryLocationCompactionInterval(650);
conf.validate();
}
}
5 changes: 5 additions & 0 deletions conf/bk_server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,11 @@ ledgerDirectories=/tmp/bk-data
# Set the rate at which compaction will readd entries. The unit is bytes added per second.
# compactionRateByBytes=1000000

# Interval to run entry location compaction, in seconds
# If it is set to less than zero, the entry location compaction is disabled.
# Note: should be greater than gcWaitTime.
# entryLocationCompactionInterval=-1

# Flag to enable/disable transactional compaction. If it is set to true, it will use transactional compaction,
# which it will use new entry log files to store compacted entries during compaction; if it is set to false,
# it will use normal compaction, which it shares same entry log file with normal add operations.
Expand Down
Loading