diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index 11048ad1a..c34691d82 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -271,13 +271,22 @@ public static void recordOperationNum(long value, S3Operation operation) { } public static void recordOperationLatency(long value, S3Operation operation) { - recordOperationLatency(value, operation, 0); + recordOperationLatency(value, operation, 0, true); + } + + public static void recordOperationLatency(long value, S3Operation operation, boolean isSuccess) { + recordOperationLatency(value, operation, 0, isSuccess); } public static void recordOperationLatency(long value, S3Operation operation, long size) { + recordOperationLatency(value, operation, size, true); + } + + public static void recordOperationLatency(long value, S3Operation operation, long size, boolean isSuccess) { AttributesBuilder attributesBuilder = newAttributesBuilder() .put(S3StreamMetricsConstant.LABEL_OPERATION_TYPE, operation.getType().getName()) - .put(S3StreamMetricsConstant.LABEL_OPERATION_NAME, operation.getName()); + .put(S3StreamMetricsConstant.LABEL_OPERATION_NAME, operation.getName()) + .put(S3StreamMetricsConstant.LABEL_STATUS, isSuccess ? "success" : "failed"); if (operation == S3Operation.GET_OBJECT || operation == S3Operation.PUT_OBJECT || operation == S3Operation.UPLOAD_PART) { attributesBuilder.put(S3StreamMetricsConstant.LABEL_SIZE_NAME, getObjectBucketLabel(size)); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java index 98840cd59..36c8783cb 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java @@ -47,21 +47,13 @@ public enum S3Operation { /* S3 request operations start */ GET_OBJECT(S3MetricsType.S3Request, "get_object"), - GET_OBJECT_FAIL(S3MetricsType.S3Request, "get_object_fail"), PUT_OBJECT(S3MetricsType.S3Request, "put_object"), - PUT_OBJECT_FAIL(S3MetricsType.S3Request, "put_object_fail"), DELETE_OBJECT(S3MetricsType.S3Request, "delete_object"), - DELETE_OBJECT_FAIL(S3MetricsType.S3Request, "delete_object_fail"), DELETE_OBJECTS(S3MetricsType.S3Request, "delete_objects"), - DELETE_OBJECTS_FAIL(S3MetricsType.S3Request, "delete_objects_fail"), CREATE_MULTI_PART_UPLOAD(S3MetricsType.S3Request, "create_multi_part_upload"), - CREATE_MULTI_PART_UPLOAD_FAIL(S3MetricsType.S3Request, "create_multi_part_upload_fail"), UPLOAD_PART(S3MetricsType.S3Request, "upload_part"), - UPLOAD_PART_FAIL(S3MetricsType.S3Request, "upload_part_fail"), UPLOAD_PART_COPY(S3MetricsType.S3Request, "upload_part_copy"), - UPLOAD_PART_COPY_FAIL(S3MetricsType.S3Request, "upload_part_copy_fail"), COMPLETE_MULTI_PART_UPLOAD(S3MetricsType.S3Request, "complete_multi_part_upload"), - COMPLETE_MULTI_PART_UPLOAD_FAIL(S3MetricsType.S3Request, "complete_multi_part_upload_fail"), /* S3 request operations end */ /* S3 object operations start */ diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index 582e480c6..c4fed3141 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -297,7 +297,7 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.GET_OBJECT_FAIL, size); + S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.GET_OBJECT, size, false); if (isUnrecoverable(ex)) { LOGGER.error("GetObject for object {} [{}, {}) fail", path, start, end, ex); cf.completeExceptionally(ex); @@ -344,7 +344,7 @@ private void write0(String path, ByteBuf data, CompletableFuture cf) { cf.complete(null); data.release(); }).exceptionally(ex -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.PUT_OBJECT_FAIL, objectSize); + S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.PUT_OBJECT, objectSize, false); if (isUnrecoverable(ex)) { LOGGER.error("PutObject for object {} fail", path, ex); cf.completeExceptionally(ex); @@ -370,7 +370,7 @@ public CompletableFuture delete(String path) { S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECT); LOGGER.info("[ControllerS3Operator]: Delete object finished, path: {}, cost: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); }).exceptionally(ex -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECT_FAIL); + S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECT, false); LOGGER.info("[ControllerS3Operator]: Delete object failed, path: {}, cost: {}, ex: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage()); return null; }); @@ -394,7 +394,7 @@ public CompletableFuture> delete(List objectKeys) { LOGGER.info("[ControllerS3Operator]: Delete objects finished, count: {}, cost: {}", resp.deleted().size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); return resp.deleted().stream().map(DeletedObject::key).collect(Collectors.toList()); }).exceptionally(ex -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECTS_FAIL); + S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECTS, false); LOGGER.info("[ControllerS3Operator]: Delete objects failed, count: {}, cost: {}, ex: {}", objectKeys.size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage()); return Collections.emptyList(); }); @@ -418,7 +418,7 @@ void createMultipartUpload0(String path, CompletableFuture cf) { S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_MULTI_PART_UPLOAD); cf.complete(createMultipartUploadResponse.uploadId()); }).exceptionally(ex -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_MULTI_PART_UPLOAD_FAIL); + S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_MULTI_PART_UPLOAD, false); if (isUnrecoverable(ex)) { LOGGER.error("CreateMultipartUpload for object {} fail", path, ex); cf.completeExceptionally(ex); @@ -465,7 +465,7 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()).build(); cf.complete(completedPart); }).exceptionally(ex -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_FAIL, size); + S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART, size, false); if (isUnrecoverable(ex)) { LOGGER.error("UploadPart for object {}-{} fail", path, partNumber, ex); cf.completeExceptionally(ex); @@ -499,7 +499,7 @@ private void uploadPartCopy0(String sourcePath, String path, long start, long en .eTag(uploadPartCopyResponse.copyPartResult().eTag()).build(); cf.complete(completedPart); }).exceptionally(ex -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_COPY_FAIL); + S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_COPY, false); if (isUnrecoverable(ex)) { LOGGER.warn("UploadPartCopy for object {}-{} fail", path, partNumber, ex); cf.completeExceptionally(ex); @@ -531,7 +531,7 @@ public void completeMultipartUpload0(String path, String uploadId, List { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.COMPLETE_MULTI_PART_UPLOAD_FAIL); + S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.COMPLETE_MULTI_PART_UPLOAD, false); if (isUnrecoverable(ex)) { LOGGER.error("CompleteMultipartUpload for object {} fail", path, ex); cf.completeExceptionally(ex);