diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index e8be862e5..24a93b50e 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -383,7 +383,7 @@ def start_search_scheduler(instance_id: str, clp_config: CLPConfig, container_cl container_start_cmd.append(clp_config.execution_container) scheduler_cmd = [ 'python3', '-u', '-m', - 'job_orchestration.scheduler.search_scheduler', + 'job_orchestration.search_scheduler.search_scheduler', '--config', str(container_clp_config.logs_directory / container_config_filename), ] diff --git a/components/job-orchestration/job_orchestration/executor/celeryconfig.py b/components/job-orchestration/job_orchestration/executor/celeryconfig.py index f0322863f..af9e55b0b 100644 --- a/components/job-orchestration/job_orchestration/executor/celeryconfig.py +++ b/components/job-orchestration/job_orchestration/executor/celeryconfig.py @@ -7,14 +7,12 @@ worker_prefetch_multiplier = 1 imports = [ 'job_orchestration.executor.compression_task', - 'job_orchestration.executor.search_task' ] # Queue settings task_queue_max_priority = TASK_QUEUE_HIGHEST_PRIORITY task_routes = { 'job_orchestration.executor.compression_task.compress': QueueName.COMPRESSION, - 'job_orchestration.executor.search_task.search': QueueName.SEARCH } task_create_missing_queues = True diff --git a/components/job-orchestration/job_orchestration/executor/search/celeryconfig.py b/components/job-orchestration/job_orchestration/executor/search/celeryconfig.py index 1bb80c4b6..1a082a5b8 100644 --- a/components/job-orchestration/job_orchestration/executor/search/celeryconfig.py +++ b/components/job-orchestration/job_orchestration/executor/search/celeryconfig.py @@ -7,6 +7,7 @@ task_routes = { 'job_orchestration.executor.search.fs_search_task.search': QueueName.SEARCH, } +task_create_missing_queues = True broker_url = os.getenv('BROKER_URL') result_backend = os.getenv('RESULT_BACKEND') diff --git a/components/job-orchestration/job_orchestration/executor/search_task.py b/components/job-orchestration/job_orchestration/executor/search_task.py index cf3d77237..ce6f920f0 100644 --- a/components/job-orchestration/job_orchestration/executor/search_task.py +++ b/components/job-orchestration/job_orchestration/executor/search_task.py @@ -76,7 +76,7 @@ def run_clo(job_id: int, task_id: int, clp_home: pathlib.Path, archive_output_di return search_successful, f"See {stderr_filename} in logs directory." -@app.task() +#@app.task() def search(job_id: int, task_id: int, search_config_json: str, archive_id: str, results_cache_uri: str): clp_home = os.getenv('CLP_HOME') diff --git a/components/job-orchestration/job_orchestration/search_scheduler/search_scheduler.py b/components/job-orchestration/job_orchestration/search_scheduler/search_scheduler.py index 0520a5a41..84dd28ba3 100644 --- a/components/job-orchestration/job_orchestration/search_scheduler/search_scheduler.py +++ b/components/job-orchestration/job_orchestration/search_scheduler/search_scheduler.py @@ -52,9 +52,9 @@ def fetch_new_search_jobs(db_cursor) -> list: SELECT {SEARCH_JOBS_TABLE_NAME}.id as job_id, {SEARCH_JOBS_TABLE_NAME}.status as job_status, {SEARCH_JOBS_TABLE_NAME}.search_config, - {SEARCH_JOBS_TABLE_NAME}.submission_time, + {SEARCH_JOBS_TABLE_NAME}.submission_time FROM {SEARCH_JOBS_TABLE_NAME} - WHERE {SEARCH_JOBS_TABLE_NAME}.status='{JobStatus.PENDING}' + WHERE {SEARCH_JOBS_TABLE_NAME}.status={JobStatus.PENDING} """) return db_cursor.fetchall() @@ -63,7 +63,7 @@ def fetch_cancelling_search_jobs(db_cursor) -> list: db_cursor.execute(f""" SELECT {SEARCH_JOBS_TABLE_NAME}.id as job_id FROM {SEARCH_JOBS_TABLE_NAME} - WHERE {SEARCH_JOBS_TABLE_NAME}.status='{JobStatus.CANCELLING}' + WHERE {SEARCH_JOBS_TABLE_NAME}.status={JobStatus.CANCELLING} """) return db_cursor.fetchall() @@ -220,11 +220,16 @@ def handle_jobs( results_cache_uri: str, jobs_poll_delay: float, ) -> None: - while True: - poll_and_submit_pending_search_jobs(db_conn, results_cache_uri) - poll_and_handle_cancelling_search_jobs(db_conn) - check_job_status_and_update_db(db_conn) - time.sleep(jobs_poll_delay) + try: + while True: + poll_and_submit_pending_search_jobs(db_conn, results_cache_uri) + poll_and_handle_cancelling_search_jobs(db_conn) + check_job_status_and_update_db(db_conn) + time.sleep(jobs_poll_delay) + except Exception as e: + logger.error(f"Uncaught exception in job handling loop: {e}") + return + def main(argv: List[str]) -> int: @@ -262,8 +267,8 @@ def main(argv: List[str]) -> int: ) logger.info(f"Connected to archive database {clp_config.database.host}:{clp_config.database.port}.") + logger.debug("Job polling interval {clp_config.search_scheduler.jobs_poll_delay} seconds.") logger.info("Search scheduler started.") - logger.debug(f"Polling interval {jobs_poll_delay} seconds.") handle_jobs( db_conn=db_conn, results_cache_uri=clp_config.results_cache.get_uri(),