Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement overwrite in PageDoraWorker.rename() #18005

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c5918d8
fix bug
007DXR Aug 2, 2023
a371f3a
fix bug
007DXR Aug 2, 2023
25846ca
fix bug and add unit tests to validate
007DXR Aug 2, 2023
c8fcd1e
delete unused package
007DXR Aug 2, 2023
795fe45
//
007DXR Aug 2, 2023
0cafdb2
//
007DXR Aug 2, 2023
8b5c520
//
007DXR Aug 2, 2023
520eea5
Merge branch 'Alluxio:main' into multipartupload
007DXR Aug 2, 2023
63b7154
Merge remote-tracking branch 'origin/multipartupload' into multipartu…
007DXR Aug 3, 2023
d12491e
update to the latest
007DXR Aug 3, 2023
15ec130
update to the latest
007DXR Aug 3, 2023
b29c186
//
007DXR Aug 3, 2023
99b4b40
add MPU UTs
007DXR Aug 10, 2023
a670e6f
revert S3BucketTask to the former
007DXR Aug 10, 2023
fa79483
validate parts like AWS
007DXR Aug 10, 2023
d542f9a
fix code style error
007DXR Aug 10, 2023
f0d8f8b
implement overwrite in pageDoraWorker.rename()
pkuweblab Aug 15, 2023
fcd6b38
add TODO. modify validateParts()
pkuweblab Aug 15, 2023
76f919b
extract execute from the constructor method
pkuweblab Aug 15, 2023
ff8f90a
ignore execute in S3ClientRestApiTest
pkuweblab Aug 15, 2023
282319e
ignore execute in S3ClientRestApiTest
pkuweblab Aug 15, 2023
d4051ec
retest
pkuweblab Aug 15, 2023
9947816
Merge remote-tracking branch 'origin/multipartupload' into multipartu…
pkuweblab Aug 16, 2023
4ab4812
move MultipartUploadTest.java location
pkuweblab Aug 16, 2023
0523e68
Merge branch 'multipartupload' of github.com:007DXR/alluxio into rena…
pkuweblab Aug 17, 2023
3ce4ce6
implement overwrite in pageDoraWorker.rename()
pkuweblab Aug 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -405,28 +405,43 @@ public CompleteMultipartUploadRequest parseCompleteMultipartUploadRequest(String
public List<URIStatus> validateParts(CompleteMultipartUploadRequest request,
String objectPath,
AlluxioURI multipartTemporaryDir)
throws S3Exception, IOException, AlluxioException {
List<URIStatus> uploadedParts = mUserFs.listStatus(multipartTemporaryDir);
uploadedParts.sort(new S3RestUtils.URIStatusNameComparator());
if (uploadedParts.size() < request.getParts().size()) {
throw new S3Exception(objectPath, S3ErrorCode.INVALID_PART);
}
Map<Integer, URIStatus> uploadedPartsMap = uploadedParts.stream().collect(Collectors.toMap(
throws S3Exception, IOException, AlluxioException {
final List<URIStatus> uploadedParts = mUserFs.listStatus(multipartTemporaryDir);
final List<CompleteMultipartUploadRequest.Part> requestParts = request.getParts();
final Map<Integer, URIStatus> uploadedPartsMap =
uploadedParts.stream().collect(Collectors.toMap(
status -> Integer.parseInt(status.getName()),
status -> status
));
int lastPartNum = request.getParts().get(request.getParts().size() - 1).getPartNumber();
for (CompleteMultipartUploadRequest.Part part : request.getParts()) {
));

if (requestParts == null || requestParts.isEmpty()) {
throw new S3Exception(objectPath, S3ErrorCode.MALFORMED_XML);
}
if (uploadedParts.size() < requestParts.size()) {
throw new S3Exception(objectPath, S3ErrorCode.INVALID_PART);
}
for (CompleteMultipartUploadRequest.Part part : requestParts) {
if (!uploadedPartsMap.containsKey(part.getPartNumber())) {
throw new S3Exception(objectPath, S3ErrorCode.INVALID_PART);
}
if (part.getPartNumber() != lastPartNum // size requirement not applicable to last part
&& uploadedPartsMap.get(part.getPartNumber()).getLength() < Configuration.getBytes(
PropertyKey.PROXY_S3_COMPLETE_MULTIPART_UPLOAD_MIN_PART_SIZE)) {
}
int prevPartNum = requestParts.get(0).getPartNumber();
for (CompleteMultipartUploadRequest.Part part :
requestParts.subList(1, requestParts.size())) {
if (prevPartNum >= part.getPartNumber()) {
throw new S3Exception(S3ErrorCode.INVALID_PART_ORDER);
}
if (uploadedPartsMap.get(prevPartNum).getLength() < Configuration.getBytes(
PropertyKey.PROXY_S3_COMPLETE_MULTIPART_UPLOAD_MIN_PART_SIZE)) {
throw new S3Exception(objectPath, S3ErrorCode.ENTITY_TOO_SMALL);
}
prevPartNum = part.getPartNumber();
}
return uploadedParts;

List<URIStatus> validParts =
requestParts.stream().map(part -> uploadedPartsMap.get(part.getPartNumber()))
.collect(Collectors.toList());
return validParts;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@

package alluxio.proxy.s3;

import alluxio.s3.S3ErrorCode;
import alluxio.s3.S3Exception;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlElementWrapper;
Expand Down Expand Up @@ -49,7 +46,7 @@ public CompleteMultipartUploadRequest() {}
* @param parts the list of Part objects
*/
public CompleteMultipartUploadRequest(List<Part> parts) {
this(parts, false);
setParts(parts);
}

/**
Expand All @@ -58,7 +55,9 @@ public CompleteMultipartUploadRequest(List<Part> parts) {
*
* @param parts the list of Part objects
* @param ignoreValidation flag to skip Part validation
* @deprecated always ignore valdateion
*/
@Deprecated
public CompleteMultipartUploadRequest(List<Part> parts, boolean ignoreValidation) {
if (ignoreValidation) {
mParts = parts;
Expand All @@ -82,25 +81,6 @@ public List<Part> getParts() {
@JacksonXmlProperty(localName = "Part")
public void setParts(List<Part> parts) {
mParts = parts;
validateParts();
}

private void validateParts() {
if (mParts.size() <= 1) { return; }
try {
int prevPartNum = mParts.get(0).getPartNumber();
for (Part part : mParts.subList(1, mParts.size())) {
if (prevPartNum + 1 != part.getPartNumber()) {
throw new S3Exception(S3ErrorCode.INVALID_PART_ORDER);
}
prevPartNum = part.getPartNumber();
}
} catch (S3Exception e) {
// IllegalArgumentException will be consumed by IOException from the
// jersey library when parsing the XML into this object
// - the underlying S3Exception will be the throwable cause for the IOException
throw new IllegalArgumentException(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
package alluxio.proxy.s3;

import alluxio.client.file.URIStatus;
import alluxio.s3.S3Constants;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
Expand Down Expand Up @@ -48,6 +47,12 @@ public class ListMultipartUploadsResult {
public static ListMultipartUploadsResult buildFromStatuses(String bucket,
List<URIStatus> children) {
List<ListMultipartUploadsResult.Upload> uploads = children.stream()
.map(status -> new Upload(status.getName(), status.getName(),
S3RestUtils.toS3Date(status.getLastModificationTimeMs())
))
.collect(Collectors.toList());
/*
TODO(pkuweblab): 3.x haven't supported XAttr yet
.filter(status -> {
if (status.getXAttr() == null
|| !status.getXAttr().containsKey(S3Constants.UPLOADS_BUCKET_XATTR_KEY)
Expand All @@ -64,7 +69,7 @@ public static ListMultipartUploadsResult buildFromStatuses(String bucket,
status.getName(),
S3RestUtils.toS3Date(status.getLastModificationTimeMs())
))
.collect(Collectors.toList());
.collect(Collectors.toList());*/
return new ListMultipartUploadsResult(bucket, uploads);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1234,27 +1234,42 @@ public List<URIStatus> validateParts(CompleteMultipartUploadRequest request,
String objectPath,
AlluxioURI multipartTemporaryDir)
throws S3Exception, IOException, AlluxioException {
List<URIStatus> uploadedParts = mUserFs.listStatus(multipartTemporaryDir);
uploadedParts.sort(new S3RestUtils.URIStatusNameComparator());
if (uploadedParts.size() < request.getParts().size()) {
final List<URIStatus> uploadedParts = mUserFs.listStatus(multipartTemporaryDir);
final List<CompleteMultipartUploadRequest.Part> requestParts = request.getParts();
final Map<Integer, URIStatus> uploadedPartsMap =
uploadedParts.stream().collect(Collectors.toMap(
status -> Integer.parseInt(status.getName()),
status -> status
));

if (requestParts == null || requestParts.isEmpty()) {
throw new S3Exception(objectPath, S3ErrorCode.MALFORMED_XML);
}
if (uploadedParts.size() < requestParts.size()) {
throw new S3Exception(objectPath, S3ErrorCode.INVALID_PART);
}
Map<Integer, URIStatus> uploadedPartsMap = uploadedParts.stream().collect(Collectors.toMap(
status -> Integer.parseInt(status.getName()),
status -> status
));
int lastPartNum = request.getParts().get(request.getParts().size() - 1).getPartNumber();
for (CompleteMultipartUploadRequest.Part part : request.getParts()) {
for (CompleteMultipartUploadRequest.Part part : requestParts) {
if (!uploadedPartsMap.containsKey(part.getPartNumber())) {
throw new S3Exception(objectPath, S3ErrorCode.INVALID_PART);
}
if (part.getPartNumber() != lastPartNum // size requirement not applicable to last part
&& uploadedPartsMap.get(part.getPartNumber()).getLength() < Configuration.getBytes(
}
int prevPartNum = requestParts.get(0).getPartNumber();
for (CompleteMultipartUploadRequest.Part part :
requestParts.subList(1, requestParts.size())) {
if (prevPartNum >= part.getPartNumber()) {
throw new S3Exception(S3ErrorCode.INVALID_PART_ORDER);
}
if (uploadedPartsMap.get(prevPartNum).getLength() < Configuration.getBytes(
PropertyKey.PROXY_S3_COMPLETE_MULTIPART_UPLOAD_MIN_PART_SIZE)) {
throw new S3Exception(objectPath, S3ErrorCode.ENTITY_TOO_SMALL);
}
prevPartNum = part.getPartNumber();
}
return uploadedParts;

List<URIStatus> validParts =
requestParts.stream().map(part -> uploadedPartsMap.get(part.getPartNumber()))
.collect(Collectors.toList());
return validParts;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.RateLimiter;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -359,17 +358,17 @@ public static List<URIStatus> checkStatusesForUploadId(
final AlluxioURI metaUri = new AlluxioURI(
S3RestUtils.getMultipartMetaFilepathForUploadId(uploadId));
URIStatus metaStatus = metaFs.getStatus(metaUri);
/* TODO(pkuweblab): 3.x haven't supported XAttr yet
if (metaStatus.getXAttr() == null
|| !metaStatus.getXAttr().containsKey(S3Constants.UPLOADS_FILE_ID_XATTR_KEY)) {
//TODO(czhu): determine intended behavior in this edge-case
throw new RuntimeException(
"Alluxio is missing multipart-upload metadata for upload ID: " + uploadId);
}
if (Longs.fromByteArray(metaStatus.getXAttr().get(S3Constants.UPLOADS_FILE_ID_XATTR_KEY))
!= multipartTempDirStatus.getFileId()) {
throw new RuntimeException(
"Alluxio mismatched file ID for multipart-upload with upload ID: " + uploadId);
}
}*/
return new ArrayList<>(Arrays.asList(multipartTempDirStatus, metaStatus));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,21 @@ public void delete(String path, DeletePOptions options) throws IOException,
public void rename(String src, String dst, RenamePOptions options)
throws IOException, AccessControlException {
try {
boolean overWrite = options.getS3SyntaxOptions().getOverwrite();
if (mUfs.exists(dst)) {
if (!overWrite) {
throw new RuntimeException(
new FileAlreadyExistsException("File already exists but no overwrite flag"));
} else {
mMetaManager.removeFromMetaStore(dst);
if (mUfs.getStatus(dst).isFile()) {
mUfs.deleteFile(dst);
} else {
mUfs.deleteDirectory(dst, DeleteOptions.RECURSIVE);
}
}
}

UfsStatus status = mUfs.getStatus(src);
if (status.isFile()) {
mUfs.renameFile(src, dst);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,23 @@
import alluxio.proxy.s3.S3RestUtils;
import alluxio.s3.ListBucketOptions;
import alluxio.s3.ListBucketResult;
import alluxio.s3.S3Error;
import alluxio.s3.S3ErrorCode;
import alluxio.security.authentication.AuthType;
import alluxio.security.authentication.AuthenticatedClientUser;
import alluxio.security.authorization.Mode;
import alluxio.security.authorization.ModeParser;
import alluxio.testutils.LocalAlluxioClusterResource;

import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;

import java.net.HttpURLConnection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.Response;

public class ListStatusTest extends RestApiTest {
Expand Down Expand Up @@ -924,15 +919,8 @@ public void headAndListNonExistentBucket() throws Exception {
headTestCase(bucketName).checkResponseCode(Response.Status.NOT_FOUND.getStatusCode());

// Lists objects in a non-existent bucket.
HttpURLConnection connection2 = new TestCase(mHostname, mPort, mBaseUri,
bucketName, NO_PARAMS, HttpMethod.GET,
getDefaultOptionsWithAuth())
.execute();
// Verify 404 HTTP status & NoSuchBucket S3 error code
Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), connection2.getResponseCode());
S3Error response =
new XmlMapper().readerFor(S3Error.class).readValue(connection2.getErrorStream());
Assert.assertEquals(bucketName, response.getResource());
Assert.assertEquals(S3ErrorCode.Name.NO_SUCH_BUCKET, response.getCode());
listTestCase(bucketName, NO_PARAMS)
.checkResponseCode(Response.Status.NOT_FOUND.getStatusCode())
.checkErrorCode(S3ErrorCode.Name.NO_SUCH_BUCKET);
}
}
Loading
Loading