Skip to content

Commit

Permalink
Add sample_params to existing effects
Browse files Browse the repository at this point in the history
  • Loading branch information
felipeangelimvieira committed Dec 15, 2024
1 parent a276300 commit c5752c3
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 45 deletions.
81 changes: 75 additions & 6 deletions extension_templates/effect.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,66 @@
from prophetverse.utils.frame_to_array import series_to_tensor_or_array


class MySimpleEffectName(BaseEffect):
"""Base class for effects."""

_tags = {
# Supports multivariate data? Can this
# Effect be used with Multiariate prophet?
"supports_multivariate": False,
# If no columns are found, should
# _predict be skipped?
"skip_predict_if_no_match": True,
# Should only the indexes related to the forecasting horizon be passed to
# _transform?
"filter_indexes_with_forecating_horizon_at_transform": True,
}

def __init__(self, param1: Any, param2: Any):
self.param1 = param1
self.param2 = param2

def _sample_params(self, data, predicted_effects):
# call numpyro.sample to sample the parameters of the effect
# return a dictionary with the sampled parameters, where
# key is the name of the parameter and value is the sampled value
return {}

def _predict(
self,
data: Any,
predicted_effects: Dict[str, jnp.ndarray],
params: Dict[str, jnp.ndarray],
) -> jnp.ndarray:
"""Apply and return the effect values.
Parameters
----------
data : Any
Data obtained from the transformed method.
predicted_effects : Dict[str, jnp.ndarray], optional
A dictionary containing the predicted effects, by default None.
params : Dict[str, jnp.ndarray]
A dictionary containing the sampled parameters of the effect.
Returns
-------
jnp.ndarray
An array with shape (T,1) for univariate timeseries, or (N, T, 1) for
multivariate timeseries, where T is the number of timepoints and N is the
number of series.
"""
# predicted effects come with the following shapes:
# (T, 1) shaped array for univariate timeseries
# (N, T, 1) shaped array for multivariate timeseries, where N is the number of
# series

# Here you use the params to compute the effect.
raise NotImplementedError("Subclasses must implement _predict()")


class MyEffectName(BaseEffect):
"""Base class for effects."""

Expand Down Expand Up @@ -76,10 +136,17 @@ def _transform(self, X: pd.DataFrame, fh: pd.Index) -> Any:
array = series_to_tensor_or_array(X)
return array

def predict(
def _sample_params(self, data, predicted_effects):
# call numpyro.sample to sample the parameters of the effect
# return a dictionary with the sampled parameters, where
# key is the name of the parameter and value is the sampled value
return {}

def _predict(
self,
data: Dict,
data: Any,
predicted_effects: Dict[str, jnp.ndarray],
params: Dict[str, jnp.ndarray],
) -> jnp.ndarray:
"""Apply and return the effect values.
Expand All @@ -91,18 +158,20 @@ def predict(
predicted_effects : Dict[str, jnp.ndarray], optional
A dictionary containing the predicted effects, by default None.
params : Dict[str, jnp.ndarray]
A dictionary containing the sampled parameters of the effect.
Returns
-------
jnp.ndarray
An array with shape (T,1) for univariate timeseries, or (N, T, 1) for
multivariate timeseries, where T is the number of timepoints and N is the
number of series.
"""
# Get the trend
# predicted effects come with the following shapes:
# (T, 1) shaped array for univariate timeseries
# (N, T, 1) shaped array for multivariate timeseries, where N is the number of
# series
# trend: jnp.ndarray = predicted_effects["trend"]
# Or user predicted_effects.get("trend") to return None if the trend is
# not found

# Here you use the params to compute the effect.
raise NotImplementedError("Subclasses must implement _predict()")
49 changes: 38 additions & 11 deletions src/prophetverse/effects/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,33 @@ def sample_params(
Dict
A dictionary containing the sampled parameters.
"""
if predicted_effects is None:
predicted_effects = {}

Check warning on line 277 in src/prophetverse/effects/base.py

View check run for this annotation

Codecov / codecov/patch

src/prophetverse/effects/base.py#L277

Added line #L277 was not covered by tests

return self._sample_params(data, predicted_effects)

def _sample_params(
self,
data: Dict,
predicted_effects: Optional[Dict[str, jnp.ndarray]] = None,
data: Any,
predicted_effects: Dict[str, jnp.ndarray],
):
"""Sample parameters from the prior distribution.
Should be implemented by subclasses to provide the actual sampling logic.
Parameters
----------
data : Any
The data to be used for sampling the parameters, obtained from
`transform` method.
predicted_effects : Dict[str, jnp.ndarray]
A dictionary containing the predicted effects, by default None.
Returns
-------
Dict
A dictionary containing the sampled parameters.
"""
return {}

def _predict(
Expand Down Expand Up @@ -336,9 +356,10 @@ class BaseAdditiveOrMultiplicativeEffect(BaseEffect):
or "multiplicative".
"""

def __init__(self, effect_mode="additive"):
def __init__(self, effect_mode="additive", base_effect_name: str = "trend"):

self.effect_mode = effect_mode
self.base_effect_name = base_effect_name

if effect_mode not in ["additive", "multiplicative"]:
raise ValueError(
Expand Down Expand Up @@ -372,23 +393,29 @@ def predict(
number of series.
"""
if predicted_effects is None:
predicted_effects = {}

Check warning on line 396 in src/prophetverse/effects/base.py

View check run for this annotation

Codecov / codecov/patch

src/prophetverse/effects/base.py#L396

Added line #L396 was not covered by tests

if params is None:
params = self.sample_params(data, predicted_effects)

if (
self.base_effect_name not in predicted_effects
and self.effect_mode == "multiplicative"
):
raise ValueError(
"BaseAdditiveOrMultiplicativeEffect requires trend in"
+ " predicted_effects"
)

trend = predicted_effects["trend"]
if trend.ndim == 1:
trend = trend.reshape((-1, 1))

if params is None:
params = self.sample_params(data, predicted_effects)
x = super().predict(
data=data, predicted_effects=predicted_effects, params=params
)
x = x.reshape(trend.shape)

if self.effect_mode == "additive":
return x

return trend * x
base_effect = predicted_effects[self.base_effect_name]
if base_effect.ndim == 1:
base_effect = base_effect.reshape((-1, 1))
x = x.reshape(base_effect.shape)
return base_effect * x
2 changes: 1 addition & 1 deletion src/prophetverse/effects/exact_likelihood.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def _predict(

with numpyro.handlers.mask(mask=obs_mask):
numpyro.sample(
"lift_experiment",
"exact_likelihood:ignore",
dist.Normal(x, self.prior_scale),
obs=observed_reference_value,
)
Expand Down
19 changes: 18 additions & 1 deletion src/prophetverse/effects/hill.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,24 @@ def __init__(

super().__init__(effect_mode=effect_mode)

def _sample_params(self, data, predicted_effects=None):
def _sample_params(
self, data, predicted_effects: Dict[str, jnp.ndarray]
) -> Dict[str, jnp.ndarray]:
"""
Sample the parameters of the effect.
Parameters
----------
data : Any
Data obtained from the transformed method.
predicted_effects : Dict[str, jnp.ndarray]
A dictionary containing the predicted effects
Returns
-------
Dict[str, jnp.ndarray]
A dictionary containing the sampled parameters of the effect.
"""
return {
"half_max": numpyro.sample("half_max", self.half_max_prior),
"slope": numpyro.sample("slope", self.slope_prior),
Expand Down
51 changes: 37 additions & 14 deletions src/prophetverse/effects/lift_likelihood.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ def fit(self, y: pd.DataFrame, X: pd.DataFrame, scale: float = 1):
-------
None
"""
self.effect.fit(X=X, y=y, scale=scale)
self.effect_ = self.effect.clone()
self.effect_.fit(X=X, y=y, scale=scale)
self.timeseries_scale = scale
super().fit(X=X, y=y, scale=scale)

Expand All @@ -106,8 +107,13 @@ def _transform(self, X: pd.DataFrame, fh: pd.Index) -> Dict[str, Any]:
Dictionary with data for the lift and for the inner effect
"""
data_dict = {}
data_dict["inner_effect_data"] = self.effect._transform(X, fh=fh)
data_dict["inner_effect_data"] = self.effect_._transform(X, fh=fh)

# Check if fh and self.lift_test_results have same index type
if not isinstance(fh, self.lift_test_results.index.__class__):
raise TypeError(

Check warning on line 114 in src/prophetverse/effects/lift_likelihood.py

View check run for this annotation

Codecov / codecov/patch

src/prophetverse/effects/lift_likelihood.py#L114

Added line #L114 was not covered by tests
"fh and self.lift_test_results must have the same index type"
)
X_lift = self.lift_test_results.reindex(fh, fill_value=jnp.nan)

data_dict["observed_lift"] = (
Expand All @@ -119,8 +125,25 @@ def _transform(self, X: pd.DataFrame, fh: pd.Index) -> Dict[str, Any]:

return data_dict

def _sample_params(self, data, predicted_effects=None):
return self.effect.sample_params(
def _sample_params(self, data, predicted_effects):
"""
Sample the parameters of the effect.
Calls the sample_params method of the inner effect.
Parameters
----------
data : Any
Data obtained from the transformed method.
predicted_effects : Dict[str, jnp.ndarray]
A dictionary containing the predicted effects
Returns
-------
Dict[str, jnp.ndarray]
A dictionary containing the sampled parameters of the effect.
"""
return self.effect_.sample_params(
data=data["inner_effect_data"], predicted_effects=predicted_effects
)

Expand Down Expand Up @@ -150,35 +173,35 @@ def _predict(
x_end = data["x_end"].reshape((-1, 1))
obs_mask = data["obs_mask"]

effect_params = self.effect.sample_params(
data=data["inner_effect_data"],
predicted_effects=predicted_effects,
)

predicted_effects_masked = {
k: v[obs_mask] for k, v in predicted_effects.items()
}

x = self.effect.predict(
# Call the effect a first time
x = self.effect_.predict(
data=data["inner_effect_data"],
predicted_effects=predicted_effects,
params=params,
)

y_start = self.effect.predict(
# Get the start and end values
y_start = self.effect_.predict(
data=x_start,
predicted_effects=predicted_effects_masked,
params=effect_params,
params=params,
)
y_end = self.effect.predict(
data=x_end, predicted_effects=predicted_effects_masked, params=effect_params
y_end = self.effect_.predict(
data=x_end, predicted_effects=predicted_effects_masked, params=params
)

# Calculate the delta_y
delta_y = jnp.abs(y_end - y_start)

with numpyro.handlers.scale(scale=self.likelihood_scale):
distribution = GammaReparametrized(delta_y, self.prior_scale)

# Add :ignore so that the model removes this
# sample when organizing the output dataframe
numpyro.sample(
"lift_experiment:ignore",
distribution,
Expand Down
8 changes: 4 additions & 4 deletions src/prophetverse/effects/linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(

super().__init__(effect_mode=effect_mode)

def _sample_params(self, data, predicted_effects=None):
def _sample_params(self, data, predicted_effects):

n_features = data.shape[-1]

Expand All @@ -54,7 +54,7 @@ def _sample_params(self, data, predicted_effects=None):
def _predict(
self,
data: Any,
predicted_effects: Optional[Dict[str, jnp.ndarray]],
predicted_effects: Dict[str, jnp.ndarray],
params: Dict[str, jnp.ndarray],
) -> jnp.ndarray:
"""Apply and return the effect values.
Expand All @@ -64,8 +64,8 @@ def _predict(
data : Any
Data obtained from the transformed method.
predicted_effects : Dict[str, jnp.ndarray], optional
A dictionary containing the predicted effects, by default None.
predicted_effects : Dict[str, jnp.ndarray]
A dictionary containing the predicted effects
Returns
-------
Expand Down
16 changes: 13 additions & 3 deletions src/prophetverse/effects/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,19 @@ def __init__(
self.rate_prior = rate_prior or dist.Gamma(1, 1)
super().__init__(effect_mode=effect_mode)

def _sample_params(self, data, predicted_effects):
scale = numpyro.sample("log_scale", self.scale_prior)
rate = numpyro.sample("log_rate", self.rate_prior)
return {
"scale": scale,
"rate": rate,
}

def _predict( # type: ignore[override]
self,
data: jnp.ndarray,
predicted_effects: Optional[Dict[str, jnp.ndarray]] = None,
predicted_effects: Dict[str, jnp.ndarray],
params: Dict[str, jnp.ndarray],
) -> jnp.ndarray:
"""Apply and return the effect values.
Expand All @@ -60,8 +69,9 @@ def _predict( # type: ignore[override]
multivariate timeseries, where T is the number of timepoints and N is the
number of series.
"""
scale = numpyro.sample("log_scale", self.scale_prior)
rate = numpyro.sample("log_rate", self.rate_prior)
scale = params["scale"]
rate = params["rate"]

effect = scale * jnp.log(jnp.clip(rate * data + 1, 1e-8, None))

return effect
Loading

0 comments on commit c5752c3

Please sign in to comment.