Skip to content

Commit

Permalink
Made ExecutorService configurable in AdvancedRangeCache.Builder
Browse files Browse the repository at this point in the history
  • Loading branch information
Aklakan committed Sep 8, 2024
1 parent abad2d5 commit 1c2e8a6
Showing 1 changed file with 25 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ public class AdvancedRangeCacheImpl<T>
// Number of items a worker processes in bulk before signalling available data
protected int workerBulkSize;

protected ExecutorService executorService =
MoreExecutors.getExitingExecutorService((ThreadPoolExecutor)Executors.newCachedThreadPool());
protected ExecutorService executorService;

public AdvancedRangeCacheImpl(
ReadableChannelSource<T> dataSource,
Slice<T> slice,
long requestLimit,
int workerBulkSize,
Duration terminationDelay,
int maxReadAheadItemCount) {
int maxReadAheadItemCount,
ExecutorService executorService) {

this.dataSource = dataSource;

Expand All @@ -64,23 +64,24 @@ public AdvancedRangeCacheImpl(
this.workerBulkSize = workerBulkSize;
this.terminationDelay = terminationDelay;
this.maxReadAheadItemCount = maxReadAheadItemCount;
this.executorService = executorService;
}

@Override
public ArrayOps<T> getArrayOps() {
return slice.getArrayOps();
}

public static <A> AdvancedRangeCacheImpl<A> create(
ReadableChannelSource<A> dataSource,
Slice<A> slice,
long requestLimit,
int workerBulkSize,
Duration terminationDelay,
int maxReadAheadItemCount) {

return new AdvancedRangeCacheImpl<>(dataSource, slice, requestLimit, workerBulkSize, terminationDelay, maxReadAheadItemCount);
}
// public static <A> AdvancedRangeCacheImpl<A> create(
// ReadableChannelSource<A> dataSource,
// Slice<A> slice,
// long requestLimit,
// int workerBulkSize,
// Duration terminationDelay,
// int maxReadAheadItemCount) {
//
// return new AdvancedRangeCacheImpl<>(dataSource, slice, requestLimit, workerBulkSize, terminationDelay, maxReadAheadItemCount);
// }


public ReadableChannelSource<T> getDataSource() {
Expand Down Expand Up @@ -225,6 +226,7 @@ public static class Builder<A> {
protected Duration terminationDelay;

protected int maxReadAheadItemCount;
protected ExecutorService executorService;

public ReadableChannelSource<A> getDataSource() {
return dataSource;
Expand Down Expand Up @@ -280,6 +282,11 @@ public Builder<A> setMaxReadAheadItemCount(int maxReadAheadItemCount) {
return this;
}

public Builder<A> setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}


// public Duration getSyncDelay() {
// return syncDelay;
Expand All @@ -291,7 +298,11 @@ public Builder<A> setMaxReadAheadItemCount(int maxReadAheadItemCount) {
// }

public AdvancedRangeCacheImpl<A> build() {
return AdvancedRangeCacheImpl.create(dataSource, slice, requestLimit, workerBulkSize, terminationDelay, maxReadAheadItemCount);
ExecutorService finalExecutorService = executorService != null
? executorService
: MoreExecutors.getExitingExecutorService((ThreadPoolExecutor)Executors.newCachedThreadPool());

return new AdvancedRangeCacheImpl<>(dataSource, slice, requestLimit, workerBulkSize, terminationDelay, maxReadAheadItemCount, finalExecutorService);
}
}

Expand Down

0 comments on commit 1c2e8a6

Please sign in to comment.