From 9e88f221679cbe99a8e7ebcb4d89ac29cd68b7e0 Mon Sep 17 00:00:00 2001 From: Chi Wang Date: Fri, 11 Feb 2022 20:14:10 -0800 Subject: [PATCH] fix a bug when using ray & update ray on aml (#455) * fix a bug when using ray & update ray on aml When using with_parameters(), the config argument must be the first argument in the trainable function. * make training function runnable standalone --- flaml/automl.py | 82 +++++++++++--------- flaml/tune/trial_runner.py | 4 + flaml/tune/tune.py | 8 +- flaml/version.py | 2 +- test/.Docker/Dockerfile-cpu | 4 +- test/ray/distribute_automl.py | 8 +- test/ray/distribute_tune.py | 53 ++++++------- test/run_distribute_automl.py | 16 ++-- test/run_distribute_tune.py | 16 ++-- website/docs/Examples/Integrate - AzureML.md | 27 +++---- 10 files changed, 117 insertions(+), 103 deletions(-) diff --git a/flaml/automl.py b/flaml/automl.py index f7fa4a130d..02915df071 100644 --- a/flaml/automl.py +++ b/flaml/automl.py @@ -213,41 +213,42 @@ def _prepare_sample_train_data(self, sample_size): groups = self.groups_all return sampled_X_train, sampled_y_train, sampled_weight, groups - def _compute_with_config_base(self, estimator, config_w_resource): + @staticmethod + def _compute_with_config_base(config_w_resource, state, estimator): if "FLAML_sample_size" in config_w_resource: sample_size = int(config_w_resource["FLAML_sample_size"]) else: - sample_size = self.data_size[0] + sample_size = state.data_size[0] ( sampled_X_train, sampled_y_train, sampled_weight, groups, - ) = self._prepare_sample_train_data(sample_size) + ) = state._prepare_sample_train_data(sample_size) if sampled_weight is not None: - weight = self.fit_kwargs["sample_weight"] - self.fit_kwargs["sample_weight"] = sampled_weight + weight = state.fit_kwargs["sample_weight"] + state.fit_kwargs["sample_weight"] = sampled_weight else: weight = None if groups is not None: - self.fit_kwargs["groups"] = groups + state.fit_kwargs["groups"] = groups config = config_w_resource.copy() if "FLAML_sample_size" in config: del config["FLAML_sample_size"] budget = ( None - if self.time_budget is None - else self.time_budget - self.time_from_start - if sample_size == self.data_size[0] - else (self.time_budget - self.time_from_start) + if state.time_budget is None + else state.time_budget - state.time_from_start + if sample_size == state.data_size[0] + else (state.time_budget - state.time_from_start) / 2 * sample_size - / self.data_size[0] + / state.data_size[0] ) - if _is_nlp_task(self.task): - self.fit_kwargs["X_val"] = self.X_val - self.fit_kwargs["y_val"] = self.y_val + if _is_nlp_task(state.task): + state.fit_kwargs["X_val"] = state.X_val + state.fit_kwargs["y_val"] = state.y_val ( trained_estimator, @@ -258,41 +259,42 @@ def _compute_with_config_base(self, estimator, config_w_resource): ) = compute_estimator( sampled_X_train, sampled_y_train, - self.X_val, - self.y_val, - self.weight_val, - self.groups_val, - self.train_time_limit + state.X_val, + state.y_val, + state.weight_val, + state.groups_val, + state.train_time_limit if budget is None - else min(budget, self.train_time_limit), - self.kf, + else min(budget, state.train_time_limit), + state.kf, config, - self.task, + state.task, estimator, - self.eval_method, - self.metric, - self.best_loss, - self.n_jobs, - self.learner_classes.get(estimator), - self.log_training_metric, - self.fit_kwargs, + state.eval_method, + state.metric, + state.best_loss, + state.n_jobs, + state.learner_classes.get(estimator), + state.log_training_metric, + state.fit_kwargs, ) - if self.retrain_final and not self.model_history: + if state.retrain_final and not state.model_history: trained_estimator.cleanup() - if _is_nlp_task(self.task): - del self.fit_kwargs["X_val"] - del self.fit_kwargs["y_val"] + if _is_nlp_task(state.task): + del state.fit_kwargs["X_val"] + del state.fit_kwargs["y_val"] result = { "pred_time": pred_time, - "wall_clock_time": time.time() - self._start_time_flag, + "wall_clock_time": time.time() - state._start_time_flag, "metric_for_logging": metric_for_logging, "val_loss": val_loss, "trained_estimator": trained_estimator, } if sampled_weight is not None: - self.fit_kwargs["sample_weight"] = weight + state.fit_kwargs["sample_weight"] = weight + tune.report(**result) return result def _train_with_config( @@ -1672,12 +1674,14 @@ def trainable(self) -> Callable[[dict], Optional[float]]: search_state.training_function = with_parameters( AutoMLState._compute_with_config_base, - self=self._state, + state=self._state, estimator=estimator, ) else: search_state.training_function = partial( - AutoMLState._compute_with_config_base, self._state, estimator + AutoMLState._compute_with_config_base, + state=self._state, + estimator=estimator, ) states = self._search_states mem_res = self._mem_thres @@ -2461,7 +2465,9 @@ def _search_sequential(self): ) if not search_state.search_alg: search_state.training_function = partial( - AutoMLState._compute_with_config_base, self._state, estimator + AutoMLState._compute_with_config_base, + state=self._state, + estimator=estimator, ) search_space = search_state.search_space if self._sample: diff --git a/flaml/tune/trial_runner.py b/flaml/tune/trial_runner.py index 3419dc3ec5..ec305a8c06 100644 --- a/flaml/tune/trial_runner.py +++ b/flaml/tune/trial_runner.py @@ -125,3 +125,7 @@ def step(self) -> Trial: trial = None self.running_trial = trial return trial + + def stop_trial(self, trial): + super().stop_trial(trial) + self.running_trial = None diff --git a/flaml/tune/tune.py b/flaml/tune/tune.py index 396f83a2eb..61f400c25a 100644 --- a/flaml/tune/tune.py +++ b/flaml/tune/tune.py @@ -90,7 +90,9 @@ def compute_with_config(config): result = kwargs if _metric: result[DEFAULT_METRIC] = _metric - trial = _runner.running_trial + trial = getattr(_runner, "running_trial", None) + if not trial: + return None if _running_trial == trial: _training_iteration += 1 else: @@ -102,11 +104,11 @@ def compute_with_config(config): del result["config"][INCUMBENT_RESULT] for key, value in trial.config.items(): result["config/" + key] = value - _runner.process_trial_result(_runner.running_trial, result) + _runner.process_trial_result(trial, result) result["time_total_s"] = trial.last_update_time - trial.start_time if _verbose > 2: logger.info(f"result: {result}") - if _runner.running_trial.is_finished(): + if trial.is_finished(): return None else: return True diff --git a/flaml/version.py b/flaml/version.py index 50533e307d..f5b77301f5 100644 --- a/flaml/version.py +++ b/flaml/version.py @@ -1 +1 @@ -__version__ = "0.9.6" +__version__ = "0.9.7" diff --git a/test/.Docker/Dockerfile-cpu b/test/.Docker/Dockerfile-cpu index 3c450b9351..da2570cf44 100644 --- a/test/.Docker/Dockerfile-cpu +++ b/test/.Docker/Dockerfile-cpu @@ -1,8 +1,8 @@ -FROM python:3.7 +FROM mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04 RUN pip install azureml-core RUN pip install flaml[blendsearch,ray] - +RUN pip install ray-on-aml EXPOSE 8265 EXPOSE 6379 diff --git a/test/ray/distribute_automl.py b/test/ray/distribute_automl.py index c9d6c24be9..14f15a0d00 100644 --- a/test/ray/distribute_automl.py +++ b/test/ray/distribute_automl.py @@ -1,15 +1,17 @@ +from ray_on_aml.core import Ray_On_AML from flaml import AutoML def _test_ray_classification(): from sklearn.datasets import make_classification - import ray - ray.init(address="auto") X, y = make_classification(1000, 10) automl = AutoML() automl.fit(X, y, time_budget=10, task="classification", n_concurrent_trials=2) if __name__ == "__main__": - _test_ray_classification() + ray_on_aml = Ray_On_AML() + ray = ray_on_aml.getRay() + if ray: + _test_ray_classification() diff --git a/test/ray/distribute_tune.py b/test/ray/distribute_tune.py index 7a737d16f8..991082e8ec 100644 --- a/test/ray/distribute_tune.py +++ b/test/ray/distribute_tune.py @@ -1,4 +1,4 @@ -import ray +from ray_on_aml.core import Ray_On_AML import lightgbm as lgb import numpy as np from sklearn.datasets import load_breast_cancer @@ -7,11 +7,6 @@ from flaml import tune from flaml.model import LGBMEstimator -X, y = load_breast_cancer(return_X_y=True) -X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25) -ray.init(address="auto") -X_train_ref = ray.put(X_train) - def train_breast_cancer(config): params = LGBMEstimator(**config).params @@ -24,25 +19,31 @@ def train_breast_cancer(config): if __name__ == "__main__": - flaml_lgbm_search_space = LGBMEstimator.search_space(X_train.shape) - config_search_space = { - hp: space["domain"] for hp, space in flaml_lgbm_search_space.items() - } - low_cost_partial_config = { - hp: space["low_cost_init_value"] - for hp, space in flaml_lgbm_search_space.items() - if "low_cost_init_value" in space - } + ray_on_aml = Ray_On_AML() + ray = ray_on_aml.getRay() + if ray: + X, y = load_breast_cancer(return_X_y=True) + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25) + X_train_ref = ray.put(X_train) + flaml_lgbm_search_space = LGBMEstimator.search_space(X_train.shape) + config_search_space = { + hp: space["domain"] for hp, space in flaml_lgbm_search_space.items() + } + low_cost_partial_config = { + hp: space["low_cost_init_value"] + for hp, space in flaml_lgbm_search_space.items() + if "low_cost_init_value" in space + } - analysis = tune.run( - train_breast_cancer, - metric="mean_accuracy", - mode="max", - config=config_search_space, - num_samples=-1, - time_budget_s=60, - use_ray=True, - ) + analysis = tune.run( + train_breast_cancer, + metric="mean_accuracy", + mode="max", + config=config_search_space, + num_samples=-1, + time_budget_s=60, + use_ray=True, + ) - # print("Best hyperparameters found were: ", analysis.best_config) - print("The best trial's result: ", analysis.best_trial.last_result) + # print("Best hyperparameters found were: ", analysis.best_config) + print("The best trial's result: ", analysis.best_trial.last_result) diff --git a/test/run_distribute_automl.py b/test/run_distribute_automl.py index fb6bc23367..8490a8c205 100644 --- a/test/run_distribute_automl.py +++ b/test/run_distribute_automl.py @@ -1,5 +1,6 @@ import time from azureml.core import Workspace, Experiment, ScriptRunConfig, Environment +from azureml.core.runconfig import RunConfiguration, DockerConfiguration ws = Workspace.from_config() ray_environment_name = "aml-ray-cpu" @@ -18,20 +19,21 @@ ) time.sleep(10) +command = ["python distribute_automl.py"] env = Environment.get(workspace=ws, name=ray_environment_name) compute_target = ws.compute_targets["cpucluster"] -command = ["python automl.py"] +aml_run_config = RunConfiguration(communicator="OpenMpi") +aml_run_config.target = compute_target +aml_run_config.docker = DockerConfiguration(use_docker=True) +aml_run_config.environment = env +aml_run_config.node_count = 2 config = ScriptRunConfig( source_directory="ray/", command=command, - compute_target=compute_target, - environment=env, + run_config=aml_run_config, ) -config.run_config.node_count = 2 -config.run_config.environment_variables["_AZUREML_CR_START_RAY"] = "true" -config.run_config.environment_variables["AZUREML_COMPUTE_USE_COMMON_RUNTIME"] = "true" -exp = Experiment(ws, "test-ray") +exp = Experiment(ws, "distribute-automl") run = exp.submit(config) print(run.get_portal_url()) # link to ml.azure.com run.wait_for_completion(show_output=True) diff --git a/test/run_distribute_tune.py b/test/run_distribute_tune.py index 8f91d25925..34c35d9fd1 100644 --- a/test/run_distribute_tune.py +++ b/test/run_distribute_tune.py @@ -1,5 +1,6 @@ import time from azureml.core import Workspace, Experiment, ScriptRunConfig, Environment +from azureml.core.runconfig import RunConfiguration, DockerConfiguration ws = Workspace.from_config() ray_environment_name = "aml-ray-cpu" @@ -18,20 +19,21 @@ ) time.sleep(10) +command = ["python distribute_tune.py"] env = Environment.get(workspace=ws, name=ray_environment_name) compute_target = ws.compute_targets["cpucluster"] -command = ["python distribute_tune.py"] +aml_run_config = RunConfiguration(communicator="OpenMpi") +aml_run_config.target = compute_target +aml_run_config.docker = DockerConfiguration(use_docker=True) +aml_run_config.environment = env +aml_run_config.node_count = 2 config = ScriptRunConfig( source_directory="ray/", command=command, - compute_target=compute_target, - environment=env, + run_config=aml_run_config, ) -config.run_config.node_count = 2 -config.run_config.environment_variables["_AZUREML_CR_START_RAY"] = "true" -config.run_config.environment_variables["AZUREML_COMPUTE_USE_COMMON_RUNTIME"] = "true" -exp = Experiment(ws, "test-ray") +exp = Experiment(ws, "distribute-tune") run = exp.submit(config) print(run.get_portal_url()) # link to ml.azure.com run.wait_for_completion(show_output=True) diff --git a/website/docs/Examples/Integrate - AzureML.md b/website/docs/Examples/Integrate - AzureML.md index 4675970fd2..fc085fa9ad 100644 --- a/website/docs/Examples/Integrate - AzureML.md +++ b/website/docs/Examples/Integrate - AzureML.md @@ -128,40 +128,35 @@ If the computer target "cpucluster" already exists, it will not be recreated. #### Run distributed AutoML job -Assuming you have an automl script like [ray/distribute_automl.py](https://github.com/microsoft/FLAML/blob/main/test/ray/distribute_automl.py). It uses `ray.init(address="auto")` to initialize the cluster, and uses `n_concurrent_trials=k` to inform `AutoML.fit()` to perform k concurrent trials in parallel. +Assuming you have an automl script like [ray/distribute_automl.py](https://github.com/microsoft/FLAML/blob/main/test/ray/distribute_automl.py). It uses `n_concurrent_trials=k` to inform `AutoML.fit()` to perform k concurrent trials in parallel. Submit an AzureML job as the following: ```python from azureml.core import Workspace, Experiment, ScriptRunConfig, Environment +from azureml.core.runconfig import RunConfiguration, DockerConfiguration command = ["python distribute_automl.py"] -ray_environment_name = 'aml-ray-cpu' +ray_environment_name = "aml-ray-cpu" env = Environment.get(workspace=ws, name=ray_environment_name) +aml_run_config = RunConfiguration(communicator="OpenMpi") +aml_run_config.target = compute_target +aml_run_config.docker = DockerConfiguration(use_docker=True) +aml_run_config.environment = env +aml_run_config.node_count = 2 config = ScriptRunConfig( - source_directory='ray/', + source_directory="ray/", command=command, - compute_target=compute_target, - environment=env, + run_config=aml_run_config, ) -config.run_config.node_count = 2 -config.run_config.environment_variables["_AZUREML_CR_START_RAY"] = "true" -config.run_config.environment_variables["AZUREML_COMPUTE_USE_COMMON_RUNTIME"] = "true" - -exp = Experiment(ws, 'distribute-automl') +exp = Experiment(ws, "distribute-automl") run = exp.submit(config) print(run.get_portal_url()) # link to ml.azure.com run.wait_for_completion(show_output=True) ``` -The line -` -config.run_config.environment_variables["_AZUREML_CR_START_RAY"] = "true" -` -tells AzureML to start ray on each node of the cluster. This ia a feature in preview and it is subject to change in future. It is applicable to dedicated VMs only. - #### Run distributed tune job Prepare a script like [ray/distribute_tune.py](https://github.com/microsoft/FLAML/blob/main/test/ray/distribute_tune.py). Replace the command in the above eample with: