diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e0a12f31ed87..a6e428c46824f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Derived fields support to derive field values at query time without indexing ([#12569](https://github.com/opensearch-project/OpenSearch/pull/12569)) - Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967)) - Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868)) +- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 92bcbf9e9f3dd..df7f67c6295a3 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -292,6 +292,7 @@ public void apply(Settings value, Settings current, Settings previous) { ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER, ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES, RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, + RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index d09c2963c94ff..139e7b51d2200 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -576,7 +576,13 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha try (ReplicationRef recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final RecoveryTarget recoveryTarget = recoveryRef.get(); final ActionListener listener = recoveryTarget.createOrFinishListener(channel, Actions.FILE_CHUNK, request); - recoveryTarget.handleFileChunk(request, recoveryTarget, bytesSinceLastPause, recoverySettings.rateLimiter(), listener); + recoveryTarget.handleFileChunk( + request, + recoveryTarget, + bytesSinceLastPause, + recoverySettings.recoveryRateLimiter(), + listener + ); } } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index 53b42347aa30d..8f9da6babdd99 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -65,6 +65,16 @@ public class RecoverySettings { Property.NodeScope ); + /** + * Individual speed setting for segment replication, default -1B to reuse the setting of recovery. + */ + public static final Setting INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting( + "indices.replication.max_bytes_per_sec", + new ByteSizeValue(-1), + Property.Dynamic, + Property.NodeScope + ); + /** * Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node. */ @@ -169,11 +179,13 @@ public class RecoverySettings { // choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1. public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES); - private volatile ByteSizeValue maxBytesPerSec; + private volatile ByteSizeValue recoveryMaxBytesPerSec; + private volatile ByteSizeValue replicationMaxBytesPerSec; private volatile int maxConcurrentFileChunks; private volatile int maxConcurrentOperations; private volatile int maxConcurrentRemoteStoreStreams; - private volatile SimpleRateLimiter rateLimiter; + private volatile SimpleRateLimiter recoveryRateLimiter; + private volatile SimpleRateLimiter replicationRateLimiter; private volatile TimeValue retryDelayStateSync; private volatile TimeValue retryDelayNetwork; private volatile TimeValue activityTimeout; @@ -198,17 +210,20 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { this.internalActionLongTimeout = INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.get(settings); this.activityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings); - this.maxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings); - if (maxBytesPerSec.getBytes() <= 0) { - rateLimiter = null; + this.recoveryMaxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings); + if (recoveryMaxBytesPerSec.getBytes() <= 0) { + recoveryRateLimiter = null; } else { - rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac()); + recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac()); } + this.replicationMaxBytesPerSec = INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings); + updateReplicationRateLimiter(); - logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec); + logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec); this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings); - clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec); + clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec); + clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, this::setMaxConcurrentOperations); clusterSettings.addSettingsUpdateConsumer( @@ -227,8 +242,12 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { } - public RateLimiter rateLimiter() { - return rateLimiter; + public RateLimiter recoveryRateLimiter() { + return recoveryRateLimiter; + } + + public RateLimiter replicationRateLimiter() { + return replicationRateLimiter; } public TimeValue retryDelayNetwork() { @@ -294,14 +313,40 @@ public void setInternalRemoteUploadTimeout(TimeValue internalRemoteUploadTimeout this.internalRemoteUploadTimeout = internalRemoteUploadTimeout; } - private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) { - this.maxBytesPerSec = maxBytesPerSec; - if (maxBytesPerSec.getBytes() <= 0) { - rateLimiter = null; - } else if (rateLimiter != null) { - rateLimiter.setMBPerSec(maxBytesPerSec.getMbFrac()); + private void setRecoveryMaxBytesPerSec(ByteSizeValue recoveryMaxBytesPerSec) { + this.recoveryMaxBytesPerSec = recoveryMaxBytesPerSec; + if (recoveryMaxBytesPerSec.getBytes() <= 0) { + recoveryRateLimiter = null; + } else if (recoveryRateLimiter != null) { + recoveryRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac()); } else { - rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac()); + recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac()); + } + if (replicationMaxBytesPerSec.getBytes() < 0) updateReplicationRateLimiter(); + } + + private void setReplicationMaxBytesPerSec(ByteSizeValue replicationMaxBytesPerSec) { + this.replicationMaxBytesPerSec = replicationMaxBytesPerSec; + updateReplicationRateLimiter(); + } + + private void updateReplicationRateLimiter() { + if (replicationMaxBytesPerSec.getBytes() >= 0) { + if (replicationMaxBytesPerSec.getBytes() == 0) { + replicationRateLimiter = null; + } else if (replicationRateLimiter != null) { + replicationRateLimiter.setMBPerSec(replicationMaxBytesPerSec.getMbFrac()); + } else { + replicationRateLimiter = new SimpleRateLimiter(replicationMaxBytesPerSec.getMbFrac()); + } + } else { // when replicationMaxBytesPerSec = -1B, use setting of recovery + if (recoveryMaxBytesPerSec.getBytes() <= 0) { + replicationRateLimiter = null; + } else if (replicationRateLimiter != null) { + replicationRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac()); + } else { + replicationRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac()); + } } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java index 37227596fdfe7..4d4c20f778ef3 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -111,7 +111,8 @@ public RemoteRecoveryTargetHandler( shardId, PeerRecoveryTargetService.Actions.FILE_CHUNK, requestSeqNoGenerator, - onSourceThrottle + onSourceThrottle, + recoverySettings::recoveryRateLimiter ); this.remoteStoreEnabled = remoteStoreEnabled; } diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java index b52fe66816098..179e497565326 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.function.Supplier; /** * This class handles sending file chunks over the transport layer to a target shard. @@ -36,11 +37,11 @@ public final class RemoteSegmentFileChunkWriter implements FileChunkWriter { private final AtomicLong requestSeqNoGenerator; private final RetryableTransportClient retryableTransportClient; private final ShardId shardId; - private final RecoverySettings recoverySettings; private final long replicationId; private final AtomicLong bytesSinceLastPause = new AtomicLong(); private final TransportRequestOptions fileChunkRequestOptions; private final Consumer onSourceThrottle; + private final Supplier rateLimiterSupplier; private final String action; public RemoteSegmentFileChunkWriter( @@ -50,14 +51,15 @@ public RemoteSegmentFileChunkWriter( ShardId shardId, String action, AtomicLong requestSeqNoGenerator, - Consumer onSourceThrottle + Consumer onSourceThrottle, + Supplier rateLimiterSupplier ) { this.replicationId = replicationId; - this.recoverySettings = recoverySettings; this.retryableTransportClient = retryableTransportClient; this.shardId = shardId; this.requestSeqNoGenerator = requestSeqNoGenerator; this.onSourceThrottle = onSourceThrottle; + this.rateLimiterSupplier = rateLimiterSupplier; this.fileChunkRequestOptions = TransportRequestOptions.builder() .withType(TransportRequestOptions.Type.RECOVERY) .withTimeout(recoverySettings.internalActionTimeout()) @@ -78,7 +80,7 @@ public void writeFileChunk( // Pause using the rate limiter, if desired, to throttle the recovery final long throttleTimeInNanos; // always fetch the ratelimiter - it might be updated in real-time on the recovery settings - final RateLimiter rl = recoverySettings.rateLimiter(); + final RateLimiter rl = rateLimiterSupplier.get(); if (rl != null) { long bytes = bytesSinceLastPause.addAndGet(content.length()); if (bytes > rl.getMinPauseCheckBytes()) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index a393faabae0ea..3a762200e4a82 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -124,7 +124,8 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan request.getCheckpoint().getShardId(), SegmentReplicationTargetService.Actions.FILE_CHUNK, new AtomicLong(0), - (throttleTime) -> {} + (throttleTime) -> {}, + recoverySettings::replicationRateLimiter ); final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter); channel.sendResponse( diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 4942d39cfa48a..fbd7ab7cea346 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -635,7 +635,7 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha try (ReplicationRef ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) { final SegmentReplicationTarget target = ref.get(); final ActionListener listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request); - target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.rateLimiter(), listener); + target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.replicationRateLimiter(), listener); } } } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index f6a7dcc68d311..e9241bb8d88a4 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -3291,7 +3291,7 @@ private static OffsetRangeInputStream maybeRateLimitRemoteTransfers( public InputStream maybeRateLimitRestores(InputStream stream) { return maybeRateLimit( maybeRateLimit(stream, () -> restoreRateLimiter, restoreRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT_RESTORE), - recoverySettings::rateLimiter, + recoverySettings::recoveryRateLimiter, restoreRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT_RESTORE ); @@ -3314,7 +3314,7 @@ public InputStream maybeRateLimitRemoteDownloadTransfers(InputStream inputStream remoteDownloadRateLimitingTimeInNanos, BlobStoreTransferContext.REMOTE_DOWNLOAD ), - recoverySettings::rateLimiter, + recoverySettings::recoveryRateLimiter, remoteDownloadRateLimitingTimeInNanos, BlobStoreTransferContext.REMOTE_DOWNLOAD ); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java index 75639661f539d..2793d446d66c8 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java @@ -35,6 +35,8 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.test.OpenSearchTestCase; import java.util.concurrent.TimeUnit; @@ -47,7 +49,27 @@ public void testZeroBytesPerSecondIsNoRateLimit() { clusterSettings.applySettings( Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build() ); - assertEquals(null, recoverySettings.rateLimiter()); + assertNull(recoverySettings.recoveryRateLimiter()); + clusterSettings.applySettings( + Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build() + ); + assertNull(recoverySettings.replicationRateLimiter()); + } + + public void testSetReplicationMaxBytesPerSec() { + assertEquals(40, (int) recoverySettings.replicationRateLimiter().getMBPerSec()); + clusterSettings.applySettings( + Settings.builder() + .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(60, ByteSizeUnit.MB)) + .build() + ); + assertEquals(60, (int) recoverySettings.replicationRateLimiter().getMBPerSec()); + clusterSettings.applySettings( + Settings.builder() + .put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(80, ByteSizeUnit.MB)) + .build() + ); + assertEquals(80, (int) recoverySettings.replicationRateLimiter().getMBPerSec()); } public void testRetryDelayStateSync() {