Skip to content

Commit

Permalink
fix a bug when using ray & update ray on aml (#455)
Browse files Browse the repository at this point in the history
* fix a bug when using ray & update ray on aml
When using with_parameters(), the config argument must be the first argument in the trainable function.
* make training function runnable standalone
  • Loading branch information
sonichi authored Feb 12, 2022
1 parent b4d3124 commit 9e88f22
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 103 deletions.
82 changes: 44 additions & 38 deletions flaml/automl.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,41 +213,42 @@ def _prepare_sample_train_data(self, sample_size):
groups = self.groups_all
return sampled_X_train, sampled_y_train, sampled_weight, groups

def _compute_with_config_base(self, estimator, config_w_resource):
@staticmethod
def _compute_with_config_base(config_w_resource, state, estimator):
if "FLAML_sample_size" in config_w_resource:
sample_size = int(config_w_resource["FLAML_sample_size"])
else:
sample_size = self.data_size[0]
sample_size = state.data_size[0]
(
sampled_X_train,
sampled_y_train,
sampled_weight,
groups,
) = self._prepare_sample_train_data(sample_size)
) = state._prepare_sample_train_data(sample_size)
if sampled_weight is not None:
weight = self.fit_kwargs["sample_weight"]
self.fit_kwargs["sample_weight"] = sampled_weight
weight = state.fit_kwargs["sample_weight"]
state.fit_kwargs["sample_weight"] = sampled_weight
else:
weight = None
if groups is not None:
self.fit_kwargs["groups"] = groups
state.fit_kwargs["groups"] = groups
config = config_w_resource.copy()
if "FLAML_sample_size" in config:
del config["FLAML_sample_size"]
budget = (
None
if self.time_budget is None
else self.time_budget - self.time_from_start
if sample_size == self.data_size[0]
else (self.time_budget - self.time_from_start)
if state.time_budget is None
else state.time_budget - state.time_from_start
if sample_size == state.data_size[0]
else (state.time_budget - state.time_from_start)
/ 2
* sample_size
/ self.data_size[0]
/ state.data_size[0]
)

if _is_nlp_task(self.task):
self.fit_kwargs["X_val"] = self.X_val
self.fit_kwargs["y_val"] = self.y_val
if _is_nlp_task(state.task):
state.fit_kwargs["X_val"] = state.X_val
state.fit_kwargs["y_val"] = state.y_val

(
trained_estimator,
Expand All @@ -258,41 +259,42 @@ def _compute_with_config_base(self, estimator, config_w_resource):
) = compute_estimator(
sampled_X_train,
sampled_y_train,
self.X_val,
self.y_val,
self.weight_val,
self.groups_val,
self.train_time_limit
state.X_val,
state.y_val,
state.weight_val,
state.groups_val,
state.train_time_limit
if budget is None
else min(budget, self.train_time_limit),
self.kf,
else min(budget, state.train_time_limit),
state.kf,
config,
self.task,
state.task,
estimator,
self.eval_method,
self.metric,
self.best_loss,
self.n_jobs,
self.learner_classes.get(estimator),
self.log_training_metric,
self.fit_kwargs,
state.eval_method,
state.metric,
state.best_loss,
state.n_jobs,
state.learner_classes.get(estimator),
state.log_training_metric,
state.fit_kwargs,
)
if self.retrain_final and not self.model_history:
if state.retrain_final and not state.model_history:
trained_estimator.cleanup()

if _is_nlp_task(self.task):
del self.fit_kwargs["X_val"]
del self.fit_kwargs["y_val"]
if _is_nlp_task(state.task):
del state.fit_kwargs["X_val"]
del state.fit_kwargs["y_val"]

result = {
"pred_time": pred_time,
"wall_clock_time": time.time() - self._start_time_flag,
"wall_clock_time": time.time() - state._start_time_flag,
"metric_for_logging": metric_for_logging,
"val_loss": val_loss,
"trained_estimator": trained_estimator,
}
if sampled_weight is not None:
self.fit_kwargs["sample_weight"] = weight
state.fit_kwargs["sample_weight"] = weight
tune.report(**result)
return result

def _train_with_config(
Expand Down Expand Up @@ -1672,12 +1674,14 @@ def trainable(self) -> Callable[[dict], Optional[float]]:

search_state.training_function = with_parameters(
AutoMLState._compute_with_config_base,
self=self._state,
state=self._state,
estimator=estimator,
)
else:
search_state.training_function = partial(
AutoMLState._compute_with_config_base, self._state, estimator
AutoMLState._compute_with_config_base,
state=self._state,
estimator=estimator,
)
states = self._search_states
mem_res = self._mem_thres
Expand Down Expand Up @@ -2461,7 +2465,9 @@ def _search_sequential(self):
)
if not search_state.search_alg:
search_state.training_function = partial(
AutoMLState._compute_with_config_base, self._state, estimator
AutoMLState._compute_with_config_base,
state=self._state,
estimator=estimator,
)
search_space = search_state.search_space
if self._sample:
Expand Down
4 changes: 4 additions & 0 deletions flaml/tune/trial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,7 @@ def step(self) -> Trial:
trial = None
self.running_trial = trial
return trial

def stop_trial(self, trial):
super().stop_trial(trial)
self.running_trial = None
8 changes: 5 additions & 3 deletions flaml/tune/tune.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ def compute_with_config(config):
result = kwargs
if _metric:
result[DEFAULT_METRIC] = _metric
trial = _runner.running_trial
trial = getattr(_runner, "running_trial", None)
if not trial:
return None
if _running_trial == trial:
_training_iteration += 1
else:
Expand All @@ -102,11 +104,11 @@ def compute_with_config(config):
del result["config"][INCUMBENT_RESULT]
for key, value in trial.config.items():
result["config/" + key] = value
_runner.process_trial_result(_runner.running_trial, result)
_runner.process_trial_result(trial, result)
result["time_total_s"] = trial.last_update_time - trial.start_time
if _verbose > 2:
logger.info(f"result: {result}")
if _runner.running_trial.is_finished():
if trial.is_finished():
return None
else:
return True
Expand Down
2 changes: 1 addition & 1 deletion flaml/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.9.6"
__version__ = "0.9.7"
4 changes: 2 additions & 2 deletions test/.Docker/Dockerfile-cpu
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
FROM python:3.7
FROM mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04

RUN pip install azureml-core
RUN pip install flaml[blendsearch,ray]

RUN pip install ray-on-aml

EXPOSE 8265
EXPOSE 6379
Expand Down
8 changes: 5 additions & 3 deletions test/ray/distribute_automl.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
from ray_on_aml.core import Ray_On_AML
from flaml import AutoML


def _test_ray_classification():
from sklearn.datasets import make_classification
import ray

ray.init(address="auto")
X, y = make_classification(1000, 10)
automl = AutoML()
automl.fit(X, y, time_budget=10, task="classification", n_concurrent_trials=2)


if __name__ == "__main__":
_test_ray_classification()
ray_on_aml = Ray_On_AML()
ray = ray_on_aml.getRay()
if ray:
_test_ray_classification()
53 changes: 27 additions & 26 deletions test/ray/distribute_tune.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import ray
from ray_on_aml.core import Ray_On_AML
import lightgbm as lgb
import numpy as np
from sklearn.datasets import load_breast_cancer
Expand All @@ -7,11 +7,6 @@
from flaml import tune
from flaml.model import LGBMEstimator

X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25)
ray.init(address="auto")
X_train_ref = ray.put(X_train)


def train_breast_cancer(config):
params = LGBMEstimator(**config).params
Expand All @@ -24,25 +19,31 @@ def train_breast_cancer(config):


if __name__ == "__main__":
flaml_lgbm_search_space = LGBMEstimator.search_space(X_train.shape)
config_search_space = {
hp: space["domain"] for hp, space in flaml_lgbm_search_space.items()
}
low_cost_partial_config = {
hp: space["low_cost_init_value"]
for hp, space in flaml_lgbm_search_space.items()
if "low_cost_init_value" in space
}
ray_on_aml = Ray_On_AML()
ray = ray_on_aml.getRay()
if ray:
X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25)
X_train_ref = ray.put(X_train)
flaml_lgbm_search_space = LGBMEstimator.search_space(X_train.shape)
config_search_space = {
hp: space["domain"] for hp, space in flaml_lgbm_search_space.items()
}
low_cost_partial_config = {
hp: space["low_cost_init_value"]
for hp, space in flaml_lgbm_search_space.items()
if "low_cost_init_value" in space
}

analysis = tune.run(
train_breast_cancer,
metric="mean_accuracy",
mode="max",
config=config_search_space,
num_samples=-1,
time_budget_s=60,
use_ray=True,
)
analysis = tune.run(
train_breast_cancer,
metric="mean_accuracy",
mode="max",
config=config_search_space,
num_samples=-1,
time_budget_s=60,
use_ray=True,
)

# print("Best hyperparameters found were: ", analysis.best_config)
print("The best trial's result: ", analysis.best_trial.last_result)
# print("Best hyperparameters found were: ", analysis.best_config)
print("The best trial's result: ", analysis.best_trial.last_result)
16 changes: 9 additions & 7 deletions test/run_distribute_automl.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
from azureml.core import Workspace, Experiment, ScriptRunConfig, Environment
from azureml.core.runconfig import RunConfiguration, DockerConfiguration

ws = Workspace.from_config()
ray_environment_name = "aml-ray-cpu"
Expand All @@ -18,20 +19,21 @@
)
time.sleep(10)

command = ["python distribute_automl.py"]
env = Environment.get(workspace=ws, name=ray_environment_name)
compute_target = ws.compute_targets["cpucluster"]
command = ["python automl.py"]
aml_run_config = RunConfiguration(communicator="OpenMpi")
aml_run_config.target = compute_target
aml_run_config.docker = DockerConfiguration(use_docker=True)
aml_run_config.environment = env
aml_run_config.node_count = 2
config = ScriptRunConfig(
source_directory="ray/",
command=command,
compute_target=compute_target,
environment=env,
run_config=aml_run_config,
)
config.run_config.node_count = 2
config.run_config.environment_variables["_AZUREML_CR_START_RAY"] = "true"
config.run_config.environment_variables["AZUREML_COMPUTE_USE_COMMON_RUNTIME"] = "true"

exp = Experiment(ws, "test-ray")
exp = Experiment(ws, "distribute-automl")
run = exp.submit(config)
print(run.get_portal_url()) # link to ml.azure.com
run.wait_for_completion(show_output=True)
16 changes: 9 additions & 7 deletions test/run_distribute_tune.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
from azureml.core import Workspace, Experiment, ScriptRunConfig, Environment
from azureml.core.runconfig import RunConfiguration, DockerConfiguration

ws = Workspace.from_config()
ray_environment_name = "aml-ray-cpu"
Expand All @@ -18,20 +19,21 @@
)
time.sleep(10)

command = ["python distribute_tune.py"]
env = Environment.get(workspace=ws, name=ray_environment_name)
compute_target = ws.compute_targets["cpucluster"]
command = ["python distribute_tune.py"]
aml_run_config = RunConfiguration(communicator="OpenMpi")
aml_run_config.target = compute_target
aml_run_config.docker = DockerConfiguration(use_docker=True)
aml_run_config.environment = env
aml_run_config.node_count = 2
config = ScriptRunConfig(
source_directory="ray/",
command=command,
compute_target=compute_target,
environment=env,
run_config=aml_run_config,
)
config.run_config.node_count = 2
config.run_config.environment_variables["_AZUREML_CR_START_RAY"] = "true"
config.run_config.environment_variables["AZUREML_COMPUTE_USE_COMMON_RUNTIME"] = "true"

exp = Experiment(ws, "test-ray")
exp = Experiment(ws, "distribute-tune")
run = exp.submit(config)
print(run.get_portal_url()) # link to ml.azure.com
run.wait_for_completion(show_output=True)
Loading

0 comments on commit 9e88f22

Please sign in to comment.