Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: optimize S3Utils output #895

Merged
merged 1 commit into from
Jan 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 27 additions & 32 deletions s3stream/src/main/java/com/automq/stream/utils/S3Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.async.AsyncRequestBody;
Expand Down Expand Up @@ -60,25 +61,19 @@ public class S3Utils {
* @param context s3 context.
*/
public static void checkS3Access(S3Context context) {
System.out.println("You are using s3 context: " + context);
System.out.println("====== 1/2: object operation task starting ======");
try (ObjectOperationTask task = new ObjectOperationTask(context)) {
task.run();
} catch (Throwable e) {
System.out.println("ERROR: " + ExceptionUtils.getRootCause(e));
System.exit(1);
}
System.out.println("====== 1/2: object operation task passed ======");

System.out.println("====== 2/2: multipart object operation task starting ======");
try (MultipartObjectOperationTask task = new MultipartObjectOperationTask(context)) {
task.run();
} catch (Throwable e) {
System.out.println("ERROR: " + ExceptionUtils.getRootCause(e));
System.exit(1);
}
System.out.println("====== 2/2: multipart object operation task passed ======");

System.out.println("====== Congratulations! You have passed all checks!!! ======");

}

private static String range(long start, long end) {
Expand Down Expand Up @@ -124,11 +119,12 @@ public S3CheckTask(S3Context context, String taskName) {

protected static void showErrorInfo(Exception e) {
if (e.getCause() instanceof S3Exception se) {
System.err.println("get S3 exception: ");
se.printStackTrace();
// Do not use system.err because automq admin tool suppress system.err
System.out.println("get S3 exception: ");
se.printStackTrace(System.out);
} else {
System.err.println("get other exception: ");
e.printStackTrace();
System.out.println("get other exception: ");
e.printStackTrace(System.out);
}
}

Expand Down Expand Up @@ -157,7 +153,6 @@ public void run() {
ByteBuf byteBuf = null;
try {
// Simple write/read/delete
System.out.println("1) Trying to write multipart object " + path + " ...");
String uploadId = createMultipartUpload(client, bucketName, path).get();
List<CompletedPart> parts = new ArrayList<>();
int data1Size = 1024 * 1024 * 5;
Expand All @@ -168,27 +163,28 @@ public void run() {
new Random().nextBytes(randomBytes);
ByteBuf data1 = Unpooled.wrappedBuffer(randomBytes);
writePart(uploadId, path, bucketName, data1, 1).thenAccept(parts::add).get();
System.out.println("writing part 1 passed");

byte[] randomBytes2 = new byte[data2Size];
new Random().nextBytes(randomBytes2);
ByteBuf data2 = Unpooled.wrappedBuffer(randomBytes2);
writePart(uploadId, path, bucketName, data2, 2).thenAccept(parts::add).get();
System.out.println("writing part 2 passed");

System.out.println("[ OK ] Write S3 object");

completeMultipartUpload(client, path, bucketName, uploadId, parts).get();
System.out.println("writing and uploading multipart object passed");
System.out.println("[ OK ] Upload s3 multipart object");

System.out.println("2) Trying to read multipart object " + path + " ...");
CompletableFuture<ByteBuf> readCf = new CompletableFuture<>();
readRange(client, path, readCf, bucketName, 0, -1);
byteBuf = readCf.get();
if (byteBuf == null) {
System.out.println("[ FAILED ] Read s3 multipart object");
throw new RuntimeException("read multipart object " + path + " fail. got null");
} else if (byteBuf.readableBytes() != totalSize) {
System.out.println("[ FAILED ] Read s3 multipart object");
throw new RuntimeException("read multipart object " + path + " fail. expected size " + totalSize + ", actual size " + byteBuf.readableBytes());
}
System.out.println("read passed");
System.out.println("[ OK ] Read s3 multipart object");
} catch (ExecutionException | InterruptedException e) {
showErrorInfo(e);
throw new RuntimeException(e);
Expand All @@ -204,10 +200,9 @@ private CompletableFuture<String> createMultipartUpload(S3AsyncClient writeS3Cli
CompletableFuture<String> cf = new CompletableFuture<>();
CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucketName).key(path).build();
writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> {
System.out.println("created upload id: " + createMultipartUploadResponse.uploadId());
cf.complete(createMultipartUploadResponse.uploadId());
}).exceptionally(ex -> {
System.err.println("failed to create upload id.");
System.out.println("[ FAILED ] Upload s3 multipart object");
cf.completeExceptionally(ex);
return null;
});
Expand All @@ -221,10 +216,9 @@ public CompletableFuture<Void> completeMultipartUpload(S3AsyncClient writeS3Clie
CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder().bucket(bucket).key(path).uploadId(uploadId).multipartUpload(multipartUpload).build();

writeS3Client.completeMultipartUpload(request).thenAccept(completeMultipartUploadResponse -> {
System.out.println("completed upload with id " + uploadId);
cf.complete(null);
}).exceptionally(ex -> {
System.err.println("failed to upload with id " + uploadId);
System.out.println("[ FAILED ] Upload s3 multipart object, upload id is " + uploadId);
cf.completeExceptionally(ex);
return null;
});
Expand Down Expand Up @@ -273,26 +267,28 @@ public void run() {
ByteBuf byteBuf = null;
try {
// Simple write/read/delete
System.out.println("1) Trying to write object " + path + " ...");
CompletableFuture<Void> writeCf = new CompletableFuture<>();
writeObject(client, path, ByteBuffer.wrap(content), writeCf, bucketName);
writeCf.get();
System.out.println("[ OK ] Write s3 object");

System.out.println("2) Trying to read object " + path + " ...");
CompletableFuture<ByteBuf> readCf = new CompletableFuture<>();
readRange(client, path, readCf, bucketName, 0, -1);
byteBuf = readCf.get();
if (byteBuf == null) {
System.out.println("[ Failed ] Read s3 object");
throw new RuntimeException("read object " + path + " fail. got null");
} else if (byteBuf.readableBytes() != content.length) {
System.out.println("[ Failed ] Read s3 object");
throw new RuntimeException("read object " + path + " fail. expected size " + content.length + ", actual size " + byteBuf.readableBytes());
}
byte[] readContent = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(readContent);
if (!StringUtils.equals(new String(readContent, StandardCharsets.UTF_8), new String(content, StandardCharsets.UTF_8))) {
System.out.println("[ Failed ] Read s3 object");
throw new RuntimeException("read object " + path + " fail. expected content " + new String(content, StandardCharsets.UTF_8) + ", actual content " + new String(readContent, StandardCharsets.UTF_8));
}
System.out.println("read passed");
System.out.println("[ OK ] Read s3 object");
} catch (ExecutionException | InterruptedException e) {
showErrorInfo(e);
throw new RuntimeException(e);
Expand All @@ -305,14 +301,12 @@ public void run() {

private void writeObject(S3AsyncClient writeS3Client, String path, ByteBuffer data, CompletableFuture<Void> cf,
String bucket) {
int objectSize = data.remaining();
PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build();
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data);
writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> {
System.out.printf("put object %s with size %d%n", path, objectSize);
cf.complete(null);
}).exceptionally(ex -> {
System.err.printf("PutObject for object %s fail with msg %s %n", path, ex.getMessage());
System.out.printf("[ Failed ] Write s3 object. PutObject for object %s fail with msg %s %n", path, ex.getMessage());
cf.completeExceptionally(ex);
return null;
});
Expand Down Expand Up @@ -340,29 +334,30 @@ protected void deleteObject(S3AsyncClient deleteS3Client, String path, Completab
String bucket) {
DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(bucket).key(path).build();
deleteS3Client.deleteObject(request).thenAccept(deleteObjectResponse -> {
System.out.printf("deleted object %s%n", path);
cf.complete(null);
}).exceptionally(ex -> {
System.err.printf("delete object %s fail with msg %s %n", path, ex.getMessage());
System.out.printf("[ FAILED ] Delete s3 object. Delete object %s fail with msg %s %n", path, ex.getMessage());
cf.completeExceptionally(ex);
return null;
});
}

@Override
public void close() {
System.out.println("3) Trying to delete object " + path + " ...");
try {
CompletableFuture<Void> deleteCf = new CompletableFuture<>();
deleteObject(client, path, deleteCf, bucketName);
deleteCf.get();
} catch (InterruptedException | ExecutionException e) {
System.err.println(" NOTICE: please delete object " + path + " manually!!!");

System.out.println("[ FAILED ] Delete s3 object. NOTICE: please delete object " + path + " manually!!!");
KaimingWan marked this conversation as resolved.
Show resolved Hide resolved
showErrorInfo(e);
throw new RuntimeException(e);
} finally {
super.close();
}
System.out.println("[ OK ] Delete s3 object");

}
}

Expand Down