Skip to content

Commit

Permalink
Merge pull request #6 from fugue-project/0.0.3.1
Browse files Browse the repository at this point in the history
add bayesian search
  • Loading branch information
goodwanghan authored Dec 31, 2020
2 parents 49e144b + 5e83087 commit 581d49d
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 35 deletions.
71 changes: 71 additions & 0 deletions fugue_tune/hyperopt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from typing import Any, Dict, Tuple, Set

from fugue_tune.tunable import Tunable
from fugue_tune.tuner import ObjectiveRunner
from fugue_tune.space import StochasticExpression, Rand, Choice
from hyperopt import fmin, tpe, hp, Trials, STATUS_OK
import numpy as np


class HyperoptRunner(ObjectiveRunner):
def __init__(self, max_iter: int, seed: int = 0):
self._max_iter = max_iter
self._seed = seed

def run(
self, tunable: Tunable, kwargs: Dict[str, Any], hp_keys: Set[str]
) -> Dict[str, Any]:
static_params, stochastic_params = self._split(kwargs)
keys = list(stochastic_params.keys())

def obj(args) -> Dict[str, Any]:
params = {k: v for k, v in zip(keys, args)}
tunable.run(**static_params, **params)
hp = {k: v for k, v in tunable.hp.items() if k in hp_keys}
return {
"loss": tunable.error,
"status": STATUS_OK,
"error": tunable.error,
"hp": hp,
"metadata": tunable.metadata,
}

trials = Trials()
fmin(
obj,
space=list(stochastic_params.values()),
algo=tpe.suggest,
max_evals=self._max_iter,
trials=trials,
show_progressbar=False,
rstate=np.random.RandomState(self._seed),
)

return {
"error": trials.best_trial["result"]["error"],
"hp": trials.best_trial["result"]["hp"],
"metadata": trials.best_trial["result"]["metadata"],
}

def _split(self, kwargs: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
static_params: Dict[str, Any] = {}
stochastic_params: Dict[str, Any] = {}
for k, v in kwargs.items():
if isinstance(v, StochasticExpression):
if isinstance(v, Rand):
stochastic_params[k] = self.convert_rand(k, v)
elif isinstance(v, Choice):
stochastic_params[k] = self.convert_choice(k, v)
else:
raise NotImplementedError(v) # pragma: no cover
else:
static_params[k] = v
return static_params, stochastic_params

def convert_rand(self, k: str, v: Rand) -> Any:
if v.q is None and not v.log and not v.normal:
return hp.uniform(k, v.start, v.end)
raise NotImplementedError(k, v) # pragma: no cover

def convert_choice(self, k: str, v: Choice) -> Any:
return hp.choice(k, v.values)
90 changes: 79 additions & 11 deletions fugue_tune/space.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from copy import deepcopy
from typing import Any, Dict, Iterable, List, Tuple, no_type_check
from typing import Any, Dict, Iterable, List, Tuple, no_type_check, Optional

from fugue_tune.iter import dict_product, product

Expand All @@ -12,21 +12,74 @@ def __iter__(self) -> Iterable[Any]:
yield from self._values


class Choice(object):
class StochasticExpression(object):
@property
def jsondict(self) -> Dict[str, Any]: # pragma: no cover
raise NotImplementedError

def __eq__(self, other: Any):
return isinstance(other, type(self)) and self.jsondict == other.jsondict


class Choice(StochasticExpression):
def __init__(self, *args: Any):
self._values = list(args)

def __iter__(self) -> Iterable[Any]:
yield from self._values
@property
def values(self) -> List[Any]:
return self._values

@property
def jsondict(self) -> Dict[str, Any]:
return dict(_expr_="choice", values=self.values)


class Rand(StochasticExpression):
def __init__(
self,
start: float,
end: float,
q: Optional[float] = None,
log: bool = False,
normal: bool = False,
):
self.start = start
self.end = end
self.q = q
self.log = log
self.normal = normal

class Rand(object):
def __init__(self, start: float, end: float, q: float, log: bool, normal: bool):
self._start = start
self._end = end
self._q = q
self._log = log
self._normal = normal
@property
def jsondict(self) -> Dict[str, Any]:
res = dict(
_expr_="rand",
start=self.start,
end=self.end,
log=self.log,
normal=self.normal,
)
if self.q is not None:
res["q"] = self.q
return res


def decode(value: Any) -> Any:
if isinstance(value, str):
return value
elif isinstance(value, list):
return [decode(v) for v in value]
elif isinstance(value, dict):
if "_expr_" in value:
e = value.pop("_expr_")
if e == "choice": # TODO: embeded rand is not supported
return Choice(*value["values"])
if e == "rand":
return Rand(**value)
raise ValueError(e) # pragma: no cover
else:
return {k: decode(v) for k, v in value.items()}
else:
return value


# TODO: make this inherit from iterable?
Expand All @@ -43,6 +96,10 @@ def __iter__(self) -> Iterable[Dict[str, Any]]:
tp[0][tp[1]] = tp[2]
yield deepcopy(self._value)

def encode(self) -> Iterable[Any]:
for s in self: # type: ignore
yield self._encode_value(s)

def __mul__(self, other: Any) -> "HorizontalSpace":
return HorizontalSpace(self, other)

Expand All @@ -63,6 +120,17 @@ def _search(self, parent: Any, key: Any) -> None:
def _grid_wrapper(self, parent: Any, key: Any) -> List[Tuple[Any, Any, Any]]:
return [(parent, key, x) for x in parent[key]]

def _encode_value(self, value: Any) -> Any:
if isinstance(value, StochasticExpression):
return value.jsondict
elif isinstance(value, str):
return value
elif isinstance(value, list):
return [self._encode_value(v) for v in value]
elif isinstance(value, dict):
return {k: self._encode_value(v) for k, v in value.items()}
return value


class HorizontalSpace(Space):
def __init__(self, *args: Any, **kwargs: Any):
Expand Down
45 changes: 25 additions & 20 deletions fugue_tune/tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,20 @@
Transformer,
WorkflowDataFrame,
)
from triad import assert_or_throw
from triad.utils.convert import get_caller_global_local_vars

from fugue_tune.convert import _to_tunable
from fugue_tune.space import Choice, Grid, Rand, Space
from fugue_tune.space import Space, decode
from fugue_tune.tunable import Tunable


class ObjectiveRunner(object):
def run(
self, tunable: Tunable, kwargs: Dict[str, Any], hp_keys: Set[str]
) -> Dict[str, Any]:
tunable.run(**kwargs)
hp = {k: v for k, v in tunable.hp.items() if k in hp_keys}
return {"error": tunable.error, "hp": hp, "metadata": tunable.metadata}


class Tuner(object):
Expand All @@ -28,17 +37,21 @@ def tune( # noqa: C901
params_df: WorkflowDataFrame,
tunable: Any,
distributable: Optional[bool] = None,
objective_runner: Optional[ObjectiveRunner] = None,
) -> WorkflowDataFrame:
t = _to_tunable( # type: ignore
tunable, *get_caller_global_local_vars(), distributable
)
if distributable is None:
distributable = t.distributable

if objective_runner is None:
objective_runner = ObjectiveRunner()

# input_has: __fmin_params__:str
# schema: *,__fmin_value__:double,__fmin_metadata__:str
def compute_transformer(
df: Iterable[Dict[str, Any]], load: Any = None
df: Iterable[Dict[str, Any]]
) -> Iterable[Dict[str, Any]]:
for row in df:
dfs: Dict[str, Any] = {}
Expand All @@ -50,13 +63,14 @@ def compute_transformer(
dfs[key] = pd.read_parquet(v)
dfs_keys.add(key)
for params in json.loads(row["__fmin_params__"]):
t.run(**dfs, **params)
res = dict(row)
res["__fmin_params__"] = json.dumps(
{pk: pv for pk, pv in t.hp.items() if pk not in dfs_keys}
p = decode(params)
best = objective_runner.run( # type: ignore
t, dict(**dfs, **p), set(p.keys())
)
res["__fmin_value__"] = t.error
res["__fmin_metadata__"] = json.dumps(t.metadata)
res = dict(row)
res["__fmin_params__"] = json.dumps(best["hp"])
res["__fmin_value__"] = best["error"]
res["__fmin_metadata__"] = json.dumps(best["metadata"])
yield res

# input_has: __fmin_params__:str
Expand Down Expand Up @@ -112,26 +126,17 @@ def space_to_df(
self, wf: FugueWorkflow, space: Space, batch_size: int = 1, shuffle: bool = True
) -> WorkflowDataFrame:
def get_data() -> Iterable[List[Any]]:
it = list(space) # type: ignore
it = list(space.encode()) # type: ignore
if shuffle:
random.seed(0)
random.shuffle(it)
res: List[Any] = []
for a in it:
res.append(self._convert_hp(a))
res.append(a)
if batch_size == len(res):
yield [json.dumps(res)]
res = []
if len(res) > 0:
yield [json.dumps(res)]

return wf.df(IterableDataFrame(get_data(), "__fmin_params__:str"))

def _convert_hp(self, params: Dict[str, Any]) -> Dict[str, Any]:
return {k: self._convert_single(v) for k, v in params.items()}

def _convert_single(self, param: Any) -> Any:
assert_or_throw(
not isinstance(param, (Grid, Rand, Choice)), NotImplementedError(param)
)
return param
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.
.[all]

# test requirements
pre-commit
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def get_version() -> str:
keywords="fugue incubator experiment",
url="http://github.com/fugue-project/fugue-incubator",
install_requires=["fugue==0.5.0.dev1"],
extras_require={},
extras_require={"hyperopt": ["hyperopt"], "all": ["hyperopt"]},
classifiers=[
# "3 - Alpha", "4 - Beta" or "5 - Production/Stable"
"Development Status :: 3 - Alpha",
Expand Down
16 changes: 16 additions & 0 deletions tests/fugue_tune/test_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ def t5(e: ExecutionEngine, a: int) -> float:
def t6(a: int, e: ExecutionEngine) -> float:
pass

def t7(**kwargs) -> float:
pass

def t8(**kwargs: int) -> float:
pass

def t9(a: int, **kwargs: int) -> float:
pass

def t10(e: ExecutionEngine, a: int, **kwargs: int) -> float:
pass

assert t1 is _to_tunable(t1)._func
assert _to_tunable(t1).distributable
assert _to_tunable(t1, distributable=True).distributable
Expand All @@ -39,6 +51,10 @@ def t6(a: int, e: ExecutionEngine) -> float:
assert t5 is _to_tunable(t5)._func
assert not _to_tunable(t5).distributable
assert not _to_tunable(t5, distributable=False).distributable

assert t8 is _to_tunable(t8)._func
assert t9 is _to_tunable(t9)._func
assert t10 is _to_tunable(t10)._func
# with execution engine, distributable can't be true
raises(FugueTuneCompileError, lambda: _to_tunable(t5, distributable=True))

Expand Down
41 changes: 41 additions & 0 deletions tests/fugue_tune/test_hyperopt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import pandas as pd

from fugue_tune.convert import tunable
from fugue_tune.hyperopt import HyperoptRunner
from fugue_tune.space import Choice, Rand, Space, Grid

from typing import Dict, Any
from fugue import FugueWorkflow
from fugue_tune.tuner import Tuner


def test_run():
@tunable()
def func(df: pd.DataFrame, a: float, b: float, c: int) -> Dict[str, Any]:
return {"error": a * a + b * b + df.shape[0] + c, "metadata": {"d": 1}}

pdf = pd.DataFrame([[0]], columns=["a"])
runner = HyperoptRunner(100, seed=3)

res = runner.run(
func, dict(df=pdf, b=Rand(-100, 100), a=10, c=Choice(1, -1)), {"a", "b", "c"}
)
assert res["error"] < 103.0
assert res["hp"]["a"] == 10
assert abs(res["hp"]["b"]) < 3.0
assert res["hp"]["c"] == -1
assert len(res) == 3
assert res["metadata"] == {"d": 1}


def test_wf():
@tunable()
def func(a: float, b: float, c: int) -> float:
return a * a + b * b + c

t = Tuner()
with FugueWorkflow() as dag:
space = t.space_to_df(
dag, Space(a=Grid(1, 2), b=Rand(-100, 100), c=Choice(1, -1))
)
t.tune(space, func, objective_runner=HyperoptRunner(100, seed=3)).show()
Loading

0 comments on commit 581d49d

Please sign in to comment.