Skip to content

Commit

Permalink
chore(tool): optimize s3 check logic's output for admin tool
Browse files Browse the repository at this point in the history
  • Loading branch information
KaimingWan committed Jan 12, 2024
1 parent 87098f5 commit 8fc81cd
Showing 1 changed file with 22 additions and 28 deletions.
50 changes: 22 additions & 28 deletions s3stream/src/main/java/com/automq/stream/utils/S3Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.async.AsyncRequestBody;
Expand Down Expand Up @@ -60,25 +61,19 @@ public class S3Utils {
* @param context s3 context.
*/
public static void checkS3Access(S3Context context) {
System.out.println("You are using s3 context: " + context);
System.out.println("====== 1/2: object operation task starting ======");
try (ObjectOperationTask task = new ObjectOperationTask(context)) {
task.run();
} catch (Throwable e) {
System.out.println("ERROR: " + ExceptionUtils.getRootCause(e));
System.exit(1);
}
System.out.println("====== 1/2: object operation task passed ======");

System.out.println("====== 2/2: multipart object operation task starting ======");
try (MultipartObjectOperationTask task = new MultipartObjectOperationTask(context)) {
task.run();
} catch (Throwable e) {
System.out.println("ERROR: " + ExceptionUtils.getRootCause(e));
System.exit(1);
}
System.out.println("====== 2/2: multipart object operation task passed ======");

System.out.println("====== Congratulations! You have passed all checks!!! ======");

}

private static String range(long start, long end) {
Expand Down Expand Up @@ -157,7 +152,6 @@ public void run() {
ByteBuf byteBuf = null;
try {
// Simple write/read/delete
System.out.println("1) Trying to write multipart object " + path + " ...");
String uploadId = createMultipartUpload(client, bucketName, path).get();
List<CompletedPart> parts = new ArrayList<>();
int data1Size = 1024 * 1024 * 5;
Expand All @@ -168,27 +162,28 @@ public void run() {
new Random().nextBytes(randomBytes);
ByteBuf data1 = Unpooled.wrappedBuffer(randomBytes);
writePart(uploadId, path, bucketName, data1, 1).thenAccept(parts::add).get();
System.out.println("writing part 1 passed");

byte[] randomBytes2 = new byte[data2Size];
new Random().nextBytes(randomBytes2);
ByteBuf data2 = Unpooled.wrappedBuffer(randomBytes2);
writePart(uploadId, path, bucketName, data2, 2).thenAccept(parts::add).get();
System.out.println("writing part 2 passed");

System.out.println("[ OK ] Write S3 object");

completeMultipartUpload(client, path, bucketName, uploadId, parts).get();
System.out.println("writing and uploading multipart object passed");
System.out.println("[ OK ] Upload s3 multipart object");

System.out.println("2) Trying to read multipart object " + path + " ...");
CompletableFuture<ByteBuf> readCf = new CompletableFuture<>();
readRange(client, path, readCf, bucketName, 0, -1);
byteBuf = readCf.get();
if (byteBuf == null) {
System.out.println("[ FAILED ] Read s3 multipart object");
throw new RuntimeException("read multipart object " + path + " fail. got null");
} else if (byteBuf.readableBytes() != totalSize) {
System.out.println("[ FAILED ] Read s3 multipart object");
throw new RuntimeException("read multipart object " + path + " fail. expected size " + totalSize + ", actual size " + byteBuf.readableBytes());
}
System.out.println("read passed");
System.out.println("[ OK ] Read s3 multipart object");
} catch (ExecutionException | InterruptedException e) {
showErrorInfo(e);
throw new RuntimeException(e);
Expand All @@ -204,10 +199,9 @@ private CompletableFuture<String> createMultipartUpload(S3AsyncClient writeS3Cli
CompletableFuture<String> cf = new CompletableFuture<>();
CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucketName).key(path).build();
writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> {
System.out.println("created upload id: " + createMultipartUploadResponse.uploadId());
cf.complete(createMultipartUploadResponse.uploadId());
}).exceptionally(ex -> {
System.err.println("failed to create upload id.");
System.err.println("[ FAILED ] Upload s3 multipart object");
cf.completeExceptionally(ex);
return null;
});
Expand All @@ -221,10 +215,9 @@ public CompletableFuture<Void> completeMultipartUpload(S3AsyncClient writeS3Clie
CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder().bucket(bucket).key(path).uploadId(uploadId).multipartUpload(multipartUpload).build();

writeS3Client.completeMultipartUpload(request).thenAccept(completeMultipartUploadResponse -> {
System.out.println("completed upload with id " + uploadId);
cf.complete(null);
}).exceptionally(ex -> {
System.err.println("failed to upload with id " + uploadId);
System.err.println("[ FAILED ] Upload s3 multipart object, upload id is "+uploadId);
cf.completeExceptionally(ex);
return null;
});
Expand Down Expand Up @@ -273,26 +266,28 @@ public void run() {
ByteBuf byteBuf = null;
try {
// Simple write/read/delete
System.out.println("1) Trying to write object " + path + " ...");
CompletableFuture<Void> writeCf = new CompletableFuture<>();
writeObject(client, path, ByteBuffer.wrap(content), writeCf, bucketName);
writeCf.get();
System.out.println("[ OK ] Write s3 object");

System.out.println("2) Trying to read object " + path + " ...");
CompletableFuture<ByteBuf> readCf = new CompletableFuture<>();
readRange(client, path, readCf, bucketName, 0, -1);
byteBuf = readCf.get();
if (byteBuf == null) {
System.err.println("[ Failed ] Read s3 object");
throw new RuntimeException("read object " + path + " fail. got null");
} else if (byteBuf.readableBytes() != content.length) {
System.err.println("[ Failed ] Read s3 object");
throw new RuntimeException("read object " + path + " fail. expected size " + content.length + ", actual size " + byteBuf.readableBytes());
}
byte[] readContent = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(readContent);
if (!StringUtils.equals(new String(readContent, StandardCharsets.UTF_8), new String(content, StandardCharsets.UTF_8))) {
System.err.println("[ Failed ] Read s3 object");
throw new RuntimeException("read object " + path + " fail. expected content " + new String(content, StandardCharsets.UTF_8) + ", actual content " + new String(readContent, StandardCharsets.UTF_8));
}
System.out.println("read passed");
System.out.println("[ OK ] Read s3 object");
} catch (ExecutionException | InterruptedException e) {
showErrorInfo(e);
throw new RuntimeException(e);
Expand All @@ -305,14 +300,12 @@ public void run() {

private void writeObject(S3AsyncClient writeS3Client, String path, ByteBuffer data, CompletableFuture<Void> cf,
String bucket) {
int objectSize = data.remaining();
PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build();
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data);
writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> {
System.out.printf("put object %s with size %d%n", path, objectSize);
cf.complete(null);
}).exceptionally(ex -> {
System.err.printf("PutObject for object %s fail with msg %s %n", path, ex.getMessage());
System.err.printf("[ Failed ] Write s3 object. PutObject for object %s fail with msg %s %n", path, ex.getMessage());
cf.completeExceptionally(ex);
return null;
});
Expand Down Expand Up @@ -340,29 +333,30 @@ protected void deleteObject(S3AsyncClient deleteS3Client, String path, Completab
String bucket) {
DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(bucket).key(path).build();
deleteS3Client.deleteObject(request).thenAccept(deleteObjectResponse -> {
System.out.printf("deleted object %s%n", path);
cf.complete(null);
}).exceptionally(ex -> {
System.err.printf("delete object %s fail with msg %s %n", path, ex.getMessage());
System.err.printf("[ FAILED ] Delete s3 object. Delete object %s fail with msg %s %n", path, ex.getMessage());
cf.completeExceptionally(ex);
return null;
});
}

@Override
public void close() {
System.out.println("3) Trying to delete object " + path + " ...");
try {
CompletableFuture<Void> deleteCf = new CompletableFuture<>();
deleteObject(client, path, deleteCf, bucketName);
deleteCf.get();
} catch (InterruptedException | ExecutionException e) {
System.err.println(" NOTICE: please delete object " + path + " manually!!!");

System.err.println("[ FAILED ] Delete s3 object. NOTICE: please delete object " + path + " manually!!!");
showErrorInfo(e);
throw new RuntimeException(e);
} finally {
super.close();
}
System.out.println("[ OK ] Delete s3 object");

}
}

Expand Down

0 comments on commit 8fc81cd

Please sign in to comment.