Skip to content

Commit

Permalink
feat(s3stream): add metrics for read ahead throttle time (#917)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Feb 4, 2024
1 parent 4dae71c commit c341448
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,11 @@ CompletableFuture<List<StreamRecordBatch>> handleSyncReadAhead(TraceContext trac
totalReserveSize, uuid, streamId, startOffset, endOffset, maxBytes);
}

TimerUtil throttleTimer = new TimerUtil();
CompletableFuture<Void> throttleCf = inflightReadThrottle.acquire(traceContext, uuid, totalReserveSize);
return throttleCf.thenComposeAsync(nil -> {
// concurrently read all data blocks
StorageOperationStats.getInstance().readAheadLimiterQueueTimeStats.record(MetricsLevel.INFO, throttleTimer.elapsedAs(TimeUnit.NANOSECONDS));
for (int i = 0; i < streamDataBlocksToRead.size(); i++) {
Pair<ObjectReader, StreamDataBlock> pair = streamDataBlocksToRead.get(i);
ObjectReader objectReader = pair.getLeft();
Expand Down Expand Up @@ -360,7 +362,9 @@ CompletableFuture<Void> handleAsyncReadAhead(long streamId, long startOffset, lo
reserveResult.reserveSize(), uuid, streamId, startOffset, endOffset, maxBytes);
}
if (reserveResult.reserveSize() > 0) {
TimerUtil throttleTimer = new TimerUtil();
inflightReadThrottle.acquire(TraceContext.DEFAULT, uuid, reserveResult.reserveSize()).thenAcceptAsync(nil -> {
StorageOperationStats.getInstance().readAheadLimiterQueueTimeStats.record(MetricsLevel.INFO, throttleTimer.elapsedAs(TimeUnit.NANOSECONDS));
// read data block
if (context.taskKeySet.contains(taskKey)) {
setInflightReadAheadStatus(taskKey, DefaultS3BlockCache.ReadBlockCacheStatus.WAIT_FETCH_DATA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class S3StreamMetricsConstant {
public static final String DELTA_WAL_CACHE_SIZE = "delta_wal_cache_size";
public static final String BLOCK_CACHE_SIZE = "block_cache_size";
public static final String AVAILABLE_INFLIGHT_READ_AHEAD_SIZE_METRIC_NAME = "available_inflight_read_ahead_size";
public static final String READ_AHEAD_QUEUE_TIME_METRIC_NAME = "read_ahead_limiter_queue_time";
public static final String AVAILABLE_S3_INFLIGHT_READ_QUOTA_METRIC_NAME = "available_s3_inflight_read_quota";
public static final String AVAILABLE_S3_INFLIGHT_WRITE_QUOTA_METRIC_NAME = "available_s3_inflight_write_quota";
public static final String INFLIGHT_WAL_UPLOAD_TASKS_COUNT_METRIC_NAME = "inflight_wal_upload_tasks_count";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class S3StreamMetricsManager {
private static LongHistogram networkOutboundLimiterQueueTime = new NoopLongHistogram();
private static LongHistogram allocateByteBufSize = new NoopLongHistogram();
private static LongHistogram readAheadSize = new NoopLongHistogram();
private static LongHistogram readAheadLimierQueueTime = new NoopLongHistogram();
private static ObservableLongGauge deltaWalStartOffset = new NoopObservableLongGauge();
private static ObservableLongGauge deltaWalTrimmedOffset = new NoopObservableLongGauge();
private static ObservableLongGauge deltaWalCacheSize = new NoopObservableLongGauge();
Expand Down Expand Up @@ -181,6 +182,12 @@ public static void initMetrics(Meter meter, String prefix) {
.setUnit("bytes")
.ofLongs()
.build();
readAheadLimierQueueTime = meter.histogramBuilder(prefix + S3StreamMetricsConstant.READ_AHEAD_QUEUE_TIME_METRIC_NAME)
.setDescription("Read ahead limiter queue time")
.setUnit("nanoseconds")
.ofLongs()
.setExplicitBucketBoundariesAdvice(S3StreamMetricsConstant.LATENCY_BOUNDARIES)
.build();
deltaWalStartOffset = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.WAL_START_OFFSET)
.setDescription("Delta WAL start offset")
.ofLongs()
Expand Down Expand Up @@ -220,7 +227,7 @@ public static void initMetrics(Meter meter, String prefix) {
.setUnit("bytes")
.ofLongs()
.buildWithCallback(result -> {
if (MetricsLevel.DEBUG.isWithin(metricsConfig.getMetricsLevel())) {
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
result.record((long) availableInflightReadAheadSizeSupplier.get(), metricsConfig.getBaseAttributes());
}
});
Expand Down Expand Up @@ -435,6 +442,14 @@ public static HistogramMetric buildReadAheadSizeMetric() {

}

public static HistogramMetric buildReadAheadLimiterQueueTimeMetric() {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
HistogramMetric metric = new HistogramMetric(metricsConfig, readAheadLimierQueueTime);
BASE_ATTRIBUTES_LISTENERS.add(metric);
return metric;
}
}

public static CounterMetric buildCompactionReadSizeMetric() {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
CounterMetric metric = new CounterMetric(metricsConfig, compactionReadSizeInTotal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class StorageOperationStats {
private final HistogramMetric blockCacheReadAheadSyncStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.BLOCK_CACHE_READ_AHEAD, S3StreamMetricsConstant.LABEL_STATUS_SYNC);
private final HistogramMetric blockCacheReadAheadAsyncStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.BLOCK_CACHE_READ_AHEAD, S3StreamMetricsConstant.LABEL_STATUS_ASYNC);
public final HistogramMetric readAheadSizeStats = S3StreamMetricsManager.buildReadAheadSizeMetric();
public final HistogramMetric readAheadLimiterQueueTimeStats = S3StreamMetricsManager.buildReadAheadLimiterQueueTimeMetric();

private StorageOperationStats() {
}
Expand Down

0 comments on commit c341448

Please sign in to comment.