Skip to content

Commit

Permalink
Dora Fuse HDFS integration test
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Please outline the changes and how this PR fixes the issue.

### Why are the changes needed?

Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, describe the bug.

### Does this PR introduce any user facing changes?

Please list the user-facing changes introduced by your change, including
  1. change in user-facing APIs
  2. addition or removal of property keys
  3. webui

			pr-link: #17876
			change-id: cid-9f39f7c1630fe1747c34f3b17c48e73c5d55bd2e
  • Loading branch information
elega authored Aug 3, 2023
1 parent dbb6af2 commit 17091b3
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,12 @@ protected void setupTest() throws IOException {
UnderFileSystem doraUfs = UnderFileSystem.Factory.create(doraUfsRoot, Configuration.global());

// Deletes the ufs dir for this test from to avoid permission problems
// Do not delete the ufs root if the ufs is an object storage.
// In test environment, this means s3 mock is used.
if (!ufs.isObjectStorage()) {
// Do not delete the ufs root if the ufs is not local UFS.
// In some test cases, S3 and HDFS are used.
if (ufs.getUnderFSType().equals("local")) {
UnderFileSystemUtils.deleteDirIfExists(ufs, underfsAddress);
}
if (!doraUfs.isObjectStorage()) {
if (ufs.getUnderFSType().equals("local")) {
UnderFileSystemUtils.deleteDirIfExists(doraUfs, doraUfsRoot);
}

Expand Down
6 changes: 6 additions & 0 deletions dora/tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.alluxio</groupId>
<artifactId>alluxio-underfs-hdfs</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.gaul</groupId>
<artifactId>s3proxy</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import alluxio.AlluxioTestDirectory;
import alluxio.AlluxioURI;
import alluxio.ClientContext;
import alluxio.Constants;
import alluxio.client.file.FileSystem;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.FileSystemOptions;
Expand All @@ -37,10 +36,6 @@
import alluxio.underfs.UnderFileSystemFactoryRegistry;
import alluxio.underfs.local.LocalUnderFileSystemFactory;
import alluxio.underfs.s3a.S3AUnderFileSystemFactory;
import alluxio.util.CommonUtils;
import alluxio.util.OSUtils;
import alluxio.util.ShellUtils;
import alluxio.util.WaitForOptions;

import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand All @@ -50,20 +45,17 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashSet;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

/**
* Isolation tests for {@link alluxio.fuse.AlluxioJniFuseFileSystem} with local UFS.
* This test covers the basic file system metadata operations.
*/
@Ignore("Failed to unmount because of Permission Denied")
public class FuseEndToEndTest {
private static final int WAIT_TIMEOUT_MS = 60 * Constants.SECOND_MS;
private static final String TEST_S3A_PATH_CONF = "alluxio.test.s3a.path";
private static final String MOUNT_POINT = AlluxioTestDirectory
.createTemporaryDirectory("ufs").toString();
Expand Down Expand Up @@ -93,15 +85,15 @@ public static void beforeClass() throws Exception {
AlluxioJniFuseFileSystem fuseFileSystem = new AlluxioJniFuseFileSystem(context, fileSystem,
FuseOptions.create(Configuration.global(), fileSystemOptions, false));
fuseFileSystem.mount(false, false, new HashSet<>());
if (!waitForFuseMounted()) {
umountFromShellIfMounted();
if (!FuseUtils.waitForFuseMounted(MOUNT_POINT)) {
FuseUtils.umountFromShellIfMounted(MOUNT_POINT);
fail("Could not setup FUSE mount point");
}
}

@AfterClass
public static void afterClass() throws Exception {
umountFromShellIfMounted();
FuseUtils.umountFromShellIfMounted(MOUNT_POINT);
}

@Test
Expand Down Expand Up @@ -176,41 +168,4 @@ public void rename() throws Exception {
new FileOutputStream(srcFile).close();
assertTrue(file.renameTo(new File(dstFile)));
}

private static void umountFromShellIfMounted() throws IOException {
if (fuseMounted()) {
ShellUtils.execCommand("umount", MOUNT_POINT);
}
}

private static boolean fuseMounted() throws IOException {
String result = ShellUtils.execCommand("mount");
return result.contains(MOUNT_POINT);
}

/**
* Waits for the Alluxio-Fuse to be mounted.
*
* @return true if Alluxio-Fuse mounted successfully in the given timeout, false otherwise
*/
private static boolean waitForFuseMounted() {
if (OSUtils.isLinux() || OSUtils.isMacOS()) {
try {
CommonUtils.waitFor("Alluxio-Fuse mounted on local filesystem", () -> {
try {
return fuseMounted();
} catch (IOException e) {
return false;
}
}, WaitForOptions.defaults().setTimeoutMs(WAIT_TIMEOUT_MS));
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
} catch (TimeoutException te) {
return false;
}
}
return false;
}
}
62 changes: 62 additions & 0 deletions dora/tests/src/test/java/alluxio/client/fuse/dora/FuseUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client.fuse.dora;

import alluxio.Constants;
import alluxio.util.CommonUtils;
import alluxio.util.OSUtils;
import alluxio.util.ShellUtils;
import alluxio.util.WaitForOptions;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class FuseUtils {
private static final int WAIT_TIMEOUT_MS = 60 * Constants.SECOND_MS;

public static void umountFromShellIfMounted(String mountPoint) throws IOException {
if (fuseMounted(mountPoint)) {
ShellUtils.execCommand("umount", mountPoint);
}
}

public static boolean fuseMounted(String mountPoint) throws IOException {
String result = ShellUtils.execCommand("mount");
return result.contains(mountPoint);
}

/**
* Waits for the Alluxio-Fuse to be mounted.
*
* @return true if Alluxio-Fuse mounted successfully in the given timeout, false otherwise
*/
public static boolean waitForFuseMounted(String mountPoint) {
if (OSUtils.isLinux() || OSUtils.isMacOS()) {
try {
CommonUtils.waitFor("Alluxio-Fuse mounted on local filesystem", () -> {
try {
return fuseMounted(mountPoint);
} catch (IOException e) {
return false;
}
}, WaitForOptions.defaults().setTimeoutMs(WAIT_TIMEOUT_MS));
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
} catch (TimeoutException te) {
return false;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client.fuse.dora.hdfs3;

import static org.junit.Assert.fail;

import alluxio.AlluxioTestDirectory;
import alluxio.Constants;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.FileSystemOptions;
import alluxio.client.file.options.UfsFileSystemOptions;
import alluxio.client.fuse.dora.FuseUtils;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.fuse.AlluxioFuseUtils;
import alluxio.fuse.AlluxioJniFuseFileSystem;
import alluxio.fuse.options.FuseOptions;
import alluxio.jnifuse.LibFuse;
import alluxio.testutils.LocalAlluxioClusterResource;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;

import java.io.IOException;
import java.util.HashSet;

public class AbstractFuseHdfsIntegrationTest {
@Rule
public TemporaryFolder mTemp = new TemporaryFolder();
protected static final String MOUNT_POINT = AlluxioTestDirectory
.createTemporaryDirectory("fuse_mount").toString();

private static final String PAGING_STORE_DIR = AlluxioTestDirectory
.createTemporaryDirectory("ramdisk").toString();
private AlluxioJniFuseFileSystem mFuseFileSystem;

// Hdfs related
protected MiniDFSCluster mHdfsCluster;
protected final org.apache.hadoop.conf.Configuration
mHdfsConfiguration = new org.apache.hadoop.conf.Configuration();
private static final int HDFS_BLOCK_SIZE = 1024 * 1024;
private static final int HDFS_NAMENODE_PORT = 9870;
protected FileSystem mHdfs;

@Rule
public LocalAlluxioClusterResource mAlluxioClusterResource =
new LocalAlluxioClusterResource.Builder()
.setProperty(PropertyKey.USER_NETTY_DATA_TRANSMISSION_ENABLED, true)
.setProperty(PropertyKey.USER_STREAMING_READER_CHUNK_SIZE_BYTES, Constants.KB)
.setProperty(PropertyKey.FUSE_MOUNT_POINT, MOUNT_POINT)
.setProperty(PropertyKey.WORKER_PAGE_STORE_DIRS, PAGING_STORE_DIR)
.setProperty(PropertyKey.MASTER_MOUNT_TABLE_ROOT_UFS,
"hdfs://localhost:" + HDFS_NAMENODE_PORT + "/")
.setProperty(PropertyKey.DORA_CLIENT_UFS_ROOT,
"hdfs://localhost:" + HDFS_NAMENODE_PORT + "/")
.setStartCluster(false)
.build();

@BeforeClass
public static void beforeClass() {
LibFuse.loadLibrary(AlluxioFuseUtils.getLibfuseVersion(Configuration.global()));
}

@Before
public void before() throws Exception {
initHdfsMiniCluster();
// Alluxio cluster must start after the hdfs mini cluster is ready.s
mAlluxioClusterResource.start();
mountFuse();
}

@After
public void after() throws Exception {
mFuseFileSystem.umount(true);
if (mHdfsCluster != null) {
mHdfsCluster.shutdown();
}
}

private void initHdfsMiniCluster() throws IOException {
mHdfsConfiguration.set("dfs.name.dir", mTemp.newFolder("nn").getAbsolutePath());
mHdfsConfiguration.set("dfs.data.dir", mTemp.newFolder("dn").getAbsolutePath());
// 1MB block size for testing to save memory
mHdfsConfiguration.setInt("dfs.block.size", HDFS_BLOCK_SIZE);

mHdfsCluster = new MiniDFSCluster.Builder(mHdfsConfiguration)
.enableManagedDfsDirsRedundancy(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.nameNodePort(HDFS_NAMENODE_PORT)
.numDataNodes(1).build();
mHdfs = mHdfsCluster.getFileSystem();
}

private void mountFuse() throws IOException {
UfsFileSystemOptions ufsOptions = new UfsFileSystemOptions("/");
FileSystemContext fsContext = FileSystemContext.create(Configuration.global());
final FileSystemOptions fileSystemOptions =
FileSystemOptions.Builder
.fromConf(Configuration.global())
.setUfsFileSystemOptions(ufsOptions)
.build();
mFuseFileSystem = new AlluxioJniFuseFileSystem(fsContext,
mAlluxioClusterResource.get().getClient(),
FuseOptions.create(Configuration.global(), fileSystemOptions, false));
mFuseFileSystem.mount(false, false, new HashSet<>());
if (!FuseUtils.waitForFuseMounted(MOUNT_POINT)) {
FuseUtils.umountFromShellIfMounted(MOUNT_POINT);
fail("Could not setup FUSE mount point");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client.fuse.dora.hdfs3;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.file.Paths;
import java.util.Arrays;

public class FuseHdfsIntegrationTest extends AbstractFuseHdfsIntegrationTest {
@Test
public void createDeleteEmptyFile() throws Exception {
String testFile = Paths.get(MOUNT_POINT, "/createDeleteEmptyFile").toString();
File file = new File(testFile);
assertFalse(file.exists());
new FileOutputStream(testFile).close();
assertTrue(file.exists());
assertTrue(mHdfs.exists(new Path("/createDeleteEmptyFile")));
assertEquals(0, file.length());
assertTrue(file.isFile());
assertTrue(file.delete());
assertFalse(file.exists());
}

@Test
public void writeThenRead() throws Exception {
String testFile = Paths.get(MOUNT_POINT, "/writeThenRead").toString();
byte[] content = "Alluxio Fuse Test File Content".getBytes();
File file = new File(testFile);
assertFalse(file.exists());
try (FileOutputStream outputStream = new FileOutputStream(testFile)) {
outputStream.write(content);
}
assertTrue(file.exists());
assertTrue(file.isFile());
assertEquals(content.length, file.length());
// Verify on fuse
try (FileInputStream inputStream = new FileInputStream(testFile)) {
byte[] res = new byte[content.length];
assertEquals(content.length, inputStream.read(res));
assertEquals(Arrays.toString(content), Arrays.toString(res));
}
// Verify on Hdfs
try (FSDataInputStream inputStream = mHdfs.open(new Path("/writeThenRead"))) {
byte[] res = new byte[content.length];
assertEquals(content.length, inputStream.read(res));
assertEquals(Arrays.toString(content), Arrays.toString(res));
}
assertTrue(file.delete());
assertFalse(file.exists());
}
}

0 comments on commit 17091b3

Please sign in to comment.