Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alluxioio Package #54

Merged
merged 5 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions alluxiofs/alluxioio/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Alluxioio package

The Alluxioio package replaces Python's POSIX-related standard methods with custom methods that use Alluxiofs. By importing the package, users don't need to modify their existing code to use AlluxioFS. Python's POSIX-related methods will be automatically replaced by Alluxiofs methods. This replacement is only effective in the file that imports the Alluxioio package, as well as in external libraries that are imported after Alluxioio.
Alluxio customized methods will be used only when the file or directory paths starts with supported protocols such as "s3://" and "hdfs://". Paths that don't start with supported protocols will still use Python's original methods.
### Customized Python Methods

- builtins.open
- os.listdir
- os.mkdir
- os.rmdir
- os.rename
- os.remove
- shutil.copy
- os.stat
- os.path.isdir
- os.path.isfile
- os.path.exists
- os.walk

### How To Use
Add a configuration yaml file with name "alluxiofs_config.yaml" in the same directory of files that you want to use alluxioio.
The configurations are used to initialize alluxiofs filesystem.
```
etcd_hosts: "localhost"
etcd_port: 2379
options:
alluxio.user.file.replication.max: "3"
concurrency: 128
worker_http_port: 8080
preload_path: "/path/to/preload"
target_protocol: "s3"
target_options:
aws_access_key_id: "your-access-key-id"
aws_secret_access_key: "your-secret-access-key"

```

Import alluxioio as the first line of import in your code:
```
from alluxiofs import alluxioio
import os

with open('s3://file_name.csv', 'r') as f:
print(f.read())

for d in os.listdir('s3://folder_name'):
print(d)
```
40 changes: 40 additions & 0 deletions alluxiofs/alluxioio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import builtins
import inspect
import os
import shutil

from .alluxio_custom_io import alluxio_copy
from .alluxio_custom_io import alluxio_exists
from .alluxio_custom_io import alluxio_isdir
from .alluxio_custom_io import alluxio_isfile
from .alluxio_custom_io import alluxio_ls
from .alluxio_custom_io import alluxio_mkdir
from .alluxio_custom_io import alluxio_open
from .alluxio_custom_io import alluxio_remove
from .alluxio_custom_io import alluxio_rename
from .alluxio_custom_io import alluxio_rmdir
from .alluxio_custom_io import alluxio_stat
from .alluxio_custom_io import alluxio_walk
from .alluxio_custom_io import set_user_config_file_path


# Get the path of user's configuration yaml file
caller_frame = inspect.stack()[1]
caller_file_path = caller_frame.filename
caller_dir = os.path.dirname(os.path.abspath(caller_file_path))
config_file_path = os.path.join(caller_dir, "alluxiofs_config.yaml")
set_user_config_file_path(config_file_path)

# Override the built-in python POSIX function
builtins.open = alluxio_open
os.listdir = alluxio_ls
os.mkdir = alluxio_mkdir
os.rmdir = alluxio_rmdir
os.rename = alluxio_rename
os.remove = alluxio_remove
shutil.copy = alluxio_copy
os.stat = alluxio_stat
os.path.isdir = alluxio_isdir
os.path.isfile = alluxio_isfile
os.path.exists = alluxio_exists
os.walk = alluxio_walk
210 changes: 210 additions & 0 deletions alluxiofs/alluxioio/alluxio_custom_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import builtins
import os
import shutil

import fsspec
import yaml

from alluxiofs import AlluxioFileSystem

# Save the original python methods before replacing them
# Original python methods will be used when the file path doesn't start with supported protocols such as s3 and hdfs
original_open = builtins.open
original_ls = os.listdir
original_mkdir = os.mkdir
original_rmdir = os.rmdir
original_listdir = os.listdir
original_rename = os.rename
original_remove = os.remove
original_copy = shutil.copy
original_stat = os.stat
original_isdir = os.path.isdir
original_isfile = os.path.isfile
original_exists = os.path.exists
original_walk = os.walk

file_systems = {}
supported_file_paths = {"s3", "hdfs"}
user_config_file_path = ""


def set_user_config_file_path(file_path):
global user_config_file_path
user_config_file_path = file_path


def initialize_alluxio_file_systems(protocol):
global file_systems
global user_config_file_path

# load user's alluxiofs configurations
with open(user_config_file_path, "r") as file:
config = yaml.safe_load(file)

if config is None:
config = {}

# Register the Alluxio file system with fsspec for the given protocol
fsspec.register_implementation(
"alluxiofs", AlluxioFileSystem, clobber=True
)
try:
# Initialize the file system with the provided configuration
file_systems[protocol] = fsspec.filesystem(
"alluxiofs", target_protocol=protocol, **config
)
except Exception as e:
print(
f"Failed to initialize the Alluxio file system for protocol '{protocol}': {e}"
)
return None


def get_alluxio_fs(file):
file_path_prefix = file.split("://")[0]
if file_path_prefix in supported_file_paths:
if file_path_prefix not in file_systems:
initialize_alluxio_file_systems(file_path_prefix)
return file_systems[file_path_prefix]
return None


def alluxio_open(
file,
mode="r",
buffering=-1,
encoding=None,
errors=None,
newline=None,
closefd=True,
opener=None,
**kwargs,
):
alluxio_fs = get_alluxio_fs(file)
if alluxio_fs:
return alluxio_fs.open(
file,
mode=mode,
buffering=buffering,
encoding=encoding,
errors=errors,
newline=newline,
closefd=closefd,
opener=opener,
**kwargs,
)

else:
# For other paths, use the original built-in open function
return original_open(
file,
mode=mode,
buffering=buffering,
encoding=encoding,
errors=errors,
newline=newline,
closefd=closefd,
opener=opener,
)


def alluxio_ls(path, **kwargs):
alluxio_fs = get_alluxio_fs(path)
if alluxio_fs:
return alluxio_fs.ls(path, **kwargs)

else:
return original_ls(path)


def alluxio_mkdir(path, mode=0o777, **kwargs):
alluxio_fs = get_alluxio_fs(path)

if alluxio_fs:
# s3 mkdir only create buckets, not directory in buckets. It accepts bucket name without s3:// prefix.
file_path_prefix = path.split("://")[0]
if file_path_prefix == "s3":
bucket_name = path.split("://")[1]
return alluxio_fs.mkdir(bucket_name, **kwargs)
else:
return alluxio_fs.mkdir(path)
else:
return original_mkdir(path, mode, **kwargs)


def alluxio_rmdir(path, **kwargs):
alluxio_fs = get_alluxio_fs(path)
if alluxio_fs:
# s3 rmdir only create buckets, not directory in buckets. It accepts bucket name without s3:// prefix.
file_path_prefix = path.split("://")[0]
if file_path_prefix == "s3":
bucket_name = path.split("://")[1]
return alluxio_fs.rmdir(bucket_name, **kwargs)
else:
return original_rmdir(path, **kwargs)


def alluxio_rename(src, dest, **kwargs):
alluxio_fs = get_alluxio_fs(src)
if alluxio_fs:
alluxio_fs.mv(src, dest, **kwargs)
else:
original_rename(src, dest, **kwargs)


def alluxio_remove(path):
alluxio_fs = get_alluxio_fs(path)
if alluxio_fs:
alluxio_fs.rm_file(path)
else:
original_remove(path)


def alluxio_copy(src, dst, **kwargs):
alluxio_fs = get_alluxio_fs(src)
if alluxio_fs:
alluxio_fs.copy(src, dst, **kwargs)
else:
original_copy(src, dst, **kwargs)


def alluxio_stat(path, **kwargs):
alluxio_fs = get_alluxio_fs(path)
if alluxio_fs:
return alluxio_fs.info(path, **kwargs)
else:
return original_stat(path, **kwargs)


def alluxio_isdir(path, **kwargs):
alluxio_fs = get_alluxio_fs(path)
if alluxio_fs:
return alluxio_fs.isdir(path, **kwargs)
else:
return original_isdir(path)


def alluxio_isfile(path):
alluxio_fs = get_alluxio_fs(path)
if alluxio_fs:
return alluxio_fs.isfile(path)
else:
return original_isfile(path)


def alluxio_exists(path, **kwargs):
alluxio_fs = get_alluxio_fs(path)
if alluxio_fs:
return alluxio_fs.exists(path, **kwargs)
else:
return original_exists(path)


def alluxio_walk(
path, topdown=True, onerror=None, followlinks=False, **kwargs
):
alluxio_fs = get_alluxio_fs(path)
if alluxio_fs:
return alluxio_fs.walk(path, **kwargs)
else:
return original_walk(path, topdown, onerror, followlinks)
Loading