Skip to content

Commit

Permalink
Add component to cache search results (in the future). (#212)
Browse files Browse the repository at this point in the history
  • Loading branch information
wraymo authored Jan 9, 2024
1 parent efc34ae commit 4293499
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 46 deletions.
39 changes: 27 additions & 12 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@

import yaml

from clp_py_utils.clp_config import CLPConfig, CLP_DEFAULT_CREDENTIALS_FILE_PATH
from clp_py_utils.clp_config import (
CLPConfig,
CLP_DEFAULT_CREDENTIALS_FILE_PATH,
DB_COMPONENT_NAME,
QUEUE_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME
)
from clp_py_utils.core import (
get_config_value,
make_config_path_absolute,
Expand All @@ -18,12 +24,6 @@
)

# CONSTANTS
# Component names
DB_COMPONENT_NAME = 'db'
QUEUE_COMPONENT_NAME = 'queue'
SCHEDULER_COMPONENT_NAME = 'scheduler'
WORKER_COMPONENT_NAME = 'worker'

# Paths
CONTAINER_CLP_HOME = pathlib.Path('/') / 'opt' / 'clp'
CONTAINER_INPUT_LOGS_ROOT_DIR = pathlib.Path('/') / 'mnt' / 'logs'
Expand Down Expand Up @@ -240,23 +240,38 @@ def validate_db_config(clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir:
try:
validate_path_could_be_dir(data_dir)
except ValueError as ex:
raise ValueError(f"database data directory is invalid: {ex}")
raise ValueError(f"{DB_COMPONENT_NAME} data directory is invalid: {ex}")

try:
validate_path_could_be_dir(logs_dir)
except ValueError as ex:
raise ValueError(f"database logs directory is invalid: {ex}")
raise ValueError(f"{DB_COMPONENT_NAME} logs directory is invalid: {ex}")

validate_port("database.port", clp_config.database.host, clp_config.database.port)
validate_port(f"{DB_COMPONENT_NAME}.port", clp_config.database.host, clp_config.database.port)


def validate_queue_config(clp_config: CLPConfig, logs_dir: pathlib.Path):
try:
validate_path_could_be_dir(logs_dir)
except ValueError as ex:
raise ValueError(f"queue logs directory is invalid: {ex}")
raise ValueError(f"{QUEUE_COMPONENT_NAME} logs directory is invalid: {ex}")

validate_port(f"{QUEUE_COMPONENT_NAME}.port", clp_config.queue.host, clp_config.queue.port)


def validate_results_cache_config(clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path):
try:
validate_path_could_be_dir(data_dir)
except ValueError as ex:
raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME} data directory is invalid: {ex}")

try:
validate_path_could_be_dir(logs_dir)
except ValueError as ex:
raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME} logs directory is invalid: {ex}")

validate_port("queue.port", clp_config.queue.host, clp_config.queue.port)
validate_port(f"{RESULTS_CACHE_COMPONENT_NAME}.port", clp_config.results_cache.host,
clp_config.results_cache.port)


def validate_worker_config(clp_config: CLPConfig):
Expand Down
92 changes: 70 additions & 22 deletions components/clp-package-utils/clp_package_utils/scripts/start_clp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@
from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
CONTAINER_CLP_HOME,
DB_COMPONENT_NAME,
QUEUE_COMPONENT_NAME,
SCHEDULER_COMPONENT_NAME,
WORKER_COMPONENT_NAME,
check_dependencies,
container_exists,
CLPDockerMounts,
Expand All @@ -30,9 +26,17 @@
validate_and_load_queue_credentials_file,
validate_db_config,
validate_queue_config,
validate_results_cache_config,
validate_worker_config
)
from clp_py_utils.clp_config import CLPConfig
from clp_py_utils.clp_config import (
CLPConfig,
DB_COMPONENT_NAME,
QUEUE_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME,
SCHEDULER_COMPONENT_NAME,
WORKER_COMPONENT_NAME,
)
from job_orchestration.scheduler.constants import QueueName

# Setup logging
Expand Down Expand Up @@ -78,16 +82,16 @@ def wait_for_database_to_init(container_name: str, clp_config: CLPConfig, timeou
break
time.sleep(1)

logger.error("Timeout while waiting for database to initialize.")
logger.error(f"Timeout while waiting for {DB_COMPONENT_NAME} to initialize.")
return False


def start_db(instance_id: str, clp_config: CLPConfig, conf_dir: pathlib.Path):
logger.info("Starting database...")
logger.info(f"Starting {DB_COMPONENT_NAME}...")

container_name = f'clp-{DB_COMPONENT_NAME}-{instance_id}'
if container_exists(container_name):
logger.info("Database already running.")
logger.info(f"{DB_COMPONENT_NAME} already running.")
return

db_data_dir = clp_config.data_directory / DB_COMPONENT_NAME
Expand Down Expand Up @@ -128,15 +132,15 @@ def start_db(instance_id: str, clp_config: CLPConfig, conf_dir: pathlib.Path):
subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True)

if not wait_for_database_to_init(container_name, clp_config, 30):
raise EnvironmentError("Database did not initialize in time")
raise EnvironmentError(f"{DB_COMPONENT_NAME} did not initialize in time")

logger.info("Started database.")
logger.info(f"Started {DB_COMPONENT_NAME}.")


def create_db_tables(instance_id: str, clp_config: CLPConfig, container_clp_config: CLPConfig, mounts: CLPDockerMounts):
logger.info("Creating database tables...")
logger.info(f"Creating {DB_COMPONENT_NAME} tables...")

container_name = f'clp-db-table-creator-{instance_id}'
container_name = f'clp-{DB_COMPONENT_NAME}-table-creator-{instance_id}'

# Create database config file
db_config_filename = f'{container_name}.yml'
Expand Down Expand Up @@ -175,15 +179,15 @@ def create_db_tables(instance_id: str, clp_config: CLPConfig, container_clp_conf

db_config_file_path.unlink()

logger.info("Created database tables.")
logger.info(f"Created {DB_COMPONENT_NAME} tables.")


def start_queue(instance_id: str, clp_config: CLPConfig):
logger.info("Starting queue...")
logger.info(f"Starting {QUEUE_COMPONENT_NAME}...")

container_name = f'clp-{QUEUE_COMPONENT_NAME}-{instance_id}'
if container_exists(container_name):
logger.info("Queue already running.")
logger.info(f"{QUEUE_COMPONENT_NAME} already running.")
return

queue_logs_dir = clp_config.logs_directory / QUEUE_COMPONENT_NAME
Expand Down Expand Up @@ -234,15 +238,56 @@ def start_queue(instance_id: str, clp_config: CLPConfig):
]
subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True)

logger.info("Started queue.")
logger.info(f"Started {QUEUE_COMPONENT_NAME}.")


def start_results_cache(instance_id: str, clp_config: CLPConfig, conf_dir: pathlib.Path):
logger.info(f"Starting {RESULTS_CACHE_COMPONENT_NAME}...")

container_name = f'clp-{RESULTS_CACHE_COMPONENT_NAME}-{instance_id}'
if container_exists(container_name):
logger.info(f"{RESULTS_CACHE_COMPONENT_NAME} already running.")
return

data_dir = clp_config.data_directory / RESULTS_CACHE_COMPONENT_NAME
logs_dir = clp_config.logs_directory / RESULTS_CACHE_COMPONENT_NAME

validate_results_cache_config(clp_config, data_dir, logs_dir)

data_dir.mkdir(exist_ok=True, parents=True)
logs_dir.mkdir(exist_ok=True, parents=True)

mounts = [
DockerMount(DockerMountType.BIND, conf_dir / 'mongo', pathlib.Path('/') / 'etc' / 'mongo',
True),
DockerMount(DockerMountType.BIND, data_dir, pathlib.Path('/') / 'data' / 'db'),
DockerMount(DockerMountType.BIND, logs_dir, pathlib.Path('/') / 'var' / 'log' / 'mongodb'),
]
cmd = [
'docker', 'run',
'-d',
'--rm',
'--name', container_name,
'-u', f'{os.getuid()}:{os.getgid()}',
]
for mount in mounts:
cmd.append('--mount')
cmd.append(str(mount))
append_docker_port_settings_for_host_ips(clp_config.results_cache.host, clp_config.results_cache.port, 27017, cmd)
cmd.append('mongo:7.0.1')
cmd.append('--config')
cmd.append(str(pathlib.Path('/') / 'etc' / 'mongo' / 'mongod.conf'))
subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True)

logger.info(f"Started {RESULTS_CACHE_COMPONENT_NAME}.")


def start_scheduler(instance_id: str, clp_config: CLPConfig, container_clp_config: CLPConfig, mounts: CLPDockerMounts):
logger.info("Starting scheduler...")
logger.info(f"Starting {SCHEDULER_COMPONENT_NAME}...")

container_name = f'clp-{SCHEDULER_COMPONENT_NAME}-{instance_id}'
if container_exists(container_name):
logger.info("Scheduler already running.")
logger.info(f"{SCHEDULER_COMPONENT_NAME} already running.")
return

container_config_filename = f'{container_name}.yml'
Expand Down Expand Up @@ -282,16 +327,16 @@ def start_scheduler(instance_id: str, clp_config: CLPConfig, container_clp_confi
cmd = container_start_cmd + scheduler_cmd
subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True)

logger.info("Started scheduler.")
logger.info(f"Started {SCHEDULER_COMPONENT_NAME}.")


def start_worker(instance_id: str, clp_config: CLPConfig, container_clp_config: CLPConfig, num_cpus: int,
mounts: CLPDockerMounts):
logger.info("Starting worker...")
logger.info(f"Starting {WORKER_COMPONENT_NAME}...")

container_name = f'clp-{WORKER_COMPONENT_NAME}-{instance_id}'
if container_exists(container_name):
logger.info("Worker already running.")
logger.info(f"{WORKER_COMPONENT_NAME} already running.")
return

validate_worker_config(clp_config)
Expand Down Expand Up @@ -345,7 +390,7 @@ def start_worker(instance_id: str, clp_config: CLPConfig, container_clp_config:
cmd = container_start_cmd + worker_cmd
subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True)

logger.info("Started worker.")
logger.info(f"Started {WORKER_COMPONENT_NAME}.")


def main(argv):
Expand All @@ -359,6 +404,7 @@ def main(argv):
component_args_parser = args_parser.add_subparsers(dest='component_name')
component_args_parser.add_parser(DB_COMPONENT_NAME)
component_args_parser.add_parser(QUEUE_COMPONENT_NAME)
component_args_parser.add_parser(RESULTS_CACHE_COMPONENT_NAME)
component_args_parser.add_parser(SCHEDULER_COMPONENT_NAME)
worker_args_parser = component_args_parser.add_parser(WORKER_COMPONENT_NAME)
worker_args_parser.add_argument('--num-cpus', type=int, default=0,
Expand Down Expand Up @@ -425,6 +471,8 @@ def main(argv):
create_db_tables(instance_id, clp_config, container_clp_config, mounts)
if '' == component_name or QUEUE_COMPONENT_NAME == component_name:
start_queue(instance_id, clp_config)
if '' == component_name or RESULTS_CACHE_COMPONENT_NAME == component_name:
start_results_cache(instance_id, clp_config, conf_dir)
if '' == component_name or SCHEDULER_COMPONENT_NAME == component_name:
start_scheduler(instance_id, clp_config, container_clp_config, mounts)
if '' == component_name or WORKER_COMPONENT_NAME == component_name:
Expand Down
17 changes: 12 additions & 5 deletions components/clp-package-utils/clp_package_utils/scripts/stop_clp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
DB_COMPONENT_NAME,
QUEUE_COMPONENT_NAME,
SCHEDULER_COMPONENT_NAME,
WORKER_COMPONENT_NAME,
container_exists,
get_clp_home,
validate_and_load_config_file,
validate_and_load_db_credentials_file,
validate_and_load_queue_credentials_file
)
from clp_py_utils.clp_config import (
DB_COMPONENT_NAME,
QUEUE_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME,
SCHEDULER_COMPONENT_NAME,
WORKER_COMPONENT_NAME
)

# Setup logging
# Create logger
Expand Down Expand Up @@ -49,6 +52,7 @@ def main(argv):
component_args_parser = args_parser.add_subparsers(dest='component_name')
component_args_parser.add_parser(DB_COMPONENT_NAME)
component_args_parser.add_parser(QUEUE_COMPONENT_NAME)
component_args_parser.add_parser(RESULTS_CACHE_COMPONENT_NAME)
component_args_parser.add_parser(SCHEDULER_COMPONENT_NAME)
component_args_parser.add_parser(WORKER_COMPONENT_NAME)

Expand Down Expand Up @@ -92,6 +96,9 @@ def main(argv):
container_config_file_path = logs_dir / f'{container_name}.yml'
if container_config_file_path.exists():
container_config_file_path.unlink()
if '' == component_name or RESULTS_CACHE_COMPONENT_NAME == component_name:
container_name = f'clp-{RESULTS_CACHE_COMPONENT_NAME}-{instance_id}'
stop_container(container_name)
if '' == component_name or QUEUE_COMPONENT_NAME == component_name:
container_name = f'clp-{QUEUE_COMPONENT_NAME}-{instance_id}'
stop_container(container_name)
Expand All @@ -100,7 +107,7 @@ def main(argv):
if queue_config_file_path.exists():
queue_config_file_path.unlink()
if '' == component_name or DB_COMPONENT_NAME == component_name:
stop_container(f'clp-db-{instance_id}')
stop_container(f'clp-{DB_COMPONENT_NAME}-{instance_id}')

if '' == component_name:
# NOTE: We can only remove the instance ID file if all containers have been stopped.
Expand Down
30 changes: 25 additions & 5 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@
from .core import get_config_value, make_config_path_absolute, read_yaml_config_file, validate_path_could_be_dir

# Constants
# Component names
DB_COMPONENT_NAME = 'database'
QUEUE_COMPONENT_NAME = 'queue'
RESULTS_CACHE_COMPONENT_NAME = 'results_cache'
SCHEDULER_COMPONENT_NAME = 'scheduler'
WORKER_COMPONENT_NAME = 'worker'

CLP_DEFAULT_CREDENTIALS_FILE_PATH = pathlib.Path('etc') / 'credentials.yml'
CLP_METADATA_TABLE_PREFIX = 'clp_'


class Database(BaseModel):
type: str = 'mariadb'
host: str = 'localhost'
Expand Down Expand Up @@ -93,6 +101,17 @@ class Scheduler(BaseModel):
jobs_poll_delay: int = 1 # seconds


class ResultsCache(BaseModel):
host: str = 'localhost'
port: int = 27017

@validator('host')
def validate_host(cls, field):
if '' == field:
raise ValueError(f'{RESULTS_CACHE_COMPONENT_NAME}.host cannot be empty.')
return field


class Queue(BaseModel):
host: str = 'localhost'
port: int = 5672
Expand Down Expand Up @@ -148,8 +167,9 @@ class CLPConfig(BaseModel):
input_logs_directory: pathlib.Path = pathlib.Path('/')

database: Database = Database()
scheduler: Scheduler = Scheduler()
queue: Queue = Queue()
results_cache: ResultsCache = ResultsCache()
scheduler: Scheduler = Scheduler()
credentials_file_path: pathlib.Path = CLP_DEFAULT_CREDENTIALS_FILE_PATH

archive_output: ArchiveOutput = ArchiveOutput()
Expand Down Expand Up @@ -195,8 +215,8 @@ def load_database_credentials_from_file(self):
if config is None:
raise ValueError(f"Credentials file '{self.credentials_file_path}' is empty.")
try:
self.database.username = get_config_value(config, 'db.user')
self.database.password = get_config_value(config, 'db.password')
self.database.username = get_config_value(config, f'{DB_COMPONENT_NAME}.user')
self.database.password = get_config_value(config, f'{DB_COMPONENT_NAME}.password')
except KeyError as ex:
raise ValueError(f"Credentials file '{self.credentials_file_path}' does not contain key '{ex}'.")

Expand All @@ -205,8 +225,8 @@ def load_queue_credentials_from_file(self):
if config is None:
raise ValueError(f"Credentials file '{self.credentials_file_path}' is empty.")
try:
self.queue.username = get_config_value(config, "queue.user")
self.queue.password = get_config_value(config, "queue.password")
self.queue.username = get_config_value(config, f"{QUEUE_COMPONENT_NAME}.user")
self.queue.password = get_config_value(config, f"{QUEUE_COMPONENT_NAME}.password")
except KeyError as ex:
raise ValueError(f"Credentials file '{self.credentials_file_path}' does not contain key '{ex}'.")

Expand Down
3 changes: 2 additions & 1 deletion components/package-template/src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ On the control node, run the following commands (these must be started in the
order below):

```bash
sbin/start-clp.sh db
sbin/start-clp.sh database
sbin/start-clp.sh queue
sbin/start-clp.sh results_cache
sbin/start-clp.sh scheduler
```

Expand Down
Loading

0 comments on commit 4293499

Please sign in to comment.