Skip to content

Commit

Permalink
clp-package: Prevent query scheduler from scheduling two IR extractio…
Browse files Browse the repository at this point in the history
…n jobs targeting the same file split. (y-scope#483)
  • Loading branch information
haiqi96 authored Jul 23, 2024
1 parent 921a388 commit 0becae0
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,54 @@ def create_db_tables(
logger.info(f"Created {component_name} tables.")


def create_results_cache_indices(
instance_id: str,
clp_config: CLPConfig,
container_clp_config: CLPConfig,
mounts: CLPDockerMounts,
):
component_name = RESULTS_CACHE_COMPONENT_NAME
logger.info(f"Creating {component_name} indices...")

container_name = f"clp-{component_name}-indices-creator-{instance_id}"

clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages"
# fmt: off
container_start_cmd = [
"docker", "run",
"-i",
"--network", "host",
"--rm",
"--name", container_name,
"--log-driver", "local",
"-e", f"PYTHONPATH={clp_site_packages_dir}",
"-u", f"{os.getuid()}:{os.getgid()}",
]
# fmt: on
necessary_mounts = [mounts.clp_home, mounts.data_dir, mounts.logs_dir]
for mount in necessary_mounts:
if mount:
container_start_cmd.append("--mount")
container_start_cmd.append(str(mount))
container_start_cmd.append(clp_config.execution_container)

clp_py_utils_dir = clp_site_packages_dir / "clp_py_utils"
# fmt: off
create_tables_cmd = [
"python3",
str(clp_py_utils_dir / "create-results-cache-indices.py"),
"--uri", container_clp_config.results_cache.get_uri(),
"--ir-collection", container_clp_config.results_cache.ir_collection_name,
]
# fmt: on

cmd = container_start_cmd + create_tables_cmd
logger.debug(" ".join(cmd))
subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True)

logger.info(f"Created {component_name} indices.")


def start_queue(instance_id: str, clp_config: CLPConfig):
component_name = QUEUE_COMPONENT_NAME
logger.info(f"Starting {component_name}...")
Expand Down Expand Up @@ -1058,6 +1106,8 @@ def main(argv):
start_redis(instance_id, clp_config, conf_dir)
if target in (ALL_TARGET_NAME, RESULTS_CACHE_COMPONENT_NAME):
start_results_cache(instance_id, clp_config, conf_dir)
if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, RESULTS_CACHE_COMPONENT_NAME):
create_results_cache_indices(instance_id, clp_config, container_clp_config, mounts)
if target in (
ALL_TARGET_NAME,
CONTROLLER_TARGET_NAME,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import argparse
import logging
import sys

from pymongo import IndexModel, MongoClient

# Setup logging
# Create logger
logger = logging.getLogger(__file__)
logger.setLevel(logging.INFO)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)


def main(argv):
args_parser = argparse.ArgumentParser(description="Creates results cache indices for CLP.")
args_parser.add_argument("--uri", required=True, help="URI of the results cache.")
args_parser.add_argument("--ir-collection", required=True, help="Collection for IR metadata.")
parsed_args = args_parser.parse_args(argv[1:])

results_cache_uri = parsed_args.uri
ir_collection_name = parsed_args.ir_collection

try:
with MongoClient(results_cache_uri) as results_cache_client:
ir_collection = results_cache_client.get_default_database()[ir_collection_name]

file_split_id_index = IndexModel(["file_split_id"])
orig_file_id_index = IndexModel(["orig_file_id", "begin_msg_ix", "end_msg_ix"])
ir_collection.create_indexes([file_split_id_index, orig_file_id_index])
except Exception:
logger.exception("Failed to create clp results cache indices.")
return -1

return 0


if "__main__" == __name__:
sys.exit(main(sys.argv))
4 changes: 4 additions & 0 deletions components/core/src/clp/clo/clo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ bool extract_ir(CommandLineArguments const& command_line_args) {
clp::clo::cResultsCacheKeys::OrigFileId,
orig_file_id
),
bsoncxx::builder::basic::kvp(
clp::clo::cResultsCacheKeys::IrOutput::FileSplitId,
file_split_id
),
bsoncxx::builder::basic::kvp(
clp::clo::cResultsCacheKeys::IrOutput::BeginMsgIx,
static_cast<int64_t>(begin_message_ix)
Expand Down
1 change: 1 addition & 0 deletions components/core/src/clp/clo/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ constexpr char OrigFileId[]{"orig_file_id"};

namespace IrOutput {
constexpr char Path[]{"path"};
constexpr char FileSplitId[]{"file_split_id"};
constexpr char BeginMsgIx[]{"begin_msg_ix"};
constexpr char EndMsgIx[]{"end_msg_ix"};
constexpr char IsLastIrChunk[]{"is_last_ir_chunk"};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
# Dictionary of active jobs indexed by job id
active_jobs: Dict[str, QueryJob] = {}

# Dictionary that maps IDs of file splits being extracted to IDs of jobs waiting for them
active_file_split_ir_extractions: Dict[str, List[str]] = {}

reducer_connection_queue: Optional[asyncio.Queue] = None


Expand Down Expand Up @@ -463,12 +466,13 @@ def handle_pending_query_jobs(
db_conn_pool,
clp_metadata_db_conn_params: Dict[str, any],
results_cache_uri: str,
ir_collection_name: str,
num_archives_to_search_per_sub_job: int,
) -> List[asyncio.Task]:
global active_jobs
global active_file_split_ir_extractions

reducer_acquisition_tasks = []

pending_search_jobs = [
job
for job in active_jobs.values()
Expand Down Expand Up @@ -542,10 +546,58 @@ def handle_pending_query_jobs(
logger.error(f"Failed to set job {job_id} as failed")
continue

# NOTE: The following two if blocks should not be reordered since if we first check
# whether *an* IR file has been extracted for the requested file split, it doesn't
# mean that *all* IR files have has been extracted for the file split (since the
# extraction job may still be in progress). Thus, we must first check whether the
# file split is in the process of being extracted, and then check whether it's
# already been extracted.

# Check if the file split is currently being extracted; if so, add the job ID to the
# list of jobs waiting for it.
if file_split_id in active_file_split_ir_extractions:
active_file_split_ir_extractions[file_split_id].append(job_id)
logger.info(
f"Split {file_split_id} is being extracted, so mark job {job_id} as running"
)
if not set_job_or_task_status(
db_conn,
QUERY_JOBS_TABLE_NAME,
job_id,
QueryJobStatus.RUNNING,
QueryJobStatus.PENDING,
start_time=datetime.datetime.now(),
num_tasks=0,
):
logger.error(f"Failed to set job {job_id} as running")
continue

# Check if the file split has already been extracted
if ir_file_exists_for_file_split(
results_cache_uri, ir_collection_name, file_split_id
):
logger.info(
f"Split {file_split_id} already extracted, so mark job {job_id} as done"
)
if not set_job_or_task_status(
db_conn,
QUERY_JOBS_TABLE_NAME,
job_id,
QueryJobStatus.SUCCEEDED,
QueryJobStatus.PENDING,
start_time=datetime.datetime.now(),
num_tasks=0,
duration=0,
):
logger.error(f"Failed to set job {job_id} as succeeded")
continue

active_file_split_ir_extractions[file_split_id] = [job_id]
extract_ir_config.file_split_id = file_split_id
new_extract_ir_job = ExtractIrJob(
id=job_id,
archive_id=archive_id,
file_split_id=file_split_id,
extract_ir_config=extract_ir_config,
state=InternalJobState.WAITING_FOR_DISPATCH,
)
Expand Down Expand Up @@ -633,6 +685,15 @@ def found_max_num_latest_results(
return max_timestamp_in_remaining_archives <= min_timestamp_in_top_results


def ir_file_exists_for_file_split(
results_cache_uri: str, ir_collection_name: str, file_split_id: str
):
with pymongo.MongoClient(results_cache_uri) as results_cache_client:
ir_collection = results_cache_client.get_default_database()[ir_collection_name]
results_count = ir_collection.count_documents({"file_split_id": file_split_id})
return 0 != results_count


async def handle_finished_search_job(
db_conn, job: SearchJob, task_results: Optional[Any], results_cache_uri: str
) -> None:
Expand Down Expand Up @@ -723,8 +784,10 @@ async def handle_finished_extract_ir_job(
db_conn, job: ExtractIrJob, task_results: Optional[Any]
) -> None:
global active_jobs
global active_file_split_ir_extractions

job_id = job.id
file_split_id = job.file_split_id
new_job_status = QueryJobStatus.SUCCEEDED
num_tasks = len(task_results)
if 1 != num_tasks:
Expand Down Expand Up @@ -761,6 +824,22 @@ async def handle_finished_extract_ir_job(
logger.info(f"Completed IR extraction job {job_id}.")
else:
logger.info(f"Completed IR extraction job {job_id} with failing tasks.")

waiting_jobs = active_file_split_ir_extractions[file_split_id]
waiting_jobs.remove(job_id)
for waiting_job in waiting_jobs:
logger.info(f"Setting status to {new_job_status.to_str()} for waiting jobs: {waiting_job}.")
set_job_or_task_status(
db_conn,
QUERY_JOBS_TABLE_NAME,
waiting_job,
new_job_status,
QueryJobStatus.RUNNING,
num_tasks_completed=0,
duration=(datetime.datetime.now() - job.start_time).total_seconds(),
)

del active_file_split_ir_extractions[file_split_id]
del active_jobs[job_id]


Expand Down Expand Up @@ -819,6 +898,7 @@ async def handle_jobs(
db_conn_pool,
clp_metadata_db_conn_params: Dict[str, any],
results_cache_uri: str,
ir_collection_name: str,
jobs_poll_delay: float,
num_archives_to_search_per_sub_job: int,
) -> None:
Expand All @@ -832,6 +912,7 @@ async def handle_jobs(
db_conn_pool,
clp_metadata_db_conn_params,
results_cache_uri,
ir_collection_name,
num_archives_to_search_per_sub_job,
)
if 0 == len(reducer_acquisition_tasks):
Expand Down Expand Up @@ -915,6 +996,7 @@ async def main(argv: List[str]) -> int:
True
),
results_cache_uri=clp_config.results_cache.get_uri(),
ir_collection_name=clp_config.results_cache.ir_collection_name,
jobs_poll_delay=clp_config.query_scheduler.jobs_poll_delay,
num_archives_to_search_per_sub_job=batch_size,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def get_config(self) -> QueryJobConfig: ...

class ExtractIrJob(QueryJob):
extract_ir_config: ExtractIrJobConfig
file_split_id: str
archive_id: str

def get_type(self) -> QueryJobType:
Expand Down

0 comments on commit 0becae0

Please sign in to comment.