Skip to content

Commit

Permalink
Put search scheduler db creation into its own script
Browse files Browse the repository at this point in the history
	- Old scheduling tables remain untouched and will only be removed after search/compression scheduler changes are in
  • Loading branch information
gibber9809 committed Jan 23, 2024
1 parent 7af5925 commit ba65660
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 22 deletions.
6 changes: 6 additions & 0 deletions components/clp-py-utils/clp_py_utils/create-db-tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ def main(argv):
]
subprocess.run(cmd, check=True)

cmd = [
'python3', str(script_dir / 'initialize-search-scheduler-db.py'),
'--config', str(config_file_path)
]
subprocess.run(cmd, check=True)

return 0


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/usr/bin/env python3
import argparse
import logging
import sys
from contextlib import closing

from clp_py_utils.clp_config import Database, SEARCH_JOBS_TABLE_NAME
from clp_py_utils.core import read_yaml_config_file
from job_orchestration.search_scheduler.common import JobStatus
from sql_adapter import SQL_Adapter

# Setup logging
# Create logger
logger = logging.getLogger(__file__)
logger.setLevel(logging.INFO)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)


def main(argv):
args_parser = argparse.ArgumentParser(description="Sets up metadata tables for job orchestration.")
args_parser.add_argument('--config', required=True, help="Database config file.")
parsed_args = args_parser.parse_args(argv[1:])

try:
database_config = Database.parse_obj(read_yaml_config_file(parsed_args.config))
if database_config is None:
raise ValueError(f"Database configuration file '{parsed_args.config}' is empty.")
sql_adapter = SQL_Adapter(database_config)
with closing(sql_adapter.create_connection(True)) as scheduling_db, \
closing(scheduling_db.cursor(dictionary=True)) as scheduling_db_cursor:
scheduling_db_cursor.execute(f"""
CREATE TABLE IF NOT EXISTS `{SEARCH_JOBS_TABLE_NAME}` (
`id` INT NOT NULL AUTO_INCREMENT,
`status` INT NOT NULL DEFAULT '{JobStatus.PENDING}',
`submission_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`search_config` VARBINARY(60000) NOT NULL,
PRIMARY KEY (`id`) USING BTREE,
INDEX `JOB_STATUS` (`status`) USING BTREE
) ROW_FORMAT=DYNAMIC
""")

scheduling_db.commit()
except:
logger.exception("Failed to create search scheduler tables.")
return -1

return 0


if '__main__' == __name__:
sys.exit(main(sys.argv))
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,6 @@ def fetch_cancelling_search_jobs(db_cursor) -> list:
return db_cursor.fetchall()


def setup_search_jobs_table(db_conn):
cursor = db_conn.cursor()
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS `{SEARCH_JOBS_TABLE_NAME}` (
`id` INT NOT NULL AUTO_INCREMENT,
`status` INT NOT NULL DEFAULT '{JobStatus.PENDING}',
`submission_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`search_config` VARBINARY(60000) NOT NULL,
PRIMARY KEY (`id`) USING BTREE,
INDEX `JOB_STATUS` (`status`) USING BTREE
) ROW_FORMAT=DYNAMIC
""")
db_conn.commit()
cursor.close()


def set_job_status(
db_conn, job_id: str, status: JobStatus, prev_status: Optional[JobStatus] = None, **kwargs
) -> bool:
Expand Down Expand Up @@ -278,17 +262,12 @@ def main(argv: List[str]) -> int:
)

logger.info(f"Connected to archive database {clp_config.database.host}:{clp_config.database.port}.")

setup_search_jobs_table(db_conn)

jobs_poll_delay = clp_config.search_scheduler.jobs_poll_delay

logger.info("Search scheduler started.")
logger.debug(f"Polling interval {jobs_poll_delay} seconds.")
handle_jobs(
db_conn=db_conn,
results_cache_uri=clp_config.results_cache.get_uri(),
jobs_poll_delay=jobs_poll_delay,
jobs_poll_delay=clp_config.search_scheduler.jobs_poll_delay,
)
return 0

Expand Down

0 comments on commit ba65660

Please sign in to comment.