Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge master-2.x commits 2023/07/01~2023/11/08 into main #18397

Merged
merged 12 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -558,13 +558,16 @@ public ReinitBlockerResource blockReinit() {
*/
public void reinit(boolean updateClusterConf)
throws UnavailableException, IOException {
// inquiry primary master address before entering the critical session of mReinitializer,
// where all RPCs wait for the monitor object of FileSystemContext (synchronized methods)
// will block until initialization completes
InetSocketAddress masterAddr;
try {
masterAddr = getMasterAddress();
} catch (IOException e) {
throw new UnavailableException("Failed to get master address during reinitialization", e);
}
try (Closeable r = mReinitializer.allow()) {
InetSocketAddress masterAddr;
try {
masterAddr = getMasterAddress();
} catch (IOException e) {
throw new UnavailableException("Failed to get master address during reinitialization", e);
}
try {
getClientContext().loadConf(masterAddr);
} catch (AlluxioStatusException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import alluxio.master.MasterInquireClient.Factory;
import alluxio.security.CurrentUser;
import alluxio.security.authorization.Mode;
import alluxio.util.ModeUtils;
import alluxio.wire.BlockLocationInfo;
import alluxio.wire.FileBlockInfo;
import alluxio.wire.WorkerNetAddress;
Expand Down Expand Up @@ -167,6 +168,27 @@ public void close() throws IOException {
mFileSystem.close();
}

/**
* Attempts to create a file with default permission.
* Overwrite will not succeed if the path exists and is a folder.
*
* @param path path to create
* @param overwrite overwrite if file exists
* @param bufferSize the size in bytes of the buffer to be used
* @param replication under filesystem replication factor, this is ignored
* @param blockSize block size in bytes
* @param progress queryable progress
* @return an {@link FSDataOutputStream} created at the indicated path of a file
*/
@Override
public FSDataOutputStream create(Path path, boolean overwrite, int bufferSize, short replication,
long blockSize, Progressable progress) throws IOException {
String confUmask = mAlluxioConf.getString(PropertyKey.SECURITY_AUTHORIZATION_PERMISSION_UMASK);
Mode mode = ModeUtils.applyFileUMask(Mode.defaults(), confUmask);
return this.create(path, new FsPermission(mode.toShort()), overwrite, bufferSize, replication,
blockSize, progress);
}

/**
* Attempts to create a file. Overwrite will not succeed if the path exists and is a folder.
*
Expand Down Expand Up @@ -646,6 +668,20 @@ public FileStatus[] listStatus(Path path) throws IOException {
return ret;
}

/**
* Attempts to create a folder with the specified path with default permission.
* Parent directories will be created.
*
* @param path path to create
* @return true if the indicated folder is created successfully or already exists
*/
@Override
public boolean mkdirs(Path path) throws IOException {
String confUmask = mAlluxioConf.getString(PropertyKey.SECURITY_AUTHORIZATION_PERMISSION_UMASK);
Mode mode = ModeUtils.applyDirectoryUMask(Mode.defaults(), confUmask);
return mkdirs(path, new FsPermission(mode.toShort()));
}

/**
* Attempts to create a folder with the specified path. Parent directories will be created.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import alluxio.client.file.FileInStream;

import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSDataInputStream;

import java.io.IOException;
Expand Down Expand Up @@ -50,7 +51,30 @@ public int read() throws IOException {

@Override
public int read(ByteBuffer buf) throws IOException {
return mInput.read(buf);
// @see <a href="https://github.com/apache/hadoop/blob/rel/release-3.3.6/
// * hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/
// * fs/FSDataInputStream.java#L154">FSDataInputStream.java</a>
if (mInput.getWrappedStream() instanceof ByteBufferReadable) {
return mInput.read(buf);
} else {
int off = buf.position();
int len = buf.remaining();
final int totalBytesRead;
if (buf.hasArray()) {
byte[] byteArray = buf.array();
totalBytesRead = read(byteArray, buf.arrayOffset() + off, len);
if (totalBytesRead > 0) {
buf.position(off + totalBytesRead);
}
} else {
byte[] byteArray = new byte[len];
totalBytesRead = read(byteArray);
if (totalBytesRead > 0) {
buf.put(byteArray, 0, totalBytesRead);
}
}
return totalBytesRead;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import alluxio.client.file.cache.LocalCacheFileInStream;
import alluxio.client.file.cache.filter.CacheFilter;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.AlluxioException;
import alluxio.metrics.MetricsConfig;
import alluxio.metrics.MetricsSystem;
import alluxio.wire.FileInfo;

import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -133,8 +135,15 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
.setGroup(externalFileStatus.getGroup());
// FilePath is a unique identifier for a file, however it can be a long string
// hence using md5 hash of the file path as the identifier in the cache.
CacheContext context = CacheContext.defaults().setCacheIdentifier(
md5().hashString(externalFileStatus.getPath().toString(), UTF_8).toString());
String cacheIdentifier;
if (mAlluxioConf.getBoolean(PropertyKey.USER_CLIENT_CACHE_IDENTIFIER_INCLUDE_MTIME)) {
// include mtime to avoid consistency issues if the file may update
cacheIdentifier = md5().hashString(externalFileStatus.getPath().toString()
+ externalFileStatus.getModificationTime(), UTF_8).toString();
} else {
cacheIdentifier = md5().hashString(externalFileStatus.getPath().toString(), UTF_8).toString();
}
CacheContext context = CacheContext.defaults().setCacheIdentifier(cacheIdentifier);
URIStatus status = new URIStatus(info, context);
return open(status, bufferSize);
}
Expand Down Expand Up @@ -234,4 +243,22 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
public FileStatus getFileStatus(Path f) throws IOException {
return mExternalFileSystem.getFileStatus(f);
}

@Override
public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
long len) throws IOException {
// Applications use the block information here to schedule/distribute the tasks.
// Return the UFS locations directly instead of the local cache location,
// so the application can schedule the tasks accordingly
return mExternalFileSystem.getFileBlockLocations(file, start, len);
}

@Override
public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
throws IOException {
// Applications use the block information here to schedule/distribute the tasks.
// Return the UFS locations directly instead of the local cache location,
// so the application can schedule the tasks accordingly
return mExternalFileSystem.getFileBlockLocations(p, start, len);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -732,7 +734,9 @@ public void createWithoutOverwrite() throws Exception {
ExceptionMessage.CANNOT_OVERWRITE_FILE_WITHOUT_OVERWRITE.getMessage(path.toString())));

try (FileSystem alluxioHadoopFs = new FileSystem(alluxioFs)) {
alluxioHadoopFs.create(path, false, 100, (short) 1, 1000);
alluxioHadoopFs.create(path,
FsCreateModes.applyUMask(FsPermission.getFileDefault(), FsPermission.getUMask(getConf())),
false, 100, (short) 1, 1000, null);
fail("create() of existing file is expected to fail");
} catch (IOException e) {
assertEquals("alluxio.exception.FileAlreadyExistsException: "
Expand Down
29 changes: 29 additions & 0 deletions dora/core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -1919,6 +1919,21 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OSS_DEFAULT_MODE =
stringBuilder(Name.UNDERFS_OSS_DEFAULT_MODE)
.setAlias("alluxio.underfs.oss.default.mode")
.setDefaultValue("0700")
.setDescription("Mode (in octal notation) for OSS objects if mode cannot be discovered.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OSS_OWNER_ID_TO_USERNAME_MAPPING =
stringBuilder(Name.UNDERFS_OSS_OWNER_ID_TO_USERNAME_MAPPING)
.setDescription("Optionally, specify a preset oss canonical id to Alluxio username "
+ "static mapping, in the format \"id1=user1;id2=user2\". ")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey S3A_ACCESS_KEY = stringBuilder(Name.S3A_ACCESS_KEY)
.setAlias(Name.AWS_ACCESS_KEY)
.setDescription("The access key of S3 bucket.")
Expand Down Expand Up @@ -5630,6 +5645,14 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.CLIENT)
.build();
public static final PropertyKey USER_CLIENT_CACHE_IDENTIFIER_INCLUDE_MTIME =
booleanBuilder(Name.USER_CLIENT_CACHE_IDENTIFIER_INCLUDE_MTIME)
.setDefaultValue(false)
.setDescription("If this is enabled, client-side cache will include modification time "
+ "while calculating the identifier of a file.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.CLIENT)
.build();

public static final PropertyKey USER_CLIENT_REPORT_VERSION_ENABLED =
booleanBuilder(Name.USER_CLIENT_REPORT_VERSION_ENABLED)
Expand Down Expand Up @@ -7394,6 +7417,10 @@ public static final class Name {
"alluxio.underfs.oss.multipart.upload.threads";
public static final String UNDERFS_OSS_MULTIPART_UPLOAD_PARTITION_SIZE =
"alluxio.underfs.oss.multipart.upload.part.size";
public static final String UNDERFS_OSS_DEFAULT_MODE =
"alluxio.underfs.oss.default.mode";
public static final String UNDERFS_OSS_OWNER_ID_TO_USERNAME_MAPPING =
"alluxio.underfs.oss.owner.id.to.username.mapping";
public static final String UNDERFS_S3_BULK_DELETE_ENABLED =
"alluxio.underfs.s3.bulk.delete.enabled";
public static final String UNDERFS_S3_DEFAULT_MODE = "alluxio.underfs.s3.default.mode";
Expand Down Expand Up @@ -8291,6 +8318,8 @@ public static final class Name {
"alluxio.user.client.cache.timeout.duration";
public static final String USER_CLIENT_CACHE_TIMEOUT_THREADS =
"alluxio.user.client.cache.timeout.threads";
public static final String USER_CLIENT_CACHE_IDENTIFIER_INCLUDE_MTIME =
"alluxio.user.client.cache.include.mtime";
public static final String USER_CLIENT_REPORT_VERSION_ENABLED =
"alluxio.user.client.report.version.enabled";
public static final String USER_CONSISTENT_HASH_VIRTUAL_NODE_COUNT_PER_WORKER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,7 @@ private void setIdAndRegister() throws IOException {
}

@Override
public void close() {}
public void close() {
mMasterClient.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ public S3AuditContext createAuditContext(String command,
@Nullable String object) {
// Audit log may be enabled during runtime
AsyncUserAccessAuditLogWriter auditLogWriter = null;
if (Configuration.getBoolean(PropertyKey.MASTER_AUDIT_LOGGING_ENABLED)) {
if (Configuration.getBoolean(PropertyKey.PROXY_AUDIT_LOGGING_ENABLED)) {
auditLogWriter = mAsyncAuditLogWriter;
}
S3AuditContext auditContext = new S3AuditContext(auditLogWriter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1532,7 +1532,7 @@ private S3AuditContext createAuditContext(String command, String user,
@Nullable String bucket, @Nullable String object) {
// Audit log may be enabled during runtime
AsyncUserAccessAuditLogWriter auditLogWriter = null;
if (Configuration.getBoolean(PropertyKey.MASTER_AUDIT_LOGGING_ENABLED)) {
if (Configuration.getBoolean(PropertyKey.PROXY_AUDIT_LOGGING_ENABLED)) {
auditLogWriter = mAsyncAuditLogWriter;
}
S3AuditContext auditContext = new S3AuditContext(auditLogWriter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ public void cleanupTFS() throws Exception {
comment = "Bring back but not passed, need to fix.")
@Test
public void createFileWithPermission() throws Exception {
Path defaultFile = new Path("/createfile-default");
FSDataOutputStream stream = sTFS.create(defaultFile, false /* ignored */, 10 /* ignored */,
(short) 1 /* ignored */, 512 /* ignored */, null /* ignored */);
stream.close();
FileStatus fileStatus = sTFS.getFileStatus(defaultFile);
Assert.assertEquals((short) 0644, fileStatus.getPermission().toShort());
List<Integer> permissionValues =
Lists.newArrayList(0111, 0222, 0333, 0444, 0555, 0666, 0777, 0755, 0733, 0644, 0533, 0511);
for (int value : permissionValues) {
Expand All @@ -132,6 +138,10 @@ public void createFileWithPermission() throws Exception {
@Test
@Ignore
public void mkdirsWithPermission() throws Exception {
Path defaultDir = new Path("/createDir-default");
sTFS.mkdirs(defaultDir);
FileStatus fileStatus = sTFS.getFileStatus(defaultDir);
Assert.assertEquals((short) 0755, fileStatus.getPermission().toShort());
List<Integer> permissionValues =
Lists.newArrayList(0111, 0222, 0333, 0444, 0555, 0666, 0777, 0755, 0733, 0644, 0533, 0511);
for (int value : permissionValues) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.UnderFileSystemConfiguration;
import alluxio.underfs.options.OpenOptions;
import alluxio.util.CommonUtils;
import alluxio.util.ModeUtils;
import alluxio.util.UnderFileSystemUtils;
import alluxio.util.executor.ExecutorServiceFactories;
import alluxio.util.io.PathUtils;
Expand All @@ -33,6 +35,7 @@
import com.aliyun.oss.ServiceException;
import com.aliyun.oss.common.comm.Protocol;
import com.aliyun.oss.model.AbortMultipartUploadRequest;
import com.aliyun.oss.model.BucketInfo;
import com.aliyun.oss.model.DeleteObjectsRequest;
import com.aliyun.oss.model.DeleteObjectsResult;
import com.aliyun.oss.model.ListMultipartUploadsRequest;
Expand All @@ -42,6 +45,7 @@
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.ObjectMetadata;
import com.aliyun.oss.model.Owner;
import com.aliyun.oss.model.SetObjectTaggingRequest;
import com.aliyun.oss.model.TagSet;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -76,6 +80,9 @@ public class OSSUnderFileSystem extends ObjectUnderFileSystem {

private static final String NO_SUCH_KEY = "NoSuchKey";

/** Default owner of objects if owner cannot be determined. */
private static final String DEFAULT_OWNER = "";

/** Aliyun OSS client. */
private final OSS mClient;

Expand All @@ -90,6 +97,10 @@ public class OSSUnderFileSystem extends ObjectUnderFileSystem {

private StsOssClientProvider mClientProvider;

/** The permissions associated with the bucket. Fetched once and assumed to be immutable. */
private final Supplier<ObjectPermissions> mPermissions
= CommonUtils.memoize(this::getPermissionsInternal);

/**
* Constructs a new instance of {@link OSSUnderFileSystem}.
*
Expand Down Expand Up @@ -400,7 +411,38 @@ protected ObjectStatus getObjectStatus(String key) {
// No ACL integration currently, returns default empty value
@Override
protected ObjectPermissions getPermissions() {
return new ObjectPermissions("", "", Constants.DEFAULT_FILE_SYSTEM_MODE);
return mPermissions.get();
}

/**
* Since there is no group in OSS, the owner is reused as the group. This method calls the
* OSS API and requires additional permissions aside from just read only. This method is best
* effort and will continue with default permissions (no owner, no group, 0700).
*
* @return the permissions associated with this under storage system
*/
private ObjectPermissions getPermissionsInternal() {
short bucketMode =
ModeUtils.getUMask(mUfsConf.getString(PropertyKey.UNDERFS_OSS_DEFAULT_MODE)).toShort();
String accountOwner = DEFAULT_OWNER;

try {
BucketInfo bucketInfo = mClient.getBucketInfo(mBucketName);
Owner owner = bucketInfo.getBucket().getOwner();
if (mUfsConf.isSet(PropertyKey.UNDERFS_OSS_OWNER_ID_TO_USERNAME_MAPPING)) {
// Here accountOwner can be null if there is no mapping set for this owner id
accountOwner = CommonUtils.getValueFromStaticMapping(
mUfsConf.getString(PropertyKey.UNDERFS_OSS_OWNER_ID_TO_USERNAME_MAPPING),
owner.getId());
}
if (accountOwner == null || accountOwner.equals(DEFAULT_OWNER)) {
// If there is no user-defined mapping, use display name or id.
accountOwner = owner.getDisplayName() != null ? owner.getDisplayName() : owner.getId();
}
} catch (ServiceException e) {
LOG.warn("Failed to get bucket owner, proceeding with defaults. {}", e.toString());
}
return new ObjectPermissions(accountOwner, accountOwner, bucketMode);
}

@Override
Expand Down
Loading