From eb69853172a62d0f773dc269a9ad0cfa906ec50e Mon Sep 17 00:00:00 2001 From: Yu Ning <78631860+Chillax-0v0@users.noreply.github.com> Date: Tue, 26 Dec 2023 19:03:47 +0800 Subject: [PATCH] chore(s3stream): metrics of upload wal (#856) Signed-off-by: Ning Yu --- .../java/com/automq/stream/s3/S3Storage.java | 16 +++-- .../s3/metrics/S3StreamMetricsConstant.java | 4 +- .../s3/metrics/S3StreamMetricsManager.java | 22 +++++-- .../s3/metrics/operations/S3ObjectStage.java | 3 + .../stream/s3/metrics/operations/S3Stage.java | 66 +++++++++++++++++++ .../com/automq/stream/s3/wal/BlockImpl.java | 3 +- .../automq/stream/s3/wal/BlockWALService.java | 5 +- .../stream/s3/wal/SlidingWindowService.java | 7 +- 8 files changed, 106 insertions(+), 20 deletions(-) create mode 100644 s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Stage.java diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index df3b3f1af..6d7495cef 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -27,6 +27,7 @@ import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.operations.S3Operation; +import com.automq.stream.s3.metrics.operations.S3Stage; import com.automq.stream.s3.model.StreamRecordBatch; import com.automq.stream.s3.objects.ObjectManager; import com.automq.stream.s3.operator.S3Operator; @@ -127,8 +128,8 @@ public S3Storage(Config config, WriteAheadLog deltaWAL, StreamManager streamMana this.streamManager = streamManager; this.objectManager = objectManager; this.s3Operator = s3Operator; - this.drainBackoffTask = this.backgroundExecutor.scheduleWithFixedDelay(this::tryDrainBackoffRecords, 100, 100, TimeUnit.MILLISECONDS); + S3StreamMetricsManager.registerInflightWALUploadTasksCountSupplier(this.inflightWALUploadTasks::size); } @Override @@ -426,14 +427,14 @@ public CompletableFuture forceUpload(long streamId) { List> inflightWALUploadTasks = new ArrayList<>(this.inflightWALUploadTasks); // await inflight stream set object upload tasks to group force upload tasks. CompletableFuture.allOf(inflightWALUploadTasks.toArray(new CompletableFuture[0])).whenComplete((nil, ex) -> { - S3StreamMetricsManager.recordOperationLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FORCE_UPLOAD_STORAGE_WAL_AWAIT); + S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_AWAIT_INFLIGHT); uploadDeltaWAL(streamId); FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.toArray(new CompletableFuture[0])), cf); if (LogCache.MATCH_ALL_STREAMS != streamId) { callbackSequencer.tryFree(streamId); } }); - cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordOperationLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FORCE_UPLOAD_STORAGE_WAL)); + cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_COMPLETE)); return cf; } @@ -503,13 +504,13 @@ CompletableFuture uploadDeltaWAL(LogCache.LogCacheBlock logCacheBlock) { * Upload cache block to S3. The earlier cache block will have smaller objectId and commit first. */ CompletableFuture uploadDeltaWAL(DeltaWALUploadTaskContext context) { - TimerUtil timerUtil = new TimerUtil(); + context.timer = new TimerUtil(); CompletableFuture cf = new CompletableFuture<>(); context.cf = cf; inflightWALUploadTasks.add(cf); backgroundExecutor.execute(() -> FutureUtil.exec(() -> uploadDeltaWAL0(context), cf, LOGGER, "uploadDeltaWAL")); cf.whenComplete((nil, ex) -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_STORAGE_WAL); + S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMPLETE); inflightWALUploadTasks.remove(cf); if (ex != null) { LOGGER.error("upload delta WAL fail", ex); @@ -551,9 +552,10 @@ private void uploadDeltaWAL0(DeltaWALUploadTaskContext context) { private void prepareDeltaWALUpload(DeltaWALUploadTaskContext context) { context.task.prepare().thenAcceptAsync(nil -> { + S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_PREPARE); // 1. poll out current task and trigger upload. DeltaWALUploadTaskContext peek = walPrepareQueue.poll(); - Objects.requireNonNull(peek).task.upload(); + Objects.requireNonNull(peek).task.upload().thenAcceptAsync(nil2 -> S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_UPLOAD)); // 2. add task to commit queue. boolean walObjectCommitQueueEmpty = walCommitQueue.isEmpty(); walCommitQueue.add(peek); @@ -570,6 +572,7 @@ private void prepareDeltaWALUpload(DeltaWALUploadTaskContext context) { private void commitDeltaWALUpload(DeltaWALUploadTaskContext context) { context.task.commit().thenAcceptAsync(nil -> { + S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMMIT); // 1. poll out current task walCommitQueue.poll(); if (context.cache.confirmOffset() != 0) { @@ -796,6 +799,7 @@ public int handle(int memoryRequired) { } public static class DeltaWALUploadTaskContext { + TimerUtil timer; LogCache.LogCacheBlock cache; DeltaWALUploadTask task; CompletableFuture cf; diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java index 9edf37463..beb625dbe 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java @@ -94,13 +94,13 @@ public class S3StreamMetricsConstant { public static final String AVAILABLE_INFLIGHT_READ_AHEAD_SIZE_METRIC_NAME = "available_inflight_read_ahead_size"; public static final String AVAILABLE_S3_INFLIGHT_READ_QUOTA_METRIC_NAME = "available_s3_inflight_read_quota"; public static final String AVAILABLE_S3_INFLIGHT_WRITE_QUOTA_METRIC_NAME = "available_s3_inflight_write_quota"; + public static final String INFLIGHT_WAL_UPLOAD_TASKS_COUNT_METRIC_NAME = "inflight_wal_upload_tasks_count"; public static final String COMPACTION_READ_SIZE_METRIC_NAME = "compaction_read_size_total"; public static final String COMPACTION_WRITE_SIZE_METRIC_NAME = "compaction_write_size_total"; public static final AttributeKey LABEL_OPERATION_TYPE = AttributeKey.stringKey("operation_type"); public static final AttributeKey LABEL_OPERATION_NAME = AttributeKey.stringKey("operation_name"); public static final AttributeKey LABEL_SIZE_NAME = AttributeKey.stringKey("size"); - public static final AttributeKey LABEL_APPEND_WAL_STAGE = AttributeKey.stringKey("stage"); + public static final AttributeKey LABEL_STAGE = AttributeKey.stringKey("stage"); public static final AttributeKey LABEL_STATUS = AttributeKey.stringKey("status"); - public static final AttributeKey LABEL_OBJECT_STAGE = AttributeKey.stringKey("stage"); public static final AttributeKey LABEL_ALLOCATE_BYTE_BUF_SOURCE = AttributeKey.stringKey("source"); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index c34691d82..0ad7636fa 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -17,9 +17,9 @@ package com.automq.stream.s3.metrics; -import com.automq.stream.s3.metrics.operations.S3MetricsType; import com.automq.stream.s3.metrics.operations.S3ObjectStage; import com.automq.stream.s3.metrics.operations.S3Operation; +import com.automq.stream.s3.metrics.operations.S3Stage; import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; @@ -56,6 +56,7 @@ public class S3StreamMetricsManager { private static ObservableLongGauge availableInflightReadAheadSize = new NoopObservableLongGauge(); private static ObservableLongGauge availableInflightS3ReadQuota = new NoopObservableLongGauge(); private static ObservableLongGauge availableInflightS3WriteQuota = new NoopObservableLongGauge(); + private static ObservableLongGauge inflightWALUploadTasksCount = new NoopObservableLongGauge(); private static LongCounter compactionReadSizeInTotal = new NoopLongCounter(); private static LongCounter compactionWriteSizeInTotal = new NoopLongCounter(); private static Supplier networkInboundAvailableBandwidthSupplier = () -> 0L; @@ -69,6 +70,7 @@ public class S3StreamMetricsManager { private static Supplier blockCacheSizeSupplier = () -> 0L; private static Supplier availableInflightS3ReadQuotaSupplier = () -> 0; private static Supplier availableInflightS3WriteQuotaSupplier = () -> 0; + private static Supplier inflightWALUploadTasksCountSupplier = () -> 0; private static Supplier attributesBuilderSupplier = null; public static void initAttributesBuilder(Supplier attributesBuilderSupplier) { @@ -195,6 +197,10 @@ public static void initMetrics(Meter meter, String prefix) { .setDescription("Available inflight S3 write quota") .ofLongs() .buildWithCallback(result -> result.record((long) availableInflightS3WriteQuotaSupplier.get(), newAttributesBuilder().build())); + inflightWALUploadTasksCount = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.INFLIGHT_WAL_UPLOAD_TASKS_COUNT_METRIC_NAME) + .setDescription("Inflight upload WAL tasks count") + .ofLongs() + .buildWithCallback(result -> result.record((long) inflightWALUploadTasksCountSupplier.get(), newAttributesBuilder().build())); compactionReadSizeInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.COMPACTION_READ_SIZE_METRIC_NAME) .setDescription("Compaction read size") .setUnit("bytes") @@ -254,6 +260,10 @@ public static void registerInflightReadSizeLimiterSupplier(Supplier ava S3StreamMetricsManager.availableInflightReadAheadSizeSupplier = availableInflightReadAheadSizeSupplier; } + public static void registerInflightWALUploadTasksCountSupplier(Supplier inflightWALUploadTasksCountSupplier) { + S3StreamMetricsManager.inflightWALUploadTasksCountSupplier = inflightWALUploadTasksCountSupplier; + } + public static void recordS3UploadSize(long value) { s3UploadSizeInTotal.add(value, newAttributesBuilder().build()); } @@ -293,11 +303,11 @@ public static void recordOperationLatency(long value, S3Operation operation, lon operationLatency.record(value, attributesBuilder.build()); } - public static void recordAppendWALLatency(long value, String stage) { + public static void recordStageLatency(long value, S3Stage stage) { Attributes attributes = newAttributesBuilder() - .put(S3StreamMetricsConstant.LABEL_OPERATION_TYPE, S3MetricsType.S3Storage.getName()) - .put(S3StreamMetricsConstant.LABEL_OPERATION_NAME, S3Operation.APPEND_STORAGE_WAL.getName()) - .put(S3StreamMetricsConstant.LABEL_APPEND_WAL_STAGE, stage) + .put(S3StreamMetricsConstant.LABEL_OPERATION_TYPE, stage.getOperation().getType().getName()) + .put(S3StreamMetricsConstant.LABEL_OPERATION_NAME, stage.getOperation().getName()) + .put(S3StreamMetricsConstant.LABEL_STAGE, stage.getName()) .build(); operationLatency.record(value, attributes); } @@ -332,7 +342,7 @@ public static void recordObjectNum(long value) { public static void recordObjectStageCost(long value, S3ObjectStage stage) { Attributes attributes = newAttributesBuilder() - .put(S3StreamMetricsConstant.LABEL_OBJECT_STAGE, stage.getName()) + .put(S3StreamMetricsConstant.LABEL_STAGE, stage.getName()) .build(); objectStageCost.record(value, attributes); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3ObjectStage.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3ObjectStage.java index 4e325ef95..cdde8bfe1 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3ObjectStage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3ObjectStage.java @@ -17,6 +17,9 @@ package com.automq.stream.s3.metrics.operations; +/** + * TODO: Maybe merge into {@link S3Stage} + */ public enum S3ObjectStage { UPLOAD_PART("upload_part"), READY_CLOSE("ready_close"), diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Stage.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Stage.java new file mode 100644 index 000000000..65a62aef8 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Stage.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.stream.s3.metrics.operations; + +public enum S3Stage { + + /* Append WAL stages start */ + APPEND_WAL_BEFORE(S3Operation.APPEND_STORAGE_WAL, "before"), + APPEND_WAL_BLOCK_POLLED(S3Operation.APPEND_STORAGE_WAL, "block_polled"), + APPEND_WAL_AWAIT(S3Operation.APPEND_STORAGE_WAL, "await"), + APPEND_WAL_WRITE(S3Operation.APPEND_STORAGE_WAL, "write"), + APPEND_WAL_AFTER(S3Operation.APPEND_STORAGE_WAL, "after"), + APPEND_WAL_COMPLETE(S3Operation.APPEND_STORAGE_WAL, "complete"), + /* Append WAL stages end */ + + /* Force upload WAL start */ + FORCE_UPLOAD_WAL_AWAIT_INFLIGHT(S3Operation.FORCE_UPLOAD_STORAGE_WAL, "await_inflight"), + FORCE_UPLOAD_WAL_COMPLETE(S3Operation.FORCE_UPLOAD_STORAGE_WAL, "complete"), + /* Force upload WAL end */ + + /* Upload WAL start */ + UPLOAD_WAL_PREPARE(S3Operation.UPLOAD_STORAGE_WAL, "prepare"), + UPLOAD_WAL_UPLOAD(S3Operation.UPLOAD_STORAGE_WAL, "upload"), + UPLOAD_WAL_COMMIT(S3Operation.UPLOAD_STORAGE_WAL, "commit"), + UPLOAD_WAL_COMPLETE(S3Operation.UPLOAD_STORAGE_WAL, "complete"); + /* Upload WAL end */ + + private final S3Operation operation; + private final String name; + + S3Stage(S3Operation operation, String name) { + this.operation = operation; + this.name = name; + } + + public S3Operation getOperation() { + return operation; + } + + public String getName() { + return name; + } + + @Override + public String toString() { + return "S3Stage{" + + "operation=" + operation.getName() + + ", name='" + name + '\'' + + '}'; + } +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java index 60f11910d..6082e3b7e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java @@ -20,6 +20,7 @@ import com.automq.stream.s3.DirectByteBufAlloc; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; +import com.automq.stream.s3.metrics.operations.S3Stage; import com.automq.stream.s3.wal.util.WALUtil; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; @@ -125,6 +126,6 @@ public long size() { @Override public void polled() { - S3StreamMetricsManager.recordAppendWALLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), "block_polled"); + S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_BLOCK_POLLED); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java index 3801ea314..6fe679474 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java @@ -22,6 +22,7 @@ import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.operations.S3Operation; +import com.automq.stream.s3.metrics.operations.S3Stage; import com.automq.stream.s3.wal.util.WALChannel; import com.automq.stream.s3.wal.util.WALUtil; import com.automq.stream.utils.ThreadUtils; @@ -416,8 +417,8 @@ private AppendResult append0(ByteBuf body, int crc) throws OverCapacityException slidingWindowService.tryWriteBlock(); final AppendResult appendResult = new AppendResultImpl(expectedWriteOffset, appendResultFuture); - appendResult.future().whenComplete((nil, ex) -> S3StreamMetricsManager.recordAppendWALLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), "complete")); - S3StreamMetricsManager.recordAppendWALLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), "before"); + appendResult.future().whenComplete((nil, ex) -> S3StreamMetricsManager.recordStageLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_COMPLETE)); + S3StreamMetricsManager.recordStageLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_BEFORE); return appendResult; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java index 4b1671b02..9590e04a0 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java @@ -20,6 +20,7 @@ import com.automq.stream.s3.DirectByteBufAlloc; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; +import com.automq.stream.s3.metrics.operations.S3Stage; import com.automq.stream.s3.wal.util.WALChannel; import com.automq.stream.s3.wal.util.WALUtil; import com.automq.stream.utils.FutureUtil; @@ -342,7 +343,7 @@ private void writeBlockData(BlockBatch blocks) throws IOException { walChannel.write(block.data(), position); } walChannel.flush(); - S3StreamMetricsManager.recordAppendWALLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), "write"); + S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_WRITE); } private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOException, OverCapacityException { @@ -531,7 +532,7 @@ public WriteBlockProcessor(BlockBatch blocks) { @Override public void run() { - S3StreamMetricsManager.recordAppendWALLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), "await"); + S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_AWAIT); writeBlock(this.blocks); } @@ -555,7 +556,7 @@ public String toString() { return "CallbackResult{" + "flushedOffset=" + flushedOffset() + '}'; } }); - S3StreamMetricsManager.recordAppendWALLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), "after"); + S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_AFTER); } catch (Exception e) { FutureUtil.completeExceptionally(blocks.futures(), e); LOGGER.error(String.format("failed to write blocks, startOffset: %s", blocks.startOffset()), e);