Skip to content

Commit

Permalink
Add Manual Compaction to HaloDB (yahoo#45)
Browse files Browse the repository at this point in the history
  Add the 'forceCompaction' method to HaloDB,
  which takes a compactionThreshold value and
  will trigger compaction on all files that have
  stale data above the threshold.

  Additionally clean up how pauseCompaction()
  works so that it awaits pending compactions.
  • Loading branch information
Scott Carey committed Dec 3, 2019
1 parent 4e617b4 commit 281ea00
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 115 deletions.
102 changes: 54 additions & 48 deletions src/main/java/com/oath/halodb/CompactionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@

package com.oath.halodb;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.util.concurrent.RateLimiter;

class CompactionManager {
private static final Logger logger = LoggerFactory.getLogger(CompactionManager.class);

Expand All @@ -41,7 +41,11 @@ class CompactionManager {
private volatile long totalSizeOfRecordsCopied = 0;
private volatile long compactionStartTime = System.currentTimeMillis();

private static final int STOP_SIGNAL = -10101;
// These are purposely 'newed' up because we use reference equality to check the signals and the value does not matter
// signal for the compactor to top its thread after finishing already queued tasks
private static final Integer STOP_SIGNAL = new Integer(-1);
// signal for the compactor thread to stop its thread after finishing any active task but not taking more tasks;
private static final Integer WAKE_SIGNAL = new Integer(-1);

private final ReentrantLock startStopLock = new ReentrantLock();
private volatile boolean stopInProgress = false;
Expand All @@ -53,16 +57,21 @@ class CompactionManager {
}

// If a file is being compacted we wait for it complete before stopping.
boolean stopCompactionThread(boolean closeCurrentWriteFile) throws IOException {
boolean stopCompactionThread(boolean closeCurrentWriteFile, boolean awaitPending) throws IOException {
stopInProgress = true;
startStopLock.lock();
try {
isRunning = false;
if (isCompactionRunning()) {
// We don't want to call interrupt on compaction thread as it
// may interrupt IO operations and leave files in an inconsistent state.
// instead we use -10101 as a stop signal.
compactionQueue.put(STOP_SIGNAL);
if (awaitPending) {
// we send a stop signal that will stop the thread after existing items in the queue complete
compactionQueue.put(STOP_SIGNAL);
} else {
// set the running flag to false, then send the wake signal. If the queue is empty it will immediately
// consume the signal to wake up the thread and stop.
// if the queue is not empty, then after the current task completes the 'isRunning' flag will stop it
isRunning = false;
compactionQueue.put(WAKE_SIGNAL);
}
compactionThread.join();
if (closeCurrentWriteFile && currentWriteFile != null) {
currentWriteFile.flushToDisk();
Expand Down Expand Up @@ -95,9 +104,14 @@ void startCompactionThread() {
}
}

void pauseCompactionThread() throws IOException {
/**
* Stop the compaction thread, blocking until it has stopped.
* If awaitPending is true, stops after all outstanding compaction tasks in the queue
* have completed. Otherwise, stops after the current task completes.
**/
void pauseCompactionThread(boolean awaitPending) throws IOException {
logger.info("Pausing compaction thread ...");
stopCompactionThread(false);
stopCompactionThread(false, awaitPending);
}

void resumeCompaction() {
Expand All @@ -109,7 +123,7 @@ int getCurrentWriteFileId() {
return currentWriteFile != null ? currentWriteFile.getFileId() : -1;
}

boolean submitFileForCompaction(int fileId) {
boolean submitFileForCompaction(Integer fileId) {
return compactionQueue.offer(fileId);
}

Expand Down Expand Up @@ -178,39 +192,53 @@ private class CompactionThread extends Thread {
startStopLock.lock();
try {
compactionThread = null;
startCompactionThread();
if (isRunning) {
startCompactionThread();
}
} finally {
startStopLock.unlock();
}
}
else {
logger.info("Not restarting thread as the lock is held by stop compaction method.");
}

});
}

@Override
public void run() {
logger.info("Starting compaction thread ...");
int fileToCompact = -1;

while (isRunning) {
Integer fileToCompact = null;
try {
fileToCompact = compactionQueue.take();
if (fileToCompact == STOP_SIGNAL) {
fileToCompact = compactionQueue.poll(1, TimeUnit.SECONDS);
if (fileToCompact == STOP_SIGNAL) { // reference, not value equality on purpose, these are sentinel objects
logger.debug("Received a stop signal.");
// skip rest of the steps and check status of isRunning flag.
// while pausing/stopping compaction isRunning flag must be set to false.
// in this case, isRunning was not set to false already. The signal had to work its way through the
// queue behind the other tasks. So set 'isRunning' to false and break out of the loop to halt.
isRunning = false;
break;
}
if (fileToCompact == WAKE_SIGNAL || fileToCompact == null) {
// scenario: the queue has a long list of files to compact. We add this signal to the queue after
// setting 'isRunning' to false, so all we need to do is break out of the loop and it will shut down
// without processing more tasks.
// If we do break out of this loop with tasks in the queue, then this signal may still be in the queue
// behind those tasks.
// If the thread is resumed later, the signal will be processed after resuming the compactor.
// If we were to set 'isRunning' to false here, that would shut down the
// recently resumed thread when this signal arrived.
continue;
}
logger.debug("Compacting {} ...", fileToCompact);
copyFreshRecordsToNewFile(fileToCompact);
logger.debug("Completed compacting {} to {}", fileToCompact, getCurrentWriteFileId());
dbInternal.markFileAsCompacted(fileToCompact);
dbInternal.deleteHaloDBFile(fileToCompact);
}
catch (Exception e) {
fileToCompact = null;
} catch (InterruptedException ie) {
break;
} catch (Exception e) {
logger.error(String.format("Error while compacting file %d to %d", fileToCompact, getCurrentWriteFileId()), e);
}
}
Expand Down Expand Up @@ -322,26 +350,4 @@ void forceRolloverCurrentWriteFile() throws IOException {
dbInternal.getDbDirectory().syncMetaData();
currentWriteFileOffset = 0;
}

// Used only for tests. to be called only after all writes in the test have been performed.
@VisibleForTesting
synchronized boolean isCompactionComplete() {

if (!isCompactionRunning())
return true;

if (compactionQueue.isEmpty()) {
try {
isRunning = false;
submitFileForCompaction(STOP_SIGNAL);
compactionThread.join();
} catch (InterruptedException e) {
logger.error("Error in isCompactionComplete", e);
}

return true;
}

return false;
}
}
28 changes: 17 additions & 11 deletions src/main/java/com/oath/halodb/HaloDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@

package com.oath.halodb;

import com.google.common.annotations.VisibleForTesting;

import java.io.File;
import java.io.IOException;
import java.util.Set;

import com.google.common.annotations.VisibleForTesting;

public final class HaloDB {

Expand Down Expand Up @@ -80,14 +79,27 @@ public HaloDBIterator newIterator() throws HaloDBException {
return new HaloDBIterator(dbInternal);
}

public void pauseCompaction() throws HaloDBException {
/**
* Force a compaction on all data files that have more stale data than the provided threshold ratio.
* A compactionThreshold of 0 would force all files that have any stale data to compact,
* 0.1 would force those that have more than 10% space stale to compact.
**/
public void forceCompaction(float compactionThreshold) {
dbInternal.forceCompaction(compactionThreshold);
}

public void pauseCompaction(boolean awaitPending) throws HaloDBException {
try {
dbInternal.pauseCompaction();
dbInternal.pauseCompaction(awaitPending);
} catch (IOException e) {
throw new HaloDBException("Error while trying to pause compaction thread", e);
}
}

public void pauseCompaction() throws HaloDBException {
pauseCompaction(false);
}

public boolean snapshot() {
return dbInternal.takeSnapshot();
}
Expand All @@ -105,12 +117,6 @@ public void resumeCompaction() {
}

// methods used in tests.

@VisibleForTesting
boolean isCompactionComplete() {
return dbInternal.isCompactionComplete();
}

@VisibleForTesting
boolean isTombstoneFilesMerging() {
return dbInternal.isTombstoneFilesMerging();
Expand Down
24 changes: 14 additions & 10 deletions src/main/java/com/oath/halodb/HaloDBInternal.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class HaloDBInternal {

private volatile Thread tombstoneMergeThread;

private Map<Integer, HaloDBFile> readFileMap = new ConcurrentHashMap<>();
private final Map<Integer, HaloDBFile> readFileMap = new ConcurrentHashMap<>();

HaloDBOptions options;

Expand Down Expand Up @@ -167,7 +167,7 @@ synchronized void close() throws IOException {
isClosing = true;

try {
if(!compactionManager.stopCompactionThread(true))
if(!compactionManager.stopCompactionThread(true, false))
setIOErrorFlag();
} catch (IOException e) {
logger.error("Error while stopping compaction thread. Setting IOError flag", e);
Expand Down Expand Up @@ -314,7 +314,7 @@ synchronized boolean takeSnapshot() {

try {
final int currentWriteFileId;
compactionManager.pauseCompactionThread();
compactionManager.pauseCompactionThread(false);

// Only support one snapshot now
// TODO: support multiple snapshots if needed
Expand Down Expand Up @@ -417,8 +417,8 @@ void setIOErrorFlag() throws IOException {
metaData.storeToFile();
}

void pauseCompaction() throws IOException {
compactionManager.pauseCompactionThread();
void pauseCompaction(boolean awaitPending) throws IOException {
compactionManager.pauseCompactionThread(awaitPending);
}

void resumeCompaction() {
Expand Down Expand Up @@ -481,6 +481,15 @@ private void markPreviousVersionAsStale(byte[] key, InMemoryIndexMetaData record
addFileToCompactionQueueIfThresholdCrossed(recordMetaData.getFileId(), staleRecordSize);
}

void forceCompaction(float compactionThreshold) {
staleDataPerFileMap.forEach((fileId, staleData) -> {
HaloDBFile file = readFileMap.get(fileId);
if (staleData > 0 && staleData >= file.getSize() * compactionThreshold) {
compactionManager.submitFileForCompaction(fileId);
}
});
}

void addFileToCompactionQueueIfThresholdCrossed(int fileId, int staleRecordSize) {
HaloDBFile file = readFileMap.get(fileId);
if (file == null)
Expand Down Expand Up @@ -955,11 +964,6 @@ private Map<Integer, Double> computeStaleDataMapForStats() {
}

// Used only in tests.
@VisibleForTesting
boolean isCompactionComplete() {
return compactionManager.isCompactionComplete();
}

@VisibleForTesting
boolean isTombstoneFilesMerging() {
return isTombstoneFilesMerging;
Expand Down
Loading

0 comments on commit 281ea00

Please sign in to comment.