Skip to content

Commit

Permalink
feat(s3stream): add more metrics (#845)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Dec 20, 2023
1 parent 0cb47d3 commit 35cbeaa
Show file tree
Hide file tree
Showing 15 changed files with 255 additions and 87 deletions.
12 changes: 9 additions & 3 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public boolean append0(WalWriteRequest request, boolean fromBackoff) {
if (!fromBackoff) {
backoffRecords.offer(request);
}
S3StreamMetricsManager.recordOperationNum(1, S3Operation.APPEND_STORAGE_LOG_CACHE_FULL);
S3StreamMetricsManager.recordOperationLatency(0L, S3Operation.APPEND_STORAGE_LOG_CACHE_FULL);
if (System.currentTimeMillis() - lastLogTimestamp > 1000L) {
LOGGER.warn("[BACKOFF] log cache size {} is larger than {}", deltaWALCache.size(), maxDeltaWALCacheSize);
lastLogTimestamp = System.currentTimeMillis();
Expand Down Expand Up @@ -529,8 +529,14 @@ private void uploadDeltaWAL0(DeltaWALUploadTaskContext context) {
}
rate = maxDataWriteRate;
}
context.task = DeltaWALUploadTask.builder().config(config).streamRecordsMap(context.cache.records())
.objectManager(objectManager).s3Operator(s3Operator).executor(uploadWALExecutor).rate(rate).build();
context.task = DeltaWALUploadTask.builder()
.config(config)
.streamRecordsMap(context.cache.records())
.objectManager(objectManager)
.s3Operator(s3Operator)
.executor(uploadWALExecutor)
.rate(rate)
.build();
boolean walObjectPrepareQueueEmpty = walPrepareQueue.isEmpty();
walPrepareQueue.add(context);
if (!walObjectPrepareQueueEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@


import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.cache.DefaultS3BlockCache.ReadAheadRecord;
import com.automq.stream.utils.biniarysearch.StreamRecordBatchList;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class BlockCache implements DirectByteBufAlloc.OOMHandler {
public BlockCache(long maxSize) {
this.maxSize = maxSize;
DirectByteBufAlloc.registerOOMHandlers(this);
S3StreamMetricsManager.registerBlockCacheSizeSupplier(size::get);
}

public void registerListener(CacheEvictListener listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, lo

long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS);
boolean isCacheHit = ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT;
S3StreamMetricsManager.recordReadCacheLatency(timeElapsed, S3Operation.READ_STORAGE_BLOCK_CACHE_HIT, isCacheHit);
S3StreamMetricsManager.recordReadCacheLatency(timeElapsed, S3Operation.READ_STORAGE_BLOCK_CACHE, isCacheHit);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read data complete, cache hit: {}, stream={}, {}-{}, total bytes: {} ",
ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT, streamId, startOffset, endOffset, totalReturnedSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public InflightReadThrottle(int maxInflightReadBytes) {
this.maxInflightReadBytes = maxInflightReadBytes;
this.remainingInflightReadBytes = maxInflightReadBytes;
executorService.execute(this);
S3StreamMetricsManager.registerInflightReadSizeLimiterGauge(this::getRemainingInflightReadBytes);
S3StreamMetricsManager.registerInflightReadSizeLimiterSupplier(this::getRemainingInflightReadBytes);
}

public void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public LogCache(long capacity, long cacheBlockMaxSize, int maxCacheBlockStreamCo
this.activeBlock = new LogCacheBlock(cacheBlockMaxSize, maxCacheBlockStreamCount);
this.blocks.add(activeBlock);
this.blockFreeListener = blockFreeListener;
S3StreamMetricsManager.registerDeltaWalCacheSizeSupplier(size::get);
}

public LogCache(long capacity, long cacheBlockMaxSize) {
Expand Down Expand Up @@ -136,7 +137,7 @@ public List<StreamRecordBatch> get(long streamId, long startOffset, long endOffs

long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS);
boolean isCacheHit = !records.isEmpty() && records.get(0).getBaseOffset() <= startOffset;
S3StreamMetricsManager.recordReadCacheLatency(timeElapsed, S3Operation.READ_STORAGE_LOG_CACHE_HIT, isCacheHit);
S3StreamMetricsManager.recordReadCacheLatency(timeElapsed, S3Operation.READ_STORAGE_LOG_CACHE, isCacheHit);
return records;
}

Expand Down
22 changes: 0 additions & 22 deletions s3stream/src/main/java/com/automq/stream/s3/metrics/Gauge.java

This file was deleted.

25 changes: 0 additions & 25 deletions s3stream/src/main/java/com/automq/stream/s3/metrics/NoopGauge.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,56 @@

import io.opentelemetry.api.common.AttributeKey;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class S3StreamMetricsConstant {
// value = 16KB * 2^i
public static final String[] OBJECT_SIZE_BUCKET_NAMES = {"16KB",
"32KB",
"64KB",
"128KB",
"256KB",
"512KB",
"1MB",
"2MB",
"4MB",
"8MB",
"16MB",
"32MB",
"64MB",
"128MB",
"inf"};
public static final List<Long> OPERATION_LATENCY_BOUNDARIES = List.of(
TimeUnit.MICROSECONDS.toNanos(1),
TimeUnit.MICROSECONDS.toNanos(10),
TimeUnit.MICROSECONDS.toNanos(100),
TimeUnit.MILLISECONDS.toNanos(1),
TimeUnit.MILLISECONDS.toNanos(3),
TimeUnit.MILLISECONDS.toNanos(5),
TimeUnit.MILLISECONDS.toNanos(7),
TimeUnit.MILLISECONDS.toNanos(10),
TimeUnit.MILLISECONDS.toNanos(20),
TimeUnit.MILLISECONDS.toNanos(30),
TimeUnit.MILLISECONDS.toNanos(40),
TimeUnit.MILLISECONDS.toNanos(50),
TimeUnit.MILLISECONDS.toNanos(60),
TimeUnit.MILLISECONDS.toNanos(70),
TimeUnit.MILLISECONDS.toNanos(80),
TimeUnit.MILLISECONDS.toNanos(90),
TimeUnit.MILLISECONDS.toNanos(100),
TimeUnit.MILLISECONDS.toNanos(200),
TimeUnit.MILLISECONDS.toNanos(500),
TimeUnit.SECONDS.toNanos(1),
TimeUnit.SECONDS.toNanos(3),
TimeUnit.SECONDS.toNanos(5),
TimeUnit.SECONDS.toNanos(10),
TimeUnit.SECONDS.toNanos(30),
TimeUnit.MINUTES.toNanos(1),
TimeUnit.MINUTES.toNanos(3),
TimeUnit.MINUTES.toNanos(5)
);

public static final String UPLOAD_SIZE_METRIC_NAME = "upload_size_total";
public static final String DOWNLOAD_SIZE_METRIC_NAME = "download_size_total";
public static final String OPERATION_COUNT_METRIC_NAME = "operation_count_total";
Expand All @@ -36,11 +85,18 @@ public class S3StreamMetricsConstant {
public static final String NETWORK_OUTBOUND_LIMITER_QUEUE_SIZE_METRIC_NAME = "network_outbound_limiter_queue_size";
public static final String ALLOCATE_BYTE_BUF_SIZE_METRIC_NAME = "allocate_byte_buf_size";
public static final String READ_AHEAD_SIZE_METRIC_NAME = "read_ahead_size";
public static final String WAL_START_OFFSET = "wal_start_offset";
public static final String WAL_TRIMMED_OFFSET = "wal_trimmed_offset";
public static final String DELTA_WAL_CACHE_SIZE = "delta_wal_cache_size";
public static final String BLOCK_CACHE_SIZE = "block_cache_size";
public static final String AVAILABLE_INFLIGHT_READ_AHEAD_SIZE_METRIC_NAME = "available_inflight_read_ahead_size";
public static final String AVAILABLE_S3_INFLIGHT_READ_QUOTA_METRIC_NAME = "available_s3_inflight_read_quota";
public static final String AVAILABLE_S3_INFLIGHT_WRITE_QUOTA_METRIC_NAME = "available_s3_inflight_write_quota";
public static final String COMPACTION_READ_SIZE_METRIC_NAME = "compaction_read_size_total";
public static final String COMPACTION_WRITE_SIZE_METRIC_NAME = "compaction_write_size_total";
public static final AttributeKey<String> LABEL_OPERATION_TYPE = AttributeKey.stringKey("operation_type");
public static final AttributeKey<String> LABEL_OPERATION_NAME = AttributeKey.stringKey("operation_name");
public static final AttributeKey<String> LABEL_OBJECT_SIZE_NAME = AttributeKey.stringKey("size");
public static final AttributeKey<String> LABEL_APPEND_WAL_STAGE = AttributeKey.stringKey("stage");
public static final AttributeKey<String> LABEL_CACHE_STATUS = AttributeKey.stringKey("status");
public static final AttributeKey<String> LABEL_OBJECT_STAGE = AttributeKey.stringKey("stage");
Expand Down
Loading

0 comments on commit 35cbeaa

Please sign in to comment.