Skip to content

Commit

Permalink
Update Alluxio Configuration Approach (#52)
Browse files Browse the repository at this point in the history
* add alluxio io logic

* update config

* update config

* update config

* update config

* update data_manager config

* address comments
  • Loading branch information
SibylYang authored Jul 24, 2024
1 parent bd39f9e commit ce10281
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 177 deletions.
89 changes: 89 additions & 0 deletions alluxiofs/client/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from typing import Optional

import humanfriendly

from .const import ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE
from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
from .const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
from .const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE


class AlluxioClientConfig:
"""
Class responsible for creating the configuration for Alluxio Client.
"""

def __init__(
self,
etcd_hosts: Optional[str] = None,
worker_hosts: Optional[str] = None,
etcd_port=2379,
worker_http_port=ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE,
etcd_refresh_workers_interval=120,
page_size=ALLUXIO_PAGE_SIZE_DEFAULT_VALUE,
hash_node_per_worker=ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE,
cluster_name=ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE,
etcd_username: Optional[str] = None,
etcd_password: Optional[str] = None,
concurrency=64,
**kwargs,
):
"""
Initializes Alluxio client configuration.
Args:
etcd_hosts (Optional[str], optional): The hostnames of ETCD to get worker addresses from
in 'host1,host2,host3' format. Either etcd_hosts or worker_hosts should be provided, not both.
worker_hosts (Optional[str], optional): The worker hostnames in 'host1,host2,host3' format.
Either etcd_hosts or worker_hosts should be provided, not both.
concurrency (int, optional): The maximum number of concurrent operations for HTTP requests, default to 64.
etcd_port (int, optional): The port of each etcd server.
worker_http_port (int, optional): The port of the HTTP server on each Alluxio worker node.
etcd_refresh_workers_interval (int, optional): The interval to refresh worker list from ETCD membership service periodically.
All negative values mean the service is disabled.
"""

assert (
etcd_hosts or worker_hosts
), "Must supply either 'etcd_hosts' or 'worker_hosts'"

assert not (
etcd_hosts and worker_hosts
), "Supply either 'etcd_hosts' or 'worker_hosts', not both"

assert isinstance(etcd_port, int) and (
1 <= etcd_port <= 65535
), "'etcd_port' should be an integer in the range 1-65535"

assert isinstance(worker_http_port, int) and (
1 <= worker_http_port <= 65535
), "'worker_http_port' should be an integer in the range 1-65535"

assert (
isinstance(concurrency, int) and concurrency > 0
), "'concurrency' should be a positive integer"

assert isinstance(
etcd_refresh_workers_interval, int
), "'etcd_refresh_workers_interval' should be an integer"

self.etcd_hosts = etcd_hosts
self.worker_hosts = worker_hosts
self.etcd_port = etcd_port
self.worker_http_port = worker_http_port
self.etcd_refresh_workers_interval = etcd_refresh_workers_interval

assert (
isinstance(hash_node_per_worker, int) and hash_node_per_worker > 0
), "'hash_node_per_worker' should be a positive integer"

self.hash_node_per_worker = hash_node_per_worker
self.page_size = humanfriendly.parse_size(page_size, binary=True)
self.cluster_name = cluster_name

assert (etcd_username is None) == (
etcd_password is None
), "Both ETCD username and password must be set or both should be unset."

self.etcd_username = etcd_username
self.etcd_password = etcd_password
self.concurrency = concurrency
152 changes: 33 additions & 119 deletions alluxiofs/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@
"[WARNING]pkg 'alluxiocommon' not installed, relative modules unable to invoke."
)

from .config import AlluxioClientConfig
from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
from .const import ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE
from .const import ALLUXIO_COMMON_EXTENSION_ENABLE
from .const import ALLUXIO_HASH_NODE_PER_WORKER_KEY1
from .const import ALLUXIO_HASH_NODE_PER_WORKER_KEY2
from .const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
from .const import ALLUXIO_PAGE_SIZE_KEY
from .const import ALLUXIO_SUCCESS_IDENTIFIER
from .const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE
from .const import FULL_PAGE_URL_FORMAT
from .const import GET_FILE_STATUS_URL_FORMAT
from .const import LIST_URL_FORMAT
Expand Down Expand Up @@ -119,14 +117,7 @@ class AlluxioClient:

def __init__(
self,
etcd_hosts=None,
worker_hosts=None,
options=None,
concurrency=64,
etcd_port=2379,
worker_http_port=ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE,
etcd_refresh_workers_interval=120,
test_options=None,
**kwargs,
):
"""
Inits Alluxio file system.
Expand All @@ -150,101 +141,23 @@ def __init__(
The interval to refresh worker list from ETCD membership service periodically. All negative values mean the service is disabled.
"""
# TODO(lu/chunxu) change to ETCD endpoints in format of 'http://etcd_host:port, http://etcd_host:port' & worker hosts in 'host:port, host:port' format
if not (etcd_hosts or worker_hosts):
raise ValueError(
"Must supply either 'etcd_hosts' or 'worker_hosts'"
)
if etcd_hosts and worker_hosts:
raise ValueError(
"Supply either 'etcd_hosts' or 'worker_hosts', not both"
)
if not etcd_hosts:
logger.warning(
"'etcd_hosts' not supplied. An etcd cluster is required for dynamic cluster changes."
)
if not isinstance(etcd_port, int) or not (1 <= etcd_port <= 65535):
raise ValueError(
"'etcd_port' should be an integer in the range 1-65535"
)
if not isinstance(worker_http_port, int) or not (
1 <= worker_http_port <= 65535
):
raise ValueError(
"'worker_http_port' should be an integer in the range 1-65535"
)
if not isinstance(concurrency, int) or concurrency <= 0:
raise ValueError("'concurrency' should be a positive integer")
if concurrency < 10 or concurrency > 128:
logger.warning(
f"'concurrency' value of {concurrency} is outside the recommended range (10-128). "
"This may lead to suboptimal performance or resource utilization.",
)
if not isinstance(etcd_refresh_workers_interval, int):
raise ValueError(
"'etcd_refresh_workers_interval' should be an integer"
)

self.session = self._create_session(concurrency)

# parse options
page_size = ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
hash_node_per_worker = ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
self.config = AlluxioClientConfig(**kwargs)
self.session = self._create_session(self.config.concurrency)
self.hash_provider = ConsistentHashProvider(self.config)
self.data_manager = None
if options:
if ALLUXIO_PAGE_SIZE_KEY in options:
page_size = options[ALLUXIO_PAGE_SIZE_KEY]
logger.debug(f"Page size is set to {page_size}")
if ALLUXIO_HASH_NODE_PER_WORKER_KEY1 in options:
hash_node_per_worker = int(
options[ALLUXIO_HASH_NODE_PER_WORKER_KEY1]
)
logger.debug(
f"Hash node per worker is set to {hash_node_per_worker}"
)
if ALLUXIO_HASH_NODE_PER_WORKER_KEY2 in options:
hash_node_per_worker = int(
options[ALLUXIO_HASH_NODE_PER_WORKER_KEY2]
)
logger.debug(
f"Hash node per worker is set to {hash_node_per_worker}"
)
if (
ALLUXIO_COMMON_EXTENSION_ENABLE in options
and options[ALLUXIO_COMMON_EXTENSION_ENABLE].lower() == "true"
):
print("alluxiocommon extension enabled.")
logger.info("alluxiocommon extension enabled.")
ondemand_pool_disabled = (
ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE in options
and options[ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE].lower()
== "true"
)
self.data_manager = _DataManager(
concurrency, ondemand_pool_disabled=ondemand_pool_disabled
)
if (
not isinstance(hash_node_per_worker, int)
or hash_node_per_worker <= 0
):
raise ValueError(
"'hash_node_per_worker' should be a positive integer"
if kwargs.get(ALLUXIO_COMMON_EXTENSION_ENABLE, False):
logger.info("alluxiocommon extension enabled.")
self.data_manager = _DataManager(
self.config.concurrency,
ondemand_pool_disabled=kwargs.get(
ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE, False
),
)

self.page_size = humanfriendly.parse_size(page_size, binary=True)
test_options = test_options or {}
test_options = kwargs.get("test_options", {})
set_log_level(logger, test_options)

self.hash_provider = ConsistentHashProvider(
etcd_hosts=etcd_hosts,
etcd_port=etcd_port,
worker_hosts=worker_hosts,
worker_http_port=worker_http_port,
hash_node_per_worker=hash_node_per_worker,
options=options,
etcd_refresh_workers_interval=etcd_refresh_workers_interval,
)

def listdir(self, path):
"""
Lists the directory.
Expand Down Expand Up @@ -655,7 +568,7 @@ def _all_page_generator_alluxiocommon(
yield pages_content
if (
len(pages_content)
< fetching_pages_num_each_round * self.page_size
< fetching_pages_num_each_round * self.config.page_size
):
break
except Exception as e:
Expand Down Expand Up @@ -683,7 +596,7 @@ def _all_page_generator(self, worker_host, worker_http_port, path_id):
if not page_content:
break
yield page_content
if len(page_content) < self.page_size: # last page
if len(page_content) < self.config.page_size: # last page
break
page_index += 1

Expand All @@ -693,13 +606,13 @@ def _range_page_generator_alluxiocommon(
read_urls = []
start = offset
while start < offset + length:
page_index = start // self.page_size
inpage_off = start % self.page_size
page_index = start // self.config.page_size
inpage_off = start % self.config.page_size
inpage_read_len = min(
self.page_size - inpage_off, offset + length - start
self.config.page_size - inpage_off, offset + length - start
)
page_url = None
if inpage_off == 0 and inpage_read_len == self.page_size:
if inpage_off == 0 and inpage_read_len == self.config.page_size:
page_url = FULL_PAGE_URL_FORMAT.format(
worker_host=worker_host,
http_port=worker_http_port,
Expand All @@ -723,23 +636,23 @@ def _range_page_generator_alluxiocommon(
def _range_page_generator(
self, worker_host, worker_http_port, path_id, offset, length
):
start_page_index = offset // self.page_size
start_page_offset = offset % self.page_size
start_page_index = offset // self.config.page_size
start_page_offset = offset % self.config.page_size

end_page_index = (offset + length - 1) // self.page_size
end_page_read_to = ((offset + length - 1) % self.page_size) + 1
end_page_index = (offset + length - 1) // self.config.page_size
end_page_read_to = ((offset + length - 1) % self.config.page_size) + 1

page_index = start_page_index
while True:
try:
read_offset = 0
read_length = self.page_size
read_length = self.config.page_size
if page_index == start_page_index:
read_offset = start_page_offset
if start_page_index == end_page_index:
read_length = end_page_read_to - start_page_offset
else:
read_length = self.page_size - start_page_offset
read_length = self.config.page_size - start_page_offset
elif page_index == end_page_index:
read_length = end_page_read_to

Expand Down Expand Up @@ -1004,13 +917,14 @@ def __init__(
logger.debug(f"Page size is set to {page_size}")
self.page_size = humanfriendly.parse_size(page_size, binary=True)
self.hash_provider = ConsistentHashProvider(
etcd_hosts=etcd_hosts,
etcd_port=int(etcd_port),
worker_hosts=worker_hosts,
worker_http_port=int(http_port),
hash_node_per_worker=ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE,
options=options,
etcd_refresh_workers_interval=120,
AlluxioClientConfig(
etcd_hosts=etcd_hosts,
etcd_port=int(etcd_port),
worker_hosts=worker_hosts,
worker_http_port=int(http_port),
hash_node_per_worker=ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE,
etcd_refresh_workers_interval=120,
)
)
self.http_port = http_port
self._loop = loop or asyncio.get_event_loop()
Expand Down
Loading

0 comments on commit ce10281

Please sign in to comment.