Skip to content

Commit

Permalink
chore(s3stream): metrics of upload wal (#856)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Dec 26, 2023
1 parent 59716a7 commit eb69853
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 20 deletions.
16 changes: 10 additions & 6 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.operations.S3Stage;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.S3Operator;
Expand Down Expand Up @@ -127,8 +128,8 @@ public S3Storage(Config config, WriteAheadLog deltaWAL, StreamManager streamMana
this.streamManager = streamManager;
this.objectManager = objectManager;
this.s3Operator = s3Operator;

this.drainBackoffTask = this.backgroundExecutor.scheduleWithFixedDelay(this::tryDrainBackoffRecords, 100, 100, TimeUnit.MILLISECONDS);
S3StreamMetricsManager.registerInflightWALUploadTasksCountSupplier(this.inflightWALUploadTasks::size);
}

@Override
Expand Down Expand Up @@ -426,14 +427,14 @@ public CompletableFuture<Void> forceUpload(long streamId) {
List<CompletableFuture<Void>> inflightWALUploadTasks = new ArrayList<>(this.inflightWALUploadTasks);
// await inflight stream set object upload tasks to group force upload tasks.
CompletableFuture.allOf(inflightWALUploadTasks.toArray(new CompletableFuture[0])).whenComplete((nil, ex) -> {
S3StreamMetricsManager.recordOperationLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FORCE_UPLOAD_STORAGE_WAL_AWAIT);
S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_AWAIT_INFLIGHT);
uploadDeltaWAL(streamId);
FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.toArray(new CompletableFuture[0])), cf);
if (LogCache.MATCH_ALL_STREAMS != streamId) {
callbackSequencer.tryFree(streamId);
}
});
cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordOperationLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FORCE_UPLOAD_STORAGE_WAL));
cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_COMPLETE));
return cf;
}

Expand Down Expand Up @@ -503,13 +504,13 @@ CompletableFuture<Void> uploadDeltaWAL(LogCache.LogCacheBlock logCacheBlock) {
* Upload cache block to S3. The earlier cache block will have smaller objectId and commit first.
*/
CompletableFuture<Void> uploadDeltaWAL(DeltaWALUploadTaskContext context) {
TimerUtil timerUtil = new TimerUtil();
context.timer = new TimerUtil();
CompletableFuture<Void> cf = new CompletableFuture<>();
context.cf = cf;
inflightWALUploadTasks.add(cf);
backgroundExecutor.execute(() -> FutureUtil.exec(() -> uploadDeltaWAL0(context), cf, LOGGER, "uploadDeltaWAL"));
cf.whenComplete((nil, ex) -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_STORAGE_WAL);
S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMPLETE);
inflightWALUploadTasks.remove(cf);
if (ex != null) {
LOGGER.error("upload delta WAL fail", ex);
Expand Down Expand Up @@ -551,9 +552,10 @@ private void uploadDeltaWAL0(DeltaWALUploadTaskContext context) {

private void prepareDeltaWALUpload(DeltaWALUploadTaskContext context) {
context.task.prepare().thenAcceptAsync(nil -> {
S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_PREPARE);
// 1. poll out current task and trigger upload.
DeltaWALUploadTaskContext peek = walPrepareQueue.poll();
Objects.requireNonNull(peek).task.upload();
Objects.requireNonNull(peek).task.upload().thenAcceptAsync(nil2 -> S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_UPLOAD));
// 2. add task to commit queue.
boolean walObjectCommitQueueEmpty = walCommitQueue.isEmpty();
walCommitQueue.add(peek);
Expand All @@ -570,6 +572,7 @@ private void prepareDeltaWALUpload(DeltaWALUploadTaskContext context) {

private void commitDeltaWALUpload(DeltaWALUploadTaskContext context) {
context.task.commit().thenAcceptAsync(nil -> {
S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMMIT);
// 1. poll out current task
walCommitQueue.poll();
if (context.cache.confirmOffset() != 0) {
Expand Down Expand Up @@ -796,6 +799,7 @@ public int handle(int memoryRequired) {
}

public static class DeltaWALUploadTaskContext {
TimerUtil timer;
LogCache.LogCacheBlock cache;
DeltaWALUploadTask task;
CompletableFuture<Void> cf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ public class S3StreamMetricsConstant {
public static final String AVAILABLE_INFLIGHT_READ_AHEAD_SIZE_METRIC_NAME = "available_inflight_read_ahead_size";
public static final String AVAILABLE_S3_INFLIGHT_READ_QUOTA_METRIC_NAME = "available_s3_inflight_read_quota";
public static final String AVAILABLE_S3_INFLIGHT_WRITE_QUOTA_METRIC_NAME = "available_s3_inflight_write_quota";
public static final String INFLIGHT_WAL_UPLOAD_TASKS_COUNT_METRIC_NAME = "inflight_wal_upload_tasks_count";
public static final String COMPACTION_READ_SIZE_METRIC_NAME = "compaction_read_size_total";
public static final String COMPACTION_WRITE_SIZE_METRIC_NAME = "compaction_write_size_total";
public static final AttributeKey<String> LABEL_OPERATION_TYPE = AttributeKey.stringKey("operation_type");
public static final AttributeKey<String> LABEL_OPERATION_NAME = AttributeKey.stringKey("operation_name");
public static final AttributeKey<String> LABEL_SIZE_NAME = AttributeKey.stringKey("size");
public static final AttributeKey<String> LABEL_APPEND_WAL_STAGE = AttributeKey.stringKey("stage");
public static final AttributeKey<String> LABEL_STAGE = AttributeKey.stringKey("stage");
public static final AttributeKey<String> LABEL_STATUS = AttributeKey.stringKey("status");
public static final AttributeKey<String> LABEL_OBJECT_STAGE = AttributeKey.stringKey("stage");
public static final AttributeKey<String> LABEL_ALLOCATE_BYTE_BUF_SOURCE = AttributeKey.stringKey("source");
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package com.automq.stream.s3.metrics;

import com.automq.stream.s3.metrics.operations.S3MetricsType;
import com.automq.stream.s3.metrics.operations.S3ObjectStage;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.operations.S3Stage;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
Expand Down Expand Up @@ -56,6 +56,7 @@ public class S3StreamMetricsManager {
private static ObservableLongGauge availableInflightReadAheadSize = new NoopObservableLongGauge();
private static ObservableLongGauge availableInflightS3ReadQuota = new NoopObservableLongGauge();
private static ObservableLongGauge availableInflightS3WriteQuota = new NoopObservableLongGauge();
private static ObservableLongGauge inflightWALUploadTasksCount = new NoopObservableLongGauge();
private static LongCounter compactionReadSizeInTotal = new NoopLongCounter();
private static LongCounter compactionWriteSizeInTotal = new NoopLongCounter();
private static Supplier<Long> networkInboundAvailableBandwidthSupplier = () -> 0L;
Expand All @@ -69,6 +70,7 @@ public class S3StreamMetricsManager {
private static Supplier<Long> blockCacheSizeSupplier = () -> 0L;
private static Supplier<Integer> availableInflightS3ReadQuotaSupplier = () -> 0;
private static Supplier<Integer> availableInflightS3WriteQuotaSupplier = () -> 0;
private static Supplier<Integer> inflightWALUploadTasksCountSupplier = () -> 0;
private static Supplier<AttributesBuilder> attributesBuilderSupplier = null;

public static void initAttributesBuilder(Supplier<AttributesBuilder> attributesBuilderSupplier) {
Expand Down Expand Up @@ -195,6 +197,10 @@ public static void initMetrics(Meter meter, String prefix) {
.setDescription("Available inflight S3 write quota")
.ofLongs()
.buildWithCallback(result -> result.record((long) availableInflightS3WriteQuotaSupplier.get(), newAttributesBuilder().build()));
inflightWALUploadTasksCount = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.INFLIGHT_WAL_UPLOAD_TASKS_COUNT_METRIC_NAME)
.setDescription("Inflight upload WAL tasks count")
.ofLongs()
.buildWithCallback(result -> result.record((long) inflightWALUploadTasksCountSupplier.get(), newAttributesBuilder().build()));
compactionReadSizeInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.COMPACTION_READ_SIZE_METRIC_NAME)
.setDescription("Compaction read size")
.setUnit("bytes")
Expand Down Expand Up @@ -254,6 +260,10 @@ public static void registerInflightReadSizeLimiterSupplier(Supplier<Integer> ava
S3StreamMetricsManager.availableInflightReadAheadSizeSupplier = availableInflightReadAheadSizeSupplier;
}

public static void registerInflightWALUploadTasksCountSupplier(Supplier<Integer> inflightWALUploadTasksCountSupplier) {
S3StreamMetricsManager.inflightWALUploadTasksCountSupplier = inflightWALUploadTasksCountSupplier;
}

public static void recordS3UploadSize(long value) {
s3UploadSizeInTotal.add(value, newAttributesBuilder().build());
}
Expand Down Expand Up @@ -293,11 +303,11 @@ public static void recordOperationLatency(long value, S3Operation operation, lon
operationLatency.record(value, attributesBuilder.build());
}

public static void recordAppendWALLatency(long value, String stage) {
public static void recordStageLatency(long value, S3Stage stage) {
Attributes attributes = newAttributesBuilder()
.put(S3StreamMetricsConstant.LABEL_OPERATION_TYPE, S3MetricsType.S3Storage.getName())
.put(S3StreamMetricsConstant.LABEL_OPERATION_NAME, S3Operation.APPEND_STORAGE_WAL.getName())
.put(S3StreamMetricsConstant.LABEL_APPEND_WAL_STAGE, stage)
.put(S3StreamMetricsConstant.LABEL_OPERATION_TYPE, stage.getOperation().getType().getName())
.put(S3StreamMetricsConstant.LABEL_OPERATION_NAME, stage.getOperation().getName())
.put(S3StreamMetricsConstant.LABEL_STAGE, stage.getName())
.build();
operationLatency.record(value, attributes);
}
Expand Down Expand Up @@ -332,7 +342,7 @@ public static void recordObjectNum(long value) {

public static void recordObjectStageCost(long value, S3ObjectStage stage) {
Attributes attributes = newAttributesBuilder()
.put(S3StreamMetricsConstant.LABEL_OBJECT_STAGE, stage.getName())
.put(S3StreamMetricsConstant.LABEL_STAGE, stage.getName())
.build();
objectStageCost.record(value, attributes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package com.automq.stream.s3.metrics.operations;

/**
* TODO: Maybe merge into {@link S3Stage}
*/
public enum S3ObjectStage {
UPLOAD_PART("upload_part"),
READY_CLOSE("ready_close"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.stream.s3.metrics.operations;

public enum S3Stage {

/* Append WAL stages start */
APPEND_WAL_BEFORE(S3Operation.APPEND_STORAGE_WAL, "before"),
APPEND_WAL_BLOCK_POLLED(S3Operation.APPEND_STORAGE_WAL, "block_polled"),
APPEND_WAL_AWAIT(S3Operation.APPEND_STORAGE_WAL, "await"),
APPEND_WAL_WRITE(S3Operation.APPEND_STORAGE_WAL, "write"),
APPEND_WAL_AFTER(S3Operation.APPEND_STORAGE_WAL, "after"),
APPEND_WAL_COMPLETE(S3Operation.APPEND_STORAGE_WAL, "complete"),
/* Append WAL stages end */

/* Force upload WAL start */
FORCE_UPLOAD_WAL_AWAIT_INFLIGHT(S3Operation.FORCE_UPLOAD_STORAGE_WAL, "await_inflight"),
FORCE_UPLOAD_WAL_COMPLETE(S3Operation.FORCE_UPLOAD_STORAGE_WAL, "complete"),
/* Force upload WAL end */

/* Upload WAL start */
UPLOAD_WAL_PREPARE(S3Operation.UPLOAD_STORAGE_WAL, "prepare"),
UPLOAD_WAL_UPLOAD(S3Operation.UPLOAD_STORAGE_WAL, "upload"),
UPLOAD_WAL_COMMIT(S3Operation.UPLOAD_STORAGE_WAL, "commit"),
UPLOAD_WAL_COMPLETE(S3Operation.UPLOAD_STORAGE_WAL, "complete");
/* Upload WAL end */

private final S3Operation operation;
private final String name;

S3Stage(S3Operation operation, String name) {
this.operation = operation;
this.name = name;
}

public S3Operation getOperation() {
return operation;
}

public String getName() {
return name;
}

@Override
public String toString() {
return "S3Stage{" +
"operation=" + operation.getName() +
", name='" + name + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Stage;
import com.automq.stream.s3.wal.util.WALUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
Expand Down Expand Up @@ -125,6 +126,6 @@ public long size() {

@Override
public void polled() {
S3StreamMetricsManager.recordAppendWALLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), "block_polled");
S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_BLOCK_POLLED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.operations.S3Stage;
import com.automq.stream.s3.wal.util.WALChannel;
import com.automq.stream.s3.wal.util.WALUtil;
import com.automq.stream.utils.ThreadUtils;
Expand Down Expand Up @@ -416,8 +417,8 @@ private AppendResult append0(ByteBuf body, int crc) throws OverCapacityException
slidingWindowService.tryWriteBlock();

final AppendResult appendResult = new AppendResultImpl(expectedWriteOffset, appendResultFuture);
appendResult.future().whenComplete((nil, ex) -> S3StreamMetricsManager.recordAppendWALLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), "complete"));
S3StreamMetricsManager.recordAppendWALLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), "before");
appendResult.future().whenComplete((nil, ex) -> S3StreamMetricsManager.recordStageLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_COMPLETE));
S3StreamMetricsManager.recordStageLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_BEFORE);
return appendResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Stage;
import com.automq.stream.s3.wal.util.WALChannel;
import com.automq.stream.s3.wal.util.WALUtil;
import com.automq.stream.utils.FutureUtil;
Expand Down Expand Up @@ -342,7 +343,7 @@ private void writeBlockData(BlockBatch blocks) throws IOException {
walChannel.write(block.data(), position);
}
walChannel.flush();
S3StreamMetricsManager.recordAppendWALLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), "write");
S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_WRITE);
}

private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOException, OverCapacityException {
Expand Down Expand Up @@ -531,7 +532,7 @@ public WriteBlockProcessor(BlockBatch blocks) {

@Override
public void run() {
S3StreamMetricsManager.recordAppendWALLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), "await");
S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_AWAIT);
writeBlock(this.blocks);
}

Expand All @@ -555,7 +556,7 @@ public String toString() {
return "CallbackResult{" + "flushedOffset=" + flushedOffset() + '}';
}
});
S3StreamMetricsManager.recordAppendWALLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), "after");
S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_AFTER);
} catch (Exception e) {
FutureUtil.completeExceptionally(blocks.futures(), e);
LOGGER.error(String.format("failed to write blocks, startOffset: %s", blocks.startOffset()), e);
Expand Down

0 comments on commit eb69853

Please sign in to comment.