Skip to content

Commit

Permalink
[#6131] feat (gvfs-fuse): Add integration test framework of gvfs-fuse (
Browse files Browse the repository at this point in the history
…#6160)

### What changes were proposed in this pull request?

Add integration test framework of gvfs-fuse
Integrate LocalStack into the gvfs-fuse integration test
Add ci pipeline for integration test

### Why are the changes needed?

Fix: #6131 

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

IT
  • Loading branch information
diqiu50 authored Jan 14, 2025
1 parent 63f9ae6 commit c6476b8
Show file tree
Hide file tree
Showing 17 changed files with 696 additions and 80 deletions.
14 changes: 12 additions & 2 deletions .github/workflows/gvfs-fuse-build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,18 @@ jobs:
run: |
dev/ci/check_commands.sh
- name: Build and test Gravitino
- name: Build Gvfs-fuse
run: |
./gradlew :clients:filesystem-fuse:build -PenableFuse=true
- name: Integration test
run: |
./gradlew build -x :clients:client-python:build -x test -x web -PjdkVersion=${{ matrix.java-version }}
./gradlew compileDistribution -x :clients:client-python:build -x test -x web -PjdkVersion=${{ matrix.java-version }}
cd clients/filesystem-fuse
make test-s3
make test-fuse-it
- name: Free up disk space
run: |
dev/ci/util_free_space.sh
Expand All @@ -85,5 +93,7 @@ jobs:
with:
name: Gvfs-fuse integrate-test-reports-${{ matrix.java-version }}
path: |
clients/filesystem-fuse/build/test/log/*.log
clients/filesystem-fuse/target/debug/fuse.log
distribution/package/logs/gravitino-server.out
distribution/package/logs/gravitino-server.log
6 changes: 6 additions & 0 deletions clients/filesystem-fuse/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ doc-test:
unit-test: doc-test
cargo test --no-fail-fast --lib --all-features --workspace

test-fuse-it:
@bash ./tests/bin/run_fuse_testers.sh test

test-s3:
@bash ./tests/bin/run_s3fs_testers.sh test

test: doc-test
cargo test --no-fail-fast --all-targets --all-features --workspace

Expand Down
15 changes: 12 additions & 3 deletions clients/filesystem-fuse/src/default_raw_filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,13 +334,22 @@ impl<T: PathFileSystem> RawFileSystem for DefaultRawFileSystem<T> {
file.flush().await
}

async fn close_file(&self, _file_id: u64, fh: u64) -> Result<()> {
async fn close_file(&self, file_id: u64, fh: u64) -> Result<()> {
let file_entry = self.get_file_entry(file_id).await;

let opened_file = self
.opened_file_manager
.remove(fh)
.ok_or(Errno::from(libc::EBADF))?;
let mut file = opened_file.lock().await;
file.close().await

// todo: need to handle racing condition and corner case when the file has been deleted.
if file_entry.is_ok() {
let mut file = opened_file.lock().await;
file.close().await
} else {
// If the file has been deleted, it does not cause a leak even if it has not been closed.
Ok(())
}
}

async fn read(&self, file_id: u64, fh: u64, offset: u64, size: u32) -> Result<Bytes> {
Expand Down
26 changes: 25 additions & 1 deletion clients/filesystem-fuse/src/gravitino_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,34 @@ impl GravitinoClient {
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use super::*;
use mockito::mock;

pub(crate) fn create_test_catalog(
name: &str,
provider: &str,
properties: HashMap<String, String>,
) -> Catalog {
Catalog {
name: name.to_string(),
catalog_type: "fileset".to_string(),
provider: provider.to_string(),
comment: "".to_string(),
properties: properties,
}
}

pub(crate) fn create_test_fileset(name: &str, storage_location: &str) -> Fileset {
Fileset {
name: name.to_string(),
fileset_type: "managed".to_string(),
comment: "".to_string(),
storage_location: storage_location.to_string(),
properties: HashMap::default(),
}
}

#[tokio::test]
async fn test_get_fileset_success() {
let fileset_response = r#"
Expand Down
81 changes: 78 additions & 3 deletions clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,27 @@ impl PathFileSystem for GravitinoFilesetFileSystem {

#[cfg(test)]
mod tests {
use crate::config::GravitinoConfig;
use crate::config::{AppConfig, GravitinoConfig};
use crate::default_raw_filesystem::DefaultRawFileSystem;
use crate::filesystem::tests::{TestPathFileSystem, TestRawFileSystem};
use crate::filesystem::{FileSystemContext, PathFileSystem, RawFileSystem};
use crate::gravitino_client::tests::{create_test_catalog, create_test_fileset};
use crate::gravitino_client::GravitinoClient;
use crate::gravitino_fileset_filesystem::GravitinoFilesetFileSystem;
use crate::gvfs_creator::create_fs_with_fileset;
use crate::memory_filesystem::MemoryFileSystem;
use crate::s3_filesystem::extract_s3_config;
use crate::s3_filesystem::tests::{cleanup_s3_fs, s3_test_config};
use crate::test_enable_with;
use crate::RUN_TEST_WITH_S3;
use std::collections::HashMap;
use std::path::Path;

#[tokio::test]
async fn test_map_fileset_path_to_raw_path() {
let fs = GravitinoFilesetFileSystem {
physical_fs: Box::new(MemoryFileSystem::new().await),
client: super::GravitinoClient::new(&GravitinoConfig::default()),
client: GravitinoClient::new(&GravitinoConfig::default()),
location: "/c1/fileset1".into(),
};
let path = fs.gvfs_path_to_raw_path(Path::new("/a"));
Expand All @@ -162,7 +173,7 @@ mod tests {
async fn test_map_raw_path_to_fileset_path() {
let fs = GravitinoFilesetFileSystem {
physical_fs: Box::new(MemoryFileSystem::new().await),
client: super::GravitinoClient::new(&GravitinoConfig::default()),
client: GravitinoClient::new(&GravitinoConfig::default()),
location: "/c1/fileset1".into(),
};
let path = fs
Expand All @@ -172,4 +183,68 @@ mod tests {
let path = fs.raw_path_to_gvfs_path(Path::new("/c1/fileset1")).unwrap();
assert_eq!(path, Path::new("/"));
}

async fn create_fileset_fs(path: &Path, config: &AppConfig) -> GravitinoFilesetFileSystem {
let opendal_config = extract_s3_config(config);

cleanup_s3_fs(path, &opendal_config).await;

let bucket = opendal_config.get("bucket").expect("Bucket must exist");
let endpoint = opendal_config.get("endpoint").expect("Endpoint must exist");

let catalog = create_test_catalog(
"c1",
"s3",
vec![
("location".to_string(), format!("s3a://{}", bucket)),
("s3-endpoint".to_string(), endpoint.to_string()),
]
.into_iter()
.collect::<HashMap<String, String>>(),
);
let file_set_location = format!("s3a://{}{}", bucket, path.to_string_lossy());
let file_set = create_test_fileset("fileset1", &file_set_location);

let fs_context = FileSystemContext::default();
let inner_fs = create_fs_with_fileset(&catalog, &file_set, config, &fs_context)
.await
.unwrap();
GravitinoFilesetFileSystem::new(
inner_fs,
path,
GravitinoClient::new(&config.gravitino),
config,
&fs_context,
)
.await
}

#[tokio::test]
async fn s3_ut_test_fileset_file_system() {
test_enable_with!(RUN_TEST_WITH_S3);

let config = s3_test_config();
let cwd = Path::new("/gvfs_test3");
let fs = create_fileset_fs(cwd, &config).await;
let _ = fs.init().await;
let mut tester = TestPathFileSystem::new(Path::new("/"), fs);
tester.test_path_file_system().await;
}

#[tokio::test]
async fn s3_ut_test_fileset_with_raw_file_system() {
test_enable_with!(RUN_TEST_WITH_S3);

let config = s3_test_config();
let cwd = Path::new("/gvfs_test4");
let fileset_fs = create_fileset_fs(cwd, &config).await;
let raw_fs = DefaultRawFileSystem::new(
fileset_fs,
&AppConfig::default(),
&FileSystemContext::default(),
);
let _ = raw_fs.init().await;
let mut tester = TestRawFileSystem::new(Path::new("/"), raw_fs);
tester.test_raw_file_system().await;
}
}
10 changes: 5 additions & 5 deletions clients/filesystem-fuse/src/gvfs_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ pub async fn create_gvfs_filesystem(
.get_fileset(&catalog_name, &schema_name, &fileset_name)
.await?;

let inner_fs = create_fs_with_fileset(&catalog, &fileset, config, fs_context)?;
let inner_fs = create_fs_with_fileset(&catalog, &fileset, config, fs_context).await?;

let target_path = extract_root_path(fileset.storage_location.as_str())?;
let fs =
GravitinoFilesetFileSystem::new(inner_fs, &target_path, client, config, fs_context).await;
Ok(CreateFileSystemResult::Gvfs(fs))
}

fn create_fs_with_fileset(
pub(crate) async fn create_fs_with_fileset(
catalog: &Catalog,
fileset: &Fileset,
config: &AppConfig,
Expand All @@ -104,9 +104,9 @@ fn create_fs_with_fileset(
let schema = extract_filesystem_scheme(&fileset.storage_location)?;

match schema {
FileSystemSchema::S3 => Ok(Box::new(S3FileSystem::new(
catalog, fileset, config, fs_context,
)?)),
FileSystemSchema::S3 => Ok(Box::new(
S3FileSystem::new(catalog, fileset, config, fs_context).await?,
)),
}
}

Expand Down
13 changes: 13 additions & 0 deletions clients/filesystem-fuse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,19 @@ mod opened_file_manager;
mod s3_filesystem;
mod utils;

#[macro_export]
macro_rules! test_enable_with {
($env_var:expr) => {
if std::env::var($env_var).is_err() {
println!("Test skipped because {} is not set", $env_var);
return;
}
};
}

pub const RUN_TEST_WITH_S3: &str = "RUN_TEST_WITH_S3";
pub const RUN_TEST_WITH_FUSE: &str = "RUN_TEST_WITH_FUSE";

pub async fn gvfs_mount(mount_to: &str, mount_from: &str, config: &AppConfig) -> GvfsResult<()> {
gvfs_fuse::mount(mount_to, mount_from, config).await
}
Expand Down
47 changes: 40 additions & 7 deletions clients/filesystem-fuse/src/open_dal_filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,22 +261,29 @@ fn opendal_filemode_to_filetype(mode: EntryMode) -> FileType {
mod test {
use crate::config::AppConfig;
use crate::s3_filesystem::extract_s3_config;
use crate::s3_filesystem::tests::s3_test_config;
use crate::test_enable_with;
use crate::RUN_TEST_WITH_S3;
use opendal::layers::LoggingLayer;
use opendal::{services, Builder, Operator};

#[tokio::test]
async fn test_s3_stat() {
let config = AppConfig::from_file(Some("tests/conf/gvfs_fuse_s3.toml")).unwrap();
let opendal_config = extract_s3_config(&config);

fn create_opendal(config: &AppConfig) -> Operator {
let opendal_config = extract_s3_config(config);
let builder = services::S3::from_map(opendal_config);

// Init an operator
let op = Operator::new(builder)
Operator::new(builder)
.expect("opendal create failed")
.layer(LoggingLayer::default())
.finish();
.finish()
}

#[tokio::test]
async fn s3_ut_test_s3_stat() {
test_enable_with!(RUN_TEST_WITH_S3);

let config = s3_test_config();
let op = create_opendal(&config);
let path = "/";
let list = op.list(path).await;
if let Ok(l) = list {
Expand All @@ -294,4 +301,30 @@ mod test {
println!("stat error: {:?}", meta.err());
}
}

#[tokio::test]
async fn s3_ut_test_s3_delete() {
test_enable_with!(RUN_TEST_WITH_S3);
let config = s3_test_config();

let op = create_opendal(&config);
let path = "/s1/fileset1/gvfs_test/test_dir/test_file";

let meta = op.stat(path).await;
if let Ok(m) = meta {
println!("stat result: {:?}", m);
} else {
println!("stat error: {:?}", meta.err());
}

let result = op.remove(vec![path.to_string()]).await;
match result {
Ok(_) => {
println!("Delete successful (or no-op).");
}
Err(e) => {
println!("Delete failed: {:?}", e);
}
}
}
}
Loading

0 comments on commit c6476b8

Please sign in to comment.