Skip to content

Commit

Permalink
Implement Netty based s3 API in worker
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Implement Netty based s3 API in worker

### Why are the changes needed?

Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, describe the bug.

### Does this PR introduce any user facing changes?

Please list the user-facing changes introduced by your change, including
  1. change in user-facing APIs
  2. addition or removal of property keys
  3. webui

			pr-link: #17661
			change-id: cid-0f87c923a1d0166325d47fd763262dbc423a0f30
  • Loading branch information
Jackson-Wang-7 authored Aug 3, 2023
1 parent 78379a5 commit dbb6af2
Show file tree
Hide file tree
Showing 65 changed files with 3,055 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ public void setAttribute(AlluxioURI path, SetAttributePOptions options)
* @param alluxioPath Alluxio based path
* @return UfsBaseFileSystem based full path
*/
private AlluxioURI convertAlluxioPathToUFSPath(AlluxioURI alluxioPath) {
public AlluxioURI convertAlluxioPathToUFSPath(AlluxioURI alluxioPath) {
if (mDelegatedFileSystem instanceof UfsBaseFileSystem) {
UfsBaseFileSystem under = (UfsBaseFileSystem) mDelegatedFileSystem;
AlluxioURI rootUFS = under.getRootUFS();
Expand Down
89 changes: 89 additions & 0 deletions dora/core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -5370,6 +5370,77 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_REST_PORT =
intBuilder(Name.WORKER_REST_PORT)
.setDefaultValue(29998)
.setDescription("The port Alluxio worker's rest api runs on.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_S3_REST_ENABLED =
booleanBuilder(Name.WORKER_S3_REST_ENABLED)
.setDefaultValue(false)
.setDescription("Set to true to enable worker netty s3 server.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_S3_AUDIT_LOGGING_ENABLED =
booleanBuilder(Name.WORKER_S3_LOGGING_ENABLED)
.setDefaultValue(false)
.setDescription("Set to true to enable worker netty s3 audit.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_S3_ASYNC_PROCESS_ENABLED =
booleanBuilder(Name.WORKER_S3_ASYNC_PROCESS_ENABLED)
.setDefaultValue(false)
.setDescription("(Experimental) If enabled, handle S3 request "
+ "in async mode in the netty based s3.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_S3_ASYNC_LIGHT_POOL_CORE_THREAD_NUMBER =
intBuilder(Name.WORKER_S3_ASYNC_LIGHT_POOL_CORE_THREAD_NUMBER)
.setDefaultValue(8)
.setDescription("Core thread number for async light thread pool.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey WORKER_S3_ASYNC_LIGHT_POOL_MAXIMUM_THREAD_NUMBER =
intBuilder(Name.WORKER_S3_ASYNC_LIGHT_POOL_MAXIMUM_THREAD_NUMBER)
.setDefaultValue(64)
.setDescription("Maximum thread number for async light thread pool.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey WORKER_S3_ASYNC_LIGHT_POOL_QUEUE_SIZE =
intBuilder(Name.WORKER_S3_ASYNC_LIGHT_POOL_QUEUE_SIZE)
.setDefaultValue(64 * 1024)
.setDescription("Queue size for async light thread pool.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey WORKER_S3_ASYNC_HEAVY_POOL_CORE_THREAD_NUMBER =
intBuilder(Name.WORKER_S3_ASYNC_HEAVY_POOL_CORE_THREAD_NUMBER)
.setDefaultValue(8)
.setDescription("Core thread number for async heavy thread pool.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey WORKER_S3_ASYNC_HEAVY_POOL_MAXIMUM_THREAD_NUMBER =
intBuilder(Name.WORKER_S3_ASYNC_HEAVY_POOL_MAXIMUM_THREAD_NUMBER)
.setDefaultValue(64)
.setDescription("Maximum thread number for async heavy thread pool.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey WORKER_S3_ASYNC_HEAVY_POOL_QUEUE_SIZE =
intBuilder(Name.WORKER_S3_ASYNC_HEAVY_POOL_QUEUE_SIZE)
.setDefaultValue(64 * 1024)
.setDescription("Queue size for async heavy thread pool.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey WORKER_UFS_BLOCK_OPEN_TIMEOUT_MS =
durationBuilder(Name.WORKER_UFS_BLOCK_OPEN_TIMEOUT_MS)
.setAlias("alluxio.worker.ufs.block.open.timeout.ms")
Expand Down Expand Up @@ -9017,6 +9088,24 @@ public static final class Name {
public static final String WORKER_WEB_BIND_HOST = "alluxio.worker.web.bind.host";
public static final String WORKER_WEB_HOSTNAME = "alluxio.worker.web.hostname";
public static final String WORKER_WEB_PORT = "alluxio.worker.web.port";
public static final String WORKER_REST_PORT = "alluxio.worker.rest.port";
public static final String WORKER_S3_REST_ENABLED = "alluxio.worker.s3.api.enabled";
public static final String WORKER_S3_LOGGING_ENABLED =
"alluxio.worker.s3.audit.logging.enabled";
public static final String WORKER_S3_ASYNC_PROCESS_ENABLED =
"alluxio.worker.s3.async.processing.enabled";
public static final String WORKER_S3_ASYNC_LIGHT_POOL_CORE_THREAD_NUMBER =
"alluxio.worker.s3.async.light.pool.core.thread.number";
public static final String WORKER_S3_ASYNC_LIGHT_POOL_MAXIMUM_THREAD_NUMBER =
"alluxio.worker.s3.async.light.pool.maximum.thread.number";
public static final String WORKER_S3_ASYNC_LIGHT_POOL_QUEUE_SIZE =
"alluxio.worker.s3.async.light.pool.queue.size";
public static final String WORKER_S3_ASYNC_HEAVY_POOL_CORE_THREAD_NUMBER =
"alluxio.worker.s3.async.heavy.pool.core.thread.number";
public static final String WORKER_S3_ASYNC_HEAVY_POOL_MAXIMUM_THREAD_NUMBER =
"alluxio.worker.s3.async.heavy.pool.maximum.thread.number";
public static final String WORKER_S3_ASYNC_HEAVY_POOL_QUEUE_SIZE =
"alluxio.worker.s3.async.heavy.pool.queue.size";
public static final String WORKER_UFS_BLOCK_OPEN_TIMEOUT_MS =
"alluxio.worker.ufs.block.open.timeout";
public static final String WORKER_UFS_INSTREAM_CACHE_EXPIRATION_TIME =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ public enum ServiceType implements ServiceAttributeProvider {
WORKER_DATA("Alluxio Worker data service", PropertyKey.WORKER_DATA_HOSTNAME,
PropertyKey.WORKER_DATA_BIND_HOST, PropertyKey.WORKER_DATA_PORT),

/**
* Worker s3 service (Netty).
*/
WORKER_REST("Alluxio Worker S3 service", PropertyKey.WORKER_WEB_HOSTNAME,
PropertyKey.WORKER_WEB_BIND_HOST, PropertyKey.WORKER_REST_PORT),

/**
* Worker web service (Jetty).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ private void getBindAddress(ServiceType service) throws Exception {
case WORKER_DATA:
mConfiguration.set(PropertyKey.WORKER_DATA_PORT, 20000);
break;
case WORKER_REST:
mConfiguration.set(PropertyKey.WORKER_REST_PORT, 20000);
break;
default:
Assert.fail("Unrecognized service type: " + service.toString());
break;
Expand Down
8 changes: 8 additions & 0 deletions dora/core/server/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kerby</groupId>
<artifactId>kerby-util</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
13 changes: 13 additions & 0 deletions dora/core/server/common/src/main/java/alluxio/RestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import alluxio.conf.AlluxioConfiguration;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.s3.S3Constants;
import alluxio.security.authentication.AuthenticatedClientUser;
import alluxio.security.user.ServerUserState;
import alluxio.util.SecurityUtils;
Expand All @@ -24,6 +25,9 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import javax.annotation.Nullable;
import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -121,6 +125,15 @@ private static Response createResponse(Object object, AlluxioConfiguration allux
return rb.build();
}

/**
* @param epoch the milliseconds from the epoch
* @return the string representation of the epoch in the S3 date format
*/
public static String toS3Date(long epoch) {
final DateFormat s3DateFormat = new SimpleDateFormat(S3Constants.S3_DATE_FORMAT_REGEXP);
return s3DateFormat.format(new Date(epoch));
}

/**
* Error response when {@link RestCallable#call()} throws an exception.
* It will be encoded into a Json string to be returned as an error response for the REST call.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.proxy.s3;
package alluxio.s3;

import java.io.FilterInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -38,7 +38,7 @@ public class ChunkedEncodingInputStream extends FilterInputStream {
* @param in the underlying input stream, or <code>null</code> if
* this instance is to be created without an underlying stream.
*/
protected ChunkedEncodingInputStream(InputStream in) {
public ChunkedEncodingInputStream(InputStream in) {
super(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.proxy.s3;
package alluxio.s3;

import alluxio.RestUtils;

import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement;
Expand All @@ -32,7 +34,7 @@ public class CopyObjectResult {
*/
public CopyObjectResult(String etag, long lastModifiedEpoch) {
mETag = etag;
mLastModified = S3RestUtils.toS3Date(lastModifiedEpoch);
mLastModified = RestUtils.toS3Date(lastModifiedEpoch);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.proxy.s3;
package alluxio.s3;

import alluxio.RestUtils;
import alluxio.client.file.URIStatus;

import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlElementWrapper;
Expand All @@ -35,7 +36,7 @@ public class ListAllMyBucketsResult {
public ListAllMyBucketsResult(List<URIStatus> names) {
mBuckets =
names.stream().map((uriStatus) -> new Bucket(uriStatus.getName(),
S3RestUtils.toS3Date(uriStatus.getCreationTimeMs())))
RestUtils.toS3Date(uriStatus.getCreationTimeMs())))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.proxy.s3;
package alluxio.s3;

import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.proxy.s3;
package alluxio.s3;

import alluxio.AlluxioURI;
import alluxio.RestUtils;
import alluxio.client.file.URIStatus;

import com.fasterxml.jackson.annotation.JsonInclude;
Expand Down Expand Up @@ -214,7 +215,7 @@ private void buildListBucketResult(
String path = status.getPath().substring(bucketPrefix.length());
return new Content(
status.isFolder() ? path + AlluxioURI.SEPARATOR : path,
S3RestUtils.toS3Date(status.getLastModificationTimeMs()),
RestUtils.toS3Date(status.getLastModificationTimeMs()),
status.isFolder() ? "0" : String.valueOf(status.getLength())
);
})
Expand Down
Loading

0 comments on commit dbb6af2

Please sign in to comment.