From 9adab8b4c41c7e132323a02e2244a9300cc038b1 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Sat, 12 Aug 2023 21:14:15 +0530 Subject: [PATCH] Update tests and rebase onto latest main Signed-off-by: Bhumika Saini --- .../remotestore/stats/RemoteStoreStats.java | 26 ++- .../TransportRemoteStoreStatsAction.java | 2 - .../remote/RemoteSegmentTransferTracker.java | 2 +- .../remote/RemoteStorePressureService.java | 14 +- .../index/remote/RemoteTranslogTracker.java | 2 +- .../index/translog/RemoteFsTranslog.java | 41 ++-- .../transfer/TranslogTransferManager.java | 7 +- .../opensearch/indices/IndicesService.java | 8 +- .../stats/RemoteStoreStatsResponseTests.java | 8 +- .../stats/RemoteStoreStatsTestHelper.java | 212 ++++++++++------- .../stats/RemoteStoreStatsTests.java | 19 +- .../RemoteStorePressureServiceTests.java | 4 + .../RemoteStorePressureSettingsTests.java | 79 +++++++ .../remote/RemoteTranslogTrackerTests.java | 215 ++++++++++++++---- .../index/translog/RemoteFSTranslogTests.java | 63 ++++- .../TranslogTransferManagerTests.java | 51 ++++- .../indices/IndicesServiceTests.java | 12 + 17 files changed, 583 insertions(+), 182 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java index f413d404c3c2f..eb1a22658143c 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java @@ -86,10 +86,16 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(Fields.TRANSLOG); builder.startObject(SubFields.UPLOAD); - buildTranslogUploadStats(builder); + // Ensuring that we are not showing 0 metrics to the user + if (remoteTranslogShardStats.totalUploadsStarted > 0) { + buildTranslogUploadStats(builder); + } builder.endObject(); // translog.upload builder.startObject(SubFields.DOWNLOAD); - buildTranslogDownloadStats(builder); + // Ensuring that we are not showing 0 metrics to the user + if (remoteTranslogShardStats.totalDownloadsSucceeded > 0) { + buildTranslogDownloadStats(builder); + } builder.endObject(); // translog.download builder.endObject(); // translog @@ -291,7 +297,7 @@ static final class UploadStatsFields { static final String LAST_SUCCESSFUL_UPLOAD_TIMESTAMP = "last_successful_upload_timestamp"; /** - * Number of total uploads to remote store + * Count of files uploaded to remote store */ static final String TOTAL_UPLOADS = "total_uploads"; @@ -323,7 +329,7 @@ static final class UploadStatsFields { static final class DownloadStatsFields { /** - * Epoch timestamp of the last successful download session + * Epoch timestamp of the last successful download */ public static final String LAST_SUCCESSFUL_DOWNLOAD_TIMESTAMP = "last_successful_download_timestamp"; @@ -333,7 +339,7 @@ static final class DownloadStatsFields { static final String LAST_SYNC_TIMESTAMP = "last_sync_timestamp"; /** - * Count of total remote store download sessions + * Count of files downloaded from remote store */ public static final String TOTAL_DOWNLOADS = "total_downloads"; @@ -353,18 +359,18 @@ static final class DownloadStatsFields { static final String DOWNLOAD_SIZE_IN_BYTES = "download_size_in_bytes"; /** - * Average speed (in bytes/sec) of a remote store download session + * Average speed (in bytes/sec) of a remote store download */ static final String DOWNLOAD_SPEED_IN_BYTES_PER_SEC = "download_speed_in_bytes_per_sec"; /** - * Average time spent on a remote store download session + * Average time spent on a remote store download */ public static final String DOWNLOAD_TIME_IN_MILLIS = "download_time_in_millis"; } /** - * Reusable sub fields for {@link Fields} + * Reusable sub fields for {@link UploadStatsFields} and {@link DownloadStatsFields} */ static final class SubFields { static final String STARTED = "started"; @@ -375,12 +381,12 @@ static final class SubFields { static final String UPLOAD = "upload"; /** - * Moving avg over last N values stat for a {@link Fields} + * Moving avg over last N values stat */ static final String MOVING_AVG = "moving_avg"; /** - * Most recent successful attempt stat for a {@link Fields} + * Most recent successful attempt stat */ static final String LAST_SUCCESSFUL = "last_successful"; } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java index fb03f263125c5..fc9372b37ab90 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java @@ -96,7 +96,6 @@ protected ShardsIterator shards(ClusterState clusterState, RemoteStoreStatsReque || (shardRouting.currentNodeId() == null || shardRouting.currentNodeId().equals(clusterState.getNodes().getLocalNodeId())) ) - .filter(ShardRouting::primary) .filter( shardRouting -> Boolean.parseBoolean( clusterState.getMetadata().index(shardRouting.index()).getSettings().get(IndexMetadata.SETTING_REMOTE_STORE_ENABLED) @@ -158,7 +157,6 @@ protected RemoteStoreStats shardOperation(RemoteStoreStatsRequest request, Shard indexShard.shardId() ); assert Objects.nonNull(remoteSegmentTransferTracker); - RemoteTranslogTracker remoteTranslogTracker = remoteStorePressureService.getRemoteTranslogTracker(indexShard.shardId()); assert Objects.nonNull(remoteTranslogTracker); diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java index d89374e5e6b26..81da57924d534 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java @@ -24,8 +24,8 @@ import java.io.IOException; import java.util.Collection; import java.util.Collections; -import java.util.Map; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java index 5759c06fe7bc8..70f78e9cf4c95 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java @@ -72,10 +72,10 @@ public RemoteSegmentTransferTracker getRemoteRefreshSegmentTracker(ShardId shard } /** - * Get {@link RemoteTranslogTracker} only if the underlying Index has remote translog store enabled. + * Get {@link RemoteTranslogTracker} for the shard * * @param shardId shard id - * @return the tracker if index is remote translog store-backed, else null. + * @return The tracker if index is Remote Translog Store-backed, else null. */ public RemoteTranslogTracker getRemoteTranslogTracker(ShardId shardId) { return trackerMapRemoteTranslogStore.get(shardId); @@ -135,16 +135,6 @@ public boolean isSegmentsUploadBackpressureEnabled() { return pressureSettings.isRemoteRefreshSegmentPressureEnabled(); } - /** - * Check if remote translog backpressure is enabled. - * - * @return true if enabled, else false. - */ - public boolean isTranslogUploadBackpressureEnabled() { - // Note: This is not yet implemented. - return false; - } - /** * Validates if segments are lagging more than the limits. If yes, it would lead to rejections of the requests. * diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteTranslogTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteTranslogTracker.java index e75246866666a..35b698718d3b4 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteTranslogTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteTranslogTracker.java @@ -343,7 +343,7 @@ public long getTotalDownloadsSucceeded() { return totalDownloadsSucceeded.get(); } - public void addTotalDownloadsSucceeded(long count) { + public void addDownloadsSucceeded(long count) { totalDownloadsSucceeded.addAndGet(count); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index a2a8b8962ac7a..b28f0b543379e 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -131,6 +131,11 @@ public RemoteFsTranslog( } } + // visible for testing + public RemoteTranslogTracker getRemoteTranslogTracker() { + return remoteTranslogTracker; + } + public static void download(Repository repository, ShardId shardId, ThreadPool threadPool, Path location, Logger logger) throws IOException { assert repository instanceof BlobStoreRepository : String.format( @@ -156,12 +161,11 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t public static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException { logger.trace("Downloading translog files from remote"); + RemoteTranslogTracker statsTracker = translogTransferManager.getRemoteTranslogTracker(); + long bytesBefore = statsTracker.getDownloadBytesSucceeded(); + long downloadStartTime = RemoteStoreUtils.getCurrentSystemNanoTime(); TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata(); if (translogMetadata != null) { - RemoteTranslogTracker statsTracker = translogTransferManager.getRemoteTranslogTracker(); - statsTracker.addTotalDownloadsSucceeded(1); - long bytesBefore = statsTracker.getDownloadBytesSucceeded(); - long downloadStartTime = RemoteStoreUtils.getCurrentSystemNanoTime(); if (Files.notExists(location)) { Files.createDirectories(location); } @@ -181,7 +185,11 @@ public static void download(TranslogTransferManager translogTransferManager, Pat long downloadEndTimeMs = System.currentTimeMillis(); long durationInMillis = (downloadEndTime - downloadStartTime) / 1_000_000L; long bytesDownloaded = statsTracker.getDownloadBytesSucceeded() - bytesBefore; + statsTracker.setLastSuccessfulDownloadTimestamp(downloadEndTimeMs); + // We update the duration at the end of successfully downloading all of metadata, .tlog, .ckp + // files because this is not a file-level metric but a sync-level metric. + // This also ensures the bytes per sec moving average can be correlated. statsTracker.addDownloadTimeInMillis(durationInMillis); statsTracker.updateDownloadBytesMovingAverage(bytesDownloaded); statsTracker.updateDownloadTimeMovingAverage(durationInMillis); @@ -525,17 +533,20 @@ public RemoteFsTranslogTransferListener(Releasable transferReleasable, Long gene @Override public void beforeUpload(TransferSnapshot transferSnapshot) throws IOException { toUpload = RemoteStoreUtils.getUploadBlobsFromSnapshot(transferSnapshot, fileTransferTracker); - uploadBytes = RemoteStoreUtils.getTotalBytes(toUpload); - uploadStartTime = RemoteStoreUtils.getCurrentSystemNanoTime(); - - captureStatsBeforeUpload(); + if (toUpload.size() > 0) { + uploadBytes = RemoteStoreUtils.getTotalBytes(toUpload); + captureStatsBeforeUpload(); + uploadStartTime = RemoteStoreUtils.getCurrentSystemNanoTime(); + } } @Override public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException { - uploadEndTime = RemoteStoreUtils.getCurrentSystemNanoTime(); - uploadEndTimeMs = System.currentTimeMillis(); - captureStatsOnUploadSuccess(); + if (toUpload != null && toUpload.size() > 0) { + uploadEndTime = RemoteStoreUtils.getCurrentSystemNanoTime(); + uploadEndTimeMs = System.currentTimeMillis(); + captureStatsOnUploadSuccess(); + } transferReleasable.close(); closeFilesIfNoPendingRetentionLocks(); @@ -546,13 +557,13 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOExcepti @Override public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException { - uploadEndTime = RemoteStoreUtils.getCurrentSystemNanoTime(); - uploadEndTimeMs = System.currentTimeMillis(); - captureStatsOnUploadFailure(); + if (toUpload != null && toUpload.size() > 0) { + uploadEndTime = RemoteStoreUtils.getCurrentSystemNanoTime(); + captureStatsOnUploadFailure(); + } transferReleasable.close(); closeFilesIfNoPendingRetentionLocks(); - if (ex instanceof IOException) { throw (IOException) ex; } else { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 77a113eb3dc0a..39fb4848d9e43 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -108,6 +108,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans try { if (toUpload.isEmpty()) { logger.trace("Nothing to upload for transfer"); + translogTransferListener.onUploadComplete(transferSnapshot); return true; } @@ -200,7 +201,7 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th // Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync fileTransferTracker.add(fileName, true); remoteTranslogTrackerSetOnce.get().addDownloadBytesSucceeded(bytesToRead); - remoteTranslogTrackerSetOnce.get().addTotalDownloadsSucceeded(1); + remoteTranslogTrackerSetOnce.get().addDownloadsSucceeded(1); } public TranslogTransferMetadata readMetadata() throws IOException { @@ -212,9 +213,11 @@ public TranslogTransferMetadata readMetadata() throws IOException { if (blobMetadataList.isEmpty()) return; String filename = blobMetadataList.get(0).name(); try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) { + long bytesToRead = inputStream.available(); IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes()); metadataSetOnce.set(metadataStreamWrapper.readStream(indexInput)); - remoteTranslogTrackerSetOnce.get().addDownloadBytesSucceeded(indexInput.length()); + remoteTranslogTrackerSetOnce.get().addDownloadBytesSucceeded(bytesToRead); + remoteTranslogTrackerSetOnce.get().addDownloadsSucceeded(1); } catch (IOException e) { logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e); exceptionSetOnce.set(e); diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 96d57308ca4c7..df21ad137a89a 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -83,7 +83,6 @@ import org.opensearch.common.util.iterable.Iterables; import org.opensearch.common.util.set.Sets; import org.opensearch.common.xcontent.LoggingDeprecationHandler; -import org.opensearch.common.xcontent.XContentType; import org.opensearch.common.util.io.IOUtils; import org.opensearch.common.lease.Releasable; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; @@ -953,7 +952,7 @@ public IndexShard createShard( .indices() .preparePutMapping() .setConcreteIndex(shardRouting.index()) // concrete index - no name clash, it uses uuid - .setSource(mapping.source().string(), XContentType.JSON) + .setSource(mapping.source().string(), MediaTypeRegistry.JSON) .get(); }, this); return indexShard; @@ -1866,4 +1865,9 @@ public boolean allPendingDanglingIndicesWritten() { public void setPressureService(RemoteStorePressureService pressureService) { pressureServiceSetOnce.trySet(pressureService); } + + // visible for testing + public RemoteStorePressureService getPressureService() { + return pressureServiceSetOnce.get(); + } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsResponseTests.java index 6bc29c9400ea8..beb94d1e8cecc 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsResponseTests.java @@ -25,7 +25,7 @@ import java.util.Map; import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.compareStatsResponse; -import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createPressureTrackerTranslogStats; +import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createTranslogStats; import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createShardRouting; import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createStatsForNewPrimary; import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createStatsForNewReplica; @@ -51,7 +51,7 @@ public void tearDown() throws Exception { public void testSerializationForPrimary() throws Exception { RemoteSegmentTransferTracker.Stats mockPrimaryTrackerStats = createStatsForNewPrimary(shardId); - RemoteTranslogTracker.Stats mockTranslogTrackerStats = createPressureTrackerTranslogStats(shardId); + RemoteTranslogTracker.Stats mockTranslogTrackerStats = createTranslogStats(shardId); ShardRouting primaryShardRouting = createShardRouting(shardId, true); RemoteStoreStats primaryShardStats = new RemoteStoreStats(mockPrimaryTrackerStats, mockTranslogTrackerStats, primaryShardRouting); RemoteStoreStatsResponse statsResponse = new RemoteStoreStatsResponse( @@ -82,7 +82,7 @@ public void testSerializationForPrimary() throws Exception { public void testSerializationForBothPrimaryAndReplica() throws Exception { RemoteSegmentTransferTracker.Stats mockPrimaryTrackerStats = createStatsForNewPrimary(shardId); RemoteSegmentTransferTracker.Stats mockReplicaTrackerStats = createStatsForNewReplica(shardId); - RemoteTranslogTracker.Stats mockTranslogTrackerStats = createPressureTrackerTranslogStats(shardId); + RemoteTranslogTracker.Stats mockTranslogTrackerStats = createTranslogStats(shardId); ShardRouting primaryShardRouting = createShardRouting(shardId, true); ShardRouting replicaShardRouting = createShardRouting(shardId, false); RemoteStoreStats primaryShardStats = new RemoteStoreStats(mockPrimaryTrackerStats, mockTranslogTrackerStats, primaryShardRouting); @@ -123,7 +123,7 @@ public void testSerializationForBothPrimaryAndReplica() throws Exception { public void testSerializationForBothRemoteStoreRestoredPrimaryAndReplica() throws Exception { RemoteSegmentTransferTracker.Stats mockPrimaryTrackerStats = createStatsForRemoteStoreRestoredPrimary(shardId); RemoteSegmentTransferTracker.Stats mockReplicaTrackerStats = createStatsForNewReplica(shardId); - RemoteTranslogTracker.Stats mockTranslogTrackerStats = createPressureTrackerTranslogStats(shardId); + RemoteTranslogTracker.Stats mockTranslogTrackerStats = createTranslogStats(shardId); ShardRouting primaryShardRouting = createShardRouting(shardId, true); ShardRouting replicaShardRouting = createShardRouting(shardId, false); RemoteStoreStats primaryShardStats = new RemoteStoreStats(mockPrimaryTrackerStats, mockTranslogTrackerStats, primaryShardRouting); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java index 3fdfe3b6e4774..86bfc49474966 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java @@ -18,6 +18,7 @@ import java.util.Map; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.opensearch.test.OpenSearchTestCase.assertEquals; import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength; @@ -113,10 +114,14 @@ static ShardRouting createShardRouting(ShardId shardId, boolean isPrimary) { return TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(4), isPrimary, ShardRoutingState.STARTED); } - static RemoteTranslogTracker.Stats createPressureTrackerTranslogStats(ShardId shardId) { + static RemoteTranslogTracker.Stats createTranslogStats(ShardId shardId) { return new RemoteTranslogTracker.Stats(shardId, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9D, 10D, 11D, 1L, 2L, 3L, 4L, 9D, 10D, 11D); } + static RemoteTranslogTracker.Stats createEmptyTranslogStats(ShardId shardId) { + return new RemoteTranslogTracker.Stats(shardId, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0D, 0D, 0D, 0L, 0L, 0L, 0L, 0D, 0D, 0D); + } + static void compareStatsResponse( Map statsObject, RemoteSegmentTransferTracker.Stats pressureTrackerSegmentStats, @@ -276,85 +281,138 @@ static void compareStatsResponse( } // Compare Remote Translog Store stats - Map tlogStatsObj = (Map) statsObject.get("translog"); - Map tlogUploadStatsObj = (Map) tlogStatsObj.get("upload"); - assertEquals( - pressureTrackerTranslogStats.lastSuccessfulUploadTimestamp, - Long.parseLong(tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.LAST_SUCCESSFUL_UPLOAD_TIMESTAMP).toString()) - ); + Map tlogStatsObj = (Map) statsObject.get(RemoteStoreStats.Fields.TRANSLOG); + Map tlogUploadStatsObj = (Map) tlogStatsObj.get(RemoteStoreStats.SubFields.UPLOAD); + if (pressureTrackerTranslogStats.totalUploadsStarted > 0) { + assertEquals( + pressureTrackerTranslogStats.lastSuccessfulUploadTimestamp, + Long.parseLong(tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.LAST_SUCCESSFUL_UPLOAD_TIMESTAMP).toString()) + ); - assertEquals( - pressureTrackerTranslogStats.totalUploadsStarted, - Long.parseLong( - ((Map) tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS)).get( - RemoteStoreStats.SubFields.STARTED - ).toString() - ) - ); - assertEquals( - pressureTrackerTranslogStats.totalUploadsSucceeded, - Long.parseLong( - ((Map) tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS)).get( - RemoteStoreStats.SubFields.SUCCEEDED - ).toString() - ) - ); - assertEquals( - pressureTrackerTranslogStats.totalUploadsFailed, - Long.parseLong( - ((Map) tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS)).get( - RemoteStoreStats.SubFields.FAILED - ).toString() - ) - ); + assertEquals( + pressureTrackerTranslogStats.totalUploadsStarted, + Long.parseLong( + ((Map) tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS)).get( + RemoteStoreStats.SubFields.STARTED + ).toString() + ) + ); + assertEquals( + pressureTrackerTranslogStats.totalUploadsSucceeded, + Long.parseLong( + ((Map) tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS)).get( + RemoteStoreStats.SubFields.SUCCEEDED + ).toString() + ) + ); + assertEquals( + pressureTrackerTranslogStats.totalUploadsFailed, + Long.parseLong( + ((Map) tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS)).get( + RemoteStoreStats.SubFields.FAILED + ).toString() + ) + ); - assertEquals( - pressureTrackerTranslogStats.uploadBytesStarted, - Long.parseLong( - ((Map) tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS_IN_BYTES)).get( - RemoteStoreStats.SubFields.STARTED - ).toString() - ) - ); - assertEquals( - pressureTrackerTranslogStats.uploadBytesSucceeded, - Long.parseLong( - ((Map) tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS_IN_BYTES)).get( - RemoteStoreStats.SubFields.SUCCEEDED - ).toString() - ) - ); - assertEquals( - pressureTrackerTranslogStats.uploadBytesFailed, - Long.parseLong( - ((Map) tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS_IN_BYTES)).get( - RemoteStoreStats.SubFields.FAILED - ).toString() - ) - ); + assertEquals( + pressureTrackerTranslogStats.uploadBytesStarted, + Long.parseLong( + ((Map) tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS_IN_BYTES)).get( + RemoteStoreStats.SubFields.STARTED + ).toString() + ) + ); + assertEquals( + pressureTrackerTranslogStats.uploadBytesSucceeded, + Long.parseLong( + ((Map) tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS_IN_BYTES)).get( + RemoteStoreStats.SubFields.SUCCEEDED + ).toString() + ) + ); + assertEquals( + pressureTrackerTranslogStats.uploadBytesFailed, + Long.parseLong( + ((Map) tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS_IN_BYTES)).get( + RemoteStoreStats.SubFields.FAILED + ).toString() + ) + ); - assertEquals( - pressureTrackerTranslogStats.totalUploadTimeInMillis, - Long.parseLong(tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOAD_TIME_IN_MILLIS).toString()) - ); + assertEquals( + pressureTrackerTranslogStats.totalUploadTimeInMillis, + Long.parseLong(tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOAD_TIME_IN_MILLIS).toString()) + ); - assertEquals( - pressureTrackerTranslogStats.uploadBytesMovingAverage, - ((Map) tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.UPLOAD_SIZE_IN_BYTES)).get( - RemoteStoreStats.SubFields.MOVING_AVG - ) - ); - assertEquals( - pressureTrackerTranslogStats.uploadBytesPerSecMovingAverage, - ((Map) tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.UPLOAD_SPEED_IN_BYTES_PER_SEC)).get( - RemoteStoreStats.SubFields.MOVING_AVG - ) - ); - assertEquals( - pressureTrackerTranslogStats.uploadTimeMovingAverage, - ((Map) tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.UPLOAD_TIME_IN_MILLIS)).get( - RemoteStoreStats.SubFields.MOVING_AVG - ) - ); + assertEquals( + pressureTrackerTranslogStats.uploadBytesMovingAverage, + ((Map) tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.UPLOAD_SIZE_IN_BYTES)).get( + RemoteStoreStats.SubFields.MOVING_AVG + ) + ); + assertEquals( + pressureTrackerTranslogStats.uploadBytesPerSecMovingAverage, + ((Map) tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.UPLOAD_SPEED_IN_BYTES_PER_SEC)).get( + RemoteStoreStats.SubFields.MOVING_AVG + ) + ); + assertEquals( + pressureTrackerTranslogStats.uploadTimeMovingAverage, + ((Map) tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.UPLOAD_TIME_IN_MILLIS)).get( + RemoteStoreStats.SubFields.MOVING_AVG + ) + ); + } else { + assertNull(((Map) tlogUploadStatsObj.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS))); + } + + Map tlogDownloadStatsObj = (Map) tlogStatsObj.get(RemoteStoreStats.SubFields.DOWNLOAD); + if (pressureTrackerTranslogStats.totalDownloadsSucceeded > 0) { + assertEquals( + pressureTrackerTranslogStats.lastSuccessfulDownloadTimestamp, + Long.parseLong(tlogDownloadStatsObj.get(RemoteStoreStats.DownloadStatsFields.LAST_SUCCESSFUL_DOWNLOAD_TIMESTAMP).toString()) + ); + assertEquals( + pressureTrackerTranslogStats.totalDownloadsSucceeded, + Long.parseLong( + ((Map) tlogDownloadStatsObj.get(RemoteStoreStats.DownloadStatsFields.TOTAL_DOWNLOADS)).get( + RemoteStoreStats.SubFields.SUCCEEDED + ).toString() + ) + ); + assertEquals( + pressureTrackerTranslogStats.downloadBytesSucceeded, + Long.parseLong( + ((Map) tlogDownloadStatsObj.get(RemoteStoreStats.DownloadStatsFields.TOTAL_DOWNLOADS_IN_BYTES)).get( + RemoteStoreStats.SubFields.SUCCEEDED + ).toString() + ) + ); + assertEquals( + pressureTrackerTranslogStats.totalDownloadTimeInMillis, + Long.parseLong(tlogDownloadStatsObj.get(RemoteStoreStats.DownloadStatsFields.TOTAL_DOWNLOAD_TIME_IN_MILLIS).toString()) + ); + + assertEquals( + pressureTrackerTranslogStats.downloadBytesMovingAverage, + ((Map) tlogDownloadStatsObj.get(RemoteStoreStats.DownloadStatsFields.DOWNLOAD_SIZE_IN_BYTES)).get( + RemoteStoreStats.SubFields.MOVING_AVG + ) + ); + assertEquals( + pressureTrackerTranslogStats.downloadBytesPerSecMovingAverage, + ((Map) tlogDownloadStatsObj.get(RemoteStoreStats.DownloadStatsFields.DOWNLOAD_SPEED_IN_BYTES_PER_SEC)).get( + RemoteStoreStats.SubFields.MOVING_AVG + ) + ); + assertEquals( + pressureTrackerTranslogStats.downloadTimeMovingAverage, + ((Map) tlogDownloadStatsObj.get(RemoteStoreStats.DownloadStatsFields.DOWNLOAD_TIME_IN_MILLIS)).get( + RemoteStoreStats.SubFields.MOVING_AVG + ) + ); + } else { + assertNull(((Map) tlogDownloadStatsObj.get(RemoteStoreStats.DownloadStatsFields.TOTAL_DOWNLOADS_IN_BYTES))); + } } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java index 8e089ece01774..329189c27376d 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java @@ -25,12 +25,13 @@ import java.io.IOException; import java.util.Map; -import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.compareStatsResponse; -import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createPressureTrackerTranslogStats; +import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createStatsForRemoteStoreRestoredPrimary; +import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createEmptyTranslogStats; import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createStatsForNewReplica; +import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.compareStatsResponse; import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createShardRouting; import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createStatsForNewPrimary; -import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createStatsForRemoteStoreRestoredPrimary; +import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createTranslogStats; import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS; public class RemoteStoreStatsTests extends OpenSearchTestCase { @@ -52,7 +53,7 @@ public void tearDown() throws Exception { public void testXContentBuilderWithPrimaryShard() throws IOException { RemoteSegmentTransferTracker.Stats pressureTrackerSegmentStats = createStatsForNewPrimary(shardId); - RemoteTranslogTracker.Stats pressureTrackerTranslogStats = createPressureTrackerTranslogStats(shardId); + RemoteTranslogTracker.Stats pressureTrackerTranslogStats = createTranslogStats(shardId); ShardRouting routing = createShardRouting(shardId, true); RemoteStoreStats stats = new RemoteStoreStats(pressureTrackerSegmentStats, pressureTrackerTranslogStats, routing); @@ -64,7 +65,7 @@ public void testXContentBuilderWithPrimaryShard() throws IOException { public void testXContentBuilderWithReplicaShard() throws IOException { RemoteSegmentTransferTracker.Stats pressureTrackerSegmentStats = createStatsForNewReplica(shardId); - RemoteTranslogTracker.Stats pressureTrackerTranslogStats = createPressureTrackerTranslogStats(shardId); + RemoteTranslogTracker.Stats pressureTrackerTranslogStats = createEmptyTranslogStats(shardId); ShardRouting routing = createShardRouting(shardId, false); RemoteStoreStats stats = new RemoteStoreStats(pressureTrackerSegmentStats, pressureTrackerTranslogStats, routing); @@ -76,7 +77,7 @@ public void testXContentBuilderWithReplicaShard() throws IOException { public void testXContentBuilderWithRemoteStoreRestoredShard() throws IOException { RemoteSegmentTransferTracker.Stats pressureTrackerSegmentStats = createStatsForRemoteStoreRestoredPrimary(shardId); - RemoteTranslogTracker.Stats pressureTrackerTranslogStats = createPressureTrackerTranslogStats(shardId); + RemoteTranslogTracker.Stats pressureTrackerTranslogStats = createTranslogStats(shardId); ShardRouting routing = createShardRouting(shardId, true); RemoteStoreStats stats = new RemoteStoreStats(pressureTrackerSegmentStats, pressureTrackerTranslogStats, routing); @@ -88,7 +89,7 @@ public void testXContentBuilderWithRemoteStoreRestoredShard() throws IOException public void testSerializationForPrimaryShard() throws Exception { RemoteSegmentTransferTracker.Stats pressureTrackerSegmentStats = createStatsForNewPrimary(shardId); - RemoteTranslogTracker.Stats pressureTrackerTranslogStats = createPressureTrackerTranslogStats(shardId); + RemoteTranslogTracker.Stats pressureTrackerTranslogStats = createTranslogStats(shardId); RemoteStoreStats stats = new RemoteStoreStats( pressureTrackerSegmentStats, pressureTrackerTranslogStats, @@ -106,7 +107,7 @@ public void testSerializationForPrimaryShard() throws Exception { public void testSerializationForReplicaShard() throws Exception { RemoteSegmentTransferTracker.Stats replicaShardStats = createStatsForNewReplica(shardId); - RemoteTranslogTracker.Stats pressureTrackerTranslogStats = createPressureTrackerTranslogStats(shardId); + RemoteTranslogTracker.Stats pressureTrackerTranslogStats = createEmptyTranslogStats(shardId); RemoteStoreStats stats = new RemoteStoreStats(replicaShardStats, pressureTrackerTranslogStats, createShardRouting(shardId, false)); try (BytesStreamOutput out = new BytesStreamOutput()) { stats.writeTo(out); @@ -120,7 +121,7 @@ public void testSerializationForReplicaShard() throws Exception { public void testSerializationForRemoteStoreRestoredPrimaryShard() throws Exception { RemoteSegmentTransferTracker.Stats primaryShardStats = createStatsForRemoteStoreRestoredPrimary(shardId); - RemoteTranslogTracker.Stats pressureTrackerTranslogStats = createPressureTrackerTranslogStats(shardId); + RemoteTranslogTracker.Stats pressureTrackerTranslogStats = createTranslogStats(shardId); RemoteStoreStats stats = new RemoteStoreStats(primaryShardStats, pressureTrackerTranslogStats, createShardRouting(shardId, true)); try (BytesStreamOutput out = new BytesStreamOutput()) { stats.writeTo(out); diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java index d79e5ae99b696..0eb008b757a1a 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java @@ -76,6 +76,7 @@ public void testAfterIndexShardCreatedForRemoteBackedIndex() { pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); assertNotNull(pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId())); + assertNotNull(pressureService.getRemoteTranslogTracker(indexShard.shardId())); } public void testAfterIndexShardCreatedForNonRemoteBackedIndex() { @@ -83,6 +84,7 @@ public void testAfterIndexShardCreatedForNonRemoteBackedIndex() { pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); assertNull(pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId())); + assertNull(pressureService.getRemoteTranslogTracker(indexShard.shardId())); } public void testAfterIndexShardClosed() { @@ -90,9 +92,11 @@ public void testAfterIndexShardClosed() { pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); assertNotNull(pressureService.getRemoteRefreshSegmentTracker(shardId)); + assertNotNull(pressureService.getRemoteTranslogTracker(shardId)); pressureService.afterIndexShardClosed(shardId, indexShard, indexShard.indexSettings().getSettings()); assertNull(pressureService.getRemoteRefreshSegmentTracker(shardId)); + assertNull(pressureService.getRemoteTranslogTracker(shardId)); } public void testValidateSegmentUploadLag() { diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java index 9c5ec69cf6be9..96cbecdf1636d 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java @@ -71,6 +71,15 @@ public void testGetDefaultSettings() { // Check upload time moving average window size default value assertEquals(20, pressureSettings.getUploadTimeMovingAverageWindowSize()); + + // Check download bytes moving average window size default value + assertEquals(20, pressureSettings.getDownloadBytesMovingAverageWindowSize()); + + // Check download bytes per sec moving average window size default value + assertEquals(20, pressureSettings.getDownloadBytesPerSecMovingAverageWindowSize()); + + // Check download time moving average window size default value + assertEquals(20, pressureSettings.getDownloadTimeMovingAverageWindowSize()); } public void testGetConfiguredSettings() { @@ -82,6 +91,9 @@ public void testGetConfiguredSettings() { .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) + .put(RemoteStorePressureSettings.DOWNLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 111) + .put(RemoteStorePressureSettings.DOWNLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 222) + .put(RemoteStorePressureSettings.DOWNLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 333) .build(); RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings( clusterService, @@ -109,6 +121,15 @@ public void testGetConfiguredSettings() { // Check upload time moving average window size configured value assertEquals(104, pressureSettings.getUploadTimeMovingAverageWindowSize()); + + // Check download bytes moving average window size configured value + assertEquals(111, pressureSettings.getDownloadBytesMovingAverageWindowSize()); + + // Check download bytes per sec moving average window size configured value + assertEquals(222, pressureSettings.getDownloadBytesPerSecMovingAverageWindowSize()); + + // Check download time moving average window size configured value + assertEquals(333, pressureSettings.getDownloadTimeMovingAverageWindowSize()); } public void testUpdateAfterGetDefaultSettings() { @@ -126,6 +147,9 @@ public void testUpdateAfterGetDefaultSettings() { .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) + .put(RemoteStorePressureSettings.DOWNLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 111) + .put(RemoteStorePressureSettings.DOWNLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 222) + .put(RemoteStorePressureSettings.DOWNLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 333) .build(); clusterService.getClusterSettings().applySettings(newSettings); @@ -149,6 +173,15 @@ public void testUpdateAfterGetDefaultSettings() { // Check upload time moving average window size updated assertEquals(104, pressureSettings.getUploadTimeMovingAverageWindowSize()); + + // Check download bytes moving average window size configured value + assertEquals(111, pressureSettings.getDownloadBytesMovingAverageWindowSize()); + + // Check download bytes per sec moving average window size configured value + assertEquals(222, pressureSettings.getDownloadBytesPerSecMovingAverageWindowSize()); + + // Check download time moving average window size configured value + assertEquals(333, pressureSettings.getDownloadTimeMovingAverageWindowSize()); } public void testUpdateAfterGetConfiguredSettings() { @@ -160,6 +193,9 @@ public void testUpdateAfterGetConfiguredSettings() { .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) + .put(RemoteStorePressureSettings.DOWNLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 111) + .put(RemoteStorePressureSettings.DOWNLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 222) + .put(RemoteStorePressureSettings.DOWNLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 333) .build(); RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings( clusterService, @@ -174,6 +210,9 @@ public void testUpdateAfterGetConfiguredSettings() { .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 112) .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 113) .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 114) + .put(RemoteStorePressureSettings.DOWNLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 222) + .put(RemoteStorePressureSettings.DOWNLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 444) + .put(RemoteStorePressureSettings.DOWNLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 666) .build(); clusterService.getClusterSettings().applySettings(newSettings); @@ -198,15 +237,28 @@ public void testUpdateAfterGetConfiguredSettings() { // Check upload time moving average window size updated assertEquals(114, pressureSettings.getUploadTimeMovingAverageWindowSize()); + + // Check download bytes moving average window size configured value + assertEquals(222, pressureSettings.getDownloadBytesMovingAverageWindowSize()); + + // Check download bytes per sec moving average window size configured value + assertEquals(444, pressureSettings.getDownloadBytesPerSecMovingAverageWindowSize()); + + // Check download time moving average window size configured value + assertEquals(666, pressureSettings.getDownloadTimeMovingAverageWindowSize()); } public void testUpdateTriggeredInRemotePressureServiceOnUpdateSettings() { int toUpdateVal1 = 1121, toUpdateVal2 = 1123, toUpdateVal3 = 1125; + int toUpdateVal4 = 1127, toUpdateVal5 = 1129, toUpdateVal6 = 1131; AtomicInteger updatedUploadBytesWindowSize = new AtomicInteger(); AtomicInteger updatedUploadBytesPerSecWindowSize = new AtomicInteger(); AtomicInteger updatedUploadTimeWindowSize = new AtomicInteger(); + AtomicInteger updatedDownloadBytesWindowSize = new AtomicInteger(); + AtomicInteger updatedDownloadBytesPerSecWindowSize = new AtomicInteger(); + AtomicInteger updatedDownloadTimeWindowSize = new AtomicInteger(); RemoteStorePressureService pressureService = mock(RemoteStorePressureService.class); @@ -228,11 +280,32 @@ public void testUpdateTriggeredInRemotePressureServiceOnUpdateSettings() { return null; }).when(pressureService).updateUploadTimeMsMovingAverageWindowSize(anyInt()); + // Download bytes + doAnswer(invocation -> { + updatedDownloadBytesWindowSize.set(invocation.getArgument(0)); + return null; + }).when(pressureService).updateDownloadBytesMovingAverageWindowSize(anyInt()); + + // Download bytes per sec + doAnswer(invocation -> { + updatedDownloadBytesPerSecWindowSize.set(invocation.getArgument(0)); + return null; + }).when(pressureService).updateDownloadBytesPerSecMovingAverageWindowSize(anyInt()); + + // Download time + doAnswer(invocation -> { + updatedDownloadTimeWindowSize.set(invocation.getArgument(0)); + return null; + }).when(pressureService).updateDownloadTimeMsMovingAverageWindowSize(anyInt()); + RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings(clusterService, Settings.EMPTY, pressureService); Settings newSettings = Settings.builder() .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal1) .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal2) .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal3) + .put(RemoteStorePressureSettings.DOWNLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal4) + .put(RemoteStorePressureSettings.DOWNLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal5) + .put(RemoteStorePressureSettings.DOWNLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal6) .build(); clusterService.getClusterSettings().applySettings(newSettings); @@ -243,5 +316,11 @@ public void testUpdateTriggeredInRemotePressureServiceOnUpdateSettings() { assertEquals(toUpdateVal2, updatedUploadBytesPerSecWindowSize.get()); assertEquals(toUpdateVal3, pressureSettings.getUploadTimeMovingAverageWindowSize()); assertEquals(toUpdateVal3, updatedUploadTimeWindowSize.get()); + assertEquals(toUpdateVal4, pressureSettings.getDownloadBytesMovingAverageWindowSize()); + assertEquals(toUpdateVal4, updatedDownloadBytesWindowSize.get()); + assertEquals(toUpdateVal5, pressureSettings.getDownloadBytesPerSecMovingAverageWindowSize()); + assertEquals(toUpdateVal5, updatedDownloadBytesPerSecWindowSize.get()); + assertEquals(toUpdateVal6, pressureSettings.getDownloadTimeMovingAverageWindowSize()); + assertEquals(toUpdateVal6, updatedDownloadTimeWindowSize.get()); } } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteTranslogTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteTranslogTrackerTests.java index f68408854ea30..efa25e53128e2 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteTranslogTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteTranslogTrackerTests.java @@ -70,39 +70,47 @@ public void testGetShardId() { assertEquals(shardId, tracker.getShardId()); } - public void testIncrementUploadsStarted() { + public void testAddUploadsStarted() { populateUploadsStarted(); } - public void testIncrementUploadsFailed() { + public void testAddUploadsFailed() { populateUploadsStarted(); - populateUploadsFailed(); + assertEquals(0L, tracker.getTotalUploadsFailed()); + tracker.addUploadsFailed(1); + assertEquals(1L, tracker.getTotalUploadsFailed()); + tracker.addUploadsFailed(5); + assertEquals(6L, tracker.getTotalUploadsFailed()); } - public void testInvalidIncrementUploadsFailed() { + public void testInvalidAddUploadsFailed() { populateUploadsStarted(); - populateUploadsSucceeded(); + tracker.addUploadsSucceeded(tracker.getTotalUploadsStarted()); AssertionError error = assertThrows(AssertionError.class, () -> tracker.addUploadsFailed(1)); assertTrue(error.getMessage().contains("Sum of failure count (")); } - public void testIncrementUploadsSucceeded() { + public void testAddUploadsSucceeded() { populateUploadsStarted(); - populateUploadsSucceeded(); + assertEquals(0L, tracker.getTotalUploadsSucceeded()); + tracker.addUploadsSucceeded(4); + assertEquals(4L, tracker.getTotalUploadsSucceeded()); + tracker.addUploadsSucceeded(2); + assertEquals(6L, tracker.getTotalUploadsSucceeded()); } - public void testInvalidIncrementUploadsSucceeded() { + public void testInvalidAddUploadsSucceeded() { populateUploadsStarted(); - populateUploadsFailed(); - AssertionError error = assertThrows(AssertionError.class, this::populateUploadsSucceeded); + tracker.addUploadsFailed(tracker.getTotalUploadsStarted()); + AssertionError error = assertThrows(AssertionError.class, () -> tracker.addUploadsSucceeded(1)); assertTrue(error.getMessage().contains("Sum of failure count (")); } - public void testSetUploadBytesStarted() { + public void testAddUploadBytesStarted() { populateUploadBytesStarted(); } - public void testSetUploadBytesFailed() { + public void testAddUploadBytesFailed() { populateUploadBytesStarted(); assertEquals(0L, tracker.getUploadBytesFailed()); long count1 = randomIntBetween(1, (int) tracker.getUploadBytesStarted() / 4); @@ -113,14 +121,14 @@ public void testSetUploadBytesFailed() { assertEquals(count1 + count2, tracker.getUploadBytesFailed()); } - public void testInvalidSetUploadBytesFailed() { + public void testInvalidAddUploadBytesFailed() { populateUploadBytesStarted(); tracker.addUploadBytesSucceeded(tracker.getUploadBytesStarted()); AssertionError error = assertThrows(AssertionError.class, () -> tracker.addUploadBytesFailed(1L)); assertTrue(error.getMessage().contains("Sum of failure count (")); } - public void testSetUploadBytesSucceeded() { + public void testAddUploadBytesSucceeded() { populateUploadBytesStarted(); assertEquals(0L, tracker.getUploadBytesSucceeded()); long count1 = randomIntBetween(1, (int) tracker.getUploadBytesStarted() / 4); @@ -131,7 +139,7 @@ public void testSetUploadBytesSucceeded() { assertEquals(count1 + count2, tracker.getUploadBytesSucceeded()); } - public void testInvalidSetUploadBytesSucceeded() { + public void testInvalidAddUploadBytesSucceeded() { populateUploadBytesStarted(); tracker.addUploadBytesFailed(tracker.getUploadBytesStarted()); AssertionError error = assertThrows(AssertionError.class, () -> tracker.addUploadBytesSucceeded(1L)); @@ -148,8 +156,9 @@ public void testAddUploadTimeInMillis() { assertEquals(duration1 + duration2, tracker.getTotalUploadTimeInMillis()); } - public void testSetLastUploadTimestamp() { - long lastUploadTimestamp = System.nanoTime() / 1_000_000L + randomIntBetween(10, 100); + public void testSetLastSuccessfulUploadTimestamp() { + assertEquals(0, tracker.getLastSuccessfulUploadTimestamp()); + long lastUploadTimestamp = System.currentTimeMillis() + randomIntBetween(10, 100); tracker.setLastSuccessfulUploadTimestamp(lastUploadTimestamp); assertEquals(lastUploadTimestamp, tracker.getLastSuccessfulUploadTimestamp()); } @@ -247,6 +256,134 @@ public void testUpdateUploadTimeMovingAverage() { assertEquals((double) sum / uploadTimeMovingAverageWindowSize, tracker.getUploadTimeMovingAverage(), 0.0d); } + public void testAddDownloadsSucceeded() { + assertEquals(0L, tracker.getTotalDownloadsSucceeded()); + tracker.addDownloadsSucceeded(4); + assertEquals(4L, tracker.getTotalDownloadsSucceeded()); + tracker.addDownloadsSucceeded(2); + assertEquals(6L, tracker.getTotalDownloadsSucceeded()); + } + + public void testAddDownloadBytesSucceeded() { + assertEquals(0L, tracker.getDownloadBytesSucceeded()); + long count1 = randomIntBetween(1, 500); + tracker.addDownloadBytesSucceeded(count1); + assertEquals(count1, tracker.getDownloadBytesSucceeded()); + long count2 = randomIntBetween(1, 500); + tracker.addDownloadBytesSucceeded(count2); + assertEquals(count1 + count2, tracker.getDownloadBytesSucceeded()); + } + + public void testAddDownloadTimeInMillis() { + assertEquals(0L, tracker.getTotalDownloadTimeInMillis()); + int duration1 = randomIntBetween(10, 50); + tracker.addDownloadTimeInMillis(duration1); + assertEquals(duration1, tracker.getTotalDownloadTimeInMillis()); + int duration2 = randomIntBetween(10, 50); + tracker.addDownloadTimeInMillis(duration2); + assertEquals(duration1 + duration2, tracker.getTotalDownloadTimeInMillis()); + } + + public void testSetLastSuccessfulDownloadTimestamp() { + assertEquals(0, tracker.getLastSuccessfulDownloadTimestamp()); + long lastSuccessfulDownloadTimestamp = System.currentTimeMillis() + randomIntBetween(10, 100); + tracker.setLastSuccessfulDownloadTimestamp(lastSuccessfulDownloadTimestamp); + assertEquals(lastSuccessfulDownloadTimestamp, tracker.getLastSuccessfulDownloadTimestamp()); + } + + public void testUpdateDowmloadBytesMovingAverage() { + int downloadBytesMovingAverageWindowSize = 20; + tracker = new RemoteTranslogTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize(), + downloadBytesMovingAverageWindowSize, + pressureSettings.getDownloadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getDownloadTimeMovingAverageWindowSize() + ); + assertFalse(tracker.isDownloadBytesMovingAverageReady()); + + long sum = 0; + for (int i = 1; i < downloadBytesMovingAverageWindowSize; i++) { + tracker.updateDownloadBytesMovingAverage(i); + sum += i; + assertFalse(tracker.isDownloadBytesMovingAverageReady()); + assertEquals((double) sum / i, tracker.getDownloadBytesMovingAverage(), 0.0d); + } + + tracker.updateDownloadBytesMovingAverage(downloadBytesMovingAverageWindowSize); + sum += downloadBytesMovingAverageWindowSize; + assertTrue(tracker.isDownloadBytesMovingAverageReady()); + assertEquals((double) sum / downloadBytesMovingAverageWindowSize, tracker.getDownloadBytesMovingAverage(), 0.0d); + + tracker.updateDownloadBytesMovingAverage(100); + sum = sum + 100 - 1; + assertEquals((double) sum / downloadBytesMovingAverageWindowSize, tracker.getDownloadBytesMovingAverage(), 0.0d); + } + + public void testUpdateDownloadBytesPerSecMovingAverage() { + int downloadBytesPerSecMovingAverageWindowSize = 20; + tracker = new RemoteTranslogTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize(), + pressureSettings.getDownloadBytesMovingAverageWindowSize(), + downloadBytesPerSecMovingAverageWindowSize, + pressureSettings.getDownloadTimeMovingAverageWindowSize() + ); + assertFalse(tracker.isDownloadBytesPerSecMovingAverageReady()); + + long sum = 0; + for (int i = 1; i < downloadBytesPerSecMovingAverageWindowSize; i++) { + tracker.updateDownloadBytesPerSecMovingAverage(i); + sum += i; + assertFalse(tracker.isDownloadBytesPerSecMovingAverageReady()); + assertEquals((double) sum / i, tracker.getDownloadBytesPerSecMovingAverage(), 0.0d); + } + + tracker.updateDownloadBytesPerSecMovingAverage(downloadBytesPerSecMovingAverageWindowSize); + sum += downloadBytesPerSecMovingAverageWindowSize; + assertTrue(tracker.isDownloadBytesPerSecMovingAverageReady()); + assertEquals((double) sum / downloadBytesPerSecMovingAverageWindowSize, tracker.getDownloadBytesPerSecMovingAverage(), 0.0d); + + tracker.updateDownloadBytesPerSecMovingAverage(100); + sum = sum + 100 - 1; + assertEquals((double) sum / downloadBytesPerSecMovingAverageWindowSize, tracker.getDownloadBytesPerSecMovingAverage(), 0.0d); + } + + public void testUpdateDownloadTimeMovingAverage() { + int downloadTimeMovingAverageWindowSize = 20; + tracker = new RemoteTranslogTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize(), + pressureSettings.getDownloadBytesMovingAverageWindowSize(), + pressureSettings.getDownloadBytesPerSecMovingAverageWindowSize(), + downloadTimeMovingAverageWindowSize + ); + assertFalse(tracker.isDownloadTimeMovingAverageReady()); + + long sum = 0; + for (int i = 1; i < downloadTimeMovingAverageWindowSize; i++) { + tracker.updateDownloadTimeMovingAverage(i); + sum += i; + assertFalse(tracker.isDownloadTimeMovingAverageReady()); + assertEquals((double) sum / i, tracker.getDownloadTimeMovingAverage(), 0.0d); + } + + tracker.updateDownloadTimeMovingAverage(downloadTimeMovingAverageWindowSize); + sum += downloadTimeMovingAverageWindowSize; + assertTrue(tracker.isDownloadTimeMovingAverageReady()); + assertEquals((double) sum / downloadTimeMovingAverageWindowSize, tracker.getDownloadTimeMovingAverage(), 0.0d); + + tracker.updateDownloadTimeMovingAverage(100); + sum = sum + 100 - 1; + assertEquals((double) sum / downloadTimeMovingAverageWindowSize, tracker.getDownloadTimeMovingAverage(), 0.0d); + } + public void testStatsObjectCreation() { populateDummyStats(); RemoteTranslogTracker.Stats actualStats = tracker.stats(); @@ -269,24 +406,8 @@ private void populateUploadsStarted() { assertEquals(0L, tracker.getTotalUploadsStarted()); tracker.addUploadsStarted(1); assertEquals(1L, tracker.getTotalUploadsStarted()); - tracker.addUploadsStarted(2); - assertEquals(3L, tracker.getTotalUploadsStarted()); - } - - private void populateUploadsFailed() { - assertEquals(0L, tracker.getTotalUploadsFailed()); - tracker.addUploadsFailed(1); - assertEquals(1L, tracker.getTotalUploadsFailed()); - tracker.addUploadsFailed(3); - assertEquals(4L, tracker.getTotalUploadsFailed()); - } - - private void populateUploadsSucceeded() { - assertEquals(0L, tracker.getTotalUploadsSucceeded()); - tracker.addUploadsSucceeded(4); - assertEquals(4L, tracker.getTotalUploadsSucceeded()); - tracker.addUploadsSucceeded(3); - assertEquals(7L, tracker.getTotalUploadsSucceeded()); + tracker.addUploadsStarted(5); + assertEquals(6L, tracker.getTotalUploadsStarted()); } private void populateUploadBytesStarted() { @@ -300,16 +421,20 @@ private void populateUploadBytesStarted() { } private void populateDummyStats() { - tracker.setLastSuccessfulUploadTimestamp(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100)); - tracker.addUploadsStarted(12); - tracker.addUploadsFailed(2); - tracker.addUploadsSucceeded(10); - int startedBytes = randomIntBetween(10, 100); - int failedBytes = randomIntBetween(1, startedBytes / 2); - int succeededBytes = randomIntBetween(1, startedBytes / 2); - tracker.addUploadBytesStarted(startedBytes); - tracker.addUploadBytesFailed(failedBytes); - tracker.addUploadBytesSucceeded(succeededBytes); + int startedBytesUpload = randomIntBetween(10, 100); + + tracker.addUploadBytesStarted(startedBytesUpload); + tracker.addUploadBytesFailed(randomIntBetween(1, startedBytesUpload / 2)); + tracker.addUploadBytesSucceeded(randomIntBetween(1, startedBytesUpload / 2)); tracker.addUploadTimeInMillis(randomIntBetween(10, 100)); + tracker.setLastSuccessfulUploadTimestamp(System.currentTimeMillis() + randomIntBetween(10, 100)); + tracker.addUploadsStarted(randomIntBetween(6, 10)); + tracker.addUploadsFailed(randomIntBetween(1, 5)); + tracker.addUploadsSucceeded(randomIntBetween(1, 5)); + + tracker.addDownloadBytesSucceeded(randomIntBetween(10, 100)); + tracker.addDownloadTimeInMillis(randomIntBetween(10, 100)); + tracker.setLastSuccessfulDownloadTimestamp(System.currentTimeMillis() + randomIntBetween(10, 100)); + tracker.addDownloadsSucceeded(randomIntBetween(1, 5)); } } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index 539dd6bea4760..5f9b6c7af8269 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -258,6 +258,42 @@ private Translog.Location addToTranslogAndListAndUpload(Translog translog, List< return loc; } + private static void assertUploadStatsNoFailures(RemoteTranslogTracker statsTracker) { + assertTrue(statsTracker.getUploadBytesStarted() > 0); + assertEquals(0, statsTracker.getUploadBytesFailed()); + assertTrue(statsTracker.getUploadBytesSucceeded() > 0); + assertTrue(statsTracker.getTotalUploadsStarted() > 0); + assertEquals(0, statsTracker.getTotalUploadsFailed()); + assertTrue(statsTracker.getTotalUploadsSucceeded() > 0); + assertTrue(statsTracker.getTotalUploadTimeInMillis() > 0); + assertTrue(statsTracker.getLastSuccessfulUploadTimestamp() > 0); + } + + private static void assertUploadStatsNoUploads(RemoteTranslogTracker statsTracker) { + assertEquals(0, statsTracker.getUploadBytesStarted()); + assertEquals(0, statsTracker.getUploadBytesFailed()); + assertEquals(0, statsTracker.getUploadBytesSucceeded()); + assertEquals(0, statsTracker.getTotalUploadsStarted()); + assertEquals(0, statsTracker.getTotalUploadsFailed()); + assertEquals(0, statsTracker.getTotalUploadsSucceeded()); + assertEquals(0, statsTracker.getTotalUploadTimeInMillis()); + assertEquals(0, statsTracker.getLastSuccessfulUploadTimestamp()); + } + + private static void assertDownloadStatsPopulated(RemoteTranslogTracker statsTracker) { + assertTrue(statsTracker.getDownloadBytesSucceeded() > 0); + assertTrue(statsTracker.getTotalDownloadsSucceeded() > 0); + assertTrue(statsTracker.getTotalDownloadTimeInMillis() > 0); + assertTrue(statsTracker.getLastSuccessfulDownloadTimestamp() > 0); + } + + private static void assertDownloadStatsNoDownloads(RemoteTranslogTracker statsTracker) { + assertEquals(0, statsTracker.getDownloadBytesSucceeded()); + assertEquals(0, statsTracker.getTotalDownloadsSucceeded()); + assertEquals(0, statsTracker.getTotalDownloadTimeInMillis()); + assertEquals(0, statsTracker.getLastSuccessfulDownloadTimestamp()); + } + public void testUploadWithPrimaryModeFalse() { // Test setup primaryMode.set(false); @@ -271,6 +307,9 @@ public void testUploadWithPrimaryModeFalse() { throw new RuntimeException(e); } assertTrue(translog.syncNeeded()); + RemoteTranslogTracker statsTracker = translog.getRemoteTranslogTracker(); + assertUploadStatsNoUploads(statsTracker); + assertDownloadStatsNoDownloads(statsTracker); } public void testUploadWithPrimaryModeTrue() { @@ -283,6 +322,9 @@ public void testUploadWithPrimaryModeTrue() { throw new RuntimeException(e); } assertFalse(translog.syncNeeded()); + RemoteTranslogTracker statsTracker = translog.getRemoteTranslogTracker(); + assertUploadStatsNoFailures(statsTracker); + assertDownloadStatsNoDownloads(statsTracker); } public void testSimpleOperations() throws IOException { @@ -332,6 +374,9 @@ public void testReadLocation() throws IOException { assertEquals(op, translog.readOperation(locs.get(i++))); } assertNull(translog.readOperation(new Translog.Location(100, 0, 0))); + RemoteTranslogTracker statsTracker = translog.getRemoteTranslogTracker(); + assertUploadStatsNoFailures(statsTracker); + assertDownloadStatsNoDownloads(statsTracker); } public void testReadLocationDownload() throws IOException { @@ -360,11 +405,16 @@ public void testReadLocationDownload() throws IOException { } // Creating RemoteFsTranslog with the same location - Translog newTranslog = create(translogDir, repository, translogUUID); + RemoteFsTranslog newTranslog = create(translogDir, repository, translogUUID); i = 0; for (Translog.Operation op : ops) { assertEquals(op, newTranslog.readOperation(locs.get(i++))); } + + RemoteTranslogTracker statsTracker = newTranslog.getRemoteTranslogTracker(); + assertUploadStatsNoUploads(statsTracker); + assertDownloadStatsPopulated(statsTracker); + try { newTranslog.close(); } catch (Exception e) { @@ -978,6 +1028,9 @@ public void testSyncUpTo() throws IOException { if (randomBoolean()) { translog.sync(); assertFalse("translog has been synced already", translog.ensureSynced(location)); + RemoteTranslogTracker statsTracker = translog.getRemoteTranslogTracker(); + assertUploadStatsNoFailures(statsTracker); + assertDownloadStatsNoDownloads(statsTracker); } } } @@ -1018,6 +1071,9 @@ public void testSyncUpFailure() throws IOException { assertFalse("all of the locations should be synced: " + location, translog.ensureSynced(location)); } + RemoteTranslogTracker statsTracker = translog.getRemoteTranslogTracker(); + assertUploadStatsNoFailures(statsTracker); + assertDownloadStatsNoDownloads(statsTracker); } public void testSyncUpToStream() throws IOException { @@ -1050,6 +1106,11 @@ public void testSyncUpToStream() throws IOException { translog.sync(); assertFalse("translog has been synced already", translog.ensureSynced(locations.stream())); } + + RemoteTranslogTracker statsTracker = translog.getRemoteTranslogTracker(); + assertUploadStatsNoFailures(statsTracker); + assertDownloadStatsNoDownloads(statsTracker); + for (Translog.Location location : locations) { assertFalse("all of the locations should be synced: " + location, translog.ensureSynced(location)); } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 8b4e56b95dc6d..17d0f8251dc61 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -20,6 +20,7 @@ import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.remote.RemoteTranslogTracker; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; @@ -208,6 +209,7 @@ public void testReadMetadataNoFile() throws IOException { remoteBaseTransferPath, null ); + RemoteTranslogTracker remoteTranslogTracker = setStatsTracker(translogTransferManager); doAnswer(invocation -> { LatchedActionListener> latchedActionListener = invocation.getArgument(3); List bmList = new LinkedList<>(); @@ -217,6 +219,8 @@ public void testReadMetadataNoFile() throws IOException { .listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class)); assertNull(translogTransferManager.readMetadata()); + assertEquals(0, remoteTranslogTracker.getDownloadBytesSucceeded()); + assertEquals(0, remoteTranslogTracker.getTotalDownloadsSucceeded()); } // This should happen most of the time - Just a single metadata file @@ -227,6 +231,7 @@ public void testReadMetadataSingleFile() throws IOException { remoteBaseTransferPath, null ); + RemoteTranslogTracker remoteTranslogTracker = setStatsTracker(translogTransferManager); TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2); String mdFilename = tm.getFileName(); doAnswer(invocation -> { @@ -244,6 +249,8 @@ public void testReadMetadataSingleFile() throws IOException { ); assertEquals(metadata, translogTransferManager.readMetadata()); + assertEquals(translogTransferManager.getMetadataBytes(metadata).length, remoteTranslogTracker.getDownloadBytesSucceeded()); + assertEquals(1, remoteTranslogTracker.getTotalDownloadsSucceeded()); } public void testReadMetadataReadException() throws IOException { @@ -253,7 +260,7 @@ public void testReadMetadataReadException() throws IOException { remoteBaseTransferPath, null ); - + RemoteTranslogTracker remoteTranslogTracker = setStatsTracker(translogTransferManager); TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2); String mdFilename = tm.getFileName(); @@ -269,6 +276,8 @@ public void testReadMetadataReadException() throws IOException { when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename))).thenThrow(new IOException("Something went wrong")); assertThrows(IOException.class, translogTransferManager::readMetadata); + assertEquals(0, remoteTranslogTracker.getDownloadBytesSucceeded()); + assertEquals(0, remoteTranslogTracker.getTotalDownloadsSucceeded()); } public void testMetadataFileNameOrder() throws IOException { @@ -288,6 +297,7 @@ public void testReadMetadataListException() throws IOException { remoteBaseTransferPath, null ); + RemoteTranslogTracker remoteTranslogTracker = setStatsTracker(translogTransferManager); doAnswer(invocation -> { LatchedActionListener> latchedActionListener = invocation.getArgument(3); @@ -299,6 +309,8 @@ public void testReadMetadataListException() throws IOException { when(transferService.downloadBlob(any(BlobPath.class), any(String.class))).thenThrow(new IOException("Something went wrong")); assertThrows(IOException.class, translogTransferManager::readMetadata); + assertEquals(0, remoteTranslogTracker.getDownloadBytesSucceeded()); + assertEquals(0, remoteTranslogTracker.getTotalDownloadsSucceeded()); } public void testDownloadTranslog() throws IOException { @@ -309,6 +321,7 @@ public void testDownloadTranslog() throws IOException { remoteBaseTransferPath, new FileTransferTracker(new ShardId("index", "indexUuid", 0)) ); + RemoteTranslogTracker remoteTranslogTracker = setStatsTracker(translogTransferManager); when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.tlog"))).thenReturn( new ByteArrayInputStream("Hello Translog".getBytes(StandardCharsets.UTF_8)) @@ -323,6 +336,8 @@ public void testDownloadTranslog() throws IOException { translogTransferManager.downloadTranslog("12", "23", location); assertTrue(Files.exists(location.resolve("translog-23.tlog"))); assertTrue(Files.exists(location.resolve("translog-23.ckp"))); + assertTrue(remoteTranslogTracker.getDownloadBytesSucceeded() > 0); + assertEquals(2, remoteTranslogTracker.getTotalDownloadsSucceeded()); } public void testDownloadTranslogAlreadyExists() throws IOException { @@ -337,6 +352,7 @@ public void testDownloadTranslogAlreadyExists() throws IOException { remoteBaseTransferPath, tracker ); + RemoteTranslogTracker remoteTranslogTracker = setStatsTracker(translogTransferManager); when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.tlog"))).thenReturn( new ByteArrayInputStream("Hello Translog".getBytes(StandardCharsets.UTF_8)) @@ -351,6 +367,8 @@ public void testDownloadTranslogAlreadyExists() throws IOException { verify(transferService).downloadBlob(any(BlobPath.class), eq("translog-23.ckp")); assertTrue(Files.exists(location.resolve("translog-23.tlog"))); assertTrue(Files.exists(location.resolve("translog-23.ckp"))); + assertTrue(remoteTranslogTracker.getDownloadBytesSucceeded() > 0); + assertEquals(2, remoteTranslogTracker.getTotalDownloadsSucceeded()); } public void testDownloadTranslogWithTrackerUpdated() throws IOException { @@ -366,6 +384,7 @@ public void testDownloadTranslogWithTrackerUpdated() throws IOException { remoteBaseTransferPath, tracker ); + RemoteTranslogTracker remoteTranslogTracker = setStatsTracker(translogTransferManager); when(transferService.downloadBlob(any(BlobPath.class), eq(translogFile))).thenReturn( new ByteArrayInputStream("Hello Translog".getBytes(StandardCharsets.UTF_8)) @@ -388,6 +407,9 @@ public void testDownloadTranslogWithTrackerUpdated() throws IOException { // Since the tracker already holds the files with success state, adding them with success state is allowed tracker.add(translogFile, true); tracker.add(checkpointFile, true); + + assertTrue(remoteTranslogTracker.getDownloadBytesSucceeded() > 0); + assertEquals(2, remoteTranslogTracker.getTotalDownloadsSucceeded()); } public void testDeleteTranslogSuccess() throws Exception { @@ -478,4 +500,31 @@ public void testDeleteTranslogFailure() throws Exception { translogTransferManager.deleteGenerationAsync(primaryTerm, Set.of(19L), () -> {}); assertEquals(2, tracker.allUploaded().size()); } + + public void testSetRemoteTranslogTracker() { + FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); + BlobStore blobStore = mock(BlobStore.class); + BlobStoreTransferService blobStoreTransferService = new BlobStoreTransferService(blobStore, threadPool); + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + shardId, + blobStoreTransferService, + remoteBaseTransferPath, + tracker + ); + + assertEquals(null, translogTransferManager.getRemoteTranslogTracker()); + RemoteTranslogTracker remoteTranslogTracker1 = mock(RemoteTranslogTracker.class); + translogTransferManager.setRemoteTranslogTracker(remoteTranslogTracker1); + assertEquals(remoteTranslogTracker1, translogTransferManager.getRemoteTranslogTracker()); + RemoteTranslogTracker remoteTranslogTracker2 = mock(RemoteTranslogTracker.class); + translogTransferManager.setRemoteTranslogTracker(remoteTranslogTracker2); + assertEquals(remoteTranslogTracker1, translogTransferManager.getRemoteTranslogTracker()); + } + + private RemoteTranslogTracker setStatsTracker(TranslogTransferManager translogTransferManager) { + RemoteTranslogTracker remoteTranslogTracker = new RemoteTranslogTracker(shardId, 20, 20, 20, 20, 20, 20); + translogTransferManager.setRemoteTranslogTracker(remoteTranslogTracker); + + return remoteTranslogTracker; + } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java index 1c38e087e8da9..6f59a18195700 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java @@ -66,6 +66,7 @@ import org.opensearch.index.mapper.KeywordFieldMapper; import org.opensearch.index.mapper.Mapper; import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.shard.IllegalIndexShardStateException; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; @@ -619,4 +620,15 @@ public void testConflictingEngineFactories() { ".*multiple engine factories provided for \\[foobar/.*\\]: \\[.*FooEngineFactory\\],\\[.*BarEngineFactory\\].*"; assertThat(e, hasToString(new RegexMatcher(pattern))); } + + public void testSetPressureService() { + IndicesService indicesService = getIndicesService(); + RemoteStorePressureService remoteStorePressureService1 = mock(RemoteStorePressureService.class); + RemoteStorePressureService remoteStorePressureService2 = mock(RemoteStorePressureService.class); + assertEquals(null, indicesService.getPressureService()); + indicesService.setPressureService(remoteStorePressureService1); + assertEquals(remoteStorePressureService1, indicesService.getPressureService()); + indicesService.setPressureService(remoteStorePressureService2); + assertEquals(remoteStorePressureService1, indicesService.getPressureService()); + } }