diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index 77cbb8bca0..bb9648018d 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -43,6 +43,7 @@ SEARCH_QUEUE_COMPONENT_NAME, SEARCH_SCHEDULER_COMPONENT_NAME, SEARCH_WORKER_COMPONENT_NAME, + REDUCER_COMPONENT_NAME, COMPRESSION_WORKER_COMPONENT_NAME, COMPRESSION_JOB_HANDLER_COMPONENT_NAME, WEBUI_COMPONENT_NAME, @@ -588,6 +589,71 @@ def start_worker(component_name: str, instance_id: str, clp_config: CLPConfig, w logger.info(f"Started {component_name}.") +def get_host_ip(): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.settimeout(0) + try: + # doesn't even have to be reachable + s.connect(('10.254.254.254', 1)) + ip = s.getsockname()[0] + except Exception: + ip = '127.0.0.1' + finally: + s.close() + return ip + +def start_reducer(instance_id: str, clp_config: CLPConfig, container_clp_config: CLPConfig, + num_cpus: int, mounts: CLPDockerMounts): + logger.info(f"Starting {REDUCER_COMPONENT_NAME}...") + + container_name = f'clp-{REDUCER_COMPONENT_NAME}-{instance_id}' + if container_exists(container_name): + logger.info(f"{component_name} already running.") + return + + container_config_filename = f'{container_name}.yml' + container_config_file_path = clp_config.logs_directory / container_config_filename + with open(container_config_file_path, 'w') as f: + yaml.safe_dump(container_clp_config.dump_to_primitive_dict(), f) + + logs_dir = clp_config.logs_directory / REDUCER_COMPONENT_NAME + logs_dir.mkdir(parents=True, exist_ok=True) + container_logs_dir = container_clp_config.logs_directory / REDUCER_COMPONENT_NAME + + clp_site_packages_dir = CONTAINER_CLP_HOME / 'lib' / 'python3' / 'site-packages' + container_start_cmd = [ + 'docker', 'run', + '-di', + '--network', 'host', + '-w', str(CONTAINER_CLP_HOME), + '--name', container_name, + '-e', f'PYTHONPATH={clp_site_packages_dir}', + '-e', f'CLP_LOGS_DIR={container_logs_dir}', + '-e', f'CLP_LOGGING_LEVEL={clp_config.reducer.logging_level}', + '-e', f'CLP_HOME={CONTAINER_CLP_HOME}', + '--mount', str(mounts.clp_home), + ] + necessary_mounts = [ + mounts.logs_dir, + ] + for mount in necessary_mounts: + if mount: + container_start_cmd.append('--mount') + container_start_cmd.append(str(mount)) + container_start_cmd.append(clp_config.execution_container) + + reducer_cmd = [ + 'python3', '-u', '-m', + 'job_orchestration.reducer.reducer', + '--config', str(container_clp_config.logs_directory / container_config_filename), + '--host', get_host_ip(), + "--concurrency", str(num_cpus), + ] + + cmd = container_start_cmd + reducer_cmd + subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) + + logger.info(f"Started {REDUCER_COMPONENT_NAME}") def start_webui(instance_id: str, clp_config: CLPConfig, mounts: CLPDockerMounts): component_name = WEBUI_COMPONENT_NAME @@ -737,6 +803,7 @@ def main(argv): component_args_parser.add_parser(COMPRESSION_WORKER_COMPONENT_NAME) component_args_parser.add_parser(WEBUI_COMPONENT_NAME) component_args_parser.add_parser(WEBUI_QUERY_HANDLER_COMPONENT_NAME) + component_args_parser.add_parser(REDUCER_COMPONENT_NAME) # Shortcut for multiple components component_args_parser.add_parser(DATABASE_COMPONENTS) component_args_parser.add_parser(CONTROLLER_COMPONENTS) @@ -799,6 +866,8 @@ def main(argv): num_cpus = parsed_args.num_cpus if SEARCH_WORKER_COMPONENT_NAME == component_name and parsed_args.num_cpus != 0: num_cpus = parsed_args.num_cpus + if REDUCER_COMPONENT_NAME == component_name and parsed_args.num_cpus != 0: + num_cpus = parsed_args.num_cpus container_clp_config, mounts = generate_container_config(clp_config, clp_home) @@ -836,6 +905,8 @@ def main(argv): start_search_scheduler(instance_id, clp_config, container_clp_config, mounts) if component_name in ['', SEARCH_WORKER_COMPONENT_NAME]: start_search_worker(instance_id, clp_config, container_clp_config, num_cpus, mounts) + if component_name in ['', REDUCER_COMPONENT_NAME]: + start_reducer(instance_id, clp_config, container_clp_config, num_cpus, mounts) if component_name in ['', COMPRESSION_WORKER_COMPONENT_NAME]: start_compression_worker(instance_id, clp_config, container_clp_config, num_cpus, mounts) if component_name in ['', WEBUI_QUERY_HANDLER_COMPONENT_NAME]: diff --git a/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py b/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py index 8758699a51..c5ceab96b6 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py @@ -20,6 +20,7 @@ SEARCH_QUEUE_COMPONENT_NAME, SEARCH_SCHEDULER_COMPONENT_NAME, SEARCH_WORKER_COMPONENT_NAME, + REDUCER_COMPONENT_NAME, COMPRESSION_WORKER_COMPONENT_NAME, COMPRESSION_JOB_HANDLER_COMPONENT_NAME, WEBUI_COMPONENT_NAME, @@ -76,6 +77,7 @@ def main(argv): component_args_parser.add_parser(SEARCH_QUEUE_COMPONENT_NAME) component_args_parser.add_parser(SEARCH_SCHEDULER_COMPONENT_NAME) component_args_parser.add_parser(SEARCH_WORKER_COMPONENT_NAME) + component_args_parser.add_parser(REDUCER_COMPONENT_NAME) component_args_parser.add_parser(COMPRESSION_WORKER_COMPONENT_NAME) component_args_parser.add_parser(WEBUI_COMPONENT_NAME) component_args_parser.add_parser(WEBUI_QUERY_HANDLER_COMPONENT_NAME) @@ -142,6 +144,8 @@ def main(argv): stop_container(container_name, stale_containers) if '' == component_name or DB_COMPONENT_NAME == component_name: stop_container(f'clp-{DB_COMPONENT_NAME}-{instance_id}', stale_containers) + if '' == component_name or REDUCER_COMPONENT_NAME == component_name: + stop_container(f'clp-{REDUCER_COMPONENT_NAME}-{instance_id}', stale_containers) if '' == component_name: # NOTE: We can only remove the instance ID file if all containers have been stopped. diff --git a/components/clp-py-utils/clp_py_utils/clp_config.py b/components/clp-py-utils/clp_py_utils/clp_config.py index 58f39c42cf..dd790abc0c 100644 --- a/components/clp-py-utils/clp_py_utils/clp_config.py +++ b/components/clp-py-utils/clp_py_utils/clp_config.py @@ -22,6 +22,7 @@ SEARCH_QUEUE_COMPONENT_NAME = 'search_queue' SEARCH_SCHEDULER_COMPONENT_NAME = 'search_scheduler' SEARCH_WORKER_COMPONENT_NAME = 'search_worker' +REDUCER_COMPONENT_NAME = "reducer" COMPRESSION_WORKER_COMPONENT_NAME = 'compression_worker' WEBUI_COMPONENT_NAME = 'webui' WEBUI_QUERY_HANDLER_COMPONENT_NAME = 'webui_query_handler' @@ -191,6 +192,22 @@ def validate_logging_level(cls, field): validate_logging_level_static(cls, field) return field +class Reducer(BaseModel): + logging_level: str = 'INFO' + base_port: int = 14009 + + @validator('logging_level') + def validate_logging_level(cls, field): + validate_logging_level_static(cls, field) + return field + + @validator('base_port') + def validate_base_port(cls, field): + if not field > 0: + raise ValueError( + f"{cls.__name__}: base port {field} is not a valid value" + ) + return field class Queue(BaseModel): host: str = 'localhost' @@ -278,6 +295,7 @@ class CLPConfig(BaseModel): compression_worker: CompressionWorker = CompressionWorker() search_scheduler: SearchScheduler = SearchScheduler() search_worker: SearchWorker = SearchWorker() + reducer: Reducer = Reducer() webui: WebUi = WebUi() webui_query_handler: WebUiQueryHandler = WebUiQueryHandler() credentials_file_path: pathlib.Path = CLP_DEFAULT_CREDENTIALS_FILE_PATH diff --git a/components/core/src/clo/reducer_server.cpp b/components/core/src/clo/reducer_server.cpp index a4feaf014a..e4d9808753 100644 --- a/components/core/src/clo/reducer_server.cpp +++ b/components/core/src/clo/reducer_server.cpp @@ -71,7 +71,6 @@ std::ostream& operator<<(std::ostream& os, ServerStatus const& status) { struct ServerContext { boost::asio::io_context ioctx; boost::asio::ip::tcp::acceptor acceptor; - std::string ip; Pipeline* p; MYSQL* db; ServerStatus status; @@ -335,7 +334,7 @@ ServerStatus assign_new_job(ServerContext* ctx) { for (auto job = jobs.begin(); job != jobs.end(); ++job) { // set reducer_ready where pending_reducer and id=job_id std::stringstream ss; - ss << "UPDATE distributed_search_jobs SET status=9, reducer_port=" << 14'009 + ss << "UPDATE distributed_search_jobs SET status=9, reducer_port=" << ctx->port << ", reducer_host=\"" << ctx->host << "\" WHERE status=8 and id=" << job->first; std::string update = ss.str(); if (0 != mysql_real_query(ctx->db, update.c_str(), update.length())) { @@ -524,12 +523,10 @@ int main(int argc, char const* argv[]) { // Polling interval boost::asio::steady_timer poll_timer(ctx.ioctx, boost::asio::chrono::milliseconds(500)); + std::cout << "Starting on host " << ctx.host << " port " << port << " listening successfully " << ctx.acceptor.is_open() << std::endl; + // Job acquisition loop while (true) { - boost::asio::ip::address addr; - ctx.acceptor.local_endpoint().address(addr); - ctx.ip = addr.to_string(); - std::cout << ctx.ip << std::endl; // Queue up polling and tcp accepting poll_timer.async_wait( boost::bind(poll_db, boost::asio::placeholders::error, &ctx, &poll_timer) @@ -554,8 +551,9 @@ int main(int argc, char const* argv[]) { ); } - if (result_documents.size() > 0) + if (result_documents.size() > 0) { ctx.results_collection.insert_many(result_documents); + } if (set_job_done(&ctx) == -1) { return -1; diff --git a/components/job-orchestration/job_orchestration/reducer/reducer.py b/components/job-orchestration/job_orchestration/reducer/reducer.py new file mode 100644 index 0000000000..44dde0702b --- /dev/null +++ b/components/job-orchestration/job_orchestration/reducer/reducer.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python3 + +import argparse +import logging +import os +import sys +import subprocess +from pathlib import Path +from typing import Any, Dict, List, Optional + +from clp_py_utils.clp_config import SEARCH_JOBS_TABLE_NAME + +from clp_py_utils.clp_config import CLPConfig, Database, ResultsCache +from clp_py_utils.clp_logging import get_logging_level +from clp_py_utils.core import read_yaml_config_file + +# Setup logging +# Create logger +logger = logging.getLogger("reducer") +logger.setLevel(logging.INFO) +# Setup console logging +logging_console_handler = logging.StreamHandler() +logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s") +logging_console_handler.setFormatter(logging_formatter) +logger.addHandler(logging_console_handler) +# Prevents double logging from sub loggers for specific jobs +logger.propagate = False + +def main(argv: List[str]) -> int: + # fmt: off + args_parser = argparse.ArgumentParser(description="Spin up reducers.") + args_parser.add_argument('--config', '-c', required=True, help='CLP configuration file.') + args_parser.add_argument('--host', required=True, help='Host ip this container is running on') + args_parser.add_argument('--concurrency', required=True, help='Number of reducer servers to run') + + parsed_args = args_parser.parse_args(argv[1:]) + + # Setup logging to file + logs_dir = Path(os.getenv("CLP_LOGS_DIR")) + log_file = Path(os.getenv("CLP_LOGS_DIR")) / "reducer.log" + logging_file_handler = logging.FileHandler(filename=log_file, encoding="utf-8") + logging_file_handler.setFormatter(logging_formatter) + logger.addHandler(logging_file_handler) + + # Update logging level based on config + logging_level_str = os.getenv("CLP_LOGGING_LEVEL") + logging_level = get_logging_level(logging_level_str) + logger.setLevel(logging_level) + logger.info(f"Start logging level = {logging.getLevelName(logging_level)}") + + # Load configuration + config_path = Path(parsed_args.config) + try: + clp_config = CLPConfig.parse_obj(read_yaml_config_file(config_path)) + except ValidationError as err: + logger.error(err) + return -1 + except Exception as ex: + logger.error(ex) + # read_yaml_config_file already logs the parsing error inside + return -1 + + clp_home = Path(os.getenv("CLP_HOME")) + reducer_cmd = [ + str(clp_home / "bin" / "reducer_server"), + "--db-host", clp_config.database.host, + "--db-port", str(clp_config.database.port), + "--db-user", str(clp_config.database.username), + "--db-pass", str(clp_config.database.password), + "--database", clp_config.database.name, + "--mongodb-database", clp_config.results_cache.db_name, + "--mongodb-uri", f"mongodb://{clp_config.results_cache.host}:{clp_config.results_cache.port}/", + "--mongodb-collection", clp_config.results_cache.results_collection_name, + "--host", parsed_args.host, + "--port", + ] + + reducers = [] + concurrency = max(int(parsed_args.concurrency), 1) + for i in range(concurrency): + reducer_instance_cmd = reducer_cmd + [str(clp_config.reducer.base_port + i)] + + log_file_path = logs_dir / ("reducer-" + str(i) + ".log") + log_file = open(log_file_path, 'a') + + reducers.append( + subprocess.Popen( + reducer_instance_cmd, + close_fds=True, + stdout=log_file, + stderr=log_file, + ) + ) + + logger.info("reducers started.") + logger.info(f"Host={parsed_args.host} Base port={clp_config.reducer.base_port} Concurrency={concurrency}") + for r in reducers: + r.wait() + + logger.error("all reducers terminated") + + logger.removeHandler(logging_file_handler) + logging_file_handler.close() + + return 0 + + +if "__main__" == __name__: + sys.exit(main(sys.argv))