Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
diqiu50 committed Dec 27, 2024
1 parent 06f5c23 commit fd78faf
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 55 deletions.
95 changes: 48 additions & 47 deletions clients/filesystem-fuse/src/gvfs_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/

use crate::config::AppConfig;
use crate::error::ErrorCode::InvalidConfig;
use crate::error::ErrorCode::{InvalidConfig, UnSupportedFilesystem};
use crate::filesystem::{FileSystemContext, PathFileSystem};
use crate::gravitino_client::GravitinoClient;
use crate::gravitino_client::{Fileset, GravitinoClient};
use crate::gvfs_fileset_fs::GvfsFilesetFs;
use crate::gvfs_fuse::{CreateFsResult, FileSystemSchema};
use crate::open_dal_filesystem::OpenDalFileSystem;
use crate::utils::GvfsResult;
use std::path::Path;
use crate::utils::{extract_root_path, parse_location, GvfsResult};

const FILESET_PREFIX: &str = "gvfs://fileset/";
const GRAVITINO_FILESET_SCHEMA: &str = "gvfs";

pub async fn create_gvfs_filesystem(
mount_from: &str,
Expand Down Expand Up @@ -81,69 +79,72 @@ pub async fn create_gvfs_filesystem(
let client = GravitinoClient::new(&config.gravitino);

let (catalog, schema, fileset) = extract_fileset(mount_from)?;
let location = client
.get_fileset(&catalog, &schema, &fileset)
.await?
.storage_location;
let (schema, location) = extract_storage_filesystem(&location).unwrap();
let fileset = client.get_fileset(&catalog, &schema, &fileset).await?;

let inner_fs = create_fs_with_schema(&schema, config, fs_context)?;
let inner_fs = create_fs_with_fileset(&fileset, config, fs_context)?;

let fs = GvfsFilesetFs::new(inner_fs, Path::new(&location), client, config, fs_context).await;
let target_path = extract_root_path(fileset.storage_location.as_str())?;
let fs = GvfsFilesetFs::new(inner_fs, &target_path, client, config, fs_context).await;
Ok(CreateFsResult::Gvfs(fs))
}

fn create_fs_with_schema(
schema: &FileSystemSchema,
fn create_fs_with_fileset(
fileset: &Fileset,
config: &AppConfig,
fs_context: &FileSystemContext,
) -> GvfsResult<Box<dyn PathFileSystem>> {
let schema = extract_filesystem_scheme(&fileset.storage_location).unwrap();
match schema {
FileSystemSchema::S3 => OpenDalFileSystem::create_file_system(schema, config, fs_context),
FileSystemSchema::S3 => {
OpenDalFileSystem::create_file_system(&schema, fileset, config, fs_context)
}
}
}

pub fn extract_fileset(path: &str) -> GvfsResult<(String, String, String)> {
if !path.starts_with(FILESET_PREFIX) {
return Err(InvalidConfig.to_error(format!("Invalid fileset path: {}", path)));
}
let path = parse_location(path)?;

let path_without_prefix = &path[FILESET_PREFIX.len()..];

let parts: Vec<&str> = path_without_prefix.split('/').collect();
if path.scheme() != GRAVITINO_FILESET_SCHEMA {
return Err(InvalidConfig.to_error(format!("Invalid fileset schema: {}", path)));
}

if parts.len() != 3 {
let split = path.path_segments();
if split.is_none() {
return Err(InvalidConfig.to_error(format!("Invalid fileset path: {}", path)));
}
let split = split.unwrap().collect::<Vec<&str>>();
if split.len() != 4 {
return Err(InvalidConfig.to_error(format!("Invalid fileset path: {}", path)));
}
// todo handle mount catalog or schema

let catalog = parts[0].to_string();
let schema = parts[1].to_string();
let fileset = parts[2].to_string();

let catalog = split[1].to_string();
let schema = split[2].to_string();
let fileset = split[3].to_string();
Ok((catalog, schema, fileset))
}

pub fn extract_storage_filesystem(path: &str) -> Option<(FileSystemSchema, String)> {
pub fn extract_filesystem_scheme(path: &str) -> GvfsResult<FileSystemSchema> {
// todo need to improve the logic
if let Some(pos) = path.find("://") {
let protocol = &path[..pos];
let location = &path[pos + 3..];
let location = match location.find('/') {
Some(index) => &location[index + 1..],
None => "",
};
let location = match location.ends_with('/') {
true => location.to_string(),
false => format!("{}/", location),
};
let url = parse_location(path)?;
let scheme = url.scheme();

match protocol {
"s3" => Some((FileSystemSchema::S3, location.to_string())),
"s3a" => Some((FileSystemSchema::S3, location.to_string())),
_ => None,
}
} else {
None
match scheme {
"s3" => Ok(FileSystemSchema::S3),
"s3a" => Ok(FileSystemSchema::S3),
_ => Err(UnSupportedFilesystem.to_error(format!("Invalid storage schema: {}", path))),
}
}

#[cfg(test)]
mod tests {
use crate::gvfs_creator::extract_fileset;

#[test]
fn test_extract_fileset() {
let location = "gvfs://fileset/test/c1/s1/fileset1";
let (catalog, schema, fileset) = extract_fileset(location).unwrap();
assert_eq!(catalog, "c1");
assert_eq!(schema, "s1");
assert_eq!(fileset, "fileset1");
}
}
10 changes: 5 additions & 5 deletions clients/filesystem-fuse/src/gvfs_fileset_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,30 @@ pub(crate) struct GravitinoFileSystemConfig {}
pub(crate) struct GvfsFilesetFs {
fs: Box<dyn PathFileSystem>,
client: GravitinoClient,
fileset_location: PathBuf,
target_path: PathBuf,
}

impl GvfsFilesetFs {
pub async fn new(
fs: Box<dyn PathFileSystem>,
location: &Path,
target_path: &Path,
client: GravitinoClient,
_config: &AppConfig,
_context: &FileSystemContext,
) -> Self {
Self {
fs: fs,
client: client,
fileset_location: location.into(),
target_path: target_path.into(),
}
}

fn map_fileset_path_to_raw_path(&self, path: &Path) -> PathBuf {
self.fileset_location.join(path)
self.target_path.join(path)
}

fn map_raw_path_to_fileset_path(&self, path: &Path) -> Result<PathBuf> {
path.strip_prefix(&self.fileset_location)
path.strip_prefix(&self.target_path)
.map_err(|_| Errno::from(libc::EBADF))?;
Ok(path.into())
}
Expand Down
2 changes: 1 addition & 1 deletion clients/filesystem-fuse/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn main() -> fuse3::Result<()> {
}
let config = config.unwrap();
let mount_point = "gvfs";
let mount_from = "gvfs://fileset/catalog1/schema1/fileset1";
let mount_from = "gvfs://fileset/test/c1/s1/fileset1";
let handle = tokio::spawn(async move {
let result = gvfs_mount(mount_point, mount_from, &config).await;
if let Err(e) = result {
Expand Down
7 changes: 6 additions & 1 deletion clients/filesystem-fuse/src/open_dal_filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ use crate::error::ErrorCode::OpenDalError;
use crate::filesystem::{
FileReader, FileStat, FileSystemCapacity, FileSystemContext, FileWriter, PathFileSystem, Result,
};
use crate::gravitino_client::Fileset;
use crate::gvfs_fuse::FileSystemSchema;
use crate::opened_file::{OpenFileFlags, OpenedFile};
use crate::utils::GvfsResult;
use crate::utils::{extract_bucket, GvfsResult};
use async_trait::async_trait;
use bytes::Bytes;
use fuse3::FileType::{Directory, RegularFile};
Expand All @@ -48,11 +49,15 @@ impl OpenDalFileSystem {

pub(crate) fn create_file_system(
schema: &FileSystemSchema,
fileset: &Fileset,
config: &AppConfig,
fs_context: &FileSystemContext,
) -> GvfsResult<Box<dyn PathFileSystem>> {
match schema {
FileSystemSchema::S3 => {
let mut opendal_config = config.extend_config.clone();
let bucket = extract_bucket(&fileset.storage_location)?;
opendal_config.insert("bucket".to_string(), bucket);
let builder = S3::from_map(config.extend_config.clone());

let op = Operator::new(builder);
Expand Down
47 changes: 46 additions & 1 deletion clients/filesystem-fuse/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,54 @@
* specific language governing permissions and limitations
* under the License.
*/
use crate::error::ErrorCode::InvalidConfig;
use crate::error::GvfsError;
use reqwest::Url;
use std::path::PathBuf;

pub type GvfsResult<T> = Result<T, GvfsError>;

pub(crate) fn parse_location(location: &str) -> GvfsResult<Url> {
let parsed_url = Url::parse(location);
if let Err(e) = parsed_url {
return Err(InvalidConfig.to_error(format!("Invalid fileset location: {}", e)));
}
Ok(parsed_url.unwrap())
}

pub(crate) fn extract_root_path(location: &str) -> GvfsResult<PathBuf> {
let url = parse_location(location)?;
Ok(PathBuf::from(url.path()))
}

pub(crate) fn extract_bucket(location: &str) -> GvfsResult<String> {
let url = parse_location(location)?;
match url.host_str() {
Some(host) => Ok(host.to_string()),
None => Err(InvalidConfig.to_error(format!(
"Invalid fileset location without bucket: {}",
location
))),
}
}

#[cfg(test)]
mod tests {}
mod tests {
use super::*;

#[test]
fn test_extract_root_path() {
let location = "s3://bucket/path/to/file";
let result = extract_root_path(location);
assert!(result.is_ok());
assert_eq!(result.unwrap(), PathBuf::from("/path/to/file"));
}

#[test]
fn test_extract_bucket() {
let location = "s3://bucket/path/to/file";
let result = extract_bucket(location);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "bucket");
}
}

0 comments on commit fd78faf

Please sign in to comment.