diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/FileSystemContext.java b/dora/core/client/fs/src/main/java/alluxio/client/file/FileSystemContext.java index 5401f4da1a94..c185e232a034 100644 --- a/dora/core/client/fs/src/main/java/alluxio/client/file/FileSystemContext.java +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/FileSystemContext.java @@ -558,13 +558,16 @@ public ReinitBlockerResource blockReinit() { */ public void reinit(boolean updateClusterConf) throws UnavailableException, IOException { + // inquiry primary master address before entering the critical session of mReinitializer, + // where all RPCs wait for the monitor object of FileSystemContext (synchronized methods) + // will block until initialization completes + InetSocketAddress masterAddr; + try { + masterAddr = getMasterAddress(); + } catch (IOException e) { + throw new UnavailableException("Failed to get master address during reinitialization", e); + } try (Closeable r = mReinitializer.allow()) { - InetSocketAddress masterAddr; - try { - masterAddr = getMasterAddress(); - } catch (IOException e) { - throw new UnavailableException("Failed to get master address during reinitialization", e); - } try { getClientContext().loadConf(masterAddr); } catch (AlluxioStatusException e) { diff --git a/dora/core/client/hdfs/src/main/java/alluxio/hadoop/AbstractFileSystem.java b/dora/core/client/hdfs/src/main/java/alluxio/hadoop/AbstractFileSystem.java index 1f57464b579c..9dd2f2269f0c 100644 --- a/dora/core/client/hdfs/src/main/java/alluxio/hadoop/AbstractFileSystem.java +++ b/dora/core/client/hdfs/src/main/java/alluxio/hadoop/AbstractFileSystem.java @@ -42,6 +42,7 @@ import alluxio.master.MasterInquireClient.Factory; import alluxio.security.CurrentUser; import alluxio.security.authorization.Mode; +import alluxio.util.ModeUtils; import alluxio.wire.BlockLocationInfo; import alluxio.wire.FileBlockInfo; import alluxio.wire.WorkerNetAddress; @@ -167,6 +168,27 @@ public void close() throws IOException { mFileSystem.close(); } + /** + * Attempts to create a file with default permission. + * Overwrite will not succeed if the path exists and is a folder. + * + * @param path path to create + * @param overwrite overwrite if file exists + * @param bufferSize the size in bytes of the buffer to be used + * @param replication under filesystem replication factor, this is ignored + * @param blockSize block size in bytes + * @param progress queryable progress + * @return an {@link FSDataOutputStream} created at the indicated path of a file + */ + @Override + public FSDataOutputStream create(Path path, boolean overwrite, int bufferSize, short replication, + long blockSize, Progressable progress) throws IOException { + String confUmask = mAlluxioConf.getString(PropertyKey.SECURITY_AUTHORIZATION_PERMISSION_UMASK); + Mode mode = ModeUtils.applyFileUMask(Mode.defaults(), confUmask); + return this.create(path, new FsPermission(mode.toShort()), overwrite, bufferSize, replication, + blockSize, progress); + } + /** * Attempts to create a file. Overwrite will not succeed if the path exists and is a folder. * @@ -646,6 +668,20 @@ public FileStatus[] listStatus(Path path) throws IOException { return ret; } + /** + * Attempts to create a folder with the specified path with default permission. + * Parent directories will be created. + * + * @param path path to create + * @return true if the indicated folder is created successfully or already exists + */ + @Override + public boolean mkdirs(Path path) throws IOException { + String confUmask = mAlluxioConf.getString(PropertyKey.SECURITY_AUTHORIZATION_PERMISSION_UMASK); + Mode mode = ModeUtils.applyDirectoryUMask(Mode.defaults(), confUmask); + return mkdirs(path, new FsPermission(mode.toShort())); + } + /** * Attempts to create a folder with the specified path. Parent directories will be created. * diff --git a/dora/core/client/hdfs/src/main/java/alluxio/hadoop/AlluxioHdfsInputStream.java b/dora/core/client/hdfs/src/main/java/alluxio/hadoop/AlluxioHdfsInputStream.java index 4671e812b21c..86cd96e7d10c 100644 --- a/dora/core/client/hdfs/src/main/java/alluxio/hadoop/AlluxioHdfsInputStream.java +++ b/dora/core/client/hdfs/src/main/java/alluxio/hadoop/AlluxioHdfsInputStream.java @@ -14,6 +14,7 @@ import alluxio.client.file.FileInStream; import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSDataInputStream; import java.io.IOException; @@ -50,7 +51,30 @@ public int read() throws IOException { @Override public int read(ByteBuffer buf) throws IOException { - return mInput.read(buf); + // @see FSDataInputStream.java + if (mInput.getWrappedStream() instanceof ByteBufferReadable) { + return mInput.read(buf); + } else { + int off = buf.position(); + int len = buf.remaining(); + final int totalBytesRead; + if (buf.hasArray()) { + byte[] byteArray = buf.array(); + totalBytesRead = read(byteArray, buf.arrayOffset() + off, len); + if (totalBytesRead > 0) { + buf.position(off + totalBytesRead); + } + } else { + byte[] byteArray = new byte[len]; + totalBytesRead = read(byteArray); + if (totalBytesRead > 0) { + buf.put(byteArray, 0, totalBytesRead); + } + } + return totalBytesRead; + } } @Override diff --git a/dora/core/client/hdfs/src/main/java/alluxio/hadoop/LocalCacheFileSystem.java b/dora/core/client/hdfs/src/main/java/alluxio/hadoop/LocalCacheFileSystem.java index 707e1fe0a92d..372bc3604033 100644 --- a/dora/core/client/hdfs/src/main/java/alluxio/hadoop/LocalCacheFileSystem.java +++ b/dora/core/client/hdfs/src/main/java/alluxio/hadoop/LocalCacheFileSystem.java @@ -22,12 +22,14 @@ import alluxio.client.file.cache.LocalCacheFileInStream; import alluxio.client.file.cache.filter.CacheFilter; import alluxio.conf.AlluxioConfiguration; +import alluxio.conf.PropertyKey; import alluxio.exception.AlluxioException; import alluxio.metrics.MetricsConfig; import alluxio.metrics.MetricsSystem; import alluxio.wire.FileInfo; import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -133,8 +135,15 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException { .setGroup(externalFileStatus.getGroup()); // FilePath is a unique identifier for a file, however it can be a long string // hence using md5 hash of the file path as the identifier in the cache. - CacheContext context = CacheContext.defaults().setCacheIdentifier( - md5().hashString(externalFileStatus.getPath().toString(), UTF_8).toString()); + String cacheIdentifier; + if (mAlluxioConf.getBoolean(PropertyKey.USER_CLIENT_CACHE_IDENTIFIER_INCLUDE_MTIME)) { + // include mtime to avoid consistency issues if the file may update + cacheIdentifier = md5().hashString(externalFileStatus.getPath().toString() + + externalFileStatus.getModificationTime(), UTF_8).toString(); + } else { + cacheIdentifier = md5().hashString(externalFileStatus.getPath().toString(), UTF_8).toString(); + } + CacheContext context = CacheContext.defaults().setCacheIdentifier(cacheIdentifier); URIStatus status = new URIStatus(info, context); return open(status, bufferSize); } @@ -234,4 +243,22 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException { public FileStatus getFileStatus(Path f) throws IOException { return mExternalFileSystem.getFileStatus(f); } + + @Override + public BlockLocation[] getFileBlockLocations(FileStatus file, long start, + long len) throws IOException { + // Applications use the block information here to schedule/distribute the tasks. + // Return the UFS locations directly instead of the local cache location, + // so the application can schedule the tasks accordingly + return mExternalFileSystem.getFileBlockLocations(file, start, len); + } + + @Override + public BlockLocation[] getFileBlockLocations(Path p, long start, long len) + throws IOException { + // Applications use the block information here to schedule/distribute the tasks. + // Return the UFS locations directly instead of the local cache location, + // so the application can schedule the tasks accordingly + return mExternalFileSystem.getFileBlockLocations(p, start, len); + } } diff --git a/dora/core/client/hdfs/src/test/java/alluxio/hadoop/AbstractFileSystemTest.java b/dora/core/client/hdfs/src/test/java/alluxio/hadoop/AbstractFileSystemTest.java index 624353e1d5f6..5361b101cfaa 100644 --- a/dora/core/client/hdfs/src/test/java/alluxio/hadoop/AbstractFileSystemTest.java +++ b/dora/core/client/hdfs/src/test/java/alluxio/hadoop/AbstractFileSystemTest.java @@ -54,6 +54,8 @@ import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsCreateModes; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; import org.junit.After; import org.junit.Before; @@ -732,7 +734,9 @@ public void createWithoutOverwrite() throws Exception { ExceptionMessage.CANNOT_OVERWRITE_FILE_WITHOUT_OVERWRITE.getMessage(path.toString()))); try (FileSystem alluxioHadoopFs = new FileSystem(alluxioFs)) { - alluxioHadoopFs.create(path, false, 100, (short) 1, 1000); + alluxioHadoopFs.create(path, + FsCreateModes.applyUMask(FsPermission.getFileDefault(), FsPermission.getUMask(getConf())), + false, 100, (short) 1, 1000, null); fail("create() of existing file is expected to fail"); } catch (IOException e) { assertEquals("alluxio.exception.FileAlreadyExistsException: " diff --git a/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java b/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java index 28592b3c21f7..6dcd0bbdeab0 100755 --- a/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -1919,6 +1919,21 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) .setScope(Scope.SERVER) .build(); + public static final PropertyKey UNDERFS_OSS_DEFAULT_MODE = + stringBuilder(Name.UNDERFS_OSS_DEFAULT_MODE) + .setAlias("alluxio.underfs.oss.default.mode") + .setDefaultValue("0700") + .setDescription("Mode (in octal notation) for OSS objects if mode cannot be discovered.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.SERVER) + .build(); + public static final PropertyKey UNDERFS_OSS_OWNER_ID_TO_USERNAME_MAPPING = + stringBuilder(Name.UNDERFS_OSS_OWNER_ID_TO_USERNAME_MAPPING) + .setDescription("Optionally, specify a preset oss canonical id to Alluxio username " + + "static mapping, in the format \"id1=user1;id2=user2\". ") + .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) + .setScope(Scope.SERVER) + .build(); public static final PropertyKey S3A_ACCESS_KEY = stringBuilder(Name.S3A_ACCESS_KEY) .setAlias(Name.AWS_ACCESS_KEY) .setDescription("The access key of S3 bucket.") @@ -5630,6 +5645,14 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.CLIENT) .build(); + public static final PropertyKey USER_CLIENT_CACHE_IDENTIFIER_INCLUDE_MTIME = + booleanBuilder(Name.USER_CLIENT_CACHE_IDENTIFIER_INCLUDE_MTIME) + .setDefaultValue(false) + .setDescription("If this is enabled, client-side cache will include modification time " + + "while calculating the identifier of a file.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.CLIENT) + .build(); public static final PropertyKey USER_CLIENT_REPORT_VERSION_ENABLED = booleanBuilder(Name.USER_CLIENT_REPORT_VERSION_ENABLED) @@ -7394,6 +7417,10 @@ public static final class Name { "alluxio.underfs.oss.multipart.upload.threads"; public static final String UNDERFS_OSS_MULTIPART_UPLOAD_PARTITION_SIZE = "alluxio.underfs.oss.multipart.upload.part.size"; + public static final String UNDERFS_OSS_DEFAULT_MODE = + "alluxio.underfs.oss.default.mode"; + public static final String UNDERFS_OSS_OWNER_ID_TO_USERNAME_MAPPING = + "alluxio.underfs.oss.owner.id.to.username.mapping"; public static final String UNDERFS_S3_BULK_DELETE_ENABLED = "alluxio.underfs.s3.bulk.delete.enabled"; public static final String UNDERFS_S3_DEFAULT_MODE = "alluxio.underfs.s3.default.mode"; @@ -8291,6 +8318,8 @@ public static final class Name { "alluxio.user.client.cache.timeout.duration"; public static final String USER_CLIENT_CACHE_TIMEOUT_THREADS = "alluxio.user.client.cache.timeout.threads"; + public static final String USER_CLIENT_CACHE_IDENTIFIER_INCLUDE_MTIME = + "alluxio.user.client.cache.include.mtime"; public static final String USER_CLIENT_REPORT_VERSION_ENABLED = "alluxio.user.client.report.version.enabled"; public static final String USER_CONSISTENT_HASH_VIRTUAL_NODE_COUNT_PER_WORKER = diff --git a/dora/core/server/master/src/main/java/alluxio/master/meta/MetaMasterSync.java b/dora/core/server/master/src/main/java/alluxio/master/meta/MetaMasterSync.java index eb77cc181b5d..87bddb267cf2 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/meta/MetaMasterSync.java +++ b/dora/core/server/master/src/main/java/alluxio/master/meta/MetaMasterSync.java @@ -118,5 +118,7 @@ private void setIdAndRegister() throws IOException { } @Override - public void close() {} + public void close() { + mMasterClient.close(); + } } diff --git a/dora/core/server/proxy/src/main/java/alluxio/proxy/s3/S3Handler.java b/dora/core/server/proxy/src/main/java/alluxio/proxy/s3/S3Handler.java index 4b8bbe99eb93..78cd55a2d8ef 100644 --- a/dora/core/server/proxy/src/main/java/alluxio/proxy/s3/S3Handler.java +++ b/dora/core/server/proxy/src/main/java/alluxio/proxy/s3/S3Handler.java @@ -374,7 +374,7 @@ public S3AuditContext createAuditContext(String command, @Nullable String object) { // Audit log may be enabled during runtime AsyncUserAccessAuditLogWriter auditLogWriter = null; - if (Configuration.getBoolean(PropertyKey.MASTER_AUDIT_LOGGING_ENABLED)) { + if (Configuration.getBoolean(PropertyKey.PROXY_AUDIT_LOGGING_ENABLED)) { auditLogWriter = mAsyncAuditLogWriter; } S3AuditContext auditContext = new S3AuditContext(auditLogWriter); diff --git a/dora/core/server/proxy/src/main/java/alluxio/proxy/s3/S3RestServiceHandler.java b/dora/core/server/proxy/src/main/java/alluxio/proxy/s3/S3RestServiceHandler.java index 0073ea525771..ce534bc8130c 100644 --- a/dora/core/server/proxy/src/main/java/alluxio/proxy/s3/S3RestServiceHandler.java +++ b/dora/core/server/proxy/src/main/java/alluxio/proxy/s3/S3RestServiceHandler.java @@ -1532,7 +1532,7 @@ private S3AuditContext createAuditContext(String command, String user, @Nullable String bucket, @Nullable String object) { // Audit log may be enabled during runtime AsyncUserAccessAuditLogWriter auditLogWriter = null; - if (Configuration.getBoolean(PropertyKey.MASTER_AUDIT_LOGGING_ENABLED)) { + if (Configuration.getBoolean(PropertyKey.PROXY_AUDIT_LOGGING_ENABLED)) { auditLogWriter = mAsyncAuditLogWriter; } S3AuditContext auditContext = new S3AuditContext(auditLogWriter); diff --git a/dora/tests/integration/src/test/java/alluxio/client/hadoop/FileSystemAclIntegrationTest.java b/dora/tests/integration/src/test/java/alluxio/client/hadoop/FileSystemAclIntegrationTest.java index 726e272265c7..049e34cef7e4 100644 --- a/dora/tests/integration/src/test/java/alluxio/client/hadoop/FileSystemAclIntegrationTest.java +++ b/dora/tests/integration/src/test/java/alluxio/client/hadoop/FileSystemAclIntegrationTest.java @@ -115,6 +115,12 @@ public void cleanupTFS() throws Exception { comment = "Bring back but not passed, need to fix.") @Test public void createFileWithPermission() throws Exception { + Path defaultFile = new Path("/createfile-default"); + FSDataOutputStream stream = sTFS.create(defaultFile, false /* ignored */, 10 /* ignored */, + (short) 1 /* ignored */, 512 /* ignored */, null /* ignored */); + stream.close(); + FileStatus fileStatus = sTFS.getFileStatus(defaultFile); + Assert.assertEquals((short) 0644, fileStatus.getPermission().toShort()); List permissionValues = Lists.newArrayList(0111, 0222, 0333, 0444, 0555, 0666, 0777, 0755, 0733, 0644, 0533, 0511); for (int value : permissionValues) { @@ -132,6 +138,10 @@ public void createFileWithPermission() throws Exception { @Test @Ignore public void mkdirsWithPermission() throws Exception { + Path defaultDir = new Path("/createDir-default"); + sTFS.mkdirs(defaultDir); + FileStatus fileStatus = sTFS.getFileStatus(defaultDir); + Assert.assertEquals((short) 0755, fileStatus.getPermission().toShort()); List permissionValues = Lists.newArrayList(0111, 0222, 0333, 0444, 0555, 0666, 0777, 0755, 0733, 0644, 0533, 0511); for (int value : permissionValues) { diff --git a/dora/underfs/oss/src/main/java/alluxio/underfs/oss/OSSUnderFileSystem.java b/dora/underfs/oss/src/main/java/alluxio/underfs/oss/OSSUnderFileSystem.java index 2536fcaf5c09..42cc8e0538a4 100644 --- a/dora/underfs/oss/src/main/java/alluxio/underfs/oss/OSSUnderFileSystem.java +++ b/dora/underfs/oss/src/main/java/alluxio/underfs/oss/OSSUnderFileSystem.java @@ -22,6 +22,8 @@ import alluxio.underfs.UnderFileSystem; import alluxio.underfs.UnderFileSystemConfiguration; import alluxio.underfs.options.OpenOptions; +import alluxio.util.CommonUtils; +import alluxio.util.ModeUtils; import alluxio.util.UnderFileSystemUtils; import alluxio.util.executor.ExecutorServiceFactories; import alluxio.util.io.PathUtils; @@ -33,6 +35,7 @@ import com.aliyun.oss.ServiceException; import com.aliyun.oss.common.comm.Protocol; import com.aliyun.oss.model.AbortMultipartUploadRequest; +import com.aliyun.oss.model.BucketInfo; import com.aliyun.oss.model.DeleteObjectsRequest; import com.aliyun.oss.model.DeleteObjectsResult; import com.aliyun.oss.model.ListMultipartUploadsRequest; @@ -42,6 +45,7 @@ import com.aliyun.oss.model.OSSObjectSummary; import com.aliyun.oss.model.ObjectListing; import com.aliyun.oss.model.ObjectMetadata; +import com.aliyun.oss.model.Owner; import com.aliyun.oss.model.SetObjectTaggingRequest; import com.aliyun.oss.model.TagSet; import com.google.common.base.Preconditions; @@ -76,6 +80,9 @@ public class OSSUnderFileSystem extends ObjectUnderFileSystem { private static final String NO_SUCH_KEY = "NoSuchKey"; + /** Default owner of objects if owner cannot be determined. */ + private static final String DEFAULT_OWNER = ""; + /** Aliyun OSS client. */ private final OSS mClient; @@ -90,6 +97,10 @@ public class OSSUnderFileSystem extends ObjectUnderFileSystem { private StsOssClientProvider mClientProvider; + /** The permissions associated with the bucket. Fetched once and assumed to be immutable. */ + private final Supplier mPermissions + = CommonUtils.memoize(this::getPermissionsInternal); + /** * Constructs a new instance of {@link OSSUnderFileSystem}. * @@ -400,7 +411,38 @@ protected ObjectStatus getObjectStatus(String key) { // No ACL integration currently, returns default empty value @Override protected ObjectPermissions getPermissions() { - return new ObjectPermissions("", "", Constants.DEFAULT_FILE_SYSTEM_MODE); + return mPermissions.get(); + } + + /** + * Since there is no group in OSS, the owner is reused as the group. This method calls the + * OSS API and requires additional permissions aside from just read only. This method is best + * effort and will continue with default permissions (no owner, no group, 0700). + * + * @return the permissions associated with this under storage system + */ + private ObjectPermissions getPermissionsInternal() { + short bucketMode = + ModeUtils.getUMask(mUfsConf.getString(PropertyKey.UNDERFS_OSS_DEFAULT_MODE)).toShort(); + String accountOwner = DEFAULT_OWNER; + + try { + BucketInfo bucketInfo = mClient.getBucketInfo(mBucketName); + Owner owner = bucketInfo.getBucket().getOwner(); + if (mUfsConf.isSet(PropertyKey.UNDERFS_OSS_OWNER_ID_TO_USERNAME_MAPPING)) { + // Here accountOwner can be null if there is no mapping set for this owner id + accountOwner = CommonUtils.getValueFromStaticMapping( + mUfsConf.getString(PropertyKey.UNDERFS_OSS_OWNER_ID_TO_USERNAME_MAPPING), + owner.getId()); + } + if (accountOwner == null || accountOwner.equals(DEFAULT_OWNER)) { + // If there is no user-defined mapping, use display name or id. + accountOwner = owner.getDisplayName() != null ? owner.getDisplayName() : owner.getId(); + } + } catch (ServiceException e) { + LOG.warn("Failed to get bucket owner, proceeding with defaults. {}", e.toString()); + } + return new ObjectPermissions(accountOwner, accountOwner, bucketMode); } @Override