diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index 74bb5f018..ee2ea8cf5 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -9,7 +9,7 @@ import sys import time import uuid -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple import yaml from clp_py_utils.clp_config import ( @@ -59,6 +59,22 @@ validate_worker_config, ) +# Constants +COMPONENT_NAMES = ( + CONTROLLER_TARGET_NAME, + DB_COMPONENT_NAME, + QUEUE_COMPONENT_NAME, + REDIS_COMPONENT_NAME, + RESULTS_CACHE_COMPONENT_NAME, + COMPRESSION_SCHEDULER_COMPONENT_NAME, + QUERY_SCHEDULER_COMPONENT_NAME, + COMPRESSION_WORKER_COMPONENT_NAME, + QUERY_WORKER_COMPONENT_NAME, + REDUCER_COMPONENT_NAME, + WEBUI_COMPONENT_NAME, + LOG_VIEWER_WEBUI_COMPONENT_NAME, +) + # Setup logging # Create logger logger = logging.getLogger("clp") @@ -981,6 +997,24 @@ def add_num_workers_argument(parser): ) +def component_should_start( + target: str, + exclude: List[str], + component_names: Tuple[str, ...], +) -> bool: + """ + Determines whether a component should start based on the target and exclusion list. + :param target: The target name specified by the user. + :param exclude: A list of component names that should be excluded from starting. + :param component_names: A tuple of components to which the target component corresponds + or depends on. + :return: True if the component should start, False otherwise. + """ + return target in (ALL_TARGET_NAME, *component_names) and all( + name not in exclude for name in component_names + ) + + def main(argv): clp_home = get_clp_home() default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH @@ -993,29 +1027,31 @@ def main(argv): help="CLP package configuration file.", ) + args_parser.add_argument( + "--exclude", "-no", nargs="+", default=[], help="Exclude component(s) from being started." + ) + component_args_parser = args_parser.add_subparsers(dest="target") - component_args_parser.add_parser(CONTROLLER_TARGET_NAME) - component_args_parser.add_parser(DB_COMPONENT_NAME) - component_args_parser.add_parser(QUEUE_COMPONENT_NAME) - component_args_parser.add_parser(REDIS_COMPONENT_NAME) - component_args_parser.add_parser(RESULTS_CACHE_COMPONENT_NAME) - component_args_parser.add_parser(COMPRESSION_SCHEDULER_COMPONENT_NAME) - component_args_parser.add_parser(QUERY_SCHEDULER_COMPONENT_NAME) - compression_worker_parser = component_args_parser.add_parser(COMPRESSION_WORKER_COMPONENT_NAME) - add_num_workers_argument(compression_worker_parser) - query_worker_parser = component_args_parser.add_parser(QUERY_WORKER_COMPONENT_NAME) - add_num_workers_argument(query_worker_parser) - reducer_server_parser = component_args_parser.add_parser(REDUCER_COMPONENT_NAME) - add_num_workers_argument(reducer_server_parser) - component_args_parser.add_parser(WEBUI_COMPONENT_NAME) - component_args_parser.add_parser(LOG_VIEWER_WEBUI_COMPONENT_NAME) + for component in COMPONENT_NAMES: + parser = component_args_parser.add_parser(component) + if component in ( + COMPRESSION_WORKER_COMPONENT_NAME, + QUERY_WORKER_COMPONENT_NAME, + REDUCER_COMPONENT_NAME, + ): + add_num_workers_argument(parser) parsed_args = args_parser.parse_args(argv[1:]) + target = ALL_TARGET_NAME + excluded_components = parsed_args.exclude if parsed_args.target: target = parsed_args.target - else: - target = ALL_TARGET_NAME + if 0 != len(excluded_components): + for component in excluded_components: + if component not in COMPONENT_NAMES: + raise ValueError(f"Unrecognized component {component} in --exclude list.") + logger.info(f"Starting all components except: {excluded_components}") try: check_dependencies() @@ -1096,38 +1132,49 @@ def main(argv): conf_dir = clp_home / "etc" # Start components - if target in (ALL_TARGET_NAME, DB_COMPONENT_NAME): + # fmt: off + if component_should_start(target, excluded_components, + (DB_COMPONENT_NAME,)): start_db(instance_id, clp_config, conf_dir) - if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, DB_COMPONENT_NAME): + if component_should_start(target, excluded_components, + (CONTROLLER_TARGET_NAME, DB_COMPONENT_NAME,)): create_db_tables(instance_id, clp_config, container_clp_config, mounts) - if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, QUEUE_COMPONENT_NAME): + if component_should_start(target, excluded_components, + (CONTROLLER_TARGET_NAME, QUEUE_COMPONENT_NAME,)): start_queue(instance_id, clp_config) - if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, REDIS_COMPONENT_NAME): + if component_should_start(target, excluded_components, + (CONTROLLER_TARGET_NAME, REDIS_COMPONENT_NAME,)): start_redis(instance_id, clp_config, conf_dir) - if target in (ALL_TARGET_NAME, RESULTS_CACHE_COMPONENT_NAME): + if component_should_start(target, excluded_components, + (CONTROLLER_TARGET_NAME, RESULTS_CACHE_COMPONENT_NAME,)): start_results_cache(instance_id, clp_config, conf_dir) - if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, RESULTS_CACHE_COMPONENT_NAME): + if component_should_start(target, excluded_components, + (CONTROLLER_TARGET_NAME, RESULTS_CACHE_COMPONENT_NAME,)): create_results_cache_indices(instance_id, clp_config, container_clp_config, mounts) - if target in ( - ALL_TARGET_NAME, - CONTROLLER_TARGET_NAME, - COMPRESSION_SCHEDULER_COMPONENT_NAME, - ): + if component_should_start(target, excluded_components, + (CONTROLLER_TARGET_NAME, COMPRESSION_SCHEDULER_COMPONENT_NAME,)): start_compression_scheduler(instance_id, clp_config, container_clp_config, mounts) - if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, QUERY_SCHEDULER_COMPONENT_NAME): + if component_should_start(target, excluded_components, + (CONTROLLER_TARGET_NAME, QUERY_SCHEDULER_COMPONENT_NAME,)): start_query_scheduler(instance_id, clp_config, container_clp_config, mounts) - if target in (ALL_TARGET_NAME, COMPRESSION_WORKER_COMPONENT_NAME): + if component_should_start(target, excluded_components, + (COMPRESSION_WORKER_COMPONENT_NAME,)): start_compression_worker( instance_id, clp_config, container_clp_config, num_workers, mounts ) - if target in (ALL_TARGET_NAME, QUERY_WORKER_COMPONENT_NAME): + if component_should_start(target, excluded_components, + (QUERY_WORKER_COMPONENT_NAME,)): start_query_worker(instance_id, clp_config, container_clp_config, num_workers, mounts) - if target in (ALL_TARGET_NAME, REDUCER_COMPONENT_NAME): + if component_should_start(target, excluded_components, + (REDUCER_COMPONENT_NAME,)): start_reducer(instance_id, clp_config, container_clp_config, num_workers, mounts) - if target in (ALL_TARGET_NAME, WEBUI_COMPONENT_NAME): + if component_should_start(target, excluded_components, + (WEBUI_COMPONENT_NAME,)): start_webui(instance_id, clp_config, mounts) - if target in (ALL_TARGET_NAME, LOG_VIEWER_WEBUI_COMPONENT_NAME): + if component_should_start(target, excluded_components, + (LOG_VIEWER_WEBUI_COMPONENT_NAME,)): start_log_viewer_webui(instance_id, clp_config, container_clp_config, mounts) + # fmt: on except Exception as ex: if type(ex) == ValueError: