Skip to content

Commit

Permalink
package: Rename "search" to "query" in search-cluster related variabl…
Browse files Browse the repository at this point in the history
…es to prepare for additional job types: (y-scope#449)

- Rename search scheduler/worker/job/task to query scheduler/worker/job/task where appropriate.
- Rename search_config to job_config in metadata DB.
- Add QueryJob base class for SearchJob.
  • Loading branch information
haiqi96 authored Jun 18, 2024
1 parent b335c11 commit 92ba15a
Show file tree
Hide file tree
Showing 27 changed files with 259 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

import msgpack
import pymongo
from clp_py_utils.clp_config import Database, ResultsCache, SEARCH_JOBS_TABLE_NAME
from clp_py_utils.clp_config import Database, QUERY_JOBS_TABLE_NAME, ResultsCache
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.scheduler.constants import SearchJobStatus
from job_orchestration.scheduler.constants import QueryJobStatus
from job_orchestration.scheduler.job_config import AggregationConfig, SearchConfig

from clp_package_utils.general import (
Expand Down Expand Up @@ -111,7 +111,7 @@ def create_and_monitor_job_in_db(
) as db_cursor:
# Create job
db_cursor.execute(
f"INSERT INTO `{SEARCH_JOBS_TABLE_NAME}` (`search_config`) VALUES (%s)",
f"INSERT INTO `{QUERY_JOBS_TABLE_NAME}` (`job_config`) VALUES (%s)",
(msgpack.packb(search_config.dict()),),
)
db_conn.commit()
Expand All @@ -120,16 +120,16 @@ def create_and_monitor_job_in_db(
# Wait for the job to be marked complete
while True:
db_cursor.execute(
f"SELECT `status` FROM `{SEARCH_JOBS_TABLE_NAME}` WHERE `id` = {job_id}"
f"SELECT `status` FROM `{QUERY_JOBS_TABLE_NAME}` WHERE `id` = {job_id}"
)
# There will only ever be one row since it's impossible to have more than one job with
# the same ID
new_status = db_cursor.fetchall()[0]["status"]
db_conn.commit()
if new_status in (
SearchJobStatus.SUCCEEDED,
SearchJobStatus.FAILED,
SearchJobStatus.CANCELLED,
QueryJobStatus.SUCCEEDED,
QueryJobStatus.FAILED,
QueryJobStatus.CANCELLED,
):
break

Expand Down
56 changes: 28 additions & 28 deletions components/clp-package-utils/clp_package_utils/scripts/start_clp.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
COMPRESSION_WORKER_COMPONENT_NAME,
CONTROLLER_TARGET_NAME,
DB_COMPONENT_NAME,
QUERY_JOBS_TABLE_NAME,
QUERY_SCHEDULER_COMPONENT_NAME,
QUERY_WORKER_COMPONENT_NAME,
QUEUE_COMPONENT_NAME,
REDIS_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME,
SEARCH_JOBS_TABLE_NAME,
SEARCH_SCHEDULER_COMPONENT_NAME,
SEARCH_WORKER_COMPONENT_NAME,
WEBUI_COMPONENT_NAME,
)
from job_orchestration.scheduler.constants import QueueName
Expand Down Expand Up @@ -416,15 +416,15 @@ def start_compression_scheduler(
)


def start_search_scheduler(
def start_query_scheduler(
instance_id: str,
clp_config: CLPConfig,
container_clp_config: CLPConfig,
mounts: CLPDockerMounts,
):
module_name = "job_orchestration.scheduler.search.search_scheduler"
module_name = "job_orchestration.scheduler.query.query_scheduler"
generic_start_scheduler(
SEARCH_SCHEDULER_COMPONENT_NAME,
QUERY_SCHEDULER_COMPONENT_NAME,
module_name,
instance_id,
clp_config,
Expand Down Expand Up @@ -474,10 +474,10 @@ def generic_start_scheduler(
"-e", (
f"RESULT_BACKEND=redis://default:{container_clp_config.redis.password}@"
f"{container_clp_config.redis.host}:{container_clp_config.redis.port}/"
f"{container_clp_config.redis.search_backend_database}"
f"{container_clp_config.redis.query_backend_database}"
),
"-e", f"CLP_LOGS_DIR={container_logs_dir}",
"-e", f"CLP_LOGGING_LEVEL={clp_config.search_scheduler.logging_level}",
"-e", f"CLP_LOGGING_LEVEL={clp_config.query_scheduler.logging_level}",
"-u", f"{os.getuid()}:{os.getgid()}",
"--mount", str(mounts.clp_home),
]
Expand Down Expand Up @@ -529,24 +529,24 @@ def start_compression_worker(
)


def start_search_worker(
def start_query_worker(
instance_id: str,
clp_config: CLPConfig,
container_clp_config: CLPConfig,
num_cpus: int,
mounts: CLPDockerMounts,
):
celery_method = "job_orchestration.executor.search"
celery_route = f"{QueueName.SEARCH}"
celery_method = "job_orchestration.executor.query"
celery_route = f"{QueueName.QUERY}"
generic_start_worker(
SEARCH_WORKER_COMPONENT_NAME,
QUERY_WORKER_COMPONENT_NAME,
instance_id,
clp_config,
clp_config.search_worker,
clp_config.query_worker,
container_clp_config,
celery_method,
celery_route,
clp_config.redis.search_backend_database,
clp_config.redis.query_backend_database,
num_cpus,
mounts,
)
Expand Down Expand Up @@ -696,7 +696,7 @@ def start_webui(instance_id: str, clp_config: CLPConfig, mounts: CLPDockerMounts
"SqlDbClpArchivesTableName": f"{CLP_METADATA_TABLE_PREFIX}archives",
"SqlDbClpFilesTableName": f"{CLP_METADATA_TABLE_PREFIX}files",
"SqlDbCompressionJobsTableName": COMPRESSION_JOBS_TABLE_NAME,
"SqlDbSearchJobsTableName": SEARCH_JOBS_TABLE_NAME,
"SqlDbQueryJobsTableName": QUERY_JOBS_TABLE_NAME,
},
"public": {
"ClpStorageEngine": clp_config.package.storage_engine,
Expand Down Expand Up @@ -839,11 +839,11 @@ def main(argv):
component_args_parser.add_parser(REDIS_COMPONENT_NAME)
component_args_parser.add_parser(RESULTS_CACHE_COMPONENT_NAME)
component_args_parser.add_parser(COMPRESSION_SCHEDULER_COMPONENT_NAME)
component_args_parser.add_parser(SEARCH_SCHEDULER_COMPONENT_NAME)
component_args_parser.add_parser(QUERY_SCHEDULER_COMPONENT_NAME)
compression_worker_parser = component_args_parser.add_parser(COMPRESSION_WORKER_COMPONENT_NAME)
add_num_workers_argument(compression_worker_parser)
search_worker_parser = component_args_parser.add_parser(SEARCH_WORKER_COMPONENT_NAME)
add_num_workers_argument(search_worker_parser)
query_worker_parser = component_args_parser.add_parser(QUERY_WORKER_COMPONENT_NAME)
add_num_workers_argument(query_worker_parser)
reducer_server_parser = component_args_parser.add_parser(REDUCER_COMPONENT_NAME)
add_num_workers_argument(reducer_server_parser)
component_args_parser.add_parser(WEBUI_COMPONENT_NAME)
Expand Down Expand Up @@ -874,7 +874,7 @@ def main(argv):
CONTROLLER_TARGET_NAME,
DB_COMPONENT_NAME,
COMPRESSION_SCHEDULER_COMPONENT_NAME,
SEARCH_SCHEDULER_COMPONENT_NAME,
QUERY_SCHEDULER_COMPONENT_NAME,
WEBUI_COMPONENT_NAME,
):
validate_and_load_db_credentials_file(clp_config, clp_home, True)
Expand All @@ -883,19 +883,19 @@ def main(argv):
CONTROLLER_TARGET_NAME,
QUEUE_COMPONENT_NAME,
COMPRESSION_SCHEDULER_COMPONENT_NAME,
SEARCH_SCHEDULER_COMPONENT_NAME,
QUERY_SCHEDULER_COMPONENT_NAME,
COMPRESSION_WORKER_COMPONENT_NAME,
SEARCH_WORKER_COMPONENT_NAME,
QUERY_WORKER_COMPONENT_NAME,
):
validate_and_load_queue_credentials_file(clp_config, clp_home, True)
if target in (
ALL_TARGET_NAME,
CONTROLLER_TARGET_NAME,
REDIS_COMPONENT_NAME,
COMPRESSION_SCHEDULER_COMPONENT_NAME,
SEARCH_SCHEDULER_COMPONENT_NAME,
QUERY_SCHEDULER_COMPONENT_NAME,
COMPRESSION_WORKER_COMPONENT_NAME,
SEARCH_WORKER_COMPONENT_NAME,
QUERY_WORKER_COMPONENT_NAME,
):
validate_and_load_redis_credentials_file(clp_config, clp_home, True)

Expand All @@ -908,7 +908,7 @@ def main(argv):
if target in (
COMPRESSION_WORKER_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
SEARCH_WORKER_COMPONENT_NAME,
QUERY_WORKER_COMPONENT_NAME,
):
num_workers = parsed_args.num_workers
else:
Expand Down Expand Up @@ -951,14 +951,14 @@ def main(argv):
COMPRESSION_SCHEDULER_COMPONENT_NAME,
):
start_compression_scheduler(instance_id, clp_config, container_clp_config, mounts)
if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, SEARCH_SCHEDULER_COMPONENT_NAME):
start_search_scheduler(instance_id, clp_config, container_clp_config, mounts)
if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, QUERY_SCHEDULER_COMPONENT_NAME):
start_query_scheduler(instance_id, clp_config, container_clp_config, mounts)
if target in (ALL_TARGET_NAME, COMPRESSION_WORKER_COMPONENT_NAME):
start_compression_worker(
instance_id, clp_config, container_clp_config, num_workers, mounts
)
if target in (ALL_TARGET_NAME, SEARCH_WORKER_COMPONENT_NAME):
start_search_worker(instance_id, clp_config, container_clp_config, num_workers, mounts)
if target in (ALL_TARGET_NAME, QUERY_WORKER_COMPONENT_NAME):
start_query_worker(instance_id, clp_config, container_clp_config, num_workers, mounts)
if target in (ALL_TARGET_NAME, REDUCER_COMPONENT_NAME):
start_reducer(instance_id, clp_config, container_clp_config, num_workers, mounts)
if target in (ALL_TARGET_NAME, WEBUI_COMPONENT_NAME):
Expand Down
20 changes: 10 additions & 10 deletions components/clp-package-utils/clp_package_utils/scripts/stop_clp.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
COMPRESSION_WORKER_COMPONENT_NAME,
CONTROLLER_TARGET_NAME,
DB_COMPONENT_NAME,
QUERY_SCHEDULER_COMPONENT_NAME,
QUERY_WORKER_COMPONENT_NAME,
QUEUE_COMPONENT_NAME,
REDIS_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME,
SEARCH_SCHEDULER_COMPONENT_NAME,
SEARCH_WORKER_COMPONENT_NAME,
WEBUI_COMPONENT_NAME,
)

Expand Down Expand Up @@ -88,9 +88,9 @@ def main(argv):
component_args_parser.add_parser(REDUCER_COMPONENT_NAME)
component_args_parser.add_parser(RESULTS_CACHE_COMPONENT_NAME)
component_args_parser.add_parser(COMPRESSION_SCHEDULER_COMPONENT_NAME)
component_args_parser.add_parser(SEARCH_SCHEDULER_COMPONENT_NAME)
component_args_parser.add_parser(QUERY_SCHEDULER_COMPONENT_NAME)
component_args_parser.add_parser(COMPRESSION_WORKER_COMPONENT_NAME)
component_args_parser.add_parser(SEARCH_WORKER_COMPONENT_NAME)
component_args_parser.add_parser(QUERY_WORKER_COMPONENT_NAME)
component_args_parser.add_parser(WEBUI_COMPONENT_NAME)

parsed_args = args_parser.parse_args(argv[1:])
Expand All @@ -116,8 +116,8 @@ def main(argv):
COMPRESSION_SCHEDULER_COMPONENT_NAME,
COMPRESSION_WORKER_COMPONENT_NAME,
QUEUE_COMPONENT_NAME,
SEARCH_SCHEDULER_COMPONENT_NAME,
SEARCH_WORKER_COMPONENT_NAME,
QUERY_SCHEDULER_COMPONENT_NAME,
QUERY_WORKER_COMPONENT_NAME,
):
validate_and_load_queue_credentials_file(clp_config, clp_home, False)
except:
Expand Down Expand Up @@ -146,14 +146,14 @@ def main(argv):
container_config_file_path = logs_dir / f"{container_name}.yml"
if container_config_file_path.exists():
container_config_file_path.unlink()
if target in (ALL_TARGET_NAME, SEARCH_WORKER_COMPONENT_NAME):
container_name = f"clp-{SEARCH_WORKER_COMPONENT_NAME}-{instance_id}"
if target in (ALL_TARGET_NAME, QUERY_WORKER_COMPONENT_NAME):
container_name = f"clp-{QUERY_WORKER_COMPONENT_NAME}-{instance_id}"
stop_running_container(container_name, already_exited_containers, force)
if target in (ALL_TARGET_NAME, COMPRESSION_WORKER_COMPONENT_NAME):
container_name = f"clp-{COMPRESSION_WORKER_COMPONENT_NAME}-{instance_id}"
stop_running_container(container_name, already_exited_containers, force)
if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, SEARCH_SCHEDULER_COMPONENT_NAME):
container_name = f"clp-{SEARCH_SCHEDULER_COMPONENT_NAME}-{instance_id}"
if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, QUERY_SCHEDULER_COMPONENT_NAME):
container_name = f"clp-{QUERY_SCHEDULER_COMPONENT_NAME}-{instance_id}"
stop_running_container(container_name, already_exited_containers, force)

container_config_file_path = logs_dir / f"{container_name}.yml"
Expand Down
18 changes: 9 additions & 9 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@
REDUCER_COMPONENT_NAME = "reducer"
RESULTS_CACHE_COMPONENT_NAME = "results_cache"
COMPRESSION_SCHEDULER_COMPONENT_NAME = "compression_scheduler"
SEARCH_SCHEDULER_COMPONENT_NAME = "search_scheduler"
QUERY_SCHEDULER_COMPONENT_NAME = "query_scheduler"
COMPRESSION_WORKER_COMPONENT_NAME = "compression_worker"
SEARCH_WORKER_COMPONENT_NAME = "search_worker"
QUERY_WORKER_COMPONENT_NAME = "query_worker"
WEBUI_COMPONENT_NAME = "webui"

# Target names
ALL_TARGET_NAME = ""
CONTROLLER_TARGET_NAME = "controller"

SEARCH_JOBS_TABLE_NAME = "search_jobs"
SEARCH_TASKS_TABLE_NAME = "search_tasks"
QUERY_JOBS_TABLE_NAME = "query_jobs"
QUERY_TASKS_TABLE_NAME = "query_tasks"
COMPRESSION_JOBS_TABLE_NAME = "compression_jobs"
COMPRESSION_TASKS_TABLE_NAME = "compression_tasks"

Expand Down Expand Up @@ -163,7 +163,7 @@ def validate_logging_level(cls, field):
return field


class SearchScheduler(BaseModel):
class QueryScheduler(BaseModel):
host = "localhost"
port = 7000
jobs_poll_delay: float = 0.1 # seconds
Expand Down Expand Up @@ -197,7 +197,7 @@ def validate_logging_level(cls, field):
return field


class SearchWorker(BaseModel):
class QueryWorker(BaseModel):
logging_level: str = "INFO"

@validator("logging_level")
Expand All @@ -209,7 +209,7 @@ def validate_logging_level(cls, field):
class Redis(BaseModel):
host: str = "localhost"
port: int = 6379
search_backend_database: int = 0
query_backend_database: int = 0
compression_backend_database: int = 1
# redis can perform authentication without a username
password: typing.Optional[str]
Expand Down Expand Up @@ -361,9 +361,9 @@ class CLPConfig(BaseModel):
reducer: Reducer() = Reducer()
results_cache: ResultsCache = ResultsCache()
compression_scheduler: CompressionScheduler = CompressionScheduler()
search_scheduler: SearchScheduler = SearchScheduler()
query_scheduler: QueryScheduler = QueryScheduler()
compression_worker: CompressionWorker = CompressionWorker()
search_worker: SearchWorker = SearchWorker()
query_worker: QueryWorker = QueryWorker()
webui: WebUi = WebUi()
credentials_file_path: pathlib.Path = CLP_DEFAULT_CREDENTIALS_FILE_PATH

Expand Down
Loading

0 comments on commit 92ba15a

Please sign in to comment.