diff --git a/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java b/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java index 9a2703980..4004c4605 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java +++ b/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java @@ -33,6 +33,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.async.AsyncRequestBody; @@ -60,25 +61,19 @@ public class S3Utils { * @param context s3 context. */ public static void checkS3Access(S3Context context) { - System.out.println("You are using s3 context: " + context); - System.out.println("====== 1/2: object operation task starting ======"); try (ObjectOperationTask task = new ObjectOperationTask(context)) { task.run(); } catch (Throwable e) { + System.out.println("ERROR: " + ExceptionUtils.getRootCause(e)); System.exit(1); } - System.out.println("====== 1/2: object operation task passed ======"); - System.out.println("====== 2/2: multipart object operation task starting ======"); try (MultipartObjectOperationTask task = new MultipartObjectOperationTask(context)) { task.run(); } catch (Throwable e) { + System.out.println("ERROR: " + ExceptionUtils.getRootCause(e)); System.exit(1); } - System.out.println("====== 2/2: multipart object operation task passed ======"); - - System.out.println("====== Congratulations! You have passed all checks!!! ======"); - } private static String range(long start, long end) { @@ -124,11 +119,12 @@ public S3CheckTask(S3Context context, String taskName) { protected static void showErrorInfo(Exception e) { if (e.getCause() instanceof S3Exception se) { - System.err.println("get S3 exception: "); - se.printStackTrace(); + // Do not use system.err because automq admin tool suppress system.err + System.out.println("get S3 exception: "); + se.printStackTrace(System.out); } else { - System.err.println("get other exception: "); - e.printStackTrace(); + System.out.println("get other exception: "); + e.printStackTrace(System.out); } } @@ -157,7 +153,6 @@ public void run() { ByteBuf byteBuf = null; try { // Simple write/read/delete - System.out.println("1) Trying to write multipart object " + path + " ..."); String uploadId = createMultipartUpload(client, bucketName, path).get(); List parts = new ArrayList<>(); int data1Size = 1024 * 1024 * 5; @@ -168,27 +163,28 @@ public void run() { new Random().nextBytes(randomBytes); ByteBuf data1 = Unpooled.wrappedBuffer(randomBytes); writePart(uploadId, path, bucketName, data1, 1).thenAccept(parts::add).get(); - System.out.println("writing part 1 passed"); byte[] randomBytes2 = new byte[data2Size]; new Random().nextBytes(randomBytes2); ByteBuf data2 = Unpooled.wrappedBuffer(randomBytes2); writePart(uploadId, path, bucketName, data2, 2).thenAccept(parts::add).get(); - System.out.println("writing part 2 passed"); + + System.out.println("[ OK ] Write S3 object"); completeMultipartUpload(client, path, bucketName, uploadId, parts).get(); - System.out.println("writing and uploading multipart object passed"); + System.out.println("[ OK ] Upload s3 multipart object"); - System.out.println("2) Trying to read multipart object " + path + " ..."); CompletableFuture readCf = new CompletableFuture<>(); readRange(client, path, readCf, bucketName, 0, -1); byteBuf = readCf.get(); if (byteBuf == null) { + System.out.println("[ FAILED ] Read s3 multipart object"); throw new RuntimeException("read multipart object " + path + " fail. got null"); } else if (byteBuf.readableBytes() != totalSize) { + System.out.println("[ FAILED ] Read s3 multipart object"); throw new RuntimeException("read multipart object " + path + " fail. expected size " + totalSize + ", actual size " + byteBuf.readableBytes()); } - System.out.println("read passed"); + System.out.println("[ OK ] Read s3 multipart object"); } catch (ExecutionException | InterruptedException e) { showErrorInfo(e); throw new RuntimeException(e); @@ -204,10 +200,9 @@ private CompletableFuture createMultipartUpload(S3AsyncClient writeS3Cli CompletableFuture cf = new CompletableFuture<>(); CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucketName).key(path).build(); writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> { - System.out.println("created upload id: " + createMultipartUploadResponse.uploadId()); cf.complete(createMultipartUploadResponse.uploadId()); }).exceptionally(ex -> { - System.err.println("failed to create upload id."); + System.out.println("[ FAILED ] Upload s3 multipart object"); cf.completeExceptionally(ex); return null; }); @@ -221,10 +216,9 @@ public CompletableFuture completeMultipartUpload(S3AsyncClient writeS3Clie CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder().bucket(bucket).key(path).uploadId(uploadId).multipartUpload(multipartUpload).build(); writeS3Client.completeMultipartUpload(request).thenAccept(completeMultipartUploadResponse -> { - System.out.println("completed upload with id " + uploadId); cf.complete(null); }).exceptionally(ex -> { - System.err.println("failed to upload with id " + uploadId); + System.out.println("[ FAILED ] Upload s3 multipart object, upload id is " + uploadId); cf.completeExceptionally(ex); return null; }); @@ -273,26 +267,28 @@ public void run() { ByteBuf byteBuf = null; try { // Simple write/read/delete - System.out.println("1) Trying to write object " + path + " ..."); CompletableFuture writeCf = new CompletableFuture<>(); writeObject(client, path, ByteBuffer.wrap(content), writeCf, bucketName); writeCf.get(); + System.out.println("[ OK ] Write s3 object"); - System.out.println("2) Trying to read object " + path + " ..."); CompletableFuture readCf = new CompletableFuture<>(); readRange(client, path, readCf, bucketName, 0, -1); byteBuf = readCf.get(); if (byteBuf == null) { + System.out.println("[ Failed ] Read s3 object"); throw new RuntimeException("read object " + path + " fail. got null"); } else if (byteBuf.readableBytes() != content.length) { + System.out.println("[ Failed ] Read s3 object"); throw new RuntimeException("read object " + path + " fail. expected size " + content.length + ", actual size " + byteBuf.readableBytes()); } byte[] readContent = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(readContent); if (!StringUtils.equals(new String(readContent, StandardCharsets.UTF_8), new String(content, StandardCharsets.UTF_8))) { + System.out.println("[ Failed ] Read s3 object"); throw new RuntimeException("read object " + path + " fail. expected content " + new String(content, StandardCharsets.UTF_8) + ", actual content " + new String(readContent, StandardCharsets.UTF_8)); } - System.out.println("read passed"); + System.out.println("[ OK ] Read s3 object"); } catch (ExecutionException | InterruptedException e) { showErrorInfo(e); throw new RuntimeException(e); @@ -305,14 +301,12 @@ public void run() { private void writeObject(S3AsyncClient writeS3Client, String path, ByteBuffer data, CompletableFuture cf, String bucket) { - int objectSize = data.remaining(); PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build(); AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data); writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> { - System.out.printf("put object %s with size %d%n", path, objectSize); cf.complete(null); }).exceptionally(ex -> { - System.err.printf("PutObject for object %s fail with msg %s %n", path, ex.getMessage()); + System.out.printf("[ Failed ] Write s3 object. PutObject for object %s fail with msg %s %n", path, ex.getMessage()); cf.completeExceptionally(ex); return null; }); @@ -340,10 +334,9 @@ protected void deleteObject(S3AsyncClient deleteS3Client, String path, Completab String bucket) { DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(bucket).key(path).build(); deleteS3Client.deleteObject(request).thenAccept(deleteObjectResponse -> { - System.out.printf("deleted object %s%n", path); cf.complete(null); }).exceptionally(ex -> { - System.err.printf("delete object %s fail with msg %s %n", path, ex.getMessage()); + System.out.printf("[ FAILED ] Delete s3 object. Delete object %s fail with msg %s %n", path, ex.getMessage()); cf.completeExceptionally(ex); return null; }); @@ -351,18 +344,20 @@ protected void deleteObject(S3AsyncClient deleteS3Client, String path, Completab @Override public void close() { - System.out.println("3) Trying to delete object " + path + " ..."); try { CompletableFuture deleteCf = new CompletableFuture<>(); deleteObject(client, path, deleteCf, bucketName); deleteCf.get(); } catch (InterruptedException | ExecutionException e) { - System.err.println(" NOTICE: please delete object " + path + " manually!!!"); + + System.out.println("[ FAILED ] Delete s3 object. NOTICE: please delete object " + path + " manually!!!"); showErrorInfo(e); throw new RuntimeException(e); } finally { super.close(); } + System.out.println("[ OK ] Delete s3 object"); + } }