Skip to content

Commit

Permalink
Change Remote state read thread pool to Fixed type
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <[email protected]>
  • Loading branch information
soosinha committed Dec 16, 2024
1 parent b359dd8 commit 1c55c53
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,13 @@ private ClusterState getStateFromLocalNode(GetTermVersionResponse termVersionRes

if (remoteClusterStateService != null && termVersionResponse.isStatePresentInRemote()) {
try {
logger.info(
() -> new ParameterizedMessage(
"Term version checker downloading full cluster state for term {}, version {}",
termVersion.getTerm(),
termVersion.getVersion()
)
);
ClusterStateTermVersion clusterStateTermVersion = termVersionResponse.getClusterStateTermVersion();
Optional<ClusterMetadataManifest> clusterMetadataManifest = remoteClusterStateService
.getClusterMetadataManifestByTermVersion(
Expand All @@ -454,7 +461,7 @@ private ClusterState getStateFromLocalNode(GetTermVersionResponse termVersionRes
return clusterStateFromRemote;
}
} catch (Exception e) {
logger.trace("Error while fetching from remote cluster state", e);
logger.error("Error while fetching from remote cluster state", e);

Check warning on line 464 in server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java#L464

Added line #L464 was not covered by tests
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
}

if (applyFullState == true) {
logger.debug(
logger.info(
() -> new ParameterizedMessage(
"Downloading full cluster state for term {}, version {}, stateUUID {}",
manifest.getClusterTerm(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1473,8 +1473,22 @@ public ClusterState getClusterStateForManifest(
try {
ClusterState stateFromCache = remoteClusterStateCache.getState(clusterName, manifest);
if (stateFromCache != null) {
logger.trace(
() -> new ParameterizedMessage(

Check warning on line 1477 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1476-L1477

Added lines #L1476 - L1477 were not covered by tests
"Found cluster state in cache for term {} and version {}",
manifest.getClusterTerm(),
manifest.getStateVersion()

Check warning on line 1480 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1479-L1480

Added lines #L1479 - L1480 were not covered by tests
)
);
return stateFromCache;
}
logger.info(
() -> new ParameterizedMessage(
"Cluster state not found in cache for term {} and version {}",
manifest.getClusterTerm(),
manifest.getStateVersion()
)
);

final ClusterState clusterState;
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public static ThreadPoolType fromType(String type) {
map.put(Names.REMOTE_PURGE, ThreadPoolType.SCALING);
map.put(Names.REMOTE_REFRESH_RETRY, ThreadPoolType.SCALING);
map.put(Names.REMOTE_RECOVERY, ThreadPoolType.SCALING);
map.put(Names.REMOTE_STATE_READ, ThreadPoolType.SCALING);
map.put(Names.REMOTE_STATE_READ, ThreadPoolType.FIXED);
map.put(Names.INDEX_SEARCHER, ThreadPoolType.RESIZABLE);
map.put(Names.REMOTE_STATE_CHECKSUM, ThreadPoolType.FIXED);
THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
Expand Down Expand Up @@ -306,7 +306,7 @@ public ThreadPool(
);
builders.put(
Names.REMOTE_STATE_READ,
new ScalingExecutorBuilder(Names.REMOTE_STATE_READ, 1, boundedBy(4 * allocatedProcessors, 4, 32), TimeValue.timeValueMinutes(5))
new FixedExecutorBuilder(settings, Names.REMOTE_STATE_READ, boundedBy(4 * allocatedProcessors, 4, 32), 120000)
);
builders.put(
Names.INDEX_SEARCHER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso
sizes.put(ThreadPool.Names.REMOTE_PURGE, ThreadPool::halfAllocatedProcessors);
sizes.put(ThreadPool.Names.REMOTE_REFRESH_RETRY, ThreadPool::halfAllocatedProcessors);
sizes.put(ThreadPool.Names.REMOTE_RECOVERY, ThreadPool::twiceAllocatedProcessors);
sizes.put(ThreadPool.Names.REMOTE_STATE_READ, n -> ThreadPool.boundedBy(4 * n, 4, 32));
return sizes.get(threadPoolName).apply(numberOfProcessors);
}

Expand Down

0 comments on commit 1c55c53

Please sign in to comment.