From 5e8308749fc3326e83e4fe8ee3fc7e68d6fd1b9b Mon Sep 17 00:00:00 2001 From: Han Wang Date: Thu, 31 Dec 2020 02:01:37 +0000 Subject: [PATCH] add bayesian search --- fugue_tune/hyperopt.py | 71 ++++++++++++++++++++++++ fugue_tune/space.py | 90 +++++++++++++++++++++++++++---- fugue_tune/tuner.py | 45 +++++++++------- requirements.txt | 2 +- setup.py | 2 +- tests/fugue_tune/test_convert.py | 16 ++++++ tests/fugue_tune/test_hyperopt.py | 41 ++++++++++++++ tests/fugue_tune/test_space.py | 26 ++++++++- tests/fugue_tune/test_tuner.py | 8 ++- 9 files changed, 266 insertions(+), 35 deletions(-) create mode 100644 fugue_tune/hyperopt.py create mode 100644 tests/fugue_tune/test_hyperopt.py diff --git a/fugue_tune/hyperopt.py b/fugue_tune/hyperopt.py new file mode 100644 index 0000000..7624490 --- /dev/null +++ b/fugue_tune/hyperopt.py @@ -0,0 +1,71 @@ +from typing import Any, Dict, Tuple, Set + +from fugue_tune.tunable import Tunable +from fugue_tune.tuner import ObjectiveRunner +from fugue_tune.space import StochasticExpression, Rand, Choice +from hyperopt import fmin, tpe, hp, Trials, STATUS_OK +import numpy as np + + +class HyperoptRunner(ObjectiveRunner): + def __init__(self, max_iter: int, seed: int = 0): + self._max_iter = max_iter + self._seed = seed + + def run( + self, tunable: Tunable, kwargs: Dict[str, Any], hp_keys: Set[str] + ) -> Dict[str, Any]: + static_params, stochastic_params = self._split(kwargs) + keys = list(stochastic_params.keys()) + + def obj(args) -> Dict[str, Any]: + params = {k: v for k, v in zip(keys, args)} + tunable.run(**static_params, **params) + hp = {k: v for k, v in tunable.hp.items() if k in hp_keys} + return { + "loss": tunable.error, + "status": STATUS_OK, + "error": tunable.error, + "hp": hp, + "metadata": tunable.metadata, + } + + trials = Trials() + fmin( + obj, + space=list(stochastic_params.values()), + algo=tpe.suggest, + max_evals=self._max_iter, + trials=trials, + show_progressbar=False, + rstate=np.random.RandomState(self._seed), + ) + + return { + "error": trials.best_trial["result"]["error"], + "hp": trials.best_trial["result"]["hp"], + "metadata": trials.best_trial["result"]["metadata"], + } + + def _split(self, kwargs: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]: + static_params: Dict[str, Any] = {} + stochastic_params: Dict[str, Any] = {} + for k, v in kwargs.items(): + if isinstance(v, StochasticExpression): + if isinstance(v, Rand): + stochastic_params[k] = self.convert_rand(k, v) + elif isinstance(v, Choice): + stochastic_params[k] = self.convert_choice(k, v) + else: + raise NotImplementedError(v) # pragma: no cover + else: + static_params[k] = v + return static_params, stochastic_params + + def convert_rand(self, k: str, v: Rand) -> Any: + if v.q is None and not v.log and not v.normal: + return hp.uniform(k, v.start, v.end) + raise NotImplementedError(k, v) # pragma: no cover + + def convert_choice(self, k: str, v: Choice) -> Any: + return hp.choice(k, v.values) diff --git a/fugue_tune/space.py b/fugue_tune/space.py index e9622ac..0621a8e 100644 --- a/fugue_tune/space.py +++ b/fugue_tune/space.py @@ -1,5 +1,5 @@ from copy import deepcopy -from typing import Any, Dict, Iterable, List, Tuple, no_type_check +from typing import Any, Dict, Iterable, List, Tuple, no_type_check, Optional from fugue_tune.iter import dict_product, product @@ -12,21 +12,74 @@ def __iter__(self) -> Iterable[Any]: yield from self._values -class Choice(object): +class StochasticExpression(object): + @property + def jsondict(self) -> Dict[str, Any]: # pragma: no cover + raise NotImplementedError + + def __eq__(self, other: Any): + return isinstance(other, type(self)) and self.jsondict == other.jsondict + + +class Choice(StochasticExpression): def __init__(self, *args: Any): self._values = list(args) - def __iter__(self) -> Iterable[Any]: - yield from self._values + @property + def values(self) -> List[Any]: + return self._values + @property + def jsondict(self) -> Dict[str, Any]: + return dict(_expr_="choice", values=self.values) + + +class Rand(StochasticExpression): + def __init__( + self, + start: float, + end: float, + q: Optional[float] = None, + log: bool = False, + normal: bool = False, + ): + self.start = start + self.end = end + self.q = q + self.log = log + self.normal = normal -class Rand(object): - def __init__(self, start: float, end: float, q: float, log: bool, normal: bool): - self._start = start - self._end = end - self._q = q - self._log = log - self._normal = normal + @property + def jsondict(self) -> Dict[str, Any]: + res = dict( + _expr_="rand", + start=self.start, + end=self.end, + log=self.log, + normal=self.normal, + ) + if self.q is not None: + res["q"] = self.q + return res + + +def decode(value: Any) -> Any: + if isinstance(value, str): + return value + elif isinstance(value, list): + return [decode(v) for v in value] + elif isinstance(value, dict): + if "_expr_" in value: + e = value.pop("_expr_") + if e == "choice": # TODO: embeded rand is not supported + return Choice(*value["values"]) + if e == "rand": + return Rand(**value) + raise ValueError(e) # pragma: no cover + else: + return {k: decode(v) for k, v in value.items()} + else: + return value # TODO: make this inherit from iterable? @@ -43,6 +96,10 @@ def __iter__(self) -> Iterable[Dict[str, Any]]: tp[0][tp[1]] = tp[2] yield deepcopy(self._value) + def encode(self) -> Iterable[Any]: + for s in self: # type: ignore + yield self._encode_value(s) + def __mul__(self, other: Any) -> "HorizontalSpace": return HorizontalSpace(self, other) @@ -63,6 +120,17 @@ def _search(self, parent: Any, key: Any) -> None: def _grid_wrapper(self, parent: Any, key: Any) -> List[Tuple[Any, Any, Any]]: return [(parent, key, x) for x in parent[key]] + def _encode_value(self, value: Any) -> Any: + if isinstance(value, StochasticExpression): + return value.jsondict + elif isinstance(value, str): + return value + elif isinstance(value, list): + return [self._encode_value(v) for v in value] + elif isinstance(value, dict): + return {k: self._encode_value(v) for k, v in value.items()} + return value + class HorizontalSpace(Space): def __init__(self, *args: Any, **kwargs: Any): diff --git a/fugue_tune/tuner.py b/fugue_tune/tuner.py index 6d7fd02..cd81831 100644 --- a/fugue_tune/tuner.py +++ b/fugue_tune/tuner.py @@ -15,11 +15,20 @@ Transformer, WorkflowDataFrame, ) -from triad import assert_or_throw from triad.utils.convert import get_caller_global_local_vars from fugue_tune.convert import _to_tunable -from fugue_tune.space import Choice, Grid, Rand, Space +from fugue_tune.space import Space, decode +from fugue_tune.tunable import Tunable + + +class ObjectiveRunner(object): + def run( + self, tunable: Tunable, kwargs: Dict[str, Any], hp_keys: Set[str] + ) -> Dict[str, Any]: + tunable.run(**kwargs) + hp = {k: v for k, v in tunable.hp.items() if k in hp_keys} + return {"error": tunable.error, "hp": hp, "metadata": tunable.metadata} class Tuner(object): @@ -28,6 +37,7 @@ def tune( # noqa: C901 params_df: WorkflowDataFrame, tunable: Any, distributable: Optional[bool] = None, + objective_runner: Optional[ObjectiveRunner] = None, ) -> WorkflowDataFrame: t = _to_tunable( # type: ignore tunable, *get_caller_global_local_vars(), distributable @@ -35,10 +45,13 @@ def tune( # noqa: C901 if distributable is None: distributable = t.distributable + if objective_runner is None: + objective_runner = ObjectiveRunner() + # input_has: __fmin_params__:str # schema: *,__fmin_value__:double,__fmin_metadata__:str def compute_transformer( - df: Iterable[Dict[str, Any]], load: Any = None + df: Iterable[Dict[str, Any]] ) -> Iterable[Dict[str, Any]]: for row in df: dfs: Dict[str, Any] = {} @@ -50,13 +63,14 @@ def compute_transformer( dfs[key] = pd.read_parquet(v) dfs_keys.add(key) for params in json.loads(row["__fmin_params__"]): - t.run(**dfs, **params) - res = dict(row) - res["__fmin_params__"] = json.dumps( - {pk: pv for pk, pv in t.hp.items() if pk not in dfs_keys} + p = decode(params) + best = objective_runner.run( # type: ignore + t, dict(**dfs, **p), set(p.keys()) ) - res["__fmin_value__"] = t.error - res["__fmin_metadata__"] = json.dumps(t.metadata) + res = dict(row) + res["__fmin_params__"] = json.dumps(best["hp"]) + res["__fmin_value__"] = best["error"] + res["__fmin_metadata__"] = json.dumps(best["metadata"]) yield res # input_has: __fmin_params__:str @@ -112,13 +126,13 @@ def space_to_df( self, wf: FugueWorkflow, space: Space, batch_size: int = 1, shuffle: bool = True ) -> WorkflowDataFrame: def get_data() -> Iterable[List[Any]]: - it = list(space) # type: ignore + it = list(space.encode()) # type: ignore if shuffle: random.seed(0) random.shuffle(it) res: List[Any] = [] for a in it: - res.append(self._convert_hp(a)) + res.append(a) if batch_size == len(res): yield [json.dumps(res)] res = [] @@ -126,12 +140,3 @@ def get_data() -> Iterable[List[Any]]: yield [json.dumps(res)] return wf.df(IterableDataFrame(get_data(), "__fmin_params__:str")) - - def _convert_hp(self, params: Dict[str, Any]) -> Dict[str, Any]: - return {k: self._convert_single(v) for k, v in params.items()} - - def _convert_single(self, param: Any) -> Any: - assert_or_throw( - not isinstance(param, (Grid, Rand, Choice)), NotImplementedError(param) - ) - return param diff --git a/requirements.txt b/requirements.txt index 1083c60..c617a53 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -. +.[all] # test requirements pre-commit diff --git a/setup.py b/setup.py index bddb6e4..0a35970 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ def get_version() -> str: keywords="fugue incubator experiment", url="http://github.com/fugue-project/fugue-incubator", install_requires=["fugue==0.5.0.dev1"], - extras_require={}, + extras_require={"hyperopt": ["hyperopt"], "all": ["hyperopt"]}, classifiers=[ # "3 - Alpha", "4 - Beta" or "5 - Production/Stable" "Development Status :: 3 - Alpha", diff --git a/tests/fugue_tune/test_convert.py b/tests/fugue_tune/test_convert.py index 6e14658..bf9edb2 100644 --- a/tests/fugue_tune/test_convert.py +++ b/tests/fugue_tune/test_convert.py @@ -28,6 +28,18 @@ def t5(e: ExecutionEngine, a: int) -> float: def t6(a: int, e: ExecutionEngine) -> float: pass + def t7(**kwargs) -> float: + pass + + def t8(**kwargs: int) -> float: + pass + + def t9(a: int, **kwargs: int) -> float: + pass + + def t10(e: ExecutionEngine, a: int, **kwargs: int) -> float: + pass + assert t1 is _to_tunable(t1)._func assert _to_tunable(t1).distributable assert _to_tunable(t1, distributable=True).distributable @@ -39,6 +51,10 @@ def t6(a: int, e: ExecutionEngine) -> float: assert t5 is _to_tunable(t5)._func assert not _to_tunable(t5).distributable assert not _to_tunable(t5, distributable=False).distributable + + assert t8 is _to_tunable(t8)._func + assert t9 is _to_tunable(t9)._func + assert t10 is _to_tunable(t10)._func # with execution engine, distributable can't be true raises(FugueTuneCompileError, lambda: _to_tunable(t5, distributable=True)) diff --git a/tests/fugue_tune/test_hyperopt.py b/tests/fugue_tune/test_hyperopt.py new file mode 100644 index 0000000..37d2401 --- /dev/null +++ b/tests/fugue_tune/test_hyperopt.py @@ -0,0 +1,41 @@ +import pandas as pd + +from fugue_tune.convert import tunable +from fugue_tune.hyperopt import HyperoptRunner +from fugue_tune.space import Choice, Rand, Space, Grid + +from typing import Dict, Any +from fugue import FugueWorkflow +from fugue_tune.tuner import Tuner + + +def test_run(): + @tunable() + def func(df: pd.DataFrame, a: float, b: float, c: int) -> Dict[str, Any]: + return {"error": a * a + b * b + df.shape[0] + c, "metadata": {"d": 1}} + + pdf = pd.DataFrame([[0]], columns=["a"]) + runner = HyperoptRunner(100, seed=3) + + res = runner.run( + func, dict(df=pdf, b=Rand(-100, 100), a=10, c=Choice(1, -1)), {"a", "b", "c"} + ) + assert res["error"] < 103.0 + assert res["hp"]["a"] == 10 + assert abs(res["hp"]["b"]) < 3.0 + assert res["hp"]["c"] == -1 + assert len(res) == 3 + assert res["metadata"] == {"d": 1} + + +def test_wf(): + @tunable() + def func(a: float, b: float, c: int) -> float: + return a * a + b * b + c + + t = Tuner() + with FugueWorkflow() as dag: + space = t.space_to_df( + dag, Space(a=Grid(1, 2), b=Rand(-100, 100), c=Choice(1, -1)) + ) + t.tune(space, func, objective_runner=HyperoptRunner(100, seed=3)).show() diff --git a/tests/fugue_tune/test_space.py b/tests/fugue_tune/test_space.py index 8963b63..979e77c 100644 --- a/tests/fugue_tune/test_space.py +++ b/tests/fugue_tune/test_space.py @@ -1,5 +1,15 @@ from pytest import raises -from fugue_tune.space import Grid, Space, HorizontalSpace, VerticalSpace + +from fugue_tune.space import ( + Choice, + Grid, + HorizontalSpace, + Rand, + Space, + VerticalSpace, + decode, +) +import json def test_single_space(): @@ -131,3 +141,17 @@ def test_operators(): dict(c="a"), dict(c="b"), ] == list(s1 + [dict(c="a"), dict(c="b")]) + + +def test_encode_decode(): + s1 = Space( + a=Grid(1, 2), + b=Rand(0, 1.0, 0.2, log=True, normal=False), + c=Choice(1, 2, 3), + d=[Grid(1, 2), Rand(0, 2.0)], + e={"x": "xx", "y": Choice("a", "b")}, + ) + actual = [decode(x) for x in s1.encode()] + assert list(s1) == actual + for x in s1.encode(): + print(json.dumps(x, indent=2)) diff --git a/tests/fugue_tune/test_tuner.py b/tests/fugue_tune/test_tuner.py index 5ada543..e4aecf5 100644 --- a/tests/fugue_tune/test_tuner.py +++ b/tests/fugue_tune/test_tuner.py @@ -80,4 +80,10 @@ def t1(a: int, p: pd.DataFrame, b: int) -> float: t = dag.df([[0, 1], [1, 2], [0, 2]], "x:int,y:int") df = tuner.serialize_df(t, "p", str(tmpdir)).cross_join(s.broadcast()) tuner.tune(df, t1, distributable=distributable).show() - + + for distributable in [True, False, None]: + with FugueWorkflow() as dag: + s = tuner.space_to_df(dag, Space(a=Grid(0, 1), b=Grid(2, 3)), batch_size=3) + t = dag.df([[0, 1], [1, 2], [0, 2]], "x:int,y:int").partition(by=["x"]) + df = tuner.serialize_df(t, "p", str(tmpdir)).cross_join(s.broadcast()) + tuner.tune(df, t1, distributable=distributable).show()