Skip to content

Commit

Permalink
Added stats debug logs in remote store stats publisher and added metr…
Browse files Browse the repository at this point in the history
…ic collector in async flow

Signed-off-by: vikasvb90 <[email protected]>
  • Loading branch information
vikasvb90 committed Nov 24, 2023
1 parent 17013ad commit 3ba52e3
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ final class S3ClientSettings {
static final Setting.AffixSetting<TimeValue> REQUEST_TIMEOUT_SETTING = Setting.affixKeySetting(
PREFIX,
"request_timeout",
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(10), Property.NodeScope)
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(5), Property.NodeScope)
);

/** The connection timeout for connecting to s3. */
Expand All @@ -198,14 +198,14 @@ final class S3ClientSettings {
static final Setting.AffixSetting<Integer> MAX_CONNECTIONS_SETTING = Setting.affixKeySetting(
PREFIX,
"max_connections",
key -> Setting.intSetting(key, 100, Property.NodeScope)
key -> Setting.intSetting(key, 500, Property.NodeScope)
);

/** Connection acquisition timeout for new connections to S3. */
static final Setting.AffixSetting<TimeValue> CONNECTION_ACQUISITION_TIMEOUT = Setting.affixKeySetting(
PREFIX,
"connection_acquisition_timeout",
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(20), Property.NodeScope)
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(15), Property.NodeScope)
);

/** The maximum pending connections to S3. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,25 +99,32 @@ public S3RepositoryPlugin(final Settings settings, final Path configPath) {
@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
List<ExecutorBuilder<?>> executorBuilders = new ArrayList<>();
int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors(settings));
int halfProc = halfNumberOfProcessors(allocatedProcessors(settings));
executorBuilders.add(
new FixedExecutorBuilder(settings, URGENT_FUTURE_COMPLETION, urgentPoolCount(settings), 10_000, URGENT_FUTURE_COMPLETION)
);
executorBuilders.add(new ScalingExecutorBuilder(URGENT_STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
executorBuilders.add(new ScalingExecutorBuilder(URGENT_STREAM_READER, 1, halfProc, TimeValue.timeValueMinutes(5)));
executorBuilders.add(
new FixedExecutorBuilder(settings, PRIORITY_FUTURE_COMPLETION, priorityPoolCount(settings), 10_000, PRIORITY_FUTURE_COMPLETION)
new ScalingExecutorBuilder(PRIORITY_FUTURE_COMPLETION, 1, allocatedProcessors(settings), TimeValue.timeValueMinutes(5))
);
executorBuilders.add(new ScalingExecutorBuilder(PRIORITY_STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
executorBuilders.add(new ScalingExecutorBuilder(PRIORITY_STREAM_READER, 1, halfProc, TimeValue.timeValueMinutes(5)));

executorBuilders.add(new FixedExecutorBuilder(settings, FUTURE_COMPLETION, normalPoolCount(settings), 10_000, FUTURE_COMPLETION));
executorBuilders.add(
new ScalingExecutorBuilder(STREAM_READER, 1, 3 * allocatedProcessors(settings), TimeValue.timeValueMinutes(5))
new ScalingExecutorBuilder(FUTURE_COMPLETION, 1, allocatedProcessors(settings), TimeValue.timeValueMinutes(5))
);
executorBuilders.add(
new ScalingExecutorBuilder(
STREAM_READER,
allocatedProcessors(settings),
4 * allocatedProcessors(settings),
TimeValue.timeValueMinutes(5)
)
);
return executorBuilders;
}

static int halfAllocatedProcessorsMaxFive(final int allocatedProcessors) {
return boundedBy((allocatedProcessors + 1) / 2, 1, 5);
static int halfNumberOfProcessors(int numberOfProcessors) {
return (numberOfProcessors + 1) / 2;
}

S3RepositoryPlugin(final Settings settings, final Path configPath, final S3Service service, final S3AsyncService s3AsyncService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import software.amazon.awssdk.metrics.MetricPublisher;
import software.amazon.awssdk.metrics.MetricRecord;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.blobstore.BlobStore;

import java.time.Duration;
Expand All @@ -21,6 +23,7 @@

public class StatsMetricPublisher {

private static final Logger LOGGER = LogManager.getLogger(StatsMetricPublisher.class);
private final Stats stats = new Stats();

private final Map<BlobStore.Metric, Stats> extendedStats = new HashMap<>() {
Expand All @@ -35,6 +38,7 @@ public class StatsMetricPublisher {
public MetricPublisher listObjectsMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "List objects request metrics: " + metricCollection);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down Expand Up @@ -64,6 +68,7 @@ public void close() {}
public MetricPublisher deleteObjectsMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "Delete objects request metrics: " + metricCollection);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down Expand Up @@ -93,6 +98,7 @@ public void close() {}
public MetricPublisher getObjectMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "Get object request metrics: " + metricCollection);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down Expand Up @@ -122,6 +128,7 @@ public void close() {}
public MetricPublisher putObjectMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "Put object request metrics: " + metricCollection);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down Expand Up @@ -151,6 +158,7 @@ public void close() {}
public MetricPublisher multipartUploadMetricCollector = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "Multi-part request metrics: " + metricCollection);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.StatsMetricPublisher;
import org.opensearch.repositories.s3.io.CheckedContainer;

import java.io.BufferedInputStream;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class AsyncPartsHandler {
* @param completedParts Reference of completed parts
* @param inputStreamContainers Checksum containers
* @return list of completable futures
* @param statsMetricPublisher sdk metric publisher
* @throws IOException thrown in case of an IO error
*/
public static List<CompletableFuture<CompletedPart>> uploadParts(
Expand All @@ -66,7 +68,8 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
StreamContext streamContext,
String uploadId,
AtomicReferenceArray<CompletedPart> completedParts,
AtomicReferenceArray<CheckedContainer> inputStreamContainers
AtomicReferenceArray<CheckedContainer> inputStreamContainers,
StatsMetricPublisher statsMetricPublisher
) throws IOException {
List<CompletableFuture<CompletedPart>> futures = new ArrayList<>();
for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) {
Expand All @@ -77,6 +80,7 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
.partNumber(partIdx + 1)
.key(uploadRequest.getKey())
.uploadId(uploadId)
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector))
.contentLength(inputStreamContainer.getContentLength());
if (uploadRequest.doRemoteDataIntegrityCheck()) {
uploadPartRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,14 @@ private void uploadInParts(
handleException(returnFuture, () -> "Failed to initiate multipart upload", throwable);
} else {
log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId());
doUploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, createMultipartUploadResponse.uploadId());
doUploadInParts(
s3AsyncClient,
uploadRequest,
streamContext,
returnFuture,
createMultipartUploadResponse.uploadId(),
statsMetricPublisher
);
}
});
}
Expand All @@ -156,7 +163,8 @@ private void doUploadInParts(
UploadRequest uploadRequest,
StreamContext streamContext,
CompletableFuture<Void> returnFuture,
String uploadId
String uploadId,
StatsMetricPublisher statsMetricPublisher
) {

// The list of completed parts must be sorted
Expand All @@ -174,7 +182,8 @@ private void doUploadInParts(
streamContext,
uploadId,
completedParts,
inputStreamContainers
inputStreamContainers,
statsMetricPublisher
);
} catch (Exception ex) {
try {
Expand All @@ -198,7 +207,7 @@ private void doUploadInParts(
}
return null;
})
.thenCompose(ignore -> completeMultipartUpload(s3AsyncClient, uploadRequest, uploadId, completedParts))
.thenCompose(ignore -> completeMultipartUpload(s3AsyncClient, uploadRequest, uploadId, completedParts, statsMetricPublisher))
.handle(handleExceptionOrResponse(s3AsyncClient, uploadRequest, returnFuture, uploadId))
.exceptionally(throwable -> {
handleException(returnFuture, () -> "Unexpected exception occurred", throwable);
Expand Down Expand Up @@ -245,7 +254,8 @@ private CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUplo
S3AsyncClient s3AsyncClient,
UploadRequest uploadRequest,
String uploadId,
AtomicReferenceArray<CompletedPart> completedParts
AtomicReferenceArray<CompletedPart> completedParts,
StatsMetricPublisher statsMetricPublisher
) {

log.debug(() -> new ParameterizedMessage("Sending completeMultipartUploadRequest, uploadId: {}", uploadId));
Expand All @@ -254,6 +264,7 @@ private CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUplo
.bucket(uploadRequest.getBucket())
.key(uploadRequest.getKey())
.uploadId(uploadId)
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector))
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ public void testThereIsADefaultClientByDefault() {
assertThat(defaultSettings.protocol, is(Protocol.HTTPS));
assertThat(defaultSettings.proxySettings, is(ProxySettings.NO_PROXY_SETTINGS));
assertThat(defaultSettings.readTimeoutMillis, is(50 * 1000));
assertThat(defaultSettings.requestTimeoutMillis, is(120 * 1000));
assertThat(defaultSettings.requestTimeoutMillis, is(5 * 60 * 1000));
assertThat(defaultSettings.connectionTimeoutMillis, is(10 * 1000));
assertThat(defaultSettings.connectionTTLMillis, is(5 * 1000));
assertThat(defaultSettings.maxConnections, is(100));
assertThat(defaultSettings.maxConnections, is(500));
assertThat(defaultSettings.maxRetries, is(3));
assertThat(defaultSettings.throttleRetries, is(true));
}
Expand Down

0 comments on commit 3ba52e3

Please sign in to comment.