diff --git a/alluxiofs/alluxioio/README.md b/alluxiofs/alluxioio/README.md new file mode 100644 index 0000000..ed256e5 --- /dev/null +++ b/alluxiofs/alluxioio/README.md @@ -0,0 +1,48 @@ +# Alluxioio package + +The Alluxioio package replaces Python's POSIX-related standard methods with custom methods that use Alluxiofs. By importing the package, users don't need to modify their existing code to use AlluxioFS. Python's POSIX-related methods will be automatically replaced by Alluxiofs methods. This replacement is only effective in the file that imports the Alluxioio package, as well as in external libraries that are imported after Alluxioio. +Alluxio customized methods will be used only when the file or directory paths starts with supported protocols such as "s3://" and "hdfs://". Paths that don't start with supported protocols will still use Python's original methods. +### Customized Python Methods + +- builtins.open +- os.listdir +- os.mkdir +- os.rmdir +- os.rename +- os.remove +- shutil.copy +- os.stat +- os.path.isdir +- os.path.isfile +- os.path.exists +- os.walk + +### How To Use +Add a configuration yaml file with name "alluxiofs_config.yaml" in the same directory of files that you want to use alluxioio. +The configurations are used to initialize alluxiofs filesystem. +``` +etcd_hosts: "localhost" +etcd_port: 2379 +options: + alluxio.user.file.replication.max: "3" +concurrency: 128 +worker_http_port: 8080 +preload_path: "/path/to/preload" +target_protocol: "s3" +target_options: + aws_access_key_id: "your-access-key-id" + aws_secret_access_key: "your-secret-access-key" + +``` + +Import alluxioio as the first line of import in your code: +``` +from alluxiofs import alluxioio +import os + +with open('s3://file_name.csv', 'r') as f: + print(f.read()) + +for d in os.listdir('s3://folder_name'): + print(d) +``` diff --git a/alluxiofs/alluxioio/__init__.py b/alluxiofs/alluxioio/__init__.py new file mode 100644 index 0000000..ef85161 --- /dev/null +++ b/alluxiofs/alluxioio/__init__.py @@ -0,0 +1,40 @@ +import builtins +import inspect +import os +import shutil + +from .alluxio_custom_io import alluxio_copy +from .alluxio_custom_io import alluxio_exists +from .alluxio_custom_io import alluxio_isdir +from .alluxio_custom_io import alluxio_isfile +from .alluxio_custom_io import alluxio_ls +from .alluxio_custom_io import alluxio_mkdir +from .alluxio_custom_io import alluxio_open +from .alluxio_custom_io import alluxio_remove +from .alluxio_custom_io import alluxio_rename +from .alluxio_custom_io import alluxio_rmdir +from .alluxio_custom_io import alluxio_stat +from .alluxio_custom_io import alluxio_walk +from .alluxio_custom_io import set_user_config_file_path + + +# Get the path of user's configuration yaml file +caller_frame = inspect.stack()[1] +caller_file_path = caller_frame.filename +caller_dir = os.path.dirname(os.path.abspath(caller_file_path)) +config_file_path = os.path.join(caller_dir, "alluxiofs_config.yaml") +set_user_config_file_path(config_file_path) + +# Override the built-in python POSIX function +builtins.open = alluxio_open +os.listdir = alluxio_ls +os.mkdir = alluxio_mkdir +os.rmdir = alluxio_rmdir +os.rename = alluxio_rename +os.remove = alluxio_remove +shutil.copy = alluxio_copy +os.stat = alluxio_stat +os.path.isdir = alluxio_isdir +os.path.isfile = alluxio_isfile +os.path.exists = alluxio_exists +os.walk = alluxio_walk diff --git a/alluxiofs/alluxioio/alluxio_custom_io.py b/alluxiofs/alluxioio/alluxio_custom_io.py new file mode 100644 index 0000000..14d09d5 --- /dev/null +++ b/alluxiofs/alluxioio/alluxio_custom_io.py @@ -0,0 +1,210 @@ +import builtins +import os +import shutil + +import fsspec +import yaml + +from alluxiofs import AlluxioFileSystem + +# Save the original python methods before replacing them +# Original python methods will be used when the file path doesn't start with supported protocols such as s3 and hdfs +original_open = builtins.open +original_ls = os.listdir +original_mkdir = os.mkdir +original_rmdir = os.rmdir +original_listdir = os.listdir +original_rename = os.rename +original_remove = os.remove +original_copy = shutil.copy +original_stat = os.stat +original_isdir = os.path.isdir +original_isfile = os.path.isfile +original_exists = os.path.exists +original_walk = os.walk + +file_systems = {} +supported_file_paths = {"s3", "hdfs"} +user_config_file_path = "" + + +def set_user_config_file_path(file_path): + global user_config_file_path + user_config_file_path = file_path + + +def initialize_alluxio_file_systems(protocol): + global file_systems + global user_config_file_path + + # load user's alluxiofs configurations + with open(user_config_file_path, "r") as file: + config = yaml.safe_load(file) + + if config is None: + config = {} + + # Register the Alluxio file system with fsspec for the given protocol + fsspec.register_implementation( + "alluxiofs", AlluxioFileSystem, clobber=True + ) + try: + # Initialize the file system with the provided configuration + file_systems[protocol] = fsspec.filesystem( + "alluxiofs", target_protocol=protocol, **config + ) + except Exception as e: + print( + f"Failed to initialize the Alluxio file system for protocol '{protocol}': {e}" + ) + return None + + +def get_alluxio_fs(file): + file_path_prefix = file.split("://")[0] + if file_path_prefix in supported_file_paths: + if file_path_prefix not in file_systems: + initialize_alluxio_file_systems(file_path_prefix) + return file_systems[file_path_prefix] + return None + + +def alluxio_open( + file, + mode="r", + buffering=-1, + encoding=None, + errors=None, + newline=None, + closefd=True, + opener=None, + **kwargs, +): + alluxio_fs = get_alluxio_fs(file) + if alluxio_fs: + return alluxio_fs.open( + file, + mode=mode, + buffering=buffering, + encoding=encoding, + errors=errors, + newline=newline, + closefd=closefd, + opener=opener, + **kwargs, + ) + + else: + # For other paths, use the original built-in open function + return original_open( + file, + mode=mode, + buffering=buffering, + encoding=encoding, + errors=errors, + newline=newline, + closefd=closefd, + opener=opener, + ) + + +def alluxio_ls(path, **kwargs): + alluxio_fs = get_alluxio_fs(path) + if alluxio_fs: + return alluxio_fs.ls(path, **kwargs) + + else: + return original_ls(path) + + +def alluxio_mkdir(path, mode=0o777, **kwargs): + alluxio_fs = get_alluxio_fs(path) + + if alluxio_fs: + # s3 mkdir only create buckets, not directory in buckets. It accepts bucket name without s3:// prefix. + file_path_prefix = path.split("://")[0] + if file_path_prefix == "s3": + bucket_name = path.split("://")[1] + return alluxio_fs.mkdir(bucket_name, **kwargs) + else: + return alluxio_fs.mkdir(path) + else: + return original_mkdir(path, mode, **kwargs) + + +def alluxio_rmdir(path, **kwargs): + alluxio_fs = get_alluxio_fs(path) + if alluxio_fs: + # s3 rmdir only create buckets, not directory in buckets. It accepts bucket name without s3:// prefix. + file_path_prefix = path.split("://")[0] + if file_path_prefix == "s3": + bucket_name = path.split("://")[1] + return alluxio_fs.rmdir(bucket_name, **kwargs) + else: + return original_rmdir(path, **kwargs) + + +def alluxio_rename(src, dest, **kwargs): + alluxio_fs = get_alluxio_fs(src) + if alluxio_fs: + alluxio_fs.mv(src, dest, **kwargs) + else: + original_rename(src, dest, **kwargs) + + +def alluxio_remove(path): + alluxio_fs = get_alluxio_fs(path) + if alluxio_fs: + alluxio_fs.rm_file(path) + else: + original_remove(path) + + +def alluxio_copy(src, dst, **kwargs): + alluxio_fs = get_alluxio_fs(src) + if alluxio_fs: + alluxio_fs.copy(src, dst, **kwargs) + else: + original_copy(src, dst, **kwargs) + + +def alluxio_stat(path, **kwargs): + alluxio_fs = get_alluxio_fs(path) + if alluxio_fs: + return alluxio_fs.info(path, **kwargs) + else: + return original_stat(path, **kwargs) + + +def alluxio_isdir(path, **kwargs): + alluxio_fs = get_alluxio_fs(path) + if alluxio_fs: + return alluxio_fs.isdir(path, **kwargs) + else: + return original_isdir(path) + + +def alluxio_isfile(path): + alluxio_fs = get_alluxio_fs(path) + if alluxio_fs: + return alluxio_fs.isfile(path) + else: + return original_isfile(path) + + +def alluxio_exists(path, **kwargs): + alluxio_fs = get_alluxio_fs(path) + if alluxio_fs: + return alluxio_fs.exists(path, **kwargs) + else: + return original_exists(path) + + +def alluxio_walk( + path, topdown=True, onerror=None, followlinks=False, **kwargs +): + alluxio_fs = get_alluxio_fs(path) + if alluxio_fs: + return alluxio_fs.walk(path, **kwargs) + else: + return original_walk(path, topdown, onerror, followlinks)