Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor dataservice #548

Merged
merged 7 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 178 additions & 53 deletions python/services/dataservice/dmod/dataservice/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,131 @@
)

import argparse
from . import name as package_name
from .service import ServiceManager
from .service_settings import ServiceSettings
from .dataset_manager_collection import DatasetManagerCollection
from dmod.scheduler.job import DefaultJobUtilFactory
from pathlib import Path
from socket import gethostname

from dmod.scheduler.job import DefaultJobUtilFactory
from dmod.modeldata.data.filesystem_manager import FilesystemDatasetManager
from dmod.modeldata.data.object_store_manager import ObjectStoreDatasetManager
aaraney marked this conversation as resolved.
Show resolved Hide resolved
from dmod.core.dataset import DatasetType

from . import name as package_name
from .data_derive_util import DataDeriveUtil
from .dataset_inquery_util import DatasetInqueryUtil
from .dataset_manager_collection import DatasetManagerCollection
from .service import (
ActiveOperationTracker,
DataProvisionManager,
RequiredDataChecksManager,
ServiceManager,
TempDataTaskManager,
DockerS3FSPluginHelper,
)
from .service_settings import ServiceSettings


def main():
args = _handle_args()

if args.pycharm_debug:
_setup_remote_debugging(args)
else:
logging.info("Skipping data service remote debugging setup.")

listen_host = gethostname() if args.host is None else args.host
# Flip this here to be less confusing
use_obj_store = not args.no_obj_store
# TODO: DataProvisionManager requires object store, so this is currently required. Add functionality to allow usage
# without object store. fail fast for now.
assert use_obj_store, "Object store is currently required"

secrets_dir = Path('/run/secrets')

# Figure out Redis password, trying for a Docker secret first
if args.redis_pass_secret is not None:
redis_pass_secret_file = secrets_dir.joinpath(args.redis_pass_secret)
redis_pass = redis_pass_secret_file.read_text().strip()
else:
redis_pass = args.redis_pass

# Initialize objects that will be injected and shared by service subsystems
service_settings = ServiceSettings()
# Initialize a job util via the default factory, which requires some Redis params
job_util = DefaultJobUtilFactory.factory_create(redis_host=args.redis_host, redis_port=args.redis_port,
redis_pass=redis_pass)
# Datasets creation and access go through this object
dataset_manager_collection = DatasetManagerCollection()
data_derive_util = DataDeriveUtil(dataset_manager_collection=dataset_manager_collection)
dataset_inquery_util = DatasetInqueryUtil(dataset_manager_collection=dataset_manager_collection, derive_util=data_derive_util)

# TODO: need to adjust arg groupings to allow for this to be cleaned up some
access_key = (
None
if args.obj_store_access_key is None
else (secrets_dir / args.obj_store_access_key).read_text().strip()
)
secret_key = (
None
if args.obj_store_secret_key is None
else (secrets_dir / args.obj_store_secret_key).read_text().strip()
)
object_store_dataset_manager = _init_object_store_dataset_manager(
obj_store_host=args.obj_store_host,
port=args.obj_store_port,
access_key=access_key,
secret_key=secret_key,
)
dataset_manager_collection.add(object_store_dataset_manager)

docker_s3fs_plugin_helper = _init_docker_s3fs_plugin_helper(
dataset_manager_collection=dataset_manager_collection,
access_key=access_key,
secret_key=secret_key,
settings=service_settings,
)

if args.file_dataset_config_dir is not None:
filesystem_dataset_manager = _init_filesystem_dataset_manager(
Path(args.file_dataset_config_dir)
)
dataset_manager_collection.add(filesystem_dataset_manager)


# count is used to signal when it is okay to remove temporary datasets
count = ActiveOperationTracker()

# initialize background task objects
required_data_checks_manager = RequiredDataChecksManager(
job_util=job_util,
dataset_manager_collection=dataset_manager_collection,
checks_underway_tracker=count,
dataset_inquery_util=dataset_inquery_util,
)
data_provision_manager = DataProvisionManager(job_util=job_util,
dataset_manager_collection=dataset_manager_collection,
docker_s3fs_helper=docker_s3fs_plugin_helper,
data_derive_util=data_derive_util,
provision_underway_tracker=count,
)
temp_datasets_manager = TempDataTaskManager(dataset_manager_collection=dataset_manager_collection, safe_to_exec_tracker=count)

# Handles websocket communication and async task loop
service_manager = ServiceManager(
job_util=job_util,
listen_host=listen_host,
port=args.port,
ssl_dir=Path(args.ssl_dir),
dataset_manager_collection=dataset_manager_collection,
dataset_inquery_util=dataset_inquery_util,
)

# Setup other required async tasks
service_manager.add_async_task(required_data_checks_manager.start())
service_manager.add_async_task(data_provision_manager.start())
service_manager.add_async_task(temp_datasets_manager.start())

service_manager.run()


def _handle_args():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
Expand Down Expand Up @@ -117,62 +234,70 @@ def _setup_remote_debugging(args: argparse.Namespace):
msg = 'Warning: could not set debugging trace to {} on {} due to {} - {}'
print(msg.format(args.remote_debug_host, args.remote_debug_port, error.__class__.__name__, str(error)))

def main():
args = _handle_args()

if args.pycharm_debug:
_setup_remote_debugging(args)
else:
logging.info("Skipping data service remote debugging setup.")

listen_host = gethostname() if args.host is None else args.host
# Flip this here to be less confusing
use_obj_store = not args.no_obj_store
def _init_filesystem_dataset_manager(
file_dataset_config_dir: Path,
) -> FilesystemDatasetManager:
logging.info(
"Initializing manager for {} type datasets".format(DatasetType.FILESYSTEM.name)
)
mgr = FilesystemDatasetManager(serialized_files_directory=file_dataset_config_dir)
logging.info(
"{} initialized with {} existing datasets".format(
mgr.__class__.__name__, len(mgr.datasets)
)
)
return mgr

secrets_dir = Path('/run/secrets')

# Figure out Redis password, trying for a Docker secret first
if args.redis_pass_secret is not None:
redis_pass_secret_file = secrets_dir.joinpath(args.redis_pass_secret)
redis_pass = redis_pass_secret_file.read_text().strip()
else:
redis_pass = args.redis_pass
def _init_object_store_dataset_manager(
obj_store_host: str, access_key: str, secret_key: str, port: int = 9000
) -> ObjectStoreDatasetManager:
host_str = "{}:{}".format(obj_store_host, port)
logging.info("Initializing object store dataset manager at {}".format(host_str))
mgr = ObjectStoreDatasetManager(
obj_store_host_str=host_str, access_key=access_key, secret_key=secret_key
)
logging.info(
"Object store dataset manager initialized with {} existing datasets".format(
len(mgr.datasets)
)
)
return mgr

service_settings = ServiceSettings()

# Initialize a job util via the default factory, which requires some Redis params
job_util = DefaultJobUtilFactory.factory_create(redis_host=args.redis_host, redis_port=args.redis_port,
redis_pass=redis_pass)
def _init_docker_s3fs_plugin_helper(
dataset_manager_collection: DatasetManagerCollection,
access_key: str,
secret_key: str,
settings: ServiceSettings,
*args,
**kwargs
) -> DockerS3FSPluginHelper:
s3fs_url_proto = settings.s3fs_url_protocol
s3fs_url_host = settings.s3fs_url_host
s3fs_url_port = settings.s3fs_url_port
if s3fs_url_host is not None:
s3fs_helper_url = "{}://{}:{}/".format(
s3fs_url_proto, s3fs_url_host, s3fs_url_port
)
else:
s3fs_helper_url = None

dataset_manager_collection = DatasetManagerCollection()
# Initiate a service manager WebsocketHandler implementation for primary messaging and async task loops
service_manager = ServiceManager(
job_util=job_util,
listen_host=listen_host,
port=args.port,
ssl_dir=Path(args.ssl_dir),
settings=service_settings,
dataset_manager_collection=dataset_manager_collection,
docker_s3fs_helper = DockerS3FSPluginHelper(
service_manager=dataset_manager_collection,
obj_store_access=access_key,
obj_store_secret=secret_key,
docker_image_name=settings.s3fs_vol_image_name,
docker_image_tag=settings.s3fs_vol_image_tag,
docker_networks=[settings.s3fs_helper_network],
docker_plugin_alias=settings.s3fs_plugin_alias,
obj_store_url=s3fs_helper_url,
*args,
**kwargs
)
return docker_s3fs_helper

# If we are set to use the object store ...
if use_obj_store:
# TODO: need to adjust arg groupings to allow for this to be cleaned up some
access_key_file = None if args.obj_store_access_key is None else secrets_dir.joinpath(args.obj_store_access_key)
secret_key_file = None if args.obj_store_secret_key is None else secrets_dir.joinpath(args.obj_store_secret_key)
service_manager.init_object_store_dataset_manager(obj_store_host=args.obj_store_host,
port=args.obj_store_port,
access_key=access_key_file.read_text().strip(),
secret_key=secret_key_file.read_text().strip())
if args.file_dataset_config_dir is not None:
service_manager.init_filesystem_dataset_manager(Path(args.file_dataset_config_dir))

# Setup other required async tasks
service_manager.add_async_task(service_manager.manage_required_data_checks())
service_manager.add_async_task(service_manager.manage_data_provision())
service_manager.add_async_task(service_manager.manage_temp_datasets())

service_manager.run()


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion python/services/dataservice/dmod/dataservice/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.8.1'
__version__ = '0.8.2'
Loading
Loading