Skip to content

Commit

Permalink
clp-package: Record search job and search task statistics in the data…
Browse files Browse the repository at this point in the history
…base. (y-scope#416)
  • Loading branch information
wraymo authored Jun 8, 2024
1 parent 040eb51 commit 64e5941
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 59 deletions.
1 change: 1 addition & 0 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
CONTROLLER_TARGET_NAME = "controller"

SEARCH_JOBS_TABLE_NAME = "search_jobs"
SEARCH_TASKS_TABLE_NAME = "search_tasks"
COMPRESSION_JOBS_TABLE_NAME = "compression_jobs"
COMPRESSION_TASKS_TABLE_NAME = "compression_tasks"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
CompressionJobStatus,
CompressionTaskStatus,
SearchJobStatus,
SearchTaskStatus,
)
from sql_adapter import SQL_Adapter

Expand All @@ -16,6 +17,7 @@
COMPRESSION_TASKS_TABLE_NAME,
Database,
SEARCH_JOBS_TABLE_NAME,
SEARCH_TASKS_TABLE_NAME,
)
from clp_py_utils.core import read_yaml_config_file

Expand Down Expand Up @@ -95,14 +97,38 @@ def main(argv):
CREATE TABLE IF NOT EXISTS `{SEARCH_JOBS_TABLE_NAME}` (
`id` INT NOT NULL AUTO_INCREMENT,
`status` INT NOT NULL DEFAULT '{SearchJobStatus.PENDING}',
`submission_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`num_tasks` INT NOT NULL DEFAULT '0',
`num_tasks_completed` INT NOT NULL DEFAULT '0',
`start_time` DATETIME(3) NULL DEFAULT NULL,
`duration` FLOAT NULL DEFAULT NULL,
`search_config` VARBINARY(60000) NOT NULL,
PRIMARY KEY (`id`) USING BTREE,
INDEX `JOB_STATUS` (`status`) USING BTREE
) ROW_FORMAT=DYNAMIC
"""
)

scheduling_db_cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS `{SEARCH_TASKS_TABLE_NAME}` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`status` INT NOT NULL DEFAULT '{SearchTaskStatus.PENDING}',
`creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`start_time` DATETIME(3) NULL DEFAULT NULL,
`duration` FLOAT NULL DEFAULT NULL,
`job_id` INT NOT NULL,
`archive_id` VARCHAR(255) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE,
INDEX `job_id` (`job_id`) USING BTREE,
INDEX `TASK_STATUS` (`status`) USING BTREE,
INDEX `TASK_START_TIME` (`start_time`) USING BTREE,
CONSTRAINT `search_tasks` FOREIGN KEY (`job_id`)
REFERENCES `search_jobs` (`id`) ON UPDATE NO ACTION ON DELETE NO ACTION
) ROW_FORMAT=DYNAMIC
"""
)

scheduling_db.commit()
except:
logger.exception("Failed to create scheduling tables.")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,41 @@
import datetime
import os
import signal
import subprocess
import sys
from contextlib import closing
from pathlib import Path
from typing import Any, Dict

from celery.app.task import Task
from celery.utils.log import get_task_logger
from clp_py_utils.clp_config import StorageEngine
from clp_py_utils.clp_config import Database, SEARCH_TASKS_TABLE_NAME, StorageEngine
from clp_py_utils.clp_logging import set_logging_level
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.executor.search.celery import app
from job_orchestration.scheduler.job_config import SearchConfig
from job_orchestration.scheduler.scheduler_data import SearchTaskResult
from job_orchestration.scheduler.scheduler_data import SearchTaskResult, SearchTaskStatus

# Setup logging
logger = get_task_logger(__name__)


def update_search_task_metadata(
db_cursor,
task_id: int,
kv_pairs: Dict[str, Any],
):
if not kv_pairs or len(kv_pairs) == 0:
raise ValueError("No key-value pairs provided to update search task metadata")

query = f"""
UPDATE {SEARCH_TASKS_TABLE_NAME}
SET {', '.join([f'{k}="{v}"' for k, v in kv_pairs.items()])}
WHERE id = {task_id}
"""
db_cursor.execute(query)


def make_command(
storage_engine: str,
clp_home: Path,
Expand Down Expand Up @@ -62,10 +81,10 @@ def make_command(

# fmt: off
command.extend((
"reducer",
"--host", aggregation_config.reducer_host,
"--port", str(aggregation_config.reducer_port),
"--job-id", str(aggregation_config.job_id)
"reducer",
"--host", aggregation_config.reducer_host,
"--port", str(aggregation_config.reducer_port),
"--job-id", str(aggregation_config.job_id)
))
# fmt: on
elif search_config.network_address is not None:
Expand Down Expand Up @@ -93,11 +112,12 @@ def make_command(
def search(
self: Task,
job_id: str,
task_id: int,
search_config_obj: dict,
archive_id: str,
clp_metadata_db_conn_params: dict,
results_cache_uri: str,
) -> Dict[str, Any]:
task_id = str(self.request.id)
clp_home = Path(os.getenv("CLP_HOME"))
archive_directory = Path(os.getenv("CLP_ARCHIVE_OUTPUT_DIR"))
clp_logs_dir = Path(os.getenv("CLP_LOGS_DIR"))
Expand All @@ -114,26 +134,49 @@ def search(
logger.info(f"Started task for job {job_id}")

search_config = SearchConfig.parse_obj(search_config_obj)

try:
search_command = make_command(
storage_engine=clp_storage_engine,
clp_home=clp_home,
archives_dir=archive_directory,
archive_id=archive_id,
search_config=search_config,
results_cache_uri=results_cache_uri,
results_collection=job_id,
sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_conn_params))

start_time = datetime.datetime.now()
search_status = SearchTaskStatus.RUNNING
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
try:
search_command = make_command(
storage_engine=clp_storage_engine,
clp_home=clp_home,
archives_dir=archive_directory,
archive_id=archive_id,
search_config=search_config,
results_cache_uri=results_cache_uri,
results_collection=job_id,
)
except ValueError as e:
error_message = f"Error creating search command: {e}"
logger.error(error_message)

update_search_task_metadata(
db_cursor,
task_id,
dict(status=SearchTaskStatus.FAILED, duration=0, start_time=start_time),
)
db_conn.commit()
clo_log_file.write(error_message)
clo_log_file.close()

return SearchTaskResult(
task_id=task_id,
status=SearchTaskStatus.FAILED,
duration=0,
error_log_path=clo_log_path,
).dict()

update_search_task_metadata(
db_cursor, task_id, dict(status=search_status, start_time=start_time)
)
except ValueError as e:
logger.error(f"Error creating search command: {e}")
return SearchTaskResult(
success=False,
task_id=task_id,
).dict()
db_conn.commit()

logger.info(f'Running: {" ".join(search_command)}')
search_successful = False
search_proc = subprocess.Popen(
search_command,
preexec_fn=os.setpgrp,
Expand Down Expand Up @@ -163,15 +206,31 @@ def sigterm_handler(_signo, _stack_frame):
search_proc.communicate()
return_code = search_proc.returncode
if 0 != return_code:
search_status = SearchTaskStatus.FAILED
logger.error(f"Failed search task for job {job_id} - return_code={return_code}")
else:
search_successful = True
search_status = SearchTaskStatus.SUCCEEDED
logger.info(f"Search task completed for job {job_id}")

# Close log files
clo_log_file.close()
duration = (datetime.datetime.now() - start_time).total_seconds()

return SearchTaskResult(
success=search_successful,
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
update_search_task_metadata(
db_cursor, task_id, dict(status=search_status, start_time=start_time, duration=duration)
)
db_conn.commit()

search_task_result = SearchTaskResult(
status=search_status,
task_id=task_id,
).dict()
duration=duration,
)

if SearchTaskStatus.FAILED == search_status:
search_task_result.error_log_path = clo_log_path

return search_task_result.dict()
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,21 @@ def __str__(self) -> str:

def to_str(self) -> str:
return str(self.name)


class SearchTaskStatus(IntEnum):
PENDING = 0
RUNNING = auto()
SUCCEEDED = auto()
FAILED = auto()
CANCELLED = auto()

@staticmethod
def from_str(label: str) -> SearchTaskStatus:
return SearchTaskStatus[label.upper()]

def __str__(self) -> str:
return str(self.value)

def to_str(self) -> str:
return str(self.name)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from enum import auto, Enum
from typing import Any, Dict, List, Optional

from job_orchestration.scheduler.constants import CompressionTaskStatus
from job_orchestration.scheduler.constants import CompressionTaskStatus, SearchTaskStatus
from job_orchestration.scheduler.job_config import SearchConfig
from job_orchestration.scheduler.search.reducer_handler import ReducerHandlerMessageQueues
from pydantic import BaseModel, validator
Expand Down Expand Up @@ -39,6 +39,9 @@ class SearchJob(BaseModel):
id: str
search_config: SearchConfig
state: InternalJobState
start_time: Optional[datetime.datetime]
num_archives_to_search: int
num_archives_searched: int
remaining_archives_for_search: List[Dict[str, Any]]
current_sub_job_async_task_result: Optional[Any]
reducer_acquisition_task: Optional[asyncio.Task]
Expand All @@ -49,5 +52,7 @@ class Config: # To allow asyncio.Task and asyncio.Queue


class SearchTaskResult(BaseModel):
success: bool
status: SearchTaskStatus
task_id: str
duration: float
error_log_path: Optional[str]
Loading

0 comments on commit 64e5941

Please sign in to comment.