diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index fbc2ea061..cbb95b765 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -9,7 +9,13 @@ import yaml -from clp_py_utils.clp_config import CLPConfig, CLP_DEFAULT_CREDENTIALS_FILE_PATH +from clp_py_utils.clp_config import ( + CLPConfig, + CLP_DEFAULT_CREDENTIALS_FILE_PATH, + DB_COMPONENT_NAME, + QUEUE_COMPONENT_NAME, + RESULTS_CACHE_COMPONENT_NAME +) from clp_py_utils.core import ( get_config_value, make_config_path_absolute, @@ -18,12 +24,6 @@ ) # CONSTANTS -# Component names -DB_COMPONENT_NAME = 'db' -QUEUE_COMPONENT_NAME = 'queue' -SCHEDULER_COMPONENT_NAME = 'scheduler' -WORKER_COMPONENT_NAME = 'worker' - # Paths CONTAINER_CLP_HOME = pathlib.Path('/') / 'opt' / 'clp' CONTAINER_INPUT_LOGS_ROOT_DIR = pathlib.Path('/') / 'mnt' / 'logs' @@ -240,23 +240,38 @@ def validate_db_config(clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: try: validate_path_could_be_dir(data_dir) except ValueError as ex: - raise ValueError(f"database data directory is invalid: {ex}") + raise ValueError(f"{DB_COMPONENT_NAME} data directory is invalid: {ex}") try: validate_path_could_be_dir(logs_dir) except ValueError as ex: - raise ValueError(f"database logs directory is invalid: {ex}") + raise ValueError(f"{DB_COMPONENT_NAME} logs directory is invalid: {ex}") - validate_port("database.port", clp_config.database.host, clp_config.database.port) + validate_port(f"{DB_COMPONENT_NAME}.port", clp_config.database.host, clp_config.database.port) def validate_queue_config(clp_config: CLPConfig, logs_dir: pathlib.Path): try: validate_path_could_be_dir(logs_dir) except ValueError as ex: - raise ValueError(f"queue logs directory is invalid: {ex}") + raise ValueError(f"{QUEUE_COMPONENT_NAME} logs directory is invalid: {ex}") + + validate_port(f"{QUEUE_COMPONENT_NAME}.port", clp_config.queue.host, clp_config.queue.port) + + +def validate_results_cache_config(clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path): + try: + validate_path_could_be_dir(data_dir) + except ValueError as ex: + raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME} data directory is invalid: {ex}") + + try: + validate_path_could_be_dir(logs_dir) + except ValueError as ex: + raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME} logs directory is invalid: {ex}") - validate_port("queue.port", clp_config.queue.host, clp_config.queue.port) + validate_port(f"{RESULTS_CACHE_COMPONENT_NAME}.port", clp_config.results_cache.host, + clp_config.results_cache.port) def validate_worker_config(clp_config: CLPConfig): diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index 9deaca9f0..094054042 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -14,10 +14,6 @@ from clp_package_utils.general import ( CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, CONTAINER_CLP_HOME, - DB_COMPONENT_NAME, - QUEUE_COMPONENT_NAME, - SCHEDULER_COMPONENT_NAME, - WORKER_COMPONENT_NAME, check_dependencies, container_exists, CLPDockerMounts, @@ -30,9 +26,17 @@ validate_and_load_queue_credentials_file, validate_db_config, validate_queue_config, + validate_results_cache_config, validate_worker_config ) -from clp_py_utils.clp_config import CLPConfig +from clp_py_utils.clp_config import ( + CLPConfig, + DB_COMPONENT_NAME, + QUEUE_COMPONENT_NAME, + RESULTS_CACHE_COMPONENT_NAME, + SCHEDULER_COMPONENT_NAME, + WORKER_COMPONENT_NAME, +) from job_orchestration.scheduler.constants import QueueName # Setup logging @@ -78,16 +82,16 @@ def wait_for_database_to_init(container_name: str, clp_config: CLPConfig, timeou break time.sleep(1) - logger.error("Timeout while waiting for database to initialize.") + logger.error(f"Timeout while waiting for {DB_COMPONENT_NAME} to initialize.") return False def start_db(instance_id: str, clp_config: CLPConfig, conf_dir: pathlib.Path): - logger.info("Starting database...") + logger.info(f"Starting {DB_COMPONENT_NAME}...") container_name = f'clp-{DB_COMPONENT_NAME}-{instance_id}' if container_exists(container_name): - logger.info("Database already running.") + logger.info(f"{DB_COMPONENT_NAME} already running.") return db_data_dir = clp_config.data_directory / DB_COMPONENT_NAME @@ -128,15 +132,15 @@ def start_db(instance_id: str, clp_config: CLPConfig, conf_dir: pathlib.Path): subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) if not wait_for_database_to_init(container_name, clp_config, 30): - raise EnvironmentError("Database did not initialize in time") + raise EnvironmentError(f"{DB_COMPONENT_NAME} did not initialize in time") - logger.info("Started database.") + logger.info(f"Started {DB_COMPONENT_NAME}.") def create_db_tables(instance_id: str, clp_config: CLPConfig, container_clp_config: CLPConfig, mounts: CLPDockerMounts): - logger.info("Creating database tables...") + logger.info(f"Creating {DB_COMPONENT_NAME} tables...") - container_name = f'clp-db-table-creator-{instance_id}' + container_name = f'clp-{DB_COMPONENT_NAME}-table-creator-{instance_id}' # Create database config file db_config_filename = f'{container_name}.yml' @@ -175,15 +179,15 @@ def create_db_tables(instance_id: str, clp_config: CLPConfig, container_clp_conf db_config_file_path.unlink() - logger.info("Created database tables.") + logger.info(f"Created {DB_COMPONENT_NAME} tables.") def start_queue(instance_id: str, clp_config: CLPConfig): - logger.info("Starting queue...") + logger.info(f"Starting {QUEUE_COMPONENT_NAME}...") container_name = f'clp-{QUEUE_COMPONENT_NAME}-{instance_id}' if container_exists(container_name): - logger.info("Queue already running.") + logger.info(f"{QUEUE_COMPONENT_NAME} already running.") return queue_logs_dir = clp_config.logs_directory / QUEUE_COMPONENT_NAME @@ -234,15 +238,56 @@ def start_queue(instance_id: str, clp_config: CLPConfig): ] subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) - logger.info("Started queue.") + logger.info(f"Started {QUEUE_COMPONENT_NAME}.") + + +def start_results_cache(instance_id: str, clp_config: CLPConfig, conf_dir: pathlib.Path): + logger.info(f"Starting {RESULTS_CACHE_COMPONENT_NAME}...") + + container_name = f'clp-{RESULTS_CACHE_COMPONENT_NAME}-{instance_id}' + if container_exists(container_name): + logger.info(f"{RESULTS_CACHE_COMPONENT_NAME} already running.") + return + + data_dir = clp_config.data_directory / RESULTS_CACHE_COMPONENT_NAME + logs_dir = clp_config.logs_directory / RESULTS_CACHE_COMPONENT_NAME + + validate_results_cache_config(clp_config, data_dir, logs_dir) + + data_dir.mkdir(exist_ok=True, parents=True) + logs_dir.mkdir(exist_ok=True, parents=True) + + mounts = [ + DockerMount(DockerMountType.BIND, conf_dir / 'mongo', pathlib.Path('/') / 'etc' / 'mongo', + True), + DockerMount(DockerMountType.BIND, data_dir, pathlib.Path('/') / 'data' / 'db'), + DockerMount(DockerMountType.BIND, logs_dir, pathlib.Path('/') / 'var' / 'log' / 'mongodb'), + ] + cmd = [ + 'docker', 'run', + '-d', + '--rm', + '--name', container_name, + '-u', f'{os.getuid()}:{os.getgid()}', + ] + for mount in mounts: + cmd.append('--mount') + cmd.append(str(mount)) + append_docker_port_settings_for_host_ips(clp_config.results_cache.host, clp_config.results_cache.port, 27017, cmd) + cmd.append('mongo:7.0.1') + cmd.append('--config') + cmd.append(str(pathlib.Path('/') / 'etc' / 'mongo' / 'mongod.conf')) + subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) + + logger.info(f"Started {RESULTS_CACHE_COMPONENT_NAME}.") def start_scheduler(instance_id: str, clp_config: CLPConfig, container_clp_config: CLPConfig, mounts: CLPDockerMounts): - logger.info("Starting scheduler...") + logger.info(f"Starting {SCHEDULER_COMPONENT_NAME}...") container_name = f'clp-{SCHEDULER_COMPONENT_NAME}-{instance_id}' if container_exists(container_name): - logger.info("Scheduler already running.") + logger.info(f"{SCHEDULER_COMPONENT_NAME} already running.") return container_config_filename = f'{container_name}.yml' @@ -282,16 +327,16 @@ def start_scheduler(instance_id: str, clp_config: CLPConfig, container_clp_confi cmd = container_start_cmd + scheduler_cmd subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) - logger.info("Started scheduler.") + logger.info(f"Started {SCHEDULER_COMPONENT_NAME}.") def start_worker(instance_id: str, clp_config: CLPConfig, container_clp_config: CLPConfig, num_cpus: int, mounts: CLPDockerMounts): - logger.info("Starting worker...") + logger.info(f"Starting {WORKER_COMPONENT_NAME}...") container_name = f'clp-{WORKER_COMPONENT_NAME}-{instance_id}' if container_exists(container_name): - logger.info("Worker already running.") + logger.info(f"{WORKER_COMPONENT_NAME} already running.") return validate_worker_config(clp_config) @@ -345,7 +390,7 @@ def start_worker(instance_id: str, clp_config: CLPConfig, container_clp_config: cmd = container_start_cmd + worker_cmd subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) - logger.info("Started worker.") + logger.info(f"Started {WORKER_COMPONENT_NAME}.") def main(argv): @@ -359,6 +404,7 @@ def main(argv): component_args_parser = args_parser.add_subparsers(dest='component_name') component_args_parser.add_parser(DB_COMPONENT_NAME) component_args_parser.add_parser(QUEUE_COMPONENT_NAME) + component_args_parser.add_parser(RESULTS_CACHE_COMPONENT_NAME) component_args_parser.add_parser(SCHEDULER_COMPONENT_NAME) worker_args_parser = component_args_parser.add_parser(WORKER_COMPONENT_NAME) worker_args_parser.add_argument('--num-cpus', type=int, default=0, @@ -425,6 +471,8 @@ def main(argv): create_db_tables(instance_id, clp_config, container_clp_config, mounts) if '' == component_name or QUEUE_COMPONENT_NAME == component_name: start_queue(instance_id, clp_config) + if '' == component_name or RESULTS_CACHE_COMPONENT_NAME == component_name: + start_results_cache(instance_id, clp_config, conf_dir) if '' == component_name or SCHEDULER_COMPONENT_NAME == component_name: start_scheduler(instance_id, clp_config, container_clp_config, mounts) if '' == component_name or WORKER_COMPONENT_NAME == component_name: diff --git a/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py b/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py index 2854f5d58..d1c8c6397 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py @@ -6,16 +6,19 @@ from clp_package_utils.general import ( CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, - DB_COMPONENT_NAME, - QUEUE_COMPONENT_NAME, - SCHEDULER_COMPONENT_NAME, - WORKER_COMPONENT_NAME, container_exists, get_clp_home, validate_and_load_config_file, validate_and_load_db_credentials_file, validate_and_load_queue_credentials_file ) +from clp_py_utils.clp_config import ( + DB_COMPONENT_NAME, + QUEUE_COMPONENT_NAME, + RESULTS_CACHE_COMPONENT_NAME, + SCHEDULER_COMPONENT_NAME, + WORKER_COMPONENT_NAME +) # Setup logging # Create logger @@ -49,6 +52,7 @@ def main(argv): component_args_parser = args_parser.add_subparsers(dest='component_name') component_args_parser.add_parser(DB_COMPONENT_NAME) component_args_parser.add_parser(QUEUE_COMPONENT_NAME) + component_args_parser.add_parser(RESULTS_CACHE_COMPONENT_NAME) component_args_parser.add_parser(SCHEDULER_COMPONENT_NAME) component_args_parser.add_parser(WORKER_COMPONENT_NAME) @@ -92,6 +96,9 @@ def main(argv): container_config_file_path = logs_dir / f'{container_name}.yml' if container_config_file_path.exists(): container_config_file_path.unlink() + if '' == component_name or RESULTS_CACHE_COMPONENT_NAME == component_name: + container_name = f'clp-{RESULTS_CACHE_COMPONENT_NAME}-{instance_id}' + stop_container(container_name) if '' == component_name or QUEUE_COMPONENT_NAME == component_name: container_name = f'clp-{QUEUE_COMPONENT_NAME}-{instance_id}' stop_container(container_name) @@ -100,7 +107,7 @@ def main(argv): if queue_config_file_path.exists(): queue_config_file_path.unlink() if '' == component_name or DB_COMPONENT_NAME == component_name: - stop_container(f'clp-db-{instance_id}') + stop_container(f'clp-{DB_COMPONENT_NAME}-{instance_id}') if '' == component_name: # NOTE: We can only remove the instance ID file if all containers have been stopped. diff --git a/components/clp-py-utils/clp_py_utils/clp_config.py b/components/clp-py-utils/clp_py_utils/clp_config.py index fa025b0ff..daa8f887e 100644 --- a/components/clp-py-utils/clp_py_utils/clp_config.py +++ b/components/clp-py-utils/clp_py_utils/clp_config.py @@ -6,9 +6,17 @@ from .core import get_config_value, make_config_path_absolute, read_yaml_config_file, validate_path_could_be_dir # Constants +# Component names +DB_COMPONENT_NAME = 'database' +QUEUE_COMPONENT_NAME = 'queue' +RESULTS_CACHE_COMPONENT_NAME = 'results_cache' +SCHEDULER_COMPONENT_NAME = 'scheduler' +WORKER_COMPONENT_NAME = 'worker' + CLP_DEFAULT_CREDENTIALS_FILE_PATH = pathlib.Path('etc') / 'credentials.yml' CLP_METADATA_TABLE_PREFIX = 'clp_' + class Database(BaseModel): type: str = 'mariadb' host: str = 'localhost' @@ -93,6 +101,17 @@ class Scheduler(BaseModel): jobs_poll_delay: int = 1 # seconds +class ResultsCache(BaseModel): + host: str = 'localhost' + port: int = 27017 + + @validator('host') + def validate_host(cls, field): + if '' == field: + raise ValueError(f'{RESULTS_CACHE_COMPONENT_NAME}.host cannot be empty.') + return field + + class Queue(BaseModel): host: str = 'localhost' port: int = 5672 @@ -148,8 +167,9 @@ class CLPConfig(BaseModel): input_logs_directory: pathlib.Path = pathlib.Path('/') database: Database = Database() - scheduler: Scheduler = Scheduler() queue: Queue = Queue() + results_cache: ResultsCache = ResultsCache() + scheduler: Scheduler = Scheduler() credentials_file_path: pathlib.Path = CLP_DEFAULT_CREDENTIALS_FILE_PATH archive_output: ArchiveOutput = ArchiveOutput() @@ -195,8 +215,8 @@ def load_database_credentials_from_file(self): if config is None: raise ValueError(f"Credentials file '{self.credentials_file_path}' is empty.") try: - self.database.username = get_config_value(config, 'db.user') - self.database.password = get_config_value(config, 'db.password') + self.database.username = get_config_value(config, f'{DB_COMPONENT_NAME}.user') + self.database.password = get_config_value(config, f'{DB_COMPONENT_NAME}.password') except KeyError as ex: raise ValueError(f"Credentials file '{self.credentials_file_path}' does not contain key '{ex}'.") @@ -205,8 +225,8 @@ def load_queue_credentials_from_file(self): if config is None: raise ValueError(f"Credentials file '{self.credentials_file_path}' is empty.") try: - self.queue.username = get_config_value(config, "queue.user") - self.queue.password = get_config_value(config, "queue.password") + self.queue.username = get_config_value(config, f"{QUEUE_COMPONENT_NAME}.user") + self.queue.password = get_config_value(config, f"{QUEUE_COMPONENT_NAME}.password") except KeyError as ex: raise ValueError(f"Credentials file '{self.credentials_file_path}' does not contain key '{ex}'.") diff --git a/components/package-template/src/README.md b/components/package-template/src/README.md index ab4136f3e..c17fc05f7 100644 --- a/components/package-template/src/README.md +++ b/components/package-template/src/README.md @@ -113,8 +113,9 @@ On the control node, run the following commands (these must be started in the order below): ```bash -sbin/start-clp.sh db +sbin/start-clp.sh database sbin/start-clp.sh queue +sbin/start-clp.sh results_cache sbin/start-clp.sh scheduler ``` diff --git a/components/package-template/src/etc/clp-config.yml b/components/package-template/src/etc/clp-config.yml index 56e5cffe0..3c0b94784 100644 --- a/components/package-template/src/etc/clp-config.yml +++ b/components/package-template/src/etc/clp-config.yml @@ -20,6 +20,10 @@ # host: "localhost" # port: 5672 # +#results_cache: +# host: "localhost" +# port: 27017 +# ## Where archives should be output to #archive_output: # directory: "var/data/archives" diff --git a/components/package-template/src/etc/credentials.template.yml b/components/package-template/src/etc/credentials.template.yml index 51cfcf865..54bd78321 100644 --- a/components/package-template/src/etc/credentials.template.yml +++ b/components/package-template/src/etc/credentials.template.yml @@ -1,5 +1,5 @@ ## Database credentials -#db: +#database: # user: "user" # password: "pass" # diff --git a/components/package-template/src/etc/mongo/mongod.conf b/components/package-template/src/etc/mongo/mongod.conf new file mode 100644 index 000000000..510d6ba5f --- /dev/null +++ b/components/package-template/src/etc/mongo/mongod.conf @@ -0,0 +1,7 @@ +net: + bindIp: 0.0.0.0 +systemLog: + destination: file + path: /var/log/mongodb/mongod.log + logAppend: true + timeStampFormat: iso8601-local