Skip to content

Commit

Permalink
refactor(WIP): refactor dataservice. split subsystems
Browse files Browse the repository at this point in the history
  • Loading branch information
aaraney committed Mar 14, 2024
1 parent 66b226a commit d184db0
Show file tree
Hide file tree
Showing 2 changed files with 364 additions and 236 deletions.
75 changes: 62 additions & 13 deletions python/services/dataservice/dmod/dataservice/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import logging

from python.services.evaluationservice.dmod.evaluationservice.evaluation_service import data

from .dataset_inquery_util import DatasetInqueryUtil

from .data_derive_util import DataDeriveUtil
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
Expand All @@ -9,6 +15,7 @@
from . import name as package_name
from .service import ServiceManager
from .service_settings import ServiceSettings
from .service import ServiceManager, init_docker_s3fs_plugin_helper, init_object_store_dataset_manager, init_filesystem_dataset_manager, RequiredDataChecksManager, DataProvisionManager, TempDatasetsManager, Count
from .dataset_manager_collection import DatasetManagerCollection
from dmod.scheduler.job import DefaultJobUtilFactory
from pathlib import Path
Expand Down Expand Up @@ -139,38 +146,80 @@ def main():
redis_pass = args.redis_pass

service_settings = ServiceSettings()

# Initialize a job util via the default factory, which requires some Redis params
job_util = DefaultJobUtilFactory.factory_create(redis_host=args.redis_host, redis_port=args.redis_port,
redis_pass=redis_pass)

dataset_manager_collection = DatasetManagerCollection()
data_derive_util = DataDeriveUtil(dataset_manager_collection=dataset_manager_collection)
dataset_inquery_util = DatasetInqueryUtil(dataset_manager_collection=dataset_manager_collection, derive_util=data_derive_util)

# Initiate a service manager WebsocketHandler implementation for primary messaging and async task loops
service_manager = ServiceManager(
job_util=job_util,
listen_host=listen_host,
port=args.port,
ssl_dir=Path(args.ssl_dir),
settings=service_settings,
dataset_manager_collection=dataset_manager_collection,
dataset_inquery_util=dataset_inquery_util,
)

docker_s3fs_plugin_helper = None
# If we are set to use the object store ...
if use_obj_store:
# TODO: need to adjust arg groupings to allow for this to be cleaned up some
access_key_file = None if args.obj_store_access_key is None else secrets_dir.joinpath(args.obj_store_access_key)
secret_key_file = None if args.obj_store_secret_key is None else secrets_dir.joinpath(args.obj_store_secret_key)
service_manager.init_object_store_dataset_manager(obj_store_host=args.obj_store_host,
port=args.obj_store_port,
access_key=access_key_file.read_text().strip(),
secret_key=secret_key_file.read_text().strip())
access_key = (
None
if args.obj_store_access_key is None
else (secrets_dir / args.obj_store_access_key).read_text().strip()
)
secret_key = (
None
if args.obj_store_secret_key is None
else (secrets_dir / args.obj_store_secret_key).read_text().strip()
)
object_store_dataset_manager = init_object_store_dataset_manager(
obj_store_host=args.obj_store_host,
port=args.obj_store_port,
access_key=access_key,
secret_key=secret_key,
)
dataset_manager_collection.add(object_store_dataset_manager)

docker_s3fs_plugin_helper = init_docker_s3fs_plugin_helper(
dataset_manager_collection=dataset_manager_collection,
access_key=access_key,
secret_key=secret_key,
settings=service_settings,
)

if args.file_dataset_config_dir is not None:
service_manager.init_filesystem_dataset_manager(Path(args.file_dataset_config_dir))
filesystem_dataset_manager = init_filesystem_dataset_manager(
Path(args.file_dataset_config_dir)
)
dataset_manager_collection.add(filesystem_dataset_manager)


count = Count()
required_data_checks_manager = RequiredDataChecksManager(
job_util=job_util,
dataset_manager_collection=dataset_manager_collection,
count=count,
dataset_inquery_util=dataset_inquery_util,
)
# TODO: seems to only work when docker s3fs is present
# TODO: does this need an injected data derive util?
data_provision_manager = DataProvisionManager(job_util=job_util,
managers=dataset_manager_collection,
docker_s3fs_helper=docker_s3fs_plugin_helper,
data_derive_util=data_derive_util,
count=count,
)
temp_datasets_manager = TempDatasetsManager(managers=dataset_manager_collection, count=count)

# Setup other required async tasks
service_manager.add_async_task(service_manager.manage_required_data_checks())
service_manager.add_async_task(service_manager.manage_data_provision())
service_manager.add_async_task(service_manager.manage_temp_datasets())
service_manager.add_async_task(required_data_checks_manager.start())
service_manager.add_async_task(data_provision_manager.start())
service_manager.add_async_task(temp_datasets_manager.start())

service_manager.run()

Expand Down
Loading

0 comments on commit d184db0

Please sign in to comment.