Skip to content

Commit

Permalink
start Pipeline implementation (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
EthanJamesLew committed Mar 8, 2023
1 parent 72c935c commit aa4ef80
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 0 deletions.
19 changes: 19 additions & 0 deletions autokoopman/core/hyperparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ def __contains__(self, item) -> bool:
def name(self):
return self._name

@name.setter
def name(self, new_name):
self._name = new_name

def __repr__(self):
return f"<{self.__class__.__name__} Name: {self.name}>"

Expand Down Expand Up @@ -100,12 +104,27 @@ def __init__(self, name: str, domain_lower: int, domain_upper: int, step=1):
class ParameterSpace(Parameter):
"""an interval hull"""

@classmethod
def from_parameter_spaces(cls, name: str, spaces: Sequence["ParameterSpace"]):
import copy

coords = []
for space in spaces:
for p in space:
pi = copy.deepcopy(p)
pi.name = f"{name}-{pi.name}"
coords.append(pi)
return cls(name, coords)

def __init__(self, name: str, coords: Sequence[Parameter]):
super(ParameterSpace, self).__init__(name)
self._coords = coords
self._cdict = {c.name: c for c in self._coords}

def is_member(self, item) -> bool:
assert (
len(item) == self.dimension
), f"item must be a sequence have have the same number of elements as the ParameterSpace dimension"
return all([itemi in coordi for itemi, coordi in zip(item, self._coords)])

def random(self):
Expand Down
81 changes: 81 additions & 0 deletions autokoopman/core/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Pipelines
map parameters spaces to execution artifacts
"""
import copy
import abc
from typing import Optional, Sequence, Any
from autokoopman.core.hyperparameter import ParameterSpace
from autokoopman.core.trajectory import TrajectoriesData


class Pipeline:
this_parameter_space = ParameterSpace("", [])

def __init__(self, name: str) -> None:
self._param_space = self.this_parameter_space
self._next_stages: Sequence[Pipeline] = []
self.name = name

@abc.abstractmethod
def execute(self, inputs, params: Optional[Sequence[Any]]) -> Any:
"""execute this stage only"""
raise NotImplementedError

def run(self, inputs, params: Optional[Sequence[Any]]):
"""run full pipeline"""
# input checks
assert params in self.parameter_space
params = self._split_inputs(params)

# run the current stage
results = self.execute(inputs, params[0])

# if no other stages, return the results, else flow them through the following stages
if len(self._next_stages) == 0:
return results
else:
rem = [
stage.run(results, p) for stage, p in zip(self._next_stages, params[1:])
]
if len(rem) == 1:
return rem[0]
else:
return tuple(rem)

def add_post_pipeline(self, next_pipeline: "Pipeline"):
assert isinstance(
next_pipeline, Pipeline
), f"next pipeline must be a Pipeline object"
self._next_stages.append(next_pipeline)

def __or__(self, next: Any):
if not isinstance(next, Pipeline):
raise ValueError(f"{next} must be a Pipeline")

# create a new instance
n = copy.deepcopy(self)
n.add_post_pipeline(next)
return n

def _split_inputs(self, inputs: Sequence[Any]):
idx = self._param_space.dimension
inps = [inputs[0:idx]]
for stage in self._next_stages:
inps.append(inputs[idx : (idx + stage.parameter_space.dimension)])
idx += stage.parameter_space.dimension
return inps

@property
def parameter_space(self):
return ParameterSpace.from_parameter_spaces(
self.name,
[
self._param_space,
*[stage.parameter_space for stage in self._next_stages],
],
)


class TrajectoryPreprocessor(Pipeline):
def run(self, inputs, params: Optional[Sequence[Any]]) -> TrajectoriesData:
return super().run(inputs, params)
30 changes: 30 additions & 0 deletions test/unit_test/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import autokoopman.core.pipeline as apipe
import autokoopman.core.hyperparameter as ahyp


def test_compositionality():
"""test that the compositionality works"""

class Succ(apipe.Pipeline):
def execute(self, x, _):
return x + 1

class Source(apipe.Pipeline):
def execute(self, x, __):
return x

class Identity(apipe.Pipeline):
def execute(self, x, _):
return x

head = Source("head")
fork1 = Succ("fork1")

fork1 |= Identity("stage3")
fork1 |= Succ("stage4")

head |= Succ("stage2")
head |= fork1

assert head.run(0, []) == (1, (1, 2))
assert head.run(5, []) == (6, (6, 7))

0 comments on commit aa4ef80

Please sign in to comment.