diff --git a/docker/main/ngen-calibration/entrypoint.sh b/docker/main/ngen-calibration/entrypoint.sh index c4d054610..f895e6eeb 100755 --- a/docker/main/ngen-calibration/entrypoint.sh +++ b/docker/main/ngen-calibration/entrypoint.sh @@ -1,50 +1,94 @@ -#!/bin/sh -# Managed by the _generate_docker_cmd_args function in scheduler.py of dmod.scheduler -# -# $1 will have the number of nodes associated with this run -# $2 will have comma-delimited host strings in MPI form; e.g., hostname:N,hostname:M -# $3 will have the unique job id -# $4 is the worker index -# $5 will be the name of the output dataset (which will imply a directory location) -# $6 will be the name of the hydrofabric dataset (which will imply a directory location) -# $7 will be the name of the realization configuration dataset (which will imply a directory location) -# $8 will be the name of the BMI configuration dataset (which will imply a directory location) -# $9 will be the name of the partition configuration dataset (which will imply a directory location) -# TODO: wire up $10 -# $10 will be the name of the calibration configuration dataset (which will imply a directory location) - -# Not yet supported -# no-op -MPI_NODE_COUNT="${1:?No MPI node count given}" -# no-op -MPI_HOST_STRING="${2:?No MPI host string given}" -# no-op -PARTITION_DATASET_NAME="${9:?}" - -JOB_ID=${3:?No Job id given} -WORKER_INDEX=${4:?No worker index given} - -OUTPUT_DATASET_NAME="${5:?}" -HYDROFABRIC_DATASET_NAME="${6:?}" -REALIZATION_CONFIG_DATASET_NAME="${7:?}" -BMI_CONFIG_DATASET_NAME="${8:?}" -CALIBRATION_CONFIG_DATASET_NAME="${10:?}" +#!/bin/bash +# Args are managed by the _generate_docker_cmd_args function in scheduler.py of dmod.scheduler +# TODO: Docker secret variable values need to be parameterized ACCESS_KEY_SECRET="object_store_exec_user_name" SECRET_KEY_SECRET="object_store_exec_user_passwd" DOCKER_SECRETS_DIR="/run/secrets" ACCESS_KEY_FILE="${DOCKER_SECRETS_DIR}/${ACCESS_KEY_SECRET}" SECRET_KEY_FILE="${DOCKER_SECRETS_DIR}/${SECRET_KEY_SECRET}" +MPI_USER="mpi" +if [ "$(whoami)" = "${MPI_USER}" ]; then + MPI_HOSTS_FILE="${HOME}/.mpi_hosts" +else + MPI_HOSTS_FILE="$(su ${MPI_USER} -c 'echo "${HOME}"')/.mpi_hosts" +fi +RUN_SENTINEL="/home/${MPI_USER}/.run_sentinel" + +MPI_RUN="mpirun" + +NGEN_SERIAL_EXECUTABLE="/ngen/ngen/cmake_build_serial/ngen" +NGEN_PARALLEL_EXECUTABLE="/ngen/ngen/cmake_build_parallel/ngen" +# This will be symlinked to the parallel one currently NGEN_EXECUTABLE="/ngen/ngen/cmake_build/ngen" ALL_DATASET_DIR="/dmod/datasets" -OUTPUT_DATASET_DIR="${ALL_DATASET_DIR}/output/${OUTPUT_DATASET_NAME}" -HYDROFABRIC_DATASET_DIR="${ALL_DATASET_DIR}/hydrofabric/${HYDROFABRIC_DATASET_NAME}" -REALIZATION_CONFIG_DATASET_DIR="${ALL_DATASET_DIR}/config/${REALIZATION_CONFIG_DATASET_NAME}" -BMI_CONFIG_DATASET_DIR="${ALL_DATASET_DIR}/config/${BMI_CONFIG_DATASET_NAME}" -PARTITION_DATASET_DIR="${ALL_DATASET_DIR}/config/${PARTITION_DATASET_NAME}" -CALIBRATION_CONFIG_DATASET_DIR="${ALL_DATASET_DIR}/config/${CALIBRATION_CONFIG_DATASET_NAME}" + +while [ ${#} -gt 0 ]; do + case "${1}" in + --config-dataset) + CONFIG_DATASET_NAME="${2:?}" + shift + ;; + --host-string) + MPI_HOST_STRING="${2:?}" + shift + ;; + --hydrofabric-dataset) + HYDROFABRIC_DATASET_NAME="${2:?}" + shift + ;; + --job-id) + JOB_ID="${2:?}" + shift + ;; + --node-count) + MPI_NODE_COUNT="${2:?}" + shift + ;; + --output-dataset) + OUTPUT_DATASET_NAME="${2:?}" + shift + ;; + --partition-dataset) + PARTITION_DATASET_NAME="${2:?}" + shift + ;; + --worker-index) + WORKER_INDEX="${2:?}" + shift + ;; + esac + shift +done + +# Run some sanity checks +# TODO: not sure if this is appropriate or not for calibration exec +if [ -z "${MPI_HOST_STRING:?No MPI hosts string given}" ]; then + echo "Error: MPI host string is empty" > 2>&1 + exit 1 +fi +# Using complement of valid range to catch non-integer values +if ! [ "${WORKER_INDEX:?No MPI worker index/rank given}" -ge 0 ] 2>/dev/null; then + echo "Error: invalid value '${WORKER_INDEX}' given for MPI worker index/rank" > 2>&1 + exit 1 +fi +if ! [ "${MPI_NODE_COUNT:?No MPI node count provided}" -gt 0 ] 2>/dev/null; then + echo "Error: invalid value '${MPI_NODE_COUNT}' given for MPI node count" > 2>&1 + exit 1 +fi + +# These serve as both sanity checks and initialization of some derived values +OUTPUT_DATASET_DIR="${ALL_DATASET_DIR:?}/output/${OUTPUT_DATASET_NAME:?No output dataset provided}" +HYDROFABRIC_DATASET_DIR="${ALL_DATASET_DIR}/hydrofabric/${HYDROFABRIC_DATASET_NAME:?No hydrofabric dataset provided}" +CONFIG_DATASET_DIR="${ALL_DATASET_DIR}/config/${CONFIG_DATASET_NAME:?No config dataset provided}" + +# Require a partitioning config unless using just a single node for serial exec +# TODO: account for CPU count more appropriately when that gets merged in for ngen image +if [ ${MPI_NODE_COUNT:?No MPI node count provided} -gt 1 ]; then + PARTITION_DATASET_DIR="${ALL_DATASET_DIR:?}/config/${PARTITION_DATASET_NAME:?No partition config dataset name given}" +fi print_date() { date "+%Y-%m-%d,%H:%M:%S" @@ -83,12 +127,9 @@ load_object_store_keys_from_docker_secrets() { start_calibration() { # Start ngen calibration echo "$(print_date) Starting serial ngen calibration" - # CALIBRATION_CONFIG_FILE=${CALIBRATION_CONFIG_DATASET_DIR}/$(basename $(find ${CALIBRATION_CONFIG_DATASET_DIR} -name "*.yaml" -maxdepth 1 | head -1)) - # TODO: move this to CALIBRATION_CONFIG_DATASET_DIR - # NOTE: assumes that calibration dataset will be in realization config dataset AND that it is - # the only yaml file at the top level of that dataset. - CALIBRATION_CONFIG_FILE=${REALIZATION_CONFIG_DATASET_DIR}/$(basename $(find ${REALIZATION_CONFIG_DATASET_DIR} -name "*.yaml" -maxdepth 1 | head -1)) + # Find and use copy of config in output dataset + CALIBRATION_CONFIG_FILE=$(find ${OUTPUT_DATASET_DIR} -type f -iname "*.yaml" -o -iname "*.yml" -maxdepth 1 | head -1) if [ -z "${CALIBRATION_CONFIG_FILE}" ]; then echo "Error: NGEN calibration yaml file not found" 2>&1 @@ -106,12 +147,23 @@ start_calibration() { } # Sanity check that the output, hydrofabric, and config datasets are available (i.e., their directories are in place) -check_for_dataset_dir "${REALIZATION_CONFIG_DATASET_DIR}" -check_for_dataset_dir "${BMI_CONFIG_DATASET_DIR}" -check_for_dataset_dir "${PARTITION_DATASET_DIR}" -check_for_dataset_dir "${HYDROFABRIC_DATASET_DIR}" check_for_dataset_dir "${OUTPUT_DATASET_DIR}" -# check_for_dataset_dir "${CALIBRATION_CONFIG_DATASET_DIR}" +check_for_dataset_dir "${CONFIG_DATASET_DIR}" +if [ -n "${PARTITION_DATASET_DIR:-}" ]; then + check_for_dataset_dir "${PARTITION_DATASET_DIR:?No partition dataset directory defined}" +fi +check_for_dataset_dir "${HYDROFABRIC_DATASET_DIR}" + +# Copy config files to output dataset for record keeping, but only from the "main" worker node +if [ ${WORKER_INDEX} -eq 0 ]; then + # TODO: perform copy of configs to output dataset outside of image (in service) for better performance + cp -a ${CONFIG_DATASET_DIR}/. ${OUTPUT_DATASET_DIR} + if [ -n "${PARTITION_DATASET_DIR:-}" ]; then + # Also, when partition config present, copy that for record keeping + # TODO: perform copy of configs to output dataset outside of image (in service) for better performance + cp -a ${PARTITION_DATASET_DIR}/. ${OUTPUT_DATASET_DIR} + fi +fi # Move to the output dataset mounted directory cd ${OUTPUT_DATASET_DIR} diff --git a/docker/main/ngen/entrypoint.sh b/docker/main/ngen/entrypoint.sh index ddd97583b..1f944d87d 100755 --- a/docker/main/ngen/entrypoint.sh +++ b/docker/main/ngen/entrypoint.sh @@ -1,29 +1,7 @@ -#!/bin/sh -# Managed by the _generate_docker_cmd_args function in scheduler.py of dmod.scheduler -# -# $1 will have the number of nodes associated with this run -# $2 will have comma-delimited host strings in MPI form; e.g., hostname:N,hostname:M -# $3 will have the unique job id -# $4 is the worker index -# $5 will be the name of the output dataset (which will imply a directory location) -# $6 will be the name of the hydrofabric dataset (which will imply a directory location) -# $7 will be the name of the realization configuration dataset (which will imply a directory location) -# $8 will be the name of the BMI configuration dataset (which will imply a directory location) -# $9 will be the name of the partition configuration dataset (which will imply a directory location) - -MPI_NODE_COUNT="${1:?No MPI node count given}" -MPI_HOST_STRING="${2:?No MPI host string given}" -JOB_ID=${3:?No Job id given} -WORKER_INDEX=${4:?No worker index given} -OUTPUT_DATASET_NAME="${5:?}" -HYDROFABRIC_DATASET_NAME="${6:?}" -REALIZATION_CONFIG_DATASET_NAME="${7:?}" -BMI_CONFIG_DATASET_NAME="${8:?}" -# Don't require a partitioning config when only using a single node -if [ ${MPI_NODE_COUNT:?} -gt 1 ]; then - PARTITION_DATASET_NAME="${9:?No argument for partition config dataset when expecting one for MPI-based job}" -fi +#!/bin/bash +# Args are managed by the _generate_docker_cmd_args function in scheduler.py of dmod.scheduler +# TODO: Docker secret variable values need to be parameterized ACCESS_KEY_SECRET="object_store_exec_user_name" SECRET_KEY_SECRET="object_store_exec_user_passwd" DOCKER_SECRETS_DIR="/run/secrets" @@ -36,6 +14,7 @@ if [ "$(whoami)" = "${MPI_USER}" ]; then else MPI_HOSTS_FILE="$(su ${MPI_USER} -c 'echo "${HOME}"')/.mpi_hosts" fi +RUN_SENTINEL="/home/${MPI_USER}/.run_sentinel" MPI_RUN="mpirun" #NGEN_EXECUTABLE="ngen" @@ -45,13 +24,68 @@ NGEN_PARALLEL_EXECUTABLE="/ngen/ngen/cmake_build_parallel/ngen" NGEN_EXECUTABLE="/ngen/ngen/cmake_build/ngen" ALL_DATASET_DIR="/dmod/datasets" -OUTPUT_DATASET_DIR="${ALL_DATASET_DIR}/output/${OUTPUT_DATASET_NAME}" -HYDROFABRIC_DATASET_DIR="${ALL_DATASET_DIR}/hydrofabric/${HYDROFABRIC_DATASET_NAME}" -REALIZATION_CONFIG_DATASET_DIR="${ALL_DATASET_DIR}/config/${REALIZATION_CONFIG_DATASET_NAME}" -BMI_CONFIG_DATASET_DIR="${ALL_DATASET_DIR}/config/${BMI_CONFIG_DATASET_NAME}" + +while [ ${#} -gt 0 ]; do + case "${1}" in + --config-dataset) + CONFIG_DATASET_NAME="${2:?}" + shift + ;; + --host-string) + MPI_HOST_STRING="${2:?}" + shift + ;; + --hydrofabric-dataset) + HYDROFABRIC_DATASET_NAME="${2:?}" + shift + ;; + --job-id) + JOB_ID="${2:?}" + shift + ;; + --node-count) + MPI_NODE_COUNT="${2:?}" + shift + ;; + --output-dataset) + OUTPUT_DATASET_NAME="${2:?}" + shift + ;; + --partition-dataset) + PARTITION_DATASET_NAME="${2:?}" + shift + ;; + --worker-index) + WORKER_INDEX="${2:?}" + shift + ;; + esac + shift +done + +# Run some sanity checks +if [ -z "${MPI_HOST_STRING:?No MPI hosts string given}" ]; then + echo "Error: MPI host string is empty" > 2>&1 + exit 1 +fi +# Using complement of valid range to catch non-integer values +if ! [ "${WORKER_INDEX:?No MPI worker index/rank given}" -ge 0 ] 2>/dev/null; then + echo "Error: invalid value '${WORKER_INDEX}' given for MPI worker index/rank" > 2>&1 + exit 1 +fi +if ! [ "${MPI_NODE_COUNT:?No MPI node count provided}" -gt 0 ] 2>/dev/null; then + echo "Error: invalid value '${MPI_NODE_COUNT}' given for MPI node count" > 2>&1 + exit 1 +fi + +# These serve as both sanity checks and initialization of some derived values +OUTPUT_DATASET_DIR="${ALL_DATASET_DIR:?}/output/${OUTPUT_DATASET_NAME:?No output dataset provided}" +HYDROFABRIC_DATASET_DIR="${ALL_DATASET_DIR}/hydrofabric/${HYDROFABRIC_DATASET_NAME:?No hydrofabric dataset provided}" +CONFIG_DATASET_DIR="${ALL_DATASET_DIR}/config/${CONFIG_DATASET_NAME:?No config dataset provided}" + # Check if parallel processing is in effect and partition dataset is needed by testing node count or 1st node CPU count -if [ ${MPI_NODE_COUNT:?} -gt 1 ] || [ $(echo "${MPI_HOST_STRING}" | sed 's/,//' | awk -F: '{print $2}') -gt 1 ] 2>/dev/null; then - PARTITION_DATASET_DIR="${ALL_DATASET_DIR}/config/${PARTITION_DATASET_NAME:?No partition config dataset name for directory}" +if [ ${MPI_NODE_COUNT:?No MPI node count provided} -gt 1 ] || [ $(echo "${MPI_HOST_STRING}" | sed 's/,//' | awk -F: '{print $2}') -gt 1 ] 2>/dev/null; then + PARTITION_DATASET_DIR="${ALL_DATASET_DIR:?}/config/${PARTITION_DATASET_NAME:?No partition config dataset name given}" # Note that, if the above test is "false" (in particular, the CPU count check) we should ensure the host string is valid # Catch false negative due to invalid CPU count/format by taking complement of whether 1st CPU count is greater than -1 # Any bogus value will result in the pre-complemented test being "false" @@ -60,8 +94,6 @@ elif ! [ $(echo "${MPI_HOST_STRING}" | sed 's/,//' | awk -F: '{print $2}') -gt - exit 1 fi -RUN_SENTINEL="/home/${MPI_USER}/.run_sentinel" - print_date() { date "+%Y-%m-%d,%H:%M:%S" @@ -99,6 +131,18 @@ load_object_store_keys_from_docker_secrets() test -n "${ACCESS_KEY:-}" && test -n "${SECRET_KEY:-}" } +close_remote_workers() +{ + # Signal to remote worker nodes that they can stop their SSH process by removing this file + if [ ${MPI_NODE_COUNT:-1} -gt 1 ]; then + for i in $(echo "${MPI_HOST_STRING}" | sed 's/,/ /g'); do + _HOST_NAME=$(echo "${i}" | awk -F: '{print $1}') + ssh -q ${_HOST_NAME} rm ${RUN_SENTINEL} >/dev/null 2>&1 + done + echo "$(print_date) DEBUG: closed other worker SSH processes" + fi +} + exec_main_worker_ngen_run() { # Write (split) hoststring to a proper file @@ -131,7 +175,7 @@ exec_main_worker_ngen_run() ${MPI_RUN:?} -f "${MPI_HOSTS_FILE}" -n ${_TOTAL_CPUS} \ ${NGEN_EXECUTABLE:?} ${HYDROFABRIC_DATASET_DIR}/catchment_data.geojson "" \ ${HYDROFABRIC_DATASET_DIR}/nexus_data.geojson "" \ - ${REALIZATION_CONFIG_DATASET_DIR}/realization_config.json \ + ${CONFIG_DATASET_DIR}/realization_config.json \ ${PARTITION_DATASET_DIR}/partition_config.json \ --subdivided-hydrofabric @@ -140,14 +184,6 @@ exec_main_worker_ngen_run() echo "$(print_date) ngen mpirun command finished with return value: ${NGEN_RETURN}" - # Close the other workers by removing this file - for i in $(echo "${MPI_HOST_STRING}" | sed 's/,/ /g'); do - _HOST_NAME=$(echo "${i}" | awk -F: '{print $1}') - ssh -q ${_HOST_NAME} rm ${RUN_SENTINEL} >/dev/null 2>&1 - done - - echo "$(print_date) DEBUG: closed other worker SSH processes" - # Exit with the model's exit code return ${NGEN_RETURN} } @@ -160,7 +196,7 @@ exec_serial_ngen_run() echo "$(print_date) Executing serial build of ngen" ${NGEN_SERIAL_EXECUTABLE:?} ${HYDROFABRIC_DATASET_DIR}/catchment_data.geojson "" \ ${HYDROFABRIC_DATASET_DIR}/nexus_data.geojson "" \ - ${REALIZATION_CONFIG_DATASET_DIR}/realization_config.json + ${CONFIG_DATASET_DIR}/realization_config.json #Capture the return value to use as service exit code NGEN_RETURN=$? @@ -172,8 +208,7 @@ exec_serial_ngen_run() } # Sanity check that the output, hydrofabric, and config datasets are available (i.e., their directories are in place) -check_for_dataset_dir "${REALIZATION_CONFIG_DATASET_DIR}" -check_for_dataset_dir "${BMI_CONFIG_DATASET_DIR}" +check_for_dataset_dir "${CONFIG_DATASET_DIR}" if [ -n "${PARTITION_DATASET_DIR:-}" ]; then check_for_dataset_dir "${PARTITION_DATASET_DIR}" fi @@ -183,9 +218,24 @@ check_for_dataset_dir "${OUTPUT_DATASET_DIR}" # Move to the output dataset mounted directory cd ${OUTPUT_DATASET_DIR} + +cleanup_sshuser_exit() +{ + if [ -n "${_SSH_D_PID:-}" ] && kill -s 0 "${_SSH_D_PID}" 2>/dev/null ; then + kill "${_SSH_D_PID}" + fi +} + if [ "${WORKER_INDEX}" = "0" ]; then if [ "$(whoami)" = "${MPI_USER}" ]; then + trap close_remote_workers EXIT + # Have "main" worker copy config files to output dataset for record keeping + # TODO: perform copy of configs to output dataset outside of image (in service) for better performance + cp -a ${CONFIG_DATASET_DIR}/. ${OUTPUT_DATASET_DIR} if [ -n "${PARTITION_DATASET_DIR:-}" ]; then + # Include partition config dataset too if appropriate + # TODO: perform copy of configs to output dataset outside of image (in service) for better performance + cp -a ${PARTITION_DATASET_DIR}/. ${OUTPUT_DATASET_DIR} exec_main_worker_ngen_run else exec_serial_ngen_run @@ -195,6 +245,8 @@ if [ "${WORKER_INDEX}" = "0" ]; then /usr/sbin/sshd -D & _SSH_D_PID="$!" + trap cleanup_sshuser_exit EXIT + # Start the SSH daemon as a power user, but then actually run the model as our MPI_USER echo "$(print_date) Running exec script as '${MPI_USER:?}'" # Do this by just re-running this script with the same args, but as the other user @@ -202,20 +254,17 @@ if [ "${WORKER_INDEX}" = "0" ]; then _EXEC_STRING="${0} ${@}" su ${MPI_USER:?} --session-command "${_EXEC_STRING}" #time su ${MPI_USER:?} --session-command "${_EXEC_STRING}" - - # Once running the model finishes, kill the SSH daemon process - kill ${_SSH_D_PID} fi else echo "$(print_date) Starting SSH daemon, waiting for main job" /usr/sbin/sshd -D & _SSH_D_PID="$!" + trap cleanup_sshuser_exit EXIT + touch ${RUN_SENTINEL} chown ${MPI_USER} ${RUN_SENTINEL} - while [ -e ${RUN_SENTINEL} ]; do + while [ -e ${RUN_SENTINEL} ] && kill -s 0 "${_SSH_D_PID}" 2>/dev/null ; do sleep 5 done - - kill ${_SSH_D_PID} fi \ No newline at end of file diff --git a/docker/main/nwm/run_model.sh b/docker/main/nwm/run_model.sh index 7a192a57f..3d071c534 100755 --- a/docker/main/nwm/run_model.sh +++ b/docker/main/nwm/run_model.sh @@ -1,25 +1,55 @@ #!/bin/bash -# $1 will have the number of nodes associated with this run -# $2 will have the host string in MPI form, i.e. hostname:N, hostname:M -# $3 will have the unique job id -# $4 is optional, and if set is the index of the distributed job/s this entrypoint is responsible for -# if the idx is 0, this is the main MPI worker responsible for launching the job, otherwise, it is a worker -# that simply needs to start the SSH daemon and wait -if [ "x$4" != "x" ] && [ $4 == "0" ] +while [ ${#} -gt 0 ]; do + case "${1}" in +# TODO: most likely, we are eventually going to need these available too for jobs to actually function +# --config-dataset) +# CONFIG_DATASET_NAME="${2:?}" +# shift +# ;; +# --output-dataset) +# OUTPUT_DATASET_NAME="${2:?}" +# shift +# ;; + --host-string) + # MPI host string; i.e., comma delimited hostnames and per-host cpu counts (e.g. hostname:N,hostname:M) + MPI_HOST_STRING="${2:?}" + shift + ;; + --job-id) + # A unique id for the job being executed + JOB_ID="${2:?}" + shift + ;; + --node-count) + # The number of distinct hosts/nodes in the job (different from cpu count) + MPI_NODE_COUNT="${2:?}" + shift + ;; + --worker-index) + # Optional index of the distributed job/s this entrypoint is responsible for + # if the idx is 0, this is the main MPI worker responsible for launching the job, otherwise, it is a worker + # that simply needs to start the SSH daemon and wait + WORKER_INDEX="${2:?}" + shift + ;; + esac + shift +done + +if [ "x$WORKER_INDEX" != "x" ] && [ $WORKER_INDEX == "0" ] then echo "Starting SSH daemon on main worker" sudo /usr/sbin/sshd -D & -num_hosts=$1 -num_hosts=$((num_hosts + 0)) -req_id=$3 +MPI_NODE_COUNT=$((MPI_NODE_COUNT + 0)) #Setup the runtime paths cd ${WORKDIR} domain_location=${WORKDIR}/domains output_dir=${WORKDIR}/output #Make a temp working dir -tmp_domain=$output_dir/tmp_${req_id} +# TODO: this needs to be the output dataset +tmp_domain=$output_dir/tmp_${JOB_ID} mkdir -p $tmp_domain #Link the static domain and runtime tables to the working dir ln -s $domain_location/* $tmp_domain/ @@ -27,7 +57,7 @@ ln -s ${WORKDIR}/*.TBL $tmp_domain/ cd $tmp_domain # write hoststring to file -echo $2 >> hostfile +echo $MPI_HOST_STRING >> hostfile total_cpus=0 #Determine total CPUS and make sure hosts are running ssh diff --git a/python/lib/scheduler/dmod/scheduler/scheduler.py b/python/lib/scheduler/dmod/scheduler/scheduler.py index 092164097..bda21bd99 100644 --- a/python/lib/scheduler/dmod/scheduler/scheduler.py +++ b/python/lib/scheduler/dmod/scheduler/scheduler.py @@ -391,28 +391,28 @@ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]: raise RuntimeError("Unexpected request type {}: cannot build Docker CMD arg list".format( job.model_request.__class__.__name__)) - # For now at least, all image arg lists start the same way (first 3 args: node count, host string, and job id) - # TODO: this probably should be a documented standard for any future entrypoints - # TODO (later): probably need to move all types to recognize and use explicit flags rather than order arguments - docker_cmd_args = [str(len(job.allocations)), self.build_host_list(job), str(job.job_id)] + # For now at least, all image args sets will have these (i.e, node count, host string, and job id) + docker_cmd_arg_map = {"--node-count": str(len(job.allocations)), "--host-string": self.build_host_list(job), + "--job-id": str(job.job_id), "--worker-index": str(worker_index)} if isinstance(job.model_request, AbstractNgenRequest): - docker_cmd_args.extend(self._generate_nextgen_job_docker_cmd_args(job, worker_index)) + docker_cmd_arg_map.update(self._generate_nextgen_job_docker_cmd_args(job, worker_index)) - return docker_cmd_args + # Finally, convert the args map to a list, with each "flag"/key immediately preceding its value + args_as_list = [] + for flag_key, param_val in docker_cmd_arg_map.values(): + args_as_list.append(flag_key) + args_as_list.append(param_val) + return args_as_list - def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]: + def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) -> Dict[str, str]: """ - Prepare the specific Docker CMD arg applicable to Nextgen-based jobs, which start with the 4th positional arg. + Prepare the specific Docker CMD arg applicable to Nextgen-based jobs Generate the necessary Docker CMD arguments required for starting a specified worker container that is part of a - Nextgen-based job executed within Docker. In general, this function should not be used except when called by - ::method:`_generate_docker_cmd_args`. Further, it only applies to Nextgen-based jobs: i.e., ngen model exec or - ngen calibration jobs. - - Since the general form of the required Docker CMD args (i.e., for any type, not only Nextgen-based jobs) - generated by ::method:`_generate_docker_cmd_args` always begins with the same 3 positional args, this function - effectively starts by producing positional argument number 4 and generates the remaining necessary CMD args. + Nextgen-based job executed within Docker, beyond CMD args universal to all jobs. In general, this function + should not be used except when called by ::method:`_generate_docker_cmd_args`. Further, it only applies to + Nextgen-based jobs: i.e., ngen model exec or ngen calibration jobs. Parameters ---------- @@ -423,9 +423,8 @@ def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) - Returns ------- - List[str] - The sublist (with index ``0`` of the sublist corresponding to index ``3`` of the final list) of Docker CMD - args for the associated job worker container. + Dict[str, str] + The Docker CMD param flags and values. See Also ------- @@ -436,51 +435,24 @@ def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) - msg = "Cannot generate Nextgen-base Docker job CMD args for job {} with request of {} type" raise RuntimeError(msg.format(str(job.job_id), job.model_request.__class__.__name__)) - # Remember, this list will start with $4 in the eventual complete Docker CMD list - ngen_cmd_args = [] - - # $4 is the worker index (where index 0 is assumed to be the lead node) - ngen_cmd_args.append(str(worker_index)) - - # $5 is the name of the output dataset (which will imply a directory location) - output_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.OUTPUT, max_count=1) - ngen_cmd_args.append(output_dataset_names[0]) + ngen_cmd_arg_map = { + "--output-dataset": self._ds_names_helper(job, worker_index, DataCategory.OUTPUT, max_count=1)[0], + "--hydrofabric-dataset": self._ds_names_helper(job, worker_index, DataCategory.HYDROFABRIC, max_count=1)[0], + "--config-dataset": self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1, + data_format=DataFormat.NGEN_JOB_COMPOSITE_CONFIG)[0], + } - # $6 is the name of the hydrofabric dataset (which will imply a directory location) - hydrofabric_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.HYDROFABRIC, max_count=1) - ngen_cmd_args.append(hydrofabric_dataset_names[0]) - - # $7 is the name of the realization configuration dataset (which will imply a directory location) - realization_cfg_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1, - data_format=DataFormat.NGEN_REALIZATION_CONFIG) - ngen_cmd_args.append(realization_cfg_dataset_names[0]) - - # $8 is the name of the BMI config dataset (which will imply a directory location) - bmi_config_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1, - data_format=DataFormat.BMI_CONFIG) - ngen_cmd_args.append(bmi_config_dataset_names[0]) - - # $9 is the name of the partition config dataset (which will imply a directory location) - # TODO: this probably will eventually break things if $10 is added for calibration config dataset - # TODO: need to overhaul entrypoint for ngen and ngen-calibration images with flag-based args if job.cpu_count > 1: partition_config_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1, data_format=DataFormat.NGEN_PARTITION_CONFIG) - ngen_cmd_args.append(partition_config_dataset_names[0]) + ngen_cmd_arg_map["--partition-dataset"] = partition_config_dataset_names[0] # Also do a sanity check here to ensure there is at least one forcing dataset self._ds_names_helper(job, worker_index, DataCategory.FORCING) - # TODO: account for differences between regular ngen execution and calibration job - - # $10 is the name of the calibration config dataset (which will imply a directory location) - # TODO: this *might* need to be added depending on how we decide to handle calibration - # configs. meaning if they are datasets or not. - # calibration_config_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1, - # data_format=DataFormat.NGEN_CAL_CONFIG) - # ngen_cmd_args.append(calibration_config_dataset_names[0]) + # TODO: confirm this properly accounts for differences between ngen exec and calibration jobs - return ngen_cmd_args + return ngen_cmd_arg_map def _get_required_obj_store_datasets_arg_strings(self, job: 'Job', worker_index: int) -> List[str]: """