Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix S3 CompleteMultipartUpload and add related UTs #17874

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -405,28 +405,43 @@ public CompleteMultipartUploadRequest parseCompleteMultipartUploadRequest(String
public List<URIStatus> validateParts(CompleteMultipartUploadRequest request,
String objectPath,
AlluxioURI multipartTemporaryDir)
throws S3Exception, IOException, AlluxioException {
List<URIStatus> uploadedParts = mUserFs.listStatus(multipartTemporaryDir);
uploadedParts.sort(new S3RestUtils.URIStatusNameComparator());
if (uploadedParts.size() < request.getParts().size()) {
throw new S3Exception(objectPath, S3ErrorCode.INVALID_PART);
}
Map<Integer, URIStatus> uploadedPartsMap = uploadedParts.stream().collect(Collectors.toMap(
throws S3Exception, IOException, AlluxioException {
final List<URIStatus> uploadedParts = mUserFs.listStatus(multipartTemporaryDir);
final List<CompleteMultipartUploadRequest.Part> requestParts = request.getParts();
final Map<Integer, URIStatus> uploadedPartsMap =
uploadedParts.stream().collect(Collectors.toMap(
status -> Integer.parseInt(status.getName()),
status -> status
));
int lastPartNum = request.getParts().get(request.getParts().size() - 1).getPartNumber();
for (CompleteMultipartUploadRequest.Part part : request.getParts()) {
));

if (requestParts == null || requestParts.isEmpty()) {
throw new S3Exception(objectPath, S3ErrorCode.MALFORMED_XML);
}
if (uploadedParts.size() < requestParts.size()) {
throw new S3Exception(objectPath, S3ErrorCode.INVALID_PART);
}
for (CompleteMultipartUploadRequest.Part part : requestParts) {
if (!uploadedPartsMap.containsKey(part.getPartNumber())) {
throw new S3Exception(objectPath, S3ErrorCode.INVALID_PART);
}
if (part.getPartNumber() != lastPartNum // size requirement not applicable to last part
&& uploadedPartsMap.get(part.getPartNumber()).getLength() < Configuration.getBytes(
PropertyKey.PROXY_S3_COMPLETE_MULTIPART_UPLOAD_MIN_PART_SIZE)) {
}
int prevPartNum = requestParts.get(0).getPartNumber();
for (CompleteMultipartUploadRequest.Part part :
requestParts.subList(1, requestParts.size())) {
if (prevPartNum >= part.getPartNumber()) {
throw new S3Exception(S3ErrorCode.INVALID_PART_ORDER);
}
if (uploadedPartsMap.get(prevPartNum).getLength() < Configuration.getBytes(
PropertyKey.PROXY_S3_COMPLETE_MULTIPART_UPLOAD_MIN_PART_SIZE)) {
throw new S3Exception(objectPath, S3ErrorCode.ENTITY_TOO_SMALL);
}
prevPartNum = part.getPartNumber();
}
return uploadedParts;

List<URIStatus> validParts =
requestParts.stream().map(part -> uploadedPartsMap.get(part.getPartNumber()))
.collect(Collectors.toList());
return validParts;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@

package alluxio.proxy.s3;

import alluxio.s3.S3ErrorCode;
import alluxio.s3.S3Exception;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlElementWrapper;
Expand Down Expand Up @@ -49,7 +46,7 @@ public CompleteMultipartUploadRequest() {}
* @param parts the list of Part objects
*/
public CompleteMultipartUploadRequest(List<Part> parts) {
this(parts, false);
setParts(parts);
}

/**
Expand All @@ -58,7 +55,9 @@ public CompleteMultipartUploadRequest(List<Part> parts) {
*
* @param parts the list of Part objects
* @param ignoreValidation flag to skip Part validation
* @deprecated always ignore valdateion
*/
@Deprecated
public CompleteMultipartUploadRequest(List<Part> parts, boolean ignoreValidation) {
if (ignoreValidation) {
mParts = parts;
Expand All @@ -82,25 +81,6 @@ public List<Part> getParts() {
@JacksonXmlProperty(localName = "Part")
public void setParts(List<Part> parts) {
mParts = parts;
validateParts();
007DXR marked this conversation as resolved.
Show resolved Hide resolved
}

private void validateParts() {
if (mParts.size() <= 1) { return; }
try {
int prevPartNum = mParts.get(0).getPartNumber();
for (Part part : mParts.subList(1, mParts.size())) {
if (prevPartNum + 1 != part.getPartNumber()) {
throw new S3Exception(S3ErrorCode.INVALID_PART_ORDER);
}
prevPartNum = part.getPartNumber();
}
} catch (S3Exception e) {
// IllegalArgumentException will be consumed by IOException from the
// jersey library when parsing the XML into this object
// - the underlying S3Exception will be the throwable cause for the IOException
throw new IllegalArgumentException(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
package alluxio.proxy.s3;

import alluxio.client.file.URIStatus;
import alluxio.s3.S3Constants;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
Expand Down Expand Up @@ -48,6 +47,12 @@ public class ListMultipartUploadsResult {
public static ListMultipartUploadsResult buildFromStatuses(String bucket,
List<URIStatus> children) {
List<ListMultipartUploadsResult.Upload> uploads = children.stream()
.map(status -> new Upload(status.getName(), status.getName(),
007DXR marked this conversation as resolved.
Show resolved Hide resolved
S3RestUtils.toS3Date(status.getLastModificationTimeMs())
))
.collect(Collectors.toList());
/*
TODO(pkuweblab): 3.x haven't supported XAttr yet, so can't mark Upload.key as object name
.filter(status -> {
if (status.getXAttr() == null
|| !status.getXAttr().containsKey(S3Constants.UPLOADS_BUCKET_XATTR_KEY)
Expand All @@ -64,7 +69,7 @@ public static ListMultipartUploadsResult buildFromStatuses(String bucket,
status.getName(),
S3RestUtils.toS3Date(status.getLastModificationTimeMs())
))
.collect(Collectors.toList());
.collect(Collectors.toList());*/
return new ListMultipartUploadsResult(bucket, uploads);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ public Response continueTask() {
}

final List<URIStatus> buckets = objects.stream()
.filter((uri) -> uri.getOwner().equals(user))
// debatable (?) potentially breaks backcompat(?)
.filter(URIStatus::isFolder)
.collect(Collectors.toList());
Expand Down Expand Up @@ -212,10 +211,7 @@ public Response continueTask() {
try {
List<URIStatus> children = mHandler.getMetaFS().listStatus(new AlluxioURI(
S3RestUtils.MULTIPART_UPLOADS_METADATA_DIR));
final List<URIStatus> uploadIds = children.stream()
.filter((uri) -> uri.getOwner().equals(user))
.collect(Collectors.toList());
return ListMultipartUploadsResult.buildFromStatuses(bucket, uploadIds);
return ListMultipartUploadsResult.buildFromStatuses(bucket, children);
} catch (Exception e) {
throw S3RestUtils.toBucketS3Exception(e, bucket, auditContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1234,27 +1234,42 @@ public List<URIStatus> validateParts(CompleteMultipartUploadRequest request,
String objectPath,
AlluxioURI multipartTemporaryDir)
throws S3Exception, IOException, AlluxioException {
List<URIStatus> uploadedParts = mUserFs.listStatus(multipartTemporaryDir);
uploadedParts.sort(new S3RestUtils.URIStatusNameComparator());
if (uploadedParts.size() < request.getParts().size()) {
final List<URIStatus> uploadedParts = mUserFs.listStatus(multipartTemporaryDir);
final List<CompleteMultipartUploadRequest.Part> requestParts = request.getParts();
final Map<Integer, URIStatus> uploadedPartsMap =
uploadedParts.stream().collect(Collectors.toMap(
status -> Integer.parseInt(status.getName()),
status -> status
));

if (requestParts == null || requestParts.isEmpty()) {
throw new S3Exception(objectPath, S3ErrorCode.MALFORMED_XML);
}
if (uploadedParts.size() < requestParts.size()) {
throw new S3Exception(objectPath, S3ErrorCode.INVALID_PART);
}
Map<Integer, URIStatus> uploadedPartsMap = uploadedParts.stream().collect(Collectors.toMap(
status -> Integer.parseInt(status.getName()),
status -> status
));
int lastPartNum = request.getParts().get(request.getParts().size() - 1).getPartNumber();
for (CompleteMultipartUploadRequest.Part part : request.getParts()) {
for (CompleteMultipartUploadRequest.Part part : requestParts) {
if (!uploadedPartsMap.containsKey(part.getPartNumber())) {
throw new S3Exception(objectPath, S3ErrorCode.INVALID_PART);
}
if (part.getPartNumber() != lastPartNum // size requirement not applicable to last part
&& uploadedPartsMap.get(part.getPartNumber()).getLength() < Configuration.getBytes(
}
int prevPartNum = requestParts.get(0).getPartNumber();
for (CompleteMultipartUploadRequest.Part part :
requestParts.subList(1, requestParts.size())) {
if (prevPartNum >= part.getPartNumber()) {
throw new S3Exception(S3ErrorCode.INVALID_PART_ORDER);
}
if (uploadedPartsMap.get(prevPartNum).getLength() < Configuration.getBytes(
PropertyKey.PROXY_S3_COMPLETE_MULTIPART_UPLOAD_MIN_PART_SIZE)) {
throw new S3Exception(objectPath, S3ErrorCode.ENTITY_TOO_SMALL);
}
prevPartNum = part.getPartNumber();
}
return uploadedParts;

List<URIStatus> validParts =
requestParts.stream().map(part -> uploadedPartsMap.get(part.getPartNumber()))
.collect(Collectors.toList());
return validParts;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.RateLimiter;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -359,17 +358,17 @@ public static List<URIStatus> checkStatusesForUploadId(
final AlluxioURI metaUri = new AlluxioURI(
S3RestUtils.getMultipartMetaFilepathForUploadId(uploadId));
URIStatus metaStatus = metaFs.getStatus(metaUri);
/* TODO(pkuweblab): 3.x haven't supported XAttr yet
if (metaStatus.getXAttr() == null
|| !metaStatus.getXAttr().containsKey(S3Constants.UPLOADS_FILE_ID_XATTR_KEY)) {
//TODO(czhu): determine intended behavior in this edge-case
throw new RuntimeException(
"Alluxio is missing multipart-upload metadata for upload ID: " + uploadId);
}
if (Longs.fromByteArray(metaStatus.getXAttr().get(S3Constants.UPLOADS_FILE_ID_XATTR_KEY))
!= multipartTempDirStatus.getFileId()) {
throw new RuntimeException(
"Alluxio mismatched file ID for multipart-upload with upload ID: " + uploadId);
}
}*/
return new ArrayList<>(Arrays.asList(multipartTempDirStatus, metaStatus));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,23 @@
import alluxio.proxy.s3.S3RestUtils;
import alluxio.s3.ListBucketOptions;
import alluxio.s3.ListBucketResult;
import alluxio.s3.S3Error;
import alluxio.s3.S3ErrorCode;
import alluxio.security.authentication.AuthType;
import alluxio.security.authentication.AuthenticatedClientUser;
import alluxio.security.authorization.Mode;
import alluxio.security.authorization.ModeParser;
import alluxio.testutils.LocalAlluxioClusterResource;

import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;

import java.net.HttpURLConnection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.Response;

public class ListStatusTest extends RestApiTest {
Expand Down Expand Up @@ -924,15 +919,8 @@ public void headAndListNonExistentBucket() throws Exception {
headTestCase(bucketName).checkResponseCode(Response.Status.NOT_FOUND.getStatusCode());

// Lists objects in a non-existent bucket.
HttpURLConnection connection2 = new TestCase(mHostname, mPort, mBaseUri,
bucketName, NO_PARAMS, HttpMethod.GET,
getDefaultOptionsWithAuth())
.execute();
// Verify 404 HTTP status & NoSuchBucket S3 error code
Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), connection2.getResponseCode());
S3Error response =
new XmlMapper().readerFor(S3Error.class).readValue(connection2.getErrorStream());
Assert.assertEquals(bucketName, response.getResource());
Assert.assertEquals(S3ErrorCode.Name.NO_SUCH_BUCKET, response.getCode());
listTestCase(bucketName, NO_PARAMS)
.checkResponseCode(Response.Status.NOT_FOUND.getStatusCode())
.checkErrorCode(S3ErrorCode.Name.NO_SUCH_BUCKET);
}
}
Loading
Loading