From 17091b35d90a395b4e67d3de710c17cc10b43ab2 Mon Sep 17 00:00:00 2001
From: elega <445092967@qq.com>
Date: Thu, 3 Aug 2023 15:31:15 +0800
Subject: [PATCH] Dora Fuse HDFS integration test
### What changes are proposed in this pull request?
Please outline the changes and how this PR fixes the issue.
### Why are the changes needed?
Please clarify why the changes are needed. For instance,
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, describe the bug.
### Does this PR introduce any user facing changes?
Please list the user-facing changes introduced by your change, including
1. change in user-facing APIs
2. addition or removal of property keys
3. webui
pr-link: Alluxio/alluxio#17876
change-id: cid-9f39f7c1630fe1747c34f3b17c48e73c5d55bd2e
---
.../master/AbstractLocalAlluxioCluster.java | 8 +-
dora/tests/pom.xml | 6 +
.../client/fuse/dora/FuseEndToEndTest.java | 51 +------
.../alluxio/client/fuse/dora/FuseUtils.java | 62 +++++++++
.../AbstractFuseHdfsIntegrationTest.java | 126 ++++++++++++++++++
.../dora/hdfs3/FuseHdfsIntegrationTest.java | 70 ++++++++++
6 files changed, 271 insertions(+), 52 deletions(-)
create mode 100644 dora/tests/src/test/java/alluxio/client/fuse/dora/FuseUtils.java
create mode 100644 dora/tests/src/test/java/alluxio/client/fuse/dora/hdfs3/AbstractFuseHdfsIntegrationTest.java
create mode 100644 dora/tests/src/test/java/alluxio/client/fuse/dora/hdfs3/FuseHdfsIntegrationTest.java
diff --git a/dora/minicluster/src/main/java/alluxio/master/AbstractLocalAlluxioCluster.java b/dora/minicluster/src/main/java/alluxio/master/AbstractLocalAlluxioCluster.java
index 21348cecaddd..eb3ed292a335 100644
--- a/dora/minicluster/src/main/java/alluxio/master/AbstractLocalAlluxioCluster.java
+++ b/dora/minicluster/src/main/java/alluxio/master/AbstractLocalAlluxioCluster.java
@@ -231,12 +231,12 @@ protected void setupTest() throws IOException {
UnderFileSystem doraUfs = UnderFileSystem.Factory.create(doraUfsRoot, Configuration.global());
// Deletes the ufs dir for this test from to avoid permission problems
- // Do not delete the ufs root if the ufs is an object storage.
- // In test environment, this means s3 mock is used.
- if (!ufs.isObjectStorage()) {
+ // Do not delete the ufs root if the ufs is not local UFS.
+ // In some test cases, S3 and HDFS are used.
+ if (ufs.getUnderFSType().equals("local")) {
UnderFileSystemUtils.deleteDirIfExists(ufs, underfsAddress);
}
- if (!doraUfs.isObjectStorage()) {
+ if (ufs.getUnderFSType().equals("local")) {
UnderFileSystemUtils.deleteDirIfExists(doraUfs, doraUfsRoot);
}
diff --git a/dora/tests/pom.xml b/dora/tests/pom.xml
index 22478640ed0b..49c4b53dc31f 100644
--- a/dora/tests/pom.xml
+++ b/dora/tests/pom.xml
@@ -222,6 +222,12 @@
${project.version}
test
+
+ org.alluxio
+ alluxio-underfs-hdfs
+ ${project.version}
+ test
+
org.gaul
s3proxy
diff --git a/dora/tests/src/test/java/alluxio/client/fuse/dora/FuseEndToEndTest.java b/dora/tests/src/test/java/alluxio/client/fuse/dora/FuseEndToEndTest.java
index 2ff63635be38..0ca946fae1b9 100644
--- a/dora/tests/src/test/java/alluxio/client/fuse/dora/FuseEndToEndTest.java
+++ b/dora/tests/src/test/java/alluxio/client/fuse/dora/FuseEndToEndTest.java
@@ -20,7 +20,6 @@
import alluxio.AlluxioTestDirectory;
import alluxio.AlluxioURI;
import alluxio.ClientContext;
-import alluxio.Constants;
import alluxio.client.file.FileSystem;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.FileSystemOptions;
@@ -37,10 +36,6 @@
import alluxio.underfs.UnderFileSystemFactoryRegistry;
import alluxio.underfs.local.LocalUnderFileSystemFactory;
import alluxio.underfs.s3a.S3AUnderFileSystemFactory;
-import alluxio.util.CommonUtils;
-import alluxio.util.OSUtils;
-import alluxio.util.ShellUtils;
-import alluxio.util.WaitForOptions;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -50,12 +45,10 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
-import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashSet;
import java.util.UUID;
-import java.util.concurrent.TimeoutException;
/**
* Isolation tests for {@link alluxio.fuse.AlluxioJniFuseFileSystem} with local UFS.
@@ -63,7 +56,6 @@
*/
@Ignore("Failed to unmount because of Permission Denied")
public class FuseEndToEndTest {
- private static final int WAIT_TIMEOUT_MS = 60 * Constants.SECOND_MS;
private static final String TEST_S3A_PATH_CONF = "alluxio.test.s3a.path";
private static final String MOUNT_POINT = AlluxioTestDirectory
.createTemporaryDirectory("ufs").toString();
@@ -93,15 +85,15 @@ public static void beforeClass() throws Exception {
AlluxioJniFuseFileSystem fuseFileSystem = new AlluxioJniFuseFileSystem(context, fileSystem,
FuseOptions.create(Configuration.global(), fileSystemOptions, false));
fuseFileSystem.mount(false, false, new HashSet<>());
- if (!waitForFuseMounted()) {
- umountFromShellIfMounted();
+ if (!FuseUtils.waitForFuseMounted(MOUNT_POINT)) {
+ FuseUtils.umountFromShellIfMounted(MOUNT_POINT);
fail("Could not setup FUSE mount point");
}
}
@AfterClass
public static void afterClass() throws Exception {
- umountFromShellIfMounted();
+ FuseUtils.umountFromShellIfMounted(MOUNT_POINT);
}
@Test
@@ -176,41 +168,4 @@ public void rename() throws Exception {
new FileOutputStream(srcFile).close();
assertTrue(file.renameTo(new File(dstFile)));
}
-
- private static void umountFromShellIfMounted() throws IOException {
- if (fuseMounted()) {
- ShellUtils.execCommand("umount", MOUNT_POINT);
- }
- }
-
- private static boolean fuseMounted() throws IOException {
- String result = ShellUtils.execCommand("mount");
- return result.contains(MOUNT_POINT);
- }
-
- /**
- * Waits for the Alluxio-Fuse to be mounted.
- *
- * @return true if Alluxio-Fuse mounted successfully in the given timeout, false otherwise
- */
- private static boolean waitForFuseMounted() {
- if (OSUtils.isLinux() || OSUtils.isMacOS()) {
- try {
- CommonUtils.waitFor("Alluxio-Fuse mounted on local filesystem", () -> {
- try {
- return fuseMounted();
- } catch (IOException e) {
- return false;
- }
- }, WaitForOptions.defaults().setTimeoutMs(WAIT_TIMEOUT_MS));
- return true;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return false;
- } catch (TimeoutException te) {
- return false;
- }
- }
- return false;
- }
}
diff --git a/dora/tests/src/test/java/alluxio/client/fuse/dora/FuseUtils.java b/dora/tests/src/test/java/alluxio/client/fuse/dora/FuseUtils.java
new file mode 100644
index 000000000000..ce77bf1cdf15
--- /dev/null
+++ b/dora/tests/src/test/java/alluxio/client/fuse/dora/FuseUtils.java
@@ -0,0 +1,62 @@
+/*
+ * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
+ * (the "License"). You may not use this work except in compliance with the License, which is
+ * available at www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied, as more fully set forth in the License.
+ *
+ * See the NOTICE file distributed with this work for information regarding copyright ownership.
+ */
+
+package alluxio.client.fuse.dora;
+
+import alluxio.Constants;
+import alluxio.util.CommonUtils;
+import alluxio.util.OSUtils;
+import alluxio.util.ShellUtils;
+import alluxio.util.WaitForOptions;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+public class FuseUtils {
+ private static final int WAIT_TIMEOUT_MS = 60 * Constants.SECOND_MS;
+
+ public static void umountFromShellIfMounted(String mountPoint) throws IOException {
+ if (fuseMounted(mountPoint)) {
+ ShellUtils.execCommand("umount", mountPoint);
+ }
+ }
+
+ public static boolean fuseMounted(String mountPoint) throws IOException {
+ String result = ShellUtils.execCommand("mount");
+ return result.contains(mountPoint);
+ }
+
+ /**
+ * Waits for the Alluxio-Fuse to be mounted.
+ *
+ * @return true if Alluxio-Fuse mounted successfully in the given timeout, false otherwise
+ */
+ public static boolean waitForFuseMounted(String mountPoint) {
+ if (OSUtils.isLinux() || OSUtils.isMacOS()) {
+ try {
+ CommonUtils.waitFor("Alluxio-Fuse mounted on local filesystem", () -> {
+ try {
+ return fuseMounted(mountPoint);
+ } catch (IOException e) {
+ return false;
+ }
+ }, WaitForOptions.defaults().setTimeoutMs(WAIT_TIMEOUT_MS));
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ } catch (TimeoutException te) {
+ return false;
+ }
+ }
+ return false;
+ }
+}
diff --git a/dora/tests/src/test/java/alluxio/client/fuse/dora/hdfs3/AbstractFuseHdfsIntegrationTest.java b/dora/tests/src/test/java/alluxio/client/fuse/dora/hdfs3/AbstractFuseHdfsIntegrationTest.java
new file mode 100644
index 000000000000..8ab2f6670e1e
--- /dev/null
+++ b/dora/tests/src/test/java/alluxio/client/fuse/dora/hdfs3/AbstractFuseHdfsIntegrationTest.java
@@ -0,0 +1,126 @@
+/*
+ * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
+ * (the "License"). You may not use this work except in compliance with the License, which is
+ * available at www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied, as more fully set forth in the License.
+ *
+ * See the NOTICE file distributed with this work for information regarding copyright ownership.
+ */
+
+package alluxio.client.fuse.dora.hdfs3;
+
+import static org.junit.Assert.fail;
+
+import alluxio.AlluxioTestDirectory;
+import alluxio.Constants;
+import alluxio.client.file.FileSystemContext;
+import alluxio.client.file.options.FileSystemOptions;
+import alluxio.client.file.options.UfsFileSystemOptions;
+import alluxio.client.fuse.dora.FuseUtils;
+import alluxio.conf.Configuration;
+import alluxio.conf.PropertyKey;
+import alluxio.fuse.AlluxioFuseUtils;
+import alluxio.fuse.AlluxioJniFuseFileSystem;
+import alluxio.fuse.options.FuseOptions;
+import alluxio.jnifuse.LibFuse;
+import alluxio.testutils.LocalAlluxioClusterResource;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.HashSet;
+
+public class AbstractFuseHdfsIntegrationTest {
+ @Rule
+ public TemporaryFolder mTemp = new TemporaryFolder();
+ protected static final String MOUNT_POINT = AlluxioTestDirectory
+ .createTemporaryDirectory("fuse_mount").toString();
+
+ private static final String PAGING_STORE_DIR = AlluxioTestDirectory
+ .createTemporaryDirectory("ramdisk").toString();
+ private AlluxioJniFuseFileSystem mFuseFileSystem;
+
+ // Hdfs related
+ protected MiniDFSCluster mHdfsCluster;
+ protected final org.apache.hadoop.conf.Configuration
+ mHdfsConfiguration = new org.apache.hadoop.conf.Configuration();
+ private static final int HDFS_BLOCK_SIZE = 1024 * 1024;
+ private static final int HDFS_NAMENODE_PORT = 9870;
+ protected FileSystem mHdfs;
+
+ @Rule
+ public LocalAlluxioClusterResource mAlluxioClusterResource =
+ new LocalAlluxioClusterResource.Builder()
+ .setProperty(PropertyKey.USER_NETTY_DATA_TRANSMISSION_ENABLED, true)
+ .setProperty(PropertyKey.USER_STREAMING_READER_CHUNK_SIZE_BYTES, Constants.KB)
+ .setProperty(PropertyKey.FUSE_MOUNT_POINT, MOUNT_POINT)
+ .setProperty(PropertyKey.WORKER_PAGE_STORE_DIRS, PAGING_STORE_DIR)
+ .setProperty(PropertyKey.MASTER_MOUNT_TABLE_ROOT_UFS,
+ "hdfs://localhost:" + HDFS_NAMENODE_PORT + "/")
+ .setProperty(PropertyKey.DORA_CLIENT_UFS_ROOT,
+ "hdfs://localhost:" + HDFS_NAMENODE_PORT + "/")
+ .setStartCluster(false)
+ .build();
+
+ @BeforeClass
+ public static void beforeClass() {
+ LibFuse.loadLibrary(AlluxioFuseUtils.getLibfuseVersion(Configuration.global()));
+ }
+
+ @Before
+ public void before() throws Exception {
+ initHdfsMiniCluster();
+ // Alluxio cluster must start after the hdfs mini cluster is ready.s
+ mAlluxioClusterResource.start();
+ mountFuse();
+ }
+
+ @After
+ public void after() throws Exception {
+ mFuseFileSystem.umount(true);
+ if (mHdfsCluster != null) {
+ mHdfsCluster.shutdown();
+ }
+ }
+
+ private void initHdfsMiniCluster() throws IOException {
+ mHdfsConfiguration.set("dfs.name.dir", mTemp.newFolder("nn").getAbsolutePath());
+ mHdfsConfiguration.set("dfs.data.dir", mTemp.newFolder("dn").getAbsolutePath());
+ // 1MB block size for testing to save memory
+ mHdfsConfiguration.setInt("dfs.block.size", HDFS_BLOCK_SIZE);
+
+ mHdfsCluster = new MiniDFSCluster.Builder(mHdfsConfiguration)
+ .enableManagedDfsDirsRedundancy(false)
+ .manageDataDfsDirs(false)
+ .manageNameDfsDirs(false)
+ .nameNodePort(HDFS_NAMENODE_PORT)
+ .numDataNodes(1).build();
+ mHdfs = mHdfsCluster.getFileSystem();
+ }
+
+ private void mountFuse() throws IOException {
+ UfsFileSystemOptions ufsOptions = new UfsFileSystemOptions("/");
+ FileSystemContext fsContext = FileSystemContext.create(Configuration.global());
+ final FileSystemOptions fileSystemOptions =
+ FileSystemOptions.Builder
+ .fromConf(Configuration.global())
+ .setUfsFileSystemOptions(ufsOptions)
+ .build();
+ mFuseFileSystem = new AlluxioJniFuseFileSystem(fsContext,
+ mAlluxioClusterResource.get().getClient(),
+ FuseOptions.create(Configuration.global(), fileSystemOptions, false));
+ mFuseFileSystem.mount(false, false, new HashSet<>());
+ if (!FuseUtils.waitForFuseMounted(MOUNT_POINT)) {
+ FuseUtils.umountFromShellIfMounted(MOUNT_POINT);
+ fail("Could not setup FUSE mount point");
+ }
+ }
+}
diff --git a/dora/tests/src/test/java/alluxio/client/fuse/dora/hdfs3/FuseHdfsIntegrationTest.java b/dora/tests/src/test/java/alluxio/client/fuse/dora/hdfs3/FuseHdfsIntegrationTest.java
new file mode 100644
index 000000000000..50de2ec7b7f8
--- /dev/null
+++ b/dora/tests/src/test/java/alluxio/client/fuse/dora/hdfs3/FuseHdfsIntegrationTest.java
@@ -0,0 +1,70 @@
+/*
+ * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
+ * (the "License"). You may not use this work except in compliance with the License, which is
+ * available at www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied, as more fully set forth in the License.
+ *
+ * See the NOTICE file distributed with this work for information regarding copyright ownership.
+ */
+
+package alluxio.client.fuse.dora.hdfs3;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.nio.file.Paths;
+import java.util.Arrays;
+
+public class FuseHdfsIntegrationTest extends AbstractFuseHdfsIntegrationTest {
+ @Test
+ public void createDeleteEmptyFile() throws Exception {
+ String testFile = Paths.get(MOUNT_POINT, "/createDeleteEmptyFile").toString();
+ File file = new File(testFile);
+ assertFalse(file.exists());
+ new FileOutputStream(testFile).close();
+ assertTrue(file.exists());
+ assertTrue(mHdfs.exists(new Path("/createDeleteEmptyFile")));
+ assertEquals(0, file.length());
+ assertTrue(file.isFile());
+ assertTrue(file.delete());
+ assertFalse(file.exists());
+ }
+
+ @Test
+ public void writeThenRead() throws Exception {
+ String testFile = Paths.get(MOUNT_POINT, "/writeThenRead").toString();
+ byte[] content = "Alluxio Fuse Test File Content".getBytes();
+ File file = new File(testFile);
+ assertFalse(file.exists());
+ try (FileOutputStream outputStream = new FileOutputStream(testFile)) {
+ outputStream.write(content);
+ }
+ assertTrue(file.exists());
+ assertTrue(file.isFile());
+ assertEquals(content.length, file.length());
+ // Verify on fuse
+ try (FileInputStream inputStream = new FileInputStream(testFile)) {
+ byte[] res = new byte[content.length];
+ assertEquals(content.length, inputStream.read(res));
+ assertEquals(Arrays.toString(content), Arrays.toString(res));
+ }
+ // Verify on Hdfs
+ try (FSDataInputStream inputStream = mHdfs.open(new Path("/writeThenRead"))) {
+ byte[] res = new byte[content.length];
+ assertEquals(content.length, inputStream.read(res));
+ assertEquals(Arrays.toString(content), Arrays.toString(res));
+ }
+ assertTrue(file.delete());
+ assertFalse(file.exists());
+ }
+}