Skip to content

Commit

Permalink
fix(stream): wrap runnable that is executed in thread pool to catch e…
Browse files Browse the repository at this point in the history
…xception (#923)

Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Feb 6, 2024
1 parent da52ecf commit 480eb04
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,16 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3StreamClient implements StreamClient {
private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamClient.class);
private final ScheduledThreadPoolExecutor streamObjectCompactionScheduler = Threads.newSingleThreadScheduledExecutor(
private final ScheduledExecutorService streamObjectCompactionScheduler = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("stream-object-compaction-scheduler", true), LOGGER, true);
private final Map<Long, S3Stream> openedStreams;
private final StreamManager streamManager;
Expand Down
11 changes: 11 additions & 0 deletions s3stream/src/main/java/com/automq/stream/utils/ThreadUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;

/**
* Utilities for working with threads.
Expand Down Expand Up @@ -46,4 +47,14 @@ public Thread newThread(Runnable r) {
}
};
}

public static Runnable wrapRunnable(Runnable runnable, Logger logger) {
return () -> {
try {
runnable.run();
} catch (Throwable throwable) {
logger.error("[FATAL] Uncaught exception in executor thread {}", Thread.currentThread().getName(), throwable);
}
};
}
}
30 changes: 22 additions & 8 deletions s3stream/src/main/java/com/automq/stream/utils/Threads.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -44,25 +46,37 @@ public static ExecutorService newFixedThreadPoolWithMonitor(int nThreads, String
});
}

public static ScheduledThreadPoolExecutor newSingleThreadScheduledExecutor(String name, boolean daemon,
public static ScheduledExecutorService newSingleThreadScheduledExecutor(String name, boolean daemon,
Logger logger) {
return newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory(name, true), logger, false);
}

public static ScheduledThreadPoolExecutor newSingleThreadScheduledExecutor(ThreadFactory threadFactory,
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory,
Logger logger) {
return newSingleThreadScheduledExecutor(threadFactory, logger, false);
}

public static ScheduledThreadPoolExecutor newSingleThreadScheduledExecutor(ThreadFactory threadFactory,
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory,
Logger logger, boolean removeOnCancelPolicy) {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t != null) {
logger.error("[FATAL] Uncaught exception in executor thread {}", Thread.currentThread().getName(), t);
}
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
command = ThreadUtils.wrapRunnable(command, logger);
return super.schedule(command, delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,
TimeUnit unit) {
command = ThreadUtils.wrapRunnable(command, logger);
return super.scheduleAtFixedRate(command, initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
TimeUnit unit) {
command = ThreadUtils.wrapRunnable(command, logger);
return super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
};
executor.setRemoveOnCancelPolicy(removeOnCancelPolicy);
Expand Down

0 comments on commit 480eb04

Please sign in to comment.