Skip to content

Commit

Permalink
Use ForkJoinPool as executor service.
Browse files Browse the repository at this point in the history
  • Loading branch information
mstyura committed Aug 16, 2022
1 parent 7d5f6ca commit 6865c52
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 37 deletions.
5 changes: 4 additions & 1 deletion objectbox-java/src/main/java/io/objectbox/BoxStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -227,7 +228,7 @@ public static boolean isSyncServerAvailable() {
private final int[] allEntityTypeIds;
private final Map<Class<?>, Box<?>> boxes = new ConcurrentHashMap<>();
private final Set<Transaction> transactions = Collections.newSetFromMap(new WeakHashMap<>());
private final ExecutorService threadPool = new ObjectBoxThreadPool(this);
private final ExecutorService threadPool;
private final ObjectClassPublisher objectClassPublisher;
final boolean debugTxRead;
final boolean debugTxWrite;
Expand Down Expand Up @@ -257,6 +258,8 @@ public static boolean isSyncServerAvailable() {
private SyncClient syncClient;

BoxStore(BoxStoreBuilder builder) {
threadPool = Executors.unconfigurableExecutorService(
new ObjectBoxThreadPool(this, builder.executorServiceParallelism));
context = builder.context;
relinker = builder.relinker;
NativeLibraryLoader.ensureLoaded();
Expand Down
12 changes: 12 additions & 0 deletions objectbox-java/src/main/java/io/objectbox/BoxStoreBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -94,6 +95,8 @@ public class BoxStoreBuilder {
int maxReaders;
boolean noReaderThreadLocals;

int executorServiceParallelism = ForkJoinPool.getCommonPoolParallelism();

int queryAttempts;

/** For DebugCursor. */
Expand Down Expand Up @@ -319,6 +322,15 @@ public BoxStoreBuilder noReaderThreadLocals() {
return this;
}

/**
* Sets the maximum allowed level of parallelism allowed by executor service
* used by BoxStore. The default value is equal to {@ref ForkJoinPool#getCommonPoolParallelism())}
*/
public BoxStoreBuilder executorServiceParallelism(int parallelism) {
this.executorServiceParallelism = parallelism;
return this;
}

@Internal
public void entity(EntityInfo<?> entityInfo) {
entityInfoList.add(entityInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,65 +17,84 @@
package io.objectbox.internal;

import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import io.objectbox.BoxStore;
import io.objectbox.annotation.apihint.Internal;

/**
* Custom thread pool similar to {@link Executors#newCachedThreadPool()} with the following adjustments:
* Custom executor service similar to {@link Executors#newWorkStealingPool()} with the following adjustments:
* <ul>
* <li>Release thread local resources ({@link BoxStore#closeThreadResources()})</li>
* <li>Reduce keep-alive time for threads to 20 seconds</li>
* <li>Uses a ThreadFactory to name threads like "ObjectBox-1-Thread-1"</li>
* <li>Release thread local resources ({@link BoxStore#closeThreadResources()}) after task execution</li>
* <li>Uses a custom thread factory to name threads like "ObjectBox-ForkJoinPool-1-Thread-1"</li>
* </ul>
*
*/
@Internal
public class ObjectBoxThreadPool extends ThreadPoolExecutor {
public final class ObjectBoxThreadPool extends AbstractExecutorService {
private final BoxStore boxStore;
private final ExecutorService executorImpl;

public ObjectBoxThreadPool(BoxStore boxStore) {
super(0, Integer.MAX_VALUE, 20L, TimeUnit.SECONDS, new SynchronousQueue<>(),
new ObjectBoxThreadFactory());
public ObjectBoxThreadPool(BoxStore boxStore, int parallelism) {
this.boxStore = boxStore;
this.executorImpl = Executors.unconfigurableExecutorService(
new ForkJoinPool(
parallelism,
pool -> {
ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
// Priority and daemon status are inherited from calling thread; ensure to reset if required
if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}
if (thread.isDaemon()) {
thread.setDaemon(false);
}
thread.setName("ObjectBox-" + thread.getName());
return thread;
},
null,
false));
}


@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
super.afterExecute(runnable, throwable);
boxStore.closeThreadResources();
public void shutdown() {
executorImpl.shutdown();
}

static class ObjectBoxThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_COUNT = new AtomicInteger();
@Override
public List<Runnable> shutdownNow() {
return executorImpl.shutdownNow();
}

private final ThreadGroup group;
private final String namePrefix = "ObjectBox-" + POOL_COUNT.incrementAndGet() + "-Thread-";
private final AtomicInteger threadCount = new AtomicInteger();
@Override
public boolean isShutdown() {
return executorImpl.isShutdown();
}

ObjectBoxThreadFactory() {
SecurityManager securityManager = System.getSecurityManager();
group = (securityManager != null) ? securityManager.getThreadGroup() :
Thread.currentThread().getThreadGroup();
}
@Override
public boolean isTerminated() {
return executorImpl.isTerminated();
}

public Thread newThread(Runnable runnable) {
String name = namePrefix + threadCount.incrementAndGet();
Thread thread = new Thread(group, runnable, name);
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executorImpl.awaitTermination(timeout, unit);
}

// Priority and daemon status are inherited from calling thread; ensure to reset if required
if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}
if (thread.isDaemon()) {
thread.setDaemon(false);
@Override
public void execute(Runnable command) {
executorImpl.execute(() -> {
try {
command.run();
} finally {
boxStore.closeThreadResources();
}
return thread;
}
});
}
}

0 comments on commit 6865c52

Please sign in to comment.