-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Add alluxioio * Update README.md * Add alluxioio * Add alluxioio
- Loading branch information
Showing
3 changed files
with
298 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
# Alluxioio package | ||
|
||
The Alluxioio package replaces Python's POSIX-related standard methods with custom methods that use Alluxiofs. By importing the package, users don't need to modify their existing code to use AlluxioFS. Python's POSIX-related methods will be automatically replaced by Alluxiofs methods. This replacement is only effective in the file that imports the Alluxioio package, as well as in external libraries that are imported after Alluxioio. | ||
Alluxio customized methods will be used only when the file or directory paths starts with supported protocols such as "s3://" and "hdfs://". Paths that don't start with supported protocols will still use Python's original methods. | ||
### Customized Python Methods | ||
|
||
- builtins.open | ||
- os.listdir | ||
- os.mkdir | ||
- os.rmdir | ||
- os.rename | ||
- os.remove | ||
- shutil.copy | ||
- os.stat | ||
- os.path.isdir | ||
- os.path.isfile | ||
- os.path.exists | ||
- os.walk | ||
|
||
### How To Use | ||
Add a configuration yaml file with name "alluxiofs_config.yaml" in the same directory of files that you want to use alluxioio. | ||
The configurations are used to initialize alluxiofs filesystem. | ||
``` | ||
etcd_hosts: "localhost" | ||
etcd_port: 2379 | ||
options: | ||
alluxio.user.file.replication.max: "3" | ||
concurrency: 128 | ||
worker_http_port: 8080 | ||
preload_path: "/path/to/preload" | ||
target_protocol: "s3" | ||
target_options: | ||
aws_access_key_id: "your-access-key-id" | ||
aws_secret_access_key: "your-secret-access-key" | ||
``` | ||
|
||
Import alluxioio as the first line of import in your code: | ||
``` | ||
from alluxiofs import alluxioio | ||
import os | ||
with open('s3://file_name.csv', 'r') as f: | ||
print(f.read()) | ||
for d in os.listdir('s3://folder_name'): | ||
print(d) | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
import builtins | ||
import inspect | ||
import os | ||
import shutil | ||
|
||
from .alluxio_custom_io import alluxio_copy | ||
from .alluxio_custom_io import alluxio_exists | ||
from .alluxio_custom_io import alluxio_isdir | ||
from .alluxio_custom_io import alluxio_isfile | ||
from .alluxio_custom_io import alluxio_ls | ||
from .alluxio_custom_io import alluxio_mkdir | ||
from .alluxio_custom_io import alluxio_open | ||
from .alluxio_custom_io import alluxio_remove | ||
from .alluxio_custom_io import alluxio_rename | ||
from .alluxio_custom_io import alluxio_rmdir | ||
from .alluxio_custom_io import alluxio_stat | ||
from .alluxio_custom_io import alluxio_walk | ||
from .alluxio_custom_io import set_user_config_file_path | ||
|
||
|
||
# Get the path of user's configuration yaml file | ||
caller_frame = inspect.stack()[1] | ||
caller_file_path = caller_frame.filename | ||
caller_dir = os.path.dirname(os.path.abspath(caller_file_path)) | ||
config_file_path = os.path.join(caller_dir, "alluxiofs_config.yaml") | ||
set_user_config_file_path(config_file_path) | ||
|
||
# Override the built-in python POSIX function | ||
builtins.open = alluxio_open | ||
os.listdir = alluxio_ls | ||
os.mkdir = alluxio_mkdir | ||
os.rmdir = alluxio_rmdir | ||
os.rename = alluxio_rename | ||
os.remove = alluxio_remove | ||
shutil.copy = alluxio_copy | ||
os.stat = alluxio_stat | ||
os.path.isdir = alluxio_isdir | ||
os.path.isfile = alluxio_isfile | ||
os.path.exists = alluxio_exists | ||
os.walk = alluxio_walk |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,210 @@ | ||
import builtins | ||
import os | ||
import shutil | ||
|
||
import fsspec | ||
import yaml | ||
|
||
from alluxiofs import AlluxioFileSystem | ||
|
||
# Save the original python methods before replacing them | ||
# Original python methods will be used when the file path doesn't start with supported protocols such as s3 and hdfs | ||
original_open = builtins.open | ||
original_ls = os.listdir | ||
original_mkdir = os.mkdir | ||
original_rmdir = os.rmdir | ||
original_listdir = os.listdir | ||
original_rename = os.rename | ||
original_remove = os.remove | ||
original_copy = shutil.copy | ||
original_stat = os.stat | ||
original_isdir = os.path.isdir | ||
original_isfile = os.path.isfile | ||
original_exists = os.path.exists | ||
original_walk = os.walk | ||
|
||
file_systems = {} | ||
supported_file_paths = {"s3", "hdfs"} | ||
user_config_file_path = "" | ||
|
||
|
||
def set_user_config_file_path(file_path): | ||
global user_config_file_path | ||
user_config_file_path = file_path | ||
|
||
|
||
def initialize_alluxio_file_systems(protocol): | ||
global file_systems | ||
global user_config_file_path | ||
|
||
# load user's alluxiofs configurations | ||
with open(user_config_file_path, "r") as file: | ||
config = yaml.safe_load(file) | ||
|
||
if config is None: | ||
config = {} | ||
|
||
# Register the Alluxio file system with fsspec for the given protocol | ||
fsspec.register_implementation( | ||
"alluxiofs", AlluxioFileSystem, clobber=True | ||
) | ||
try: | ||
# Initialize the file system with the provided configuration | ||
file_systems[protocol] = fsspec.filesystem( | ||
"alluxiofs", target_protocol=protocol, **config | ||
) | ||
except Exception as e: | ||
print( | ||
f"Failed to initialize the Alluxio file system for protocol '{protocol}': {e}" | ||
) | ||
return None | ||
|
||
|
||
def get_alluxio_fs(file): | ||
file_path_prefix = file.split("://")[0] | ||
if file_path_prefix in supported_file_paths: | ||
if file_path_prefix not in file_systems: | ||
initialize_alluxio_file_systems(file_path_prefix) | ||
return file_systems[file_path_prefix] | ||
return None | ||
|
||
|
||
def alluxio_open( | ||
file, | ||
mode="r", | ||
buffering=-1, | ||
encoding=None, | ||
errors=None, | ||
newline=None, | ||
closefd=True, | ||
opener=None, | ||
**kwargs, | ||
): | ||
alluxio_fs = get_alluxio_fs(file) | ||
if alluxio_fs: | ||
return alluxio_fs.open( | ||
file, | ||
mode=mode, | ||
buffering=buffering, | ||
encoding=encoding, | ||
errors=errors, | ||
newline=newline, | ||
closefd=closefd, | ||
opener=opener, | ||
**kwargs, | ||
) | ||
|
||
else: | ||
# For other paths, use the original built-in open function | ||
return original_open( | ||
file, | ||
mode=mode, | ||
buffering=buffering, | ||
encoding=encoding, | ||
errors=errors, | ||
newline=newline, | ||
closefd=closefd, | ||
opener=opener, | ||
) | ||
|
||
|
||
def alluxio_ls(path, **kwargs): | ||
alluxio_fs = get_alluxio_fs(path) | ||
if alluxio_fs: | ||
return alluxio_fs.ls(path, **kwargs) | ||
|
||
else: | ||
return original_ls(path) | ||
|
||
|
||
def alluxio_mkdir(path, mode=0o777, **kwargs): | ||
alluxio_fs = get_alluxio_fs(path) | ||
|
||
if alluxio_fs: | ||
# s3 mkdir only create buckets, not directory in buckets. It accepts bucket name without s3:// prefix. | ||
file_path_prefix = path.split("://")[0] | ||
if file_path_prefix == "s3": | ||
bucket_name = path.split("://")[1] | ||
return alluxio_fs.mkdir(bucket_name, **kwargs) | ||
else: | ||
return alluxio_fs.mkdir(path) | ||
else: | ||
return original_mkdir(path, mode, **kwargs) | ||
|
||
|
||
def alluxio_rmdir(path, **kwargs): | ||
alluxio_fs = get_alluxio_fs(path) | ||
if alluxio_fs: | ||
# s3 rmdir only create buckets, not directory in buckets. It accepts bucket name without s3:// prefix. | ||
file_path_prefix = path.split("://")[0] | ||
if file_path_prefix == "s3": | ||
bucket_name = path.split("://")[1] | ||
return alluxio_fs.rmdir(bucket_name, **kwargs) | ||
else: | ||
return original_rmdir(path, **kwargs) | ||
|
||
|
||
def alluxio_rename(src, dest, **kwargs): | ||
alluxio_fs = get_alluxio_fs(src) | ||
if alluxio_fs: | ||
alluxio_fs.mv(src, dest, **kwargs) | ||
else: | ||
original_rename(src, dest, **kwargs) | ||
|
||
|
||
def alluxio_remove(path): | ||
alluxio_fs = get_alluxio_fs(path) | ||
if alluxio_fs: | ||
alluxio_fs.rm_file(path) | ||
else: | ||
original_remove(path) | ||
|
||
|
||
def alluxio_copy(src, dst, **kwargs): | ||
alluxio_fs = get_alluxio_fs(src) | ||
if alluxio_fs: | ||
alluxio_fs.copy(src, dst, **kwargs) | ||
else: | ||
original_copy(src, dst, **kwargs) | ||
|
||
|
||
def alluxio_stat(path, **kwargs): | ||
alluxio_fs = get_alluxio_fs(path) | ||
if alluxio_fs: | ||
return alluxio_fs.info(path, **kwargs) | ||
else: | ||
return original_stat(path, **kwargs) | ||
|
||
|
||
def alluxio_isdir(path, **kwargs): | ||
alluxio_fs = get_alluxio_fs(path) | ||
if alluxio_fs: | ||
return alluxio_fs.isdir(path, **kwargs) | ||
else: | ||
return original_isdir(path) | ||
|
||
|
||
def alluxio_isfile(path): | ||
alluxio_fs = get_alluxio_fs(path) | ||
if alluxio_fs: | ||
return alluxio_fs.isfile(path) | ||
else: | ||
return original_isfile(path) | ||
|
||
|
||
def alluxio_exists(path, **kwargs): | ||
alluxio_fs = get_alluxio_fs(path) | ||
if alluxio_fs: | ||
return alluxio_fs.exists(path, **kwargs) | ||
else: | ||
return original_exists(path) | ||
|
||
|
||
def alluxio_walk( | ||
path, topdown=True, onerror=None, followlinks=False, **kwargs | ||
): | ||
alluxio_fs = get_alluxio_fs(path) | ||
if alluxio_fs: | ||
return alluxio_fs.walk(path, **kwargs) | ||
else: | ||
return original_walk(path, topdown, onerror, followlinks) |