diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index 655c1dae1..74bb5f018 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -234,6 +234,54 @@ def create_db_tables( logger.info(f"Created {component_name} tables.") +def create_results_cache_indices( + instance_id: str, + clp_config: CLPConfig, + container_clp_config: CLPConfig, + mounts: CLPDockerMounts, +): + component_name = RESULTS_CACHE_COMPONENT_NAME + logger.info(f"Creating {component_name} indices...") + + container_name = f"clp-{component_name}-indices-creator-{instance_id}" + + clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages" + # fmt: off + container_start_cmd = [ + "docker", "run", + "-i", + "--network", "host", + "--rm", + "--name", container_name, + "--log-driver", "local", + "-e", f"PYTHONPATH={clp_site_packages_dir}", + "-u", f"{os.getuid()}:{os.getgid()}", + ] + # fmt: on + necessary_mounts = [mounts.clp_home, mounts.data_dir, mounts.logs_dir] + for mount in necessary_mounts: + if mount: + container_start_cmd.append("--mount") + container_start_cmd.append(str(mount)) + container_start_cmd.append(clp_config.execution_container) + + clp_py_utils_dir = clp_site_packages_dir / "clp_py_utils" + # fmt: off + create_tables_cmd = [ + "python3", + str(clp_py_utils_dir / "create-results-cache-indices.py"), + "--uri", container_clp_config.results_cache.get_uri(), + "--ir-collection", container_clp_config.results_cache.ir_collection_name, + ] + # fmt: on + + cmd = container_start_cmd + create_tables_cmd + logger.debug(" ".join(cmd)) + subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) + + logger.info(f"Created {component_name} indices.") + + def start_queue(instance_id: str, clp_config: CLPConfig): component_name = QUEUE_COMPONENT_NAME logger.info(f"Starting {component_name}...") @@ -1058,6 +1106,8 @@ def main(argv): start_redis(instance_id, clp_config, conf_dir) if target in (ALL_TARGET_NAME, RESULTS_CACHE_COMPONENT_NAME): start_results_cache(instance_id, clp_config, conf_dir) + if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, RESULTS_CACHE_COMPONENT_NAME): + create_results_cache_indices(instance_id, clp_config, container_clp_config, mounts) if target in ( ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, diff --git a/components/clp-py-utils/clp_py_utils/create-results-cache-indices.py b/components/clp-py-utils/clp_py_utils/create-results-cache-indices.py new file mode 100644 index 000000000..dafbd3bde --- /dev/null +++ b/components/clp-py-utils/clp_py_utils/create-results-cache-indices.py @@ -0,0 +1,42 @@ +import argparse +import logging +import sys + +from pymongo import IndexModel, MongoClient + +# Setup logging +# Create logger +logger = logging.getLogger(__file__) +logger.setLevel(logging.INFO) +# Setup console logging +logging_console_handler = logging.StreamHandler() +logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s") +logging_console_handler.setFormatter(logging_formatter) +logger.addHandler(logging_console_handler) + + +def main(argv): + args_parser = argparse.ArgumentParser(description="Creates results cache indices for CLP.") + args_parser.add_argument("--uri", required=True, help="URI of the results cache.") + args_parser.add_argument("--ir-collection", required=True, help="Collection for IR metadata.") + parsed_args = args_parser.parse_args(argv[1:]) + + results_cache_uri = parsed_args.uri + ir_collection_name = parsed_args.ir_collection + + try: + with MongoClient(results_cache_uri) as results_cache_client: + ir_collection = results_cache_client.get_default_database()[ir_collection_name] + + file_split_id_index = IndexModel(["file_split_id"]) + orig_file_id_index = IndexModel(["orig_file_id", "begin_msg_ix", "end_msg_ix"]) + ir_collection.create_indexes([file_split_id_index, orig_file_id_index]) + except Exception: + logger.exception("Failed to create clp results cache indices.") + return -1 + + return 0 + + +if "__main__" == __name__: + sys.exit(main(sys.argv)) diff --git a/components/core/src/clp/clo/clo.cpp b/components/core/src/clp/clo/clo.cpp index 4216a2c6e..f29df0306 100644 --- a/components/core/src/clp/clo/clo.cpp +++ b/components/core/src/clp/clo/clo.cpp @@ -198,6 +198,10 @@ bool extract_ir(CommandLineArguments const& command_line_args) { clp::clo::cResultsCacheKeys::OrigFileId, orig_file_id ), + bsoncxx::builder::basic::kvp( + clp::clo::cResultsCacheKeys::IrOutput::FileSplitId, + file_split_id + ), bsoncxx::builder::basic::kvp( clp::clo::cResultsCacheKeys::IrOutput::BeginMsgIx, static_cast(begin_message_ix) diff --git a/components/core/src/clp/clo/constants.hpp b/components/core/src/clp/clo/constants.hpp index d2eb32db4..86f7313f2 100644 --- a/components/core/src/clp/clo/constants.hpp +++ b/components/core/src/clp/clo/constants.hpp @@ -7,6 +7,7 @@ constexpr char OrigFileId[]{"orig_file_id"}; namespace IrOutput { constexpr char Path[]{"path"}; +constexpr char FileSplitId[]{"file_split_id"}; constexpr char BeginMsgIx[]{"begin_msg_ix"}; constexpr char EndMsgIx[]{"end_msg_ix"}; constexpr char IsLastIrChunk[]{"is_last_ir_chunk"}; diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index 015480662..2a0f855a3 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -64,6 +64,9 @@ # Dictionary of active jobs indexed by job id active_jobs: Dict[str, QueryJob] = {} +# Dictionary that maps IDs of file splits being extracted to IDs of jobs waiting for them +active_file_split_ir_extractions: Dict[str, List[str]] = {} + reducer_connection_queue: Optional[asyncio.Queue] = None @@ -463,12 +466,13 @@ def handle_pending_query_jobs( db_conn_pool, clp_metadata_db_conn_params: Dict[str, any], results_cache_uri: str, + ir_collection_name: str, num_archives_to_search_per_sub_job: int, ) -> List[asyncio.Task]: global active_jobs + global active_file_split_ir_extractions reducer_acquisition_tasks = [] - pending_search_jobs = [ job for job in active_jobs.values() @@ -542,10 +546,58 @@ def handle_pending_query_jobs( logger.error(f"Failed to set job {job_id} as failed") continue + # NOTE: The following two if blocks should not be reordered since if we first check + # whether *an* IR file has been extracted for the requested file split, it doesn't + # mean that *all* IR files have has been extracted for the file split (since the + # extraction job may still be in progress). Thus, we must first check whether the + # file split is in the process of being extracted, and then check whether it's + # already been extracted. + + # Check if the file split is currently being extracted; if so, add the job ID to the + # list of jobs waiting for it. + if file_split_id in active_file_split_ir_extractions: + active_file_split_ir_extractions[file_split_id].append(job_id) + logger.info( + f"Split {file_split_id} is being extracted, so mark job {job_id} as running" + ) + if not set_job_or_task_status( + db_conn, + QUERY_JOBS_TABLE_NAME, + job_id, + QueryJobStatus.RUNNING, + QueryJobStatus.PENDING, + start_time=datetime.datetime.now(), + num_tasks=0, + ): + logger.error(f"Failed to set job {job_id} as running") + continue + + # Check if the file split has already been extracted + if ir_file_exists_for_file_split( + results_cache_uri, ir_collection_name, file_split_id + ): + logger.info( + f"Split {file_split_id} already extracted, so mark job {job_id} as done" + ) + if not set_job_or_task_status( + db_conn, + QUERY_JOBS_TABLE_NAME, + job_id, + QueryJobStatus.SUCCEEDED, + QueryJobStatus.PENDING, + start_time=datetime.datetime.now(), + num_tasks=0, + duration=0, + ): + logger.error(f"Failed to set job {job_id} as succeeded") + continue + + active_file_split_ir_extractions[file_split_id] = [job_id] extract_ir_config.file_split_id = file_split_id new_extract_ir_job = ExtractIrJob( id=job_id, archive_id=archive_id, + file_split_id=file_split_id, extract_ir_config=extract_ir_config, state=InternalJobState.WAITING_FOR_DISPATCH, ) @@ -633,6 +685,15 @@ def found_max_num_latest_results( return max_timestamp_in_remaining_archives <= min_timestamp_in_top_results +def ir_file_exists_for_file_split( + results_cache_uri: str, ir_collection_name: str, file_split_id: str +): + with pymongo.MongoClient(results_cache_uri) as results_cache_client: + ir_collection = results_cache_client.get_default_database()[ir_collection_name] + results_count = ir_collection.count_documents({"file_split_id": file_split_id}) + return 0 != results_count + + async def handle_finished_search_job( db_conn, job: SearchJob, task_results: Optional[Any], results_cache_uri: str ) -> None: @@ -723,8 +784,10 @@ async def handle_finished_extract_ir_job( db_conn, job: ExtractIrJob, task_results: Optional[Any] ) -> None: global active_jobs + global active_file_split_ir_extractions job_id = job.id + file_split_id = job.file_split_id new_job_status = QueryJobStatus.SUCCEEDED num_tasks = len(task_results) if 1 != num_tasks: @@ -761,6 +824,22 @@ async def handle_finished_extract_ir_job( logger.info(f"Completed IR extraction job {job_id}.") else: logger.info(f"Completed IR extraction job {job_id} with failing tasks.") + + waiting_jobs = active_file_split_ir_extractions[file_split_id] + waiting_jobs.remove(job_id) + for waiting_job in waiting_jobs: + logger.info(f"Setting status to {new_job_status.to_str()} for waiting jobs: {waiting_job}.") + set_job_or_task_status( + db_conn, + QUERY_JOBS_TABLE_NAME, + waiting_job, + new_job_status, + QueryJobStatus.RUNNING, + num_tasks_completed=0, + duration=(datetime.datetime.now() - job.start_time).total_seconds(), + ) + + del active_file_split_ir_extractions[file_split_id] del active_jobs[job_id] @@ -819,6 +898,7 @@ async def handle_jobs( db_conn_pool, clp_metadata_db_conn_params: Dict[str, any], results_cache_uri: str, + ir_collection_name: str, jobs_poll_delay: float, num_archives_to_search_per_sub_job: int, ) -> None: @@ -832,6 +912,7 @@ async def handle_jobs( db_conn_pool, clp_metadata_db_conn_params, results_cache_uri, + ir_collection_name, num_archives_to_search_per_sub_job, ) if 0 == len(reducer_acquisition_tasks): @@ -915,6 +996,7 @@ async def main(argv: List[str]) -> int: True ), results_cache_uri=clp_config.results_cache.get_uri(), + ir_collection_name=clp_config.results_cache.ir_collection_name, jobs_poll_delay=clp_config.query_scheduler.jobs_poll_delay, num_archives_to_search_per_sub_job=batch_size, ) diff --git a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py index 3d4c0d7a7..5ef92a5d6 100644 --- a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py +++ b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py @@ -59,6 +59,7 @@ def get_config(self) -> QueryJobConfig: ... class ExtractIrJob(QueryJob): extract_ir_config: ExtractIrJobConfig + file_split_id: str archive_id: str def get_type(self) -> QueryJobType: