Skip to content

Commit

Permalink
Some refactoring and name changes
Browse files Browse the repository at this point in the history
Signed-off-by: Shubh Sahu <[email protected]>
  • Loading branch information
Shubh Sahu committed Apr 10, 2024
1 parent f28ef5c commit c5df7d4
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 40 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656))
- [Remote Store] Make translog transfer timeout configurable ([#12704](https://github.com/opensearch-project/OpenSearch/pull/12704))
- Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686))
- [Remote Store] Reallow index & cluster default Refresh Interval to be set as -1 ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992))
- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992))
- Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void testRefreshOnTooManyRemoteTranslogFiles() throws Exception {

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.getKey(), "5")
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "5")
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS
)
)
);
Expand Down
18 changes: 6 additions & 12 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4485,9 +4485,9 @@ public Durability getTranslogDurability() {
private final AtomicBoolean flushOrRollRunning = new AtomicBoolean();

/**
* Checks if the shard need to be Refreshed depending on Translog constraints.
* Each Translog type can have it's own decider
* @return {@code true} if the shard should be Refreshed
* Checks if the shard need to be refreshed depending on translog constraints.
* each translog type can have it's own decider
* @return {@code true} if the shard should be refreshed
*/
public boolean shouldRefreshShard() {
final Engine engine = getEngineOrNull();
Expand All @@ -4509,7 +4509,7 @@ private void maybeRefreshShard(String source) {
/**
* Schedules a flush or translog generation roll if needed but will not schedule more than one concurrently. The operation will be
* executed asynchronously on the flush thread pool.
* Also Schedules a refresh if required, decided by Translog manager
* Also schedules a refresh if required, decided by translog manager
*/
public void afterWriteOperation() {
if (shouldPeriodicallyFlush() || shouldRollTranslogGeneration()) {
Expand Down Expand Up @@ -4573,7 +4573,7 @@ public void onAfter() {
}
} else if (shouldRefreshShard()) {
logger.debug("submitting async Refresh request");
final AbstractRunnable _refresh = new AbstractRunnable() {
final AbstractRunnable refreshRunnable = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("refresh failed after translog manager decided to refresh the shard", e);
Expand All @@ -4583,14 +4583,8 @@ public void onFailure(Exception e) {
protected void doRun() throws Exception {
maybeRefreshShard("Translog manager decided to refresh the shard");
}

@Override
public boolean isForceExecution() {
return true;
}

};
threadPool.executor(ThreadPool.Names.REFRESH).execute(_refresh);
threadPool.executor(ThreadPool.Names.REFRESH).execute(refreshRunnable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,13 +654,13 @@ int availablePermits() {
}

/**
* Checks whether or not the shard should be Refreshed.
* Checks whether or not the shard should be refreshed.
* This checks if number of translog files breaches the threshold count determined by
* {@code cluster.remote_store.max_referenced_translog_files} setting
* @return {@code true} if the shard should be Refreshed
* {@code cluster.remote_store.translog.max_readers} setting
* @return {@code true} if the shard should be refreshed
*/
@Override
public boolean shouldRefreshShard() {
return readers.size() >= translogTransferManager.getMaxRemoteReferencedTranslogFilesSettings();
return readers.size() >= translogTransferManager.getMaxRemoteTranslogReadersSettings();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2054,10 +2054,9 @@ public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommi
}

/**
* Tests whether or not the shard should be Refreshed.
* This test is based on the number of Translog files compared to configured number of Translog files threshold
*
* @return {@code true} if the shard should be Refreshed
* Checks whether or not the shard should be refreshed.
* each translog type can have it's own decider
* @return {@code true} if the shard should be refreshed
*/
public boolean shouldRefreshShard() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ public void onFailure(Exception e) {
}
}

public int getMaxRemoteReferencedTranslogFilesSettings() {
return this.remoteStoreSettings.getMaxRemoteReferencedTranslogFiles();
public int getMaxRemoteTranslogReadersSettings() {
return this.remoteStoreSettings.getMaxRemoteTranslogReaders();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public class RemoteStoreSettings {
/**
* Controls the maximum referenced remote translog files. If breached the shard will be Refreshed.
*/
public static final Setting<Integer> CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES = Setting.intSetting(
"cluster.remote_store.max_referenced_translog_files",
public static final Setting<Integer> CLUSTER_REMOTE_MAX_TRANSLOG_READERS = Setting.intSetting(
"cluster.remote_store.translog.max_readers",
300,
1,
Property.Dynamic,
Expand All @@ -79,7 +79,7 @@ public class RemoteStoreSettings {
private volatile TimeValue clusterRemoteTranslogBufferInterval;
private volatile int minRemoteSegmentMetadataFiles;
private volatile TimeValue clusterRemoteTranslogTransferTimeout;
private volatile int maxRemoteReferencedTranslogFiles;
private volatile int maxRemoteTranslogReaders;

public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
Expand All @@ -100,8 +100,8 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
this::setClusterRemoteTranslogTransferTimeout
);

maxRemoteReferencedTranslogFiles = CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES, this::setMaxRemoteReferencedTranslogFiles);
maxRemoteTranslogReaders = CLUSTER_REMOTE_MAX_TRANSLOG_READERS.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_MAX_TRANSLOG_READERS, this::setMaxRemoteTranslogReaders);
}

public TimeValue getClusterRemoteTranslogBufferInterval() {
Expand All @@ -128,11 +128,11 @@ private void setClusterRemoteTranslogTransferTimeout(TimeValue clusterRemoteTran
this.clusterRemoteTranslogTransferTimeout = clusterRemoteTranslogTransferTimeout;
}

public int getMaxRemoteReferencedTranslogFiles() {
return maxRemoteReferencedTranslogFiles;
public int getMaxRemoteTranslogReaders() {
return maxRemoteTranslogReaders;
}

private void setMaxRemoteReferencedTranslogFiles(int maxRemoteReferencedTranslogFiles) {
this.maxRemoteReferencedTranslogFiles = maxRemoteReferencedTranslogFiles;
private void setMaxRemoteTranslogReaders(int maxRemoteTranslogReaders) {
this.maxRemoteTranslogReaders = maxRemoteTranslogReaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4974,7 +4974,7 @@ public void testShouldRefreshOnTooManyRemoteTranslogFiles() throws Exception {

final IndexShard primaryShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory());
RemoteStoreSettings remoteStoreSettings = primaryShard.getRemoteStoreSettings();
final long numDocs = remoteStoreSettings.getMaxRemoteReferencedTranslogFiles();
final long numDocs = remoteStoreSettings.getMaxRemoteTranslogReaders();

assertFalse(primaryShard.shouldRefreshShard());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,21 @@ public void testClusterRemoteTranslogTransferTimeout() {

public void testMaxRemoteReferencedTranslogFiles() {
// Test default value
assertEquals(300, remoteStoreSettings.getMaxRemoteReferencedTranslogFiles());
assertEquals(300, remoteStoreSettings.getMaxRemoteTranslogReaders());

// Test override with valid value
clusterSettings.applySettings(
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.getKey(), "100").build()
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "100").build()
);
assertEquals(100, remoteStoreSettings.getMaxRemoteReferencedTranslogFiles());
assertEquals(100, remoteStoreSettings.getMaxRemoteTranslogReaders());

// Test override with value less than minimum
assertThrows(
IllegalArgumentException.class,
() -> clusterSettings.applySettings(
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.getKey(), "0").build()
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "0").build()
)
);
assertEquals(100, remoteStoreSettings.getMaxRemoteReferencedTranslogFiles());
assertEquals(100, remoteStoreSettings.getMaxRemoteTranslogReaders());
}
}

0 comments on commit c5df7d4

Please sign in to comment.