diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index bbc911e5..188e9b0d 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -15,18 +15,8 @@ on: jobs: main: - name: test-py-${{ matrix.python }} - runs-on: ${{ matrix.os }} - strategy: - matrix: - os: [ubuntu-20.04] - python: ["3.10"] - include: - - os: ubuntu-20.04 - python: 3.10 - # the following has no effect with manual trigger - # where the ramp-workflow is specified anyway - ramp_workflow_version: master + name: test-py-3.10 + runs-on: ubuntu-latest services: @@ -49,22 +39,23 @@ jobs: with: update-conda: true activate-conda: false - python-version: ${{ matrix.python }} + python-version: 3.10 conda-channels: anaconda - name: Create envs run: | conda install --yes mamba -n base -c conda-forge + rm -f /usr/share/miniconda/pkgs/cache/*.json # workaround for mamba-org/mamba#488 mamba create --yes -n testenv python=$PYTHON_VERSION mamba env update -n testenv -f environment.yml mamba env create -f ci_tools/environment_iris_kit.yml env: - PYTHON_VERSION: ${{ matrix.python }} + PYTHON_VERSION: 3.10 - name: Install ramp-board run: | source activate testenv - if [ "$PYTHON_VERSION" == "3.8" ]; then + if [ "$PYTHON_VERSION" == "3.10" ]; then python -m pip install "dask==2021.4.1" "distributed==2021.4.1" fi if [ "${{ matrix.ramp_workflow_version }}" == "master" ]; then @@ -124,13 +115,10 @@ jobs: - uses: actions/setup-python@v2 name: Install Python with: - python-version: '3.7' + python-version: '3.10' - name: Install dependencies run: pip install flake8 black==22.3.0 - - name: Run flake8 - run: flake8 ramp-* - - name: Run black run: black --check . diff --git a/.gitignore b/.gitignore index abe84ddd..1389d28b 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,4 @@ db_engine.yml prof dask-worker-space/ coverage.xml +*.swp diff --git a/doc/workers.rst b/doc/workers.rst index 77571296..c2a48bcc 100644 --- a/doc/workers.rst +++ b/doc/workers.rst @@ -551,7 +551,11 @@ Create an event config.yml (see :ref:`deploy-ramp-event`) and update the console, 'Instances' tab on the left, under 'availability zone'. * ``ami_image_name``: name you gave to the image you prepared (see :ref:`prepare_instance`). This can be found in the EC2 console, under - 'Images' -> 'AMI' tab. + 'Images' -> 'AMI' tab. Note: you don't have to put the entire image name + and if you indicate the generic name you chose, it will automatically take + the latest version of the image created running the pipeline (e.g. + 'challenge-iris' will point to 'challenge-iris 2022-04-19T17-19-18.405Z' + if it's the latest one) * ``ami_user_name``: user name you used to ssh into your instance. Commonly 'ec2-user' or 'ubuntu'. * ``instance_type``: found in the EC2 console, 'Instances' tab, 'Description' diff --git a/ramp-database/ramp_database/tools/event.py b/ramp-database/ramp_database/tools/event.py index b1e928ef..ebfa359a 100644 --- a/ramp-database/ramp_database/tools/event.py +++ b/ramp-database/ramp_database/tools/event.py @@ -287,11 +287,11 @@ def add_event( session.add(event) session.commit() - X_train, y_train = event.problem.get_train_data() - cv = event.problem.module.get_cv(X_train, y_train) - for train_indices, test_indices in cv: - cv_fold = CVFold(event=event, train_is=train_indices, test_is=test_indices) - session.add(cv_fold) + # X_train, y_train = event.problem.get_train_data() + # cv = event.problem.module.get_cv(X_train, y_train) + # for train_indices, test_indices in cv: + # cv_fold = CVFold(event=event, train_is=train_indices, test_is=test_indices) + # session.add(cv_fold) score_types = event.problem.module.score_types for score_type in score_types: diff --git a/ramp-database/ramp_database/tools/leaderboard.py b/ramp-database/ramp_database/tools/leaderboard.py index c22f21ad..51bc9e5e 100644 --- a/ramp-database/ramp_database/tools/leaderboard.py +++ b/ramp-database/ramp_database/tools/leaderboard.py @@ -46,80 +46,20 @@ def _compute_leaderboard( """ record_score = [] event = session.query(Event).filter_by(name=event_name).one() - map_score_precision = { - score_type.name: score_type.precision for score_type in event.score_types - } for sub in submissions: # take only max n bag - df_scores_bag = get_bagged_scores(session, sub.id) - highest_level = df_scores_bag.index.get_level_values("n_bag").max() - df_scores_bag = df_scores_bag.loc[(slice(None), highest_level), :] - df_scores_bag.index = df_scores_bag.index.droplevel("n_bag") - df_scores_bag = df_scores_bag.round(map_score_precision) - - df_scores = get_scores(session, sub.id) - df_scores = df_scores.round(map_score_precision) - - df_time = get_time(session, sub.id) - df_time = df_time.stack().to_frame() - df_time.index = df_time.index.set_names(["fold", "step"]) - df_time = df_time.rename(columns={0: "time"}) - df_time = df_time.groupby(level="step").sum().T - - df_scores_mean = df_scores.groupby("step").mean() - df_scores_std = df_scores.groupby("step").std() - df_scores_std.fillna(0, inplace=True) - - # select only the validation and testing steps and rename them to - # public and private - map_renaming = {"valid": "public", "test": "private"} - df_scores_mean = ( - df_scores_mean.loc[list(map_renaming.keys())] - .rename(index=map_renaming) - .stack() + df = ( + get_bagged_scores(session, sub.id) + .reset_index(drop=True) + .max(axis=0) .to_frame() .T ) - df_scores_std = ( - df_scores_std.loc[list(map_renaming.keys())] - .rename(index=map_renaming) - .stack() - .to_frame() - .T - ) - df_scores_bag = df_scores_bag.rename(index=map_renaming).stack().to_frame().T - - df = pd.concat( - [df_scores_bag, df_scores_mean, df_scores_std], - axis=1, - keys=["bag", "mean", "std"], - ) - - df.columns = df.columns.set_names(["stat", "set", "score"]) - - # change the multi-index into a stacked index - df.columns = df.columns.map(lambda x: " ".join(x)) - - # add the aggregated time information - df_time.index = df.index - df_time = df_time.rename( - columns={ - "train": "train time [s]", - "valid": "validation time [s]", - "test": "test time [s]", - } - ) - df = pd.concat([df, df_time], axis=1) if leaderboard_type == "private": df["submission ID"] = sub.basename.replace("submission_", "") df["team"] = sub.team.name df["submission"] = sub.name_with_link if with_links else sub.name - df["contributivity"] = int(round(100 * sub.contributivity)) - df["historical contributivity"] = int( - round(100 * sub.historical_contributivity) - ) - df["max RAM [MB]"] = get_submission_max_ram(session, sub.id) df["submitted at (UTC)"] = pd.Timestamp(sub.submission_timestamp) record_score.append(df) @@ -128,53 +68,9 @@ def _compute_leaderboard( # keep only second precision for the time stamp df["submitted at (UTC)"] = df["submitted at (UTC)"].astype("datetime64[s]") + df.columns.name = None - # reordered the column - stats_order = ["bag", "mean", "std"] if leaderboard_type == "private" else ["bag"] - dataset_order = ( - ["public", "private"] if leaderboard_type == "private" else ["public"] - ) - score_order = [event.official_score_name] + [ - score_type.name - for score_type in event.score_types - if score_type.name != event.official_score_name - ] - score_list = [ - "{} {} {}".format(stat, dataset, score) - for dataset, score, stat in product(dataset_order, score_order, stats_order) - ] - # Only display train and validation time for the public leaderboard - time_list = ( - ["train time [s]", "validation time [s]", "test time [s]"] - if leaderboard_type == "private" - else ["train time [s]", "validation time [s]"] - ) - col_ordered = ( - ["team", "submission"] - + score_list - + ["contributivity", "historical contributivity"] - + time_list - + ["max RAM [MB]", "submitted at (UTC)"] - ) - if leaderboard_type == "private": - col_ordered = ["submission ID"] + col_ordered - df = df[col_ordered] - - # check if the contributivity columns are null - contrib_columns = ["contributivity", "historical contributivity"] - if (df[contrib_columns] == 0).all(axis=0).all(): - df = df.drop(columns=contrib_columns) - - df = df.sort_values( - "bag {} {}".format(leaderboard_type, event.official_score_name), - ascending=event.get_official_score_type(session).is_lower_the_better, - ) - - # rename the column name for the public leaderboard - if leaderboard_type == "public": - df = df.rename( - columns={key: value for key, value in zip(score_list, score_order)} - ) + df = df.sort_values(by="submitted at (UTC)", ascending=False) return df @@ -207,105 +103,18 @@ def _compute_competition_leaderboard( session, submissions, "private", event_name, with_links=False ) - time_list = ( - ["train time [s]", "validation time [s]", "test time [s]"] - if leaderboard_type == "private" - else ["train time [s]", "validation time [s]"] - ) - - col_selected_private = ( - ["team", "submission"] - + ["bag private " + score_name, "bag public " + score_name] - + time_list - + ["submitted at (UTC)"] - ) - leaderboard_df = private_leaderboard[col_selected_private] - leaderboard_df = leaderboard_df.rename( - columns={ - "bag private " + score_name: "private " + score_name, - "bag public " + score_name: "public " + score_name, - } - ) + def _select_best_submission(df): + df = df.sort_values('Total cost') + # Take lowest score + del df['team'] + return df.iloc[0] # select best submission for each team - best_df = ( - leaderboard_df.groupby("team").min() - if score_type.is_lower_the_better - else leaderboard_df.groupby("team").max() - ) - best_df = best_df[["public " + score_name]].reset_index() - best_df["best"] = True - - # merge to get a best indicator column then select best - leaderboard_df = pd.merge( - leaderboard_df, - best_df, - how="left", - left_on=["team", "public " + score_name], - right_on=["team", "public " + score_name], - ) - leaderboard_df = leaderboard_df.fillna(False) - leaderboard_df = leaderboard_df[leaderboard_df["best"]] - leaderboard_df = leaderboard_df.drop(columns="best") - - # dealing with ties: we need the lowest timestamp - best_df = leaderboard_df.groupby("team").min() - best_df = best_df[["submitted at (UTC)"]].reset_index() - best_df["best"] = True - leaderboard_df = pd.merge( - leaderboard_df, - best_df, - how="left", - left_on=["team", "submitted at (UTC)"], - right_on=["team", "submitted at (UTC)"], - ) - leaderboard_df = leaderboard_df.fillna(False) - leaderboard_df = leaderboard_df[leaderboard_df["best"]] - leaderboard_df = leaderboard_df.drop(columns="best") - - # sort by public score then by submission timestamp, compute rank - leaderboard_df = leaderboard_df.sort_values( - by=["public " + score_name, "submitted at (UTC)"], - ascending=[score_type.is_lower_the_better, True], - ) - leaderboard_df["public rank"] = np.arange(len(leaderboard_df)) + 1 + best_df = private_leaderboard.groupby("team").apply(_select_best_submission).reset_index() + best_df = best_df.sort_values(by="Total cost") + best_df.insert(0, 'rank', np.arange(1, best_df.shape[0]+1, dtype=np.int32)) - # sort by private score then by submission timestamp, compute rank - leaderboard_df = leaderboard_df.sort_values( - by=["private " + score_name, "submitted at (UTC)"], - ascending=[score_type.is_lower_the_better, True], - ) - leaderboard_df["private rank"] = np.arange(len(leaderboard_df)) + 1 - - leaderboard_df["move"] = ( - leaderboard_df["public rank"] - leaderboard_df["private rank"] - ) - leaderboard_df["move"] = [ - "{:+d}".format(m) if m != 0 else "-" for m in leaderboard_df["move"] - ] - - col_selected = ( - [ - leaderboard_type + " rank", - "team", - "submission", - leaderboard_type + " " + score_name, - ] - + time_list - + ["submitted at (UTC)"] - ) - if leaderboard_type == "private": - col_selected.insert(1, "move") - - df = leaderboard_df[col_selected] - df = df.rename( - columns={ - leaderboard_type + " " + score_name: score_name, - leaderboard_type + " rank": "rank", - } - ) - df = df.sort_values(by="rank") - return df + return best_df def get_leaderboard_all_info(session, event_name): @@ -429,7 +238,7 @@ def get_leaderboard( "submission", "submitted at (UTC)", "state", - "wating list", + "waiting list", ] else: columns = ["team", "submission", "submitted at (UTC)", "error"] @@ -447,7 +256,7 @@ def get_leaderboard( pd.Timestamp(sub.submission_timestamp), ( sub.state_with_link - if leaderboard_type == "error" + if leaderboard_type == "failed" else sub.state ), ( diff --git a/ramp-database/ramp_database/tools/submission.py b/ramp-database/ramp_database/tools/submission.py index 1e8287f0..ed2dc7c1 100644 --- a/ramp-database/ramp_database/tools/submission.py +++ b/ramp-database/ramp_database/tools/submission.py @@ -446,6 +446,7 @@ def get_time(session, submission_id): results["fold"].append(fold_id) for step in ("train", "valid", "test"): results[step].append(getattr(cv_fold, "{}_time".format(step))) + breakpoint() return pd.DataFrame(results).set_index("fold") @@ -717,21 +718,13 @@ def set_bagged_scores(session, submission_id, path_predictions): The path where the results files are located. """ submission = select_submission_by_id(session, submission_id) - df = pd.read_csv( - os.path.join(path_predictions, "bagged_scores.csv"), index_col=[0, 1] - ) - df_steps = df.index.get_level_values("step").unique().tolist() + with open(os.path.join(path_predictions, "score.txt")) as fh: + cost_value = float(fh.read().strip()) + for score in submission.scores: for step in ("valid", "test"): - highest_n_bag = df.index.get_level_values("n_bag").max() - if step in df_steps: - score_last_bag = float(df.loc[(step, highest_n_bag), score.score_name]) - score_all_bags = df.loc[(step, slice(None)), score.score_name].tolist() - else: - score_last_bag = float(score.event_score_type.worst) - score_all_bags = None - setattr(score, "{}_score_cv_bag".format(step), score_last_bag) - setattr(score, "{}_score_cv_bags".format(step), score_all_bags) + setattr(score, "{}_score_cv_bag".format(step), cost_value) + setattr(score, "{}_score_cv_bags".format(step), [cost_value]) session.commit() diff --git a/ramp-database/ramp_database/tools/tests/test_leaderboard.py b/ramp-database/ramp_database/tools/tests/test_leaderboard.py index cb2e1117..535fb75b 100644 --- a/ramp-database/ramp_database/tools/tests/test_leaderboard.py +++ b/ramp-database/ramp_database/tools/tests/test_leaderboard.py @@ -1,3 +1,4 @@ +import re import shutil import pytest @@ -170,6 +171,8 @@ def test_get_leaderboard(session_toy_db): session_toy_db, "failed", "iris_test", "test_user" ) assert leaderboard_failed.count("") == 1 + # check that we have a link to the log of the failed submission + assert re.match(r".*.*", leaderboard_failed, flags=re.DOTALL) # the remaining submission should be successful leaderboard_public = get_leaderboard(session_toy_db, "public", "iris_test") diff --git a/ramp-database/ramp_database/utils.py b/ramp-database/ramp_database/utils.py index 82940410..48c75a83 100644 --- a/ramp-database/ramp_database/utils.py +++ b/ramp-database/ramp_database/utils.py @@ -106,4 +106,8 @@ def check_password(password, hashed_password): is_same_password : bool Return True if the two passwords are identical. """ - return bcrypt.checkpw(_encode_string(password), _encode_string(hashed_password)) + try: + return bcrypt.checkpw(_encode_string(password), _encode_string(hashed_password)) + except ValueError: + # Some manually created password don't have an invalid salt, ignore it. + return False diff --git a/ramp-engine/ramp_engine/__init__.py b/ramp-engine/ramp_engine/__init__.py index c9895e78..fed821cc 100644 --- a/ramp-engine/ramp_engine/__init__.py +++ b/ramp-engine/ramp_engine/__init__.py @@ -1,6 +1,7 @@ from .aws import AWSWorker from .dispatcher import Dispatcher # noqa from .local import CondaEnvWorker +from .cpp_runner import CppCondaEnvWorker from .remote import DaskWorker from ._version import __version__ @@ -9,6 +10,7 @@ "conda": CondaEnvWorker, "aws": AWSWorker, "dask": DaskWorker, + "conda-cpp": CppCondaEnvWorker, } __all__ = [ diff --git a/ramp-engine/ramp_engine/base.py b/ramp-engine/ramp_engine/base.py index 63cc66b5..6da837f5 100644 --- a/ramp-engine/ramp_engine/base.py +++ b/ramp-engine/ramp_engine/base.py @@ -2,6 +2,7 @@ from abc import ABCMeta, abstractmethod from datetime import datetime import subprocess +import re logger = logging.getLogger("RAMP-WORKER") @@ -191,4 +192,14 @@ def _get_traceback(content): cut_exception_text = content.find("Traceback") if cut_exception_text > 0: content = content[cut_exception_text:] + else: + content = content[-3000:] + # strip paths + content = re.sub("/[^\s]+/", '', content) + if content: + # Take only the last 2 lines + content = "\n".join(content.splitlines()[-3:]) + # If data is suspiciosly long truncate it + if len(content) > 150: + content = content[-150:] return content diff --git a/ramp-engine/ramp_engine/cpp_runner.py b/ramp-engine/ramp_engine/cpp_runner.py new file mode 100644 index 00000000..46e24e95 --- /dev/null +++ b/ramp-engine/ramp_engine/cpp_runner.py @@ -0,0 +1,355 @@ +import logging +import sys +import os +import shutil +from datetime import datetime +import time +import subprocess +from pathlib import Path + +from .base import _get_traceback +from .conda import _conda_info_envs, _get_conda_env_path +from .local import CondaEnvWorker + +logger = logging.getLogger("RAMP-WORKER") + + +COMPILATION_ERROR = 1220 +RUNTIME_ERROR = 1221 +SCORING_ERROR = 1222 + + +def get_conda_cmd(cmd: list[str], options: list[str] = None, memory="512m", with_java: bool=True) -> list[str]: + + if options is None: + options = [] + if with_java: + image = "tomcat:10-jdk11" + else: + image = "ubuntu:kinetic-20220830" + cmd_full = ( + [ + "docker", + "run", + "-i", + "--rm", + "--network", + "none", + "-v", + "/home/ubuntu/miniforge3/:/home/ubuntu/miniforge3/:ro", + "-v", + "/etc/passwd:/etc/passwd:ro", + "-v", + "/etc/group:/etc/group:ro", + ] + + options + + ["-m", memory, image] + + cmd + ) + return cmd_full + + +class CppCondaEnvWorker(CondaEnvWorker): + """Local worker which uses conda environment to dispatch submission. + + Parameters + ---------- + config : dict + Configuration dictionary to set the worker. The following parameter + should be set: + + * 'conda_env': the name of the conda environment to use. If not + specified, the base environment will be used. + * 'kit_dir': path to the directory of the RAMP kit; + * 'data_dir': path to the directory of the data; + * 'submissions_dir': path to the directory containing the + submissions; + * `logs_dir`: path to the directory where the log of the + submission will be stored; + * `predictions_dir`: path to the directory where the + predictions of the submission will be stored. + * 'timeout': timeout after a given number of seconds when + running the worker. If not provided, a default of 7200 + is used. + submission : str + Name of the RAMP submission to be handle by the worker. + + Attributes + ---------- + status : str + The status of the worker. It should be one of the following state: + + * 'initialized': the worker has been instanciated. + * 'setup': the worker has been set up. + * 'error': setup failed / training couldn't be started + * 'running': the worker is training the submission. + * 'finished': the worker finished to train the submission. + * 'collected': the results of the training have been collected. + """ + + def __init__(self, config, submission): + super().__init__(config=config, submission=submission) + + def setup(self): + """Set up the worker. + + The worker will find the path to the conda environment to use using + the configuration passed when instantiating the worker. + """ + # sanity check for the configuration variable + for required_param in ( + "kit_dir", + "data_dir", + "submissions_dir", + "logs_dir", + "predictions_dir", + ): + self._check_config_name(self.config, required_param) + # find the path to the conda environment + env_name = self.config.get("conda_env", "base") + conda_info = _conda_info_envs() + + self._python_bin_path = _get_conda_env_path(conda_info, env_name, self) + + super().setup() + + def teardown(self): + """Remove the predictions stores within the submission.""" + if self.status not in ("collected", "retry"): + raise ValueError("Collect the results before to kill the worker.") + output_training_dir = os.path.join( + self.config["kit_dir"], + "submissions", + self.submission, + "training_output", + ) + if os.path.exists(output_training_dir): + shutil.rmtree(output_training_dir) + super().teardown() + + def is_cpp_submission(self) -> bool: + """Return True if submission is C++, False if it's a Python one""" + + submission_dir = Path(self.config["submissions_dir"]) / self.submission + + if (submission_dir / "solution.cpp").exists() and ( + len((submission_dir / "solution.cpp").read_text().strip()) > 10 + ): + return True + else: + return False + + def launch_submission(self): + """Launch the submission. + + Basically, it comes to run ``ramp_test_submission`` using the conda + environment given in the configuration. The submission is launched in + a subprocess to free to not lock the Python main process. + """ + if self.status == "running": + raise ValueError( + "Wait that the submission is processed before to " "launch a new one." + ) + + self._log_dir = os.path.join(self.config["logs_dir"], self.submission) + os.makedirs(self._log_dir, exist_ok=True) + self._log_file = open(os.path.join(self._log_dir, "log"), "wb+") + submission_dir = os.path.join( + self.config["submissions_dir"], + self.submission, + ) + output_dir = os.path.join(submission_dir, "training_output") + os.makedirs(output_dir, exist_ok=True) + INCLUDE_DIR = Path(self.config["data_dir"]).resolve() + DATA_DIR = Path(self.config["data_dir"]).resolve() + + self.status = "finished" + + self._return_code = 0 + + is_cpp = self.is_cpp_submission() + if is_cpp: + bin_path = os.path.join(submission_dir, "main") + + try: + subprocess.check_call( + [ + "gcc", + os.path.join(submission_dir, "solution.cpp"), + #f"-I{INCLUDE_DIR / 'include' / 'cpp'}", + "-lstdc++", + "-O3", + "-w", + "-o", + bin_path, + ], + stderr=self._log_file, + stdout=self._log_file, + ) + except subprocess.CalledProcessError as err: + + self._return_code = COMPILATION_ERROR + return + else: + bin_path = os.path.join(submission_dir, "main") + + try: + subprocess.check_call( + [ + "javac", + os.path.join(submission_dir, "Main.java"), + ], + stderr=self._log_file, + stdout=self._log_file, + ) + except subprocess.CalledProcessError as err: + + self._return_code = COMPILATION_ERROR + return + + # Compilation passed, clean up the log + shutil.copy( + os.path.join(self._log_dir, "log"), + os.path.join(self._log_dir, "compilation-log"), + ) + self._log_file.truncate(0) + + # Run solution in batches + batch_size = 5 + (Path(output_dir) / "output").mkdir(exist_ok=True) + for n_batch in range(4): + t0 = time.perf_counter() + procs = [] + for sub_idx in range(batch_size): + idx = batch_size * n_batch + sub_idx + # We have 9 test cases in total + if idx > 9: + continue + if is_cpp: + p = subprocess.Popen( + get_conda_cmd( + # Make sure the process is killed as we cannot kill it from outside + ["timeout", "22", str(bin_path)], + options=["-v", f"{submission_dir}:{submission_dir}:ro"], + ), + stdout=open(os.path.join(output_dir, f"output/case{idx}.out"), "wb+"), + stderr=self._log_file, + stdin=open(os.path.join(DATA_DIR, f"input/case{idx}.in"), "rb"), + ) + else: + p = subprocess.Popen( + get_conda_cmd( + # Make sure the process is killed as we cannot kill it from outside + ["timeout", "22", "java", "Main"], + options=["-v", f"{submission_dir}:{submission_dir}:ro", "-w", f"{submission_dir}"], + ), + stdout=open(os.path.join(output_dir, f"output/case{idx}.out"), "wb+"), + stderr=self._log_file, + stdin=open(os.path.join(DATA_DIR, f"input/case{idx}.in"), "rb"), + ) + + procs.append(p) + for p in procs: + # Time remaining for this batch (evaluated in parallel) + dt = max(t0 + self.timeout - time.perf_counter(), 0) + if dt == 0: + self.status = "timeout" + self._return_code = 124 + return + try: + p.communicate(timeout=dt) + self._return_code = max(p.returncode, self._return_code) + except subprocess.TimeoutExpired: + for p in procs: + p.kill() + self.status = "timeout" + self._return_code = 124 + return + + + if self._return_code > 0: + return + + # Running the model passed, clean up the log + shutil.copy( + os.path.join(self._log_dir, "log"), os.path.join(self._log_dir, "run-log") + ) + self._log_file.truncate(0) + + # Score the solution + judger_path = os.path.join( + self.config["data_dir"], "validation/judge_ramp.py" + ) + try: + subprocess.check_call( + [ + os.path.join(self._python_bin_path, "python"), + judger_path, + os.path.join(DATA_DIR, "input"), + os.path.join(output_dir, "output"), + output_dir, + ], + stderr=self._log_file, + stdout=self._log_file, + ) + except subprocess.CalledProcessError as err: + self._return_code = SCORING_ERROR + return + + def collect_results(self): + """Collect the results after that the submission is completed. + + Be aware that calling ``collect_results()`` before that the submission + finished will lock the Python main process awaiting for the submission + to be processed. Use ``worker.status`` to know the status of the worker + beforehand. + """ + if self.status == "initialized": + raise ValueError( + "The worker has not been setup and no submission " + "was launched. Call the method setup() and " + "launch_submission() before to collect the " + "results." + ) + elif self.status == "setup": + raise ValueError( + "No submission was launched. Call the method " + "launch_submission() and then try again to " + "collect the results." + ) + if self.status in ["finished", "running", "timeout"]: + with open(os.path.join(self._log_dir, "log"), "rb") as f: + log_output = f.read() + error_msg = _get_traceback(log_output.decode("utf-8")) + if self.status == "timeout": + error_msg += "\nWorker killed due to timeout after {}s.".format( + self.timeout + ) + if self.status == "timeout": + returncode = 124 + else: + returncode = self._return_code + pred_dir = os.path.join(self.config["predictions_dir"], self.submission) + output_training_dir = os.path.join( + self.config["submissions_dir"], + self.submission, + "training_output", + ) + if returncode: + if returncode == 139: + error_msg = "Segmentation fault (core dumped)" + + # copy the predictions into the disk + # no need to create the directory, it will be handled by copytree + shutil.rmtree(pred_dir, ignore_errors=True) + shutil.copytree(output_training_dir, pred_dir) + self.status = "collected" + return (returncode, error_msg) + + def check_timeout(self): + """We use a different timeout mechanism""" + return None + + def _is_submission_finished(): + """The parallelism happens at the level of test cases""" + return True diff --git a/ramp-engine/ramp_engine/tests/test_conda_worker.py b/ramp-engine/ramp_engine/tests/test_conda_worker.py index 6d55787a..f5471cbf 100644 --- a/ramp-engine/ramp_engine/tests/test_conda_worker.py +++ b/ramp-engine/ramp_engine/tests/test_conda_worker.py @@ -8,6 +8,7 @@ from ramp_engine.local import CondaEnvWorker from ramp_engine.remote import DaskWorker from ramp_engine.conda import _conda_info_envs +from ramp ALL_WORKERS = [CondaEnvWorker, DaskWorker] diff --git a/ramp-frontend/ramp_frontend/templates/event.html b/ramp-frontend/ramp_frontend/templates/event.html index 7700de76..31668ba4 100644 --- a/ramp-frontend/ramp_frontend/templates/event.html +++ b/ramp-frontend/ramp_frontend/templates/event.html @@ -52,12 +52,16 @@ Description + +