Skip to content

Commit

Permalink
More refactoring and simplification of search_scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
gibber9809 committed Jan 23, 2024
1 parent a6332c3 commit 48ba779
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 200 deletions.
1 change: 1 addition & 0 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

CLP_DEFAULT_CREDENTIALS_FILE_PATH = pathlib.Path('etc') / 'credentials.yml'
CLP_METADATA_TABLE_PREFIX = 'clp_'
SEARCH_JOBS_TABLE_NAME = 'distributed_search_jobs'


class Database(BaseModel):
Expand Down

This file was deleted.

Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from celery import Celery
from . import celeryconfig # type: ignore

app = Celery("search")
app.config_from_object(celeryconfig)
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import os

from job_orchestration.scheduler.constants import QueueName

imports = ("job_orchestration.executor.search.fs_search_task")

task_routes = {
'job_orchestration.executor.search.fs_search_task.search': QueueName.SEARCH,
}

broker_url = os.getenv('BROKER_URL')
result_backend = os.getenv('RESULT_BACKEND')

result_persistent = True

# Differentiate between tasks that have started v.s. tasks still in queue
task_track_started = True

accept_content = [
"application/json", # json
"application/x-python-serialize", # pickle
]

result_accept_content = [
"application/json", # json
"application/x-python-serialize", # pickle
]

# TODO: Find out how to precisely specify the serialization format for both the
# task (args + kwargs) and the task return value (instead of using json/pickle
# for everything). See also:
# https://stackoverflow.com/questions/69531560/how-do-you-configure-celery-to-use-serializer-pickle
# https://docs.celeryq.dev/en/stable/internals/protocol.html#task-messages
result_serializer = "json"
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import logging
import os
import sys
import signal
import subprocess
from pathlib import Path
from typing import Any, Dict

from celery.app.task import Task
from celery.utils.log import get_task_logger

from clp_py_utils.clp_logging import get_logging_formatter, set_logging_level

from job_orchestration.executor.search.celery import app
from job_orchestration.job_config import SearchConfig


logger = get_task_logger(__name__)

@app.task(bind=True)
def search(
self: Task,
job_id: str,
search_config: SearchConfig,
archive_id: str,
results_cache_uri: str,
) -> Dict[str, Any]:
task_id = str(self.request.id)
clp_home = Path(os.getenv("CLP_HOME"))
archive_directory = Path(os.getenv('CLP_ARCHIVE_OUTPUT_DIR'))
clp_logs_dir = Path(os.getenv("CLP_LOGS_DIR"))
clp_logging_level = str(os.getenv("CLP_LOGGING_LEVEL"))

# Setup logging to file
worker_logs_dir = clp_logs_dir / job_id
worker_logs_dir.mkdir(exist_ok=True, parents=True)
worker_logs = worker_logs_dir / f"{task_id}.log"
logging_file_handler = logging.FileHandler(filename=worker_logs, encoding="utf-8")
logging_file_handler.setFormatter(get_logging_formatter())
logger.addHandler(logging_file_handler)
set_logging_level(logger, clp_logging_level)
stderr_log_path = worker_logs_dir / f"{task_id}-stderr.log"
stderr_log_file = open(stderr_log_path, "w")

logger.info(f"Started job {job_id}. Task Id={task_id}.")

search_cmd = [
str(clp_home / "bin" / "clo"),
results_cache_uri,
job_id,
str(archive_directory / archive_id),
search_config.wildcard_query,
]

if search_config.begin_timestamp is not None:
search_cmd.append('--tge')
search_cmd.append(str(search_config.begin_timestamp))
if search_config.end_timestamp is not None:
search_cmd.append('--tle')
search_cmd.append(str(search_config.end_timestamp))
if search_config.path_filter is not None:
search_cmd.append(search_config.path_filter)

logger.info(f'Searching: {" ".join(search_cmd)}')
search_successful = False
search_proc = subprocess.Popen(
search_cmd,
preexec_fn=os.setpgrp,
close_fds=True,
stdout=stderr_log_file,
stderr=stderr_log_file,
)

def sigterm_handler(_signo, _stack_frame):
logger.debug("Entered sigterm handler")
if search_proc.poll() is None:
logger.debug("try to kill search process")
# kill with group id for when we're running both obs and clo
os.killpg(os.getpgid(search_proc.pid), signal.SIGTERM)
os.waitpid(search_proc.pid, 0)
logger.info(f"Cancelling search task: {task_id}")
logger.debug(f"Exiting with error code {_signo + 128}")
sys.exit(_signo + 128)

# Register the function to kill the child process at exit
signal.signal(signal.SIGTERM, sigterm_handler)

logger.info("Waiting for search to finish...")
search_proc.communicate()
return_code = search_proc.returncode
if 0 != return_code:
logger.error(f"Failed to search, job {job_id}. Task Id={task_id}, return_code={return_code}")
else:
search_successful = True
logger.info(f"Search completed for job {job_id}. Task Id={task_id}")

# Close log files
stderr_log_file.close()
logger.removeHandler(logging_file_handler)
logging_file_handler.close()

results = {
'status': search_successful,
'job_id': job_id,
'task_id': task_id,
}

return results
4 changes: 1 addition & 3 deletions components/job-orchestration/job_orchestration/job_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ class ClpIoConfig(BaseModel):


class SearchConfig(BaseModel):
search_controller_host: str
search_controller_port: int
wildcard_query: str
query_string: str
begin_timestamp: typing.Optional[int] = None
end_timestamp: typing.Optional[int] = None
path_filter: typing.Optional[str] = None
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ class JobStatus(IntEnum):
FAILED = auto()
CANCELLING = auto()
CANCELLED = auto()
NO_MATCHING_ARCHIVE = auto()

def __str__(self) -> str:
return str(self.value).lower()
Expand Down

This file was deleted.

Loading

0 comments on commit 48ba779

Please sign in to comment.