Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xgboost #405

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions numalogic/models/forecast/variants/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from numalogic.models.forecast.variants.naive import BaselineForecaster, SeasonalNaiveForecaster
from numalogic.models.forecast.variants.naive import (
BaselineForecaster,
SeasonalNaiveForecaster,
)
from numalogic.models.forecast.variants.xgboost import XGBoostForecaster

__all__ = ["BaselineForecaster", "SeasonalNaiveForecaster"]
__all__ = ["BaselineForecaster", "SeasonalNaiveForecaster", "XGBoostForecaster"]
150 changes: 150 additions & 0 deletions numalogic/models/forecast/variants/xgboost.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import logging

import numpy as np
import pandas as pd
import torch
from torch.utils.data import DataLoader
from xgboost import XGBRegressor, callback

from numalogic.tools.data import ForecastDataset
from numalogic.transforms._covariates import CovariatesGenerator

_LOGGER = logging.getLogger(__name__)


def _check_data_format(df) -> bool:
if not isinstance(df, pd.DataFrame):
raise TypeError("df should be of type pd.DataFrame")

Check warning on line 17 in numalogic/models/forecast/variants/xgboost.py

View check run for this annotation

Codecov / codecov/patch

numalogic/models/forecast/variants/xgboost.py#L17

Added line #L17 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you cover these lines with tests?

if not df.shape[1] > 0:
raise ValueError("df should have more than 0 column")

Check warning on line 19 in numalogic/models/forecast/variants/xgboost.py

View check run for this annotation

Codecov / codecov/patch

numalogic/models/forecast/variants/xgboost.py#L19

Added line #L19 was not covered by tests
if not isinstance(df.index, pd.DatetimeIndex):
raise TypeError("df index should be of type pd.DatetimeIndex")

Check warning on line 21 in numalogic/models/forecast/variants/xgboost.py

View check run for this annotation

Codecov / codecov/patch

numalogic/models/forecast/variants/xgboost.py#L21

Added line #L21 was not covered by tests
return True


class XGBoostForecaster:
"""
A forecaster that uses XGBoost regressor to predict future values.

Args:
____
horizon: number of time steps to predict into the future
seq_len: number of time steps to consider for prediction
l_rate: learning rate for the XGBoost regress
regressor_params: additional parameters for the XGBoost regressor
"""

__slots__ = (
"_horizon",
"_seq_len",
"_val_split",
"_model",
"_early_stop_callback",
"_early_stopping",
)

def __init__(
self,
horizon: int,
seq_len: int,
early_stopping=True,
val_split: float = 0.1,
**regressor_params,
):
self._horizon = horizon
self._seq_len = seq_len
self._val_split = 0
self._early_stopping = early_stopping
if early_stopping:
self._val_split = val_split
early_stop_callback = callback.EarlyStopping(
rounds=20, metric_name="rmse", save_best=True, maximize=False, min_delta=1e-4
)
default_params = {
"learning_rate": 0.1,
"n_estimators": 1000,
"booster": "gbtree",
"max_depth": 7,
"min_child_weight": 1,
"gamma": 0.0,
"subsample": 0.9,
"colsample_bytree": 0.8,
"reg_alpha": 0.1,
"nthread": 4,
"seed": 27,
"objective": "reg:squarederror",
"random_state": 42,
}
if early_stopping:
default_params.update({"callbacks": [early_stop_callback]})
if regressor_params:
default_params.update(default_params)

Check warning on line 81 in numalogic/models/forecast/variants/xgboost.py

View check run for this annotation

Codecov / codecov/patch

numalogic/models/forecast/variants/xgboost.py#L81

Added line #L81 was not covered by tests

self._model = XGBRegressor(**default_params)

def prepare_data(self, x: np.array):
"""
Prepare data in the format required for forecasting.

Args:
----
x: np.array: input data
seq_len: int: sequence length
horizon: int: forecast horizon
"""
ds = ForecastDataset(x, seq_len=self._seq_len, horizon=self._horizon)
dataloaders = DataLoader(ds, batch_size=1)

X = np.empty((0, self._seq_len, x.shape[1]))
Y = np.empty((0, self._horizon, 1))
for x, y in dataloaders:
X = np.concatenate([X, x.numpy()], axis=0)
Y = np.concatenate([Y, y[:, :, 0].unsqueeze(-1).numpy()], axis=0)
X = X.reshape(X.shape[0], -1)
Y = Y.reshape(Y.shape[0], -1)
return X, Y

def fit(self, df: pd.DataFrame):
_check_data_format(df)

# Split the data into training and validation sets
train_df = df.iloc[: int(len(df) * (1 - self._val_split)), :]
val_df = df.iloc[int(len(df) * (1 - self._val_split)) :, :] if self._val_split else None

# Transform and prepare the training data
transformed_train_data = CovariatesGenerator().transform(train_df)
x_train, y_train = self.prepare_data(transformed_train_data)

# Fit the model with or without validation data
if val_df is not None:
transformed_val_data = CovariatesGenerator().transform(val_df)
x_val, y_val = self.prepare_data(transformed_val_data)
_LOGGER.info("Fitting the model with validation data")
self._model.fit(x_train, y_train, eval_set=[(x_val, y_val)], verbose=True)
else:
_LOGGER.info("Fitting the model without validation data")
self._model.fit(x_train, y_train, verbose=False)

def predict_horizon(self, df: pd.DataFrame) -> np.ndarray:
_check_data_format(df)
transformed_test_data = CovariatesGenerator().transform(df)
_LOGGER.info("Predicting the horizon")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove log

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or maybe use debug

x_test, y_test = self.prepare_data(transformed_test_data)
return self._model.predict(x_test)

def predict_last(self, df: pd.DataFrame) -> np.ndarray:
_check_data_format(df)
transformed_test_data = CovariatesGenerator().transform(df)
_LOGGER.info("Predicting the last value")
x_test, y_test = self.prepare_data(transformed_test_data)
return self._model.predict(x_test[-1].reshape(1, -1))

def save_artifacts(self, path: str) -> None:
artifact = {"model": self._model}
torch.save(artifact, path)
_LOGGER.info("Model saved at %s", path)

def load_artifacts(self, path: str) -> None:
artifact = torch.load(path)
self._model = artifact["model"]
_LOGGER.info(f"Model loaded from {path}")
55 changes: 55 additions & 0 deletions numalogic/tools/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,61 @@
return self._data[(idx * self._stride) : (idx * self._stride) + self._seq_len]


class ForecastDataset(StreamingDataset):
"""
A Dataset for generating sequences for forecasting tasks.

Args:
----
data: A numpy array containing the input data in the shape of (batch, num_features).
seq_len: Length of the sliding window sequences to be generated from the input data
horizon: The forecast horizon
stride: Stride to jump between sequences; defaults to 1
"""

def __init__(self, data: npt.NDArray[float], seq_len: int, horizon: int, stride: int = 1):
super().__init__(data, seq_len, stride=stride)
self.horizon = horizon

def create_seq(
self, input_: npt.NDArray[float]
) -> Generator[tuple[npt.NDArray[float], npt.NDArray[float]], None, None]:
r"""Yields sequences of specified length from the input data.

Args:
----
input_: A numpy array containing the input data.

Yields
------
A tuple of subarray of size (seq_len, num_features)
and forecast horizon of size (seq_len, num_features)
from the input data.
"""
idx = 0
while idx < len(self._data) - self._seq_len - self.horizon + 1:
yield (
input_[idx : idx + self._seq_len],
input_[idx + self._seq_len : idx + self._seq_len + self.horizon],
)
idx += self._stride

def __getitem__(self, idx: int) -> npt.NDArray[float]:
r"""Retrieves a sequence from the input data at the specified index."""
if isinstance(idx, slice):
raise NotImplementedError("Slicing not supported!")

Check warning on line 239 in numalogic/tools/data.py

View check run for this annotation

Codecov / codecov/patch

numalogic/tools/data.py#L239

Added line #L239 was not covered by tests
if idx >= len(self):
raise IndexError(f"{idx} out of bound!")
return (

Check warning on line 242 in numalogic/tools/data.py

View check run for this annotation

Codecov / codecov/patch

numalogic/tools/data.py#L241-L242

Added lines #L241 - L242 were not covered by tests
self._data[idx : idx + self._seq_len],
self._data[idx + self._seq_len : idx + self._seq_len + self.horizon],
)

def __len__(self) -> int:
r"""Returns the number of sequences that can be generated from the input data."""
return (len(self._data) - self._seq_len - self.horizon) // self._stride + 1

Check warning on line 249 in numalogic/tools/data.py

View check run for this annotation

Codecov / codecov/patch

numalogic/tools/data.py#L249

Added line #L249 was not covered by tests


class StreamingDataLoader(DataLoader):
"""
A DataLoader for convenience that uses StreamingDataset for handling time series data.
Expand Down
60 changes: 60 additions & 0 deletions numalogic/transforms/_covariates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import logging
from typing import Union

import numpy as np
import pandas as pd
from darts import TimeSeries
from darts.utils.timeseries_generation import datetime_attribute_timeseries

from numalogic.base import StatelessTransformer

_LOGGER = logging.getLogger(__name__)


class CovariatesGenerator(StatelessTransformer):
"""A transformer/generator that generates covariates for a timeseries dataset.

Args:
----
timestamp_column_name: The name of the timestamp column
columns_to_preserve: The columns to preserve in the dataset
*covariate_attributes: The tuple of attributes to consider for generating covariates
"""

def __init__(
self,
timestamp_column_name: str = "timestamp",
columns_to_preserve: Union[str, list[str]] = "value",
*covariate_attributes: tuple[str],
):
self.covariate_attributes = (
covariate_attributes if covariate_attributes else ("dayofweek", "month", "dayofyear")
)
self.timestamp_column_name = timestamp_column_name
self.columns_to_preserve = columns_to_preserve

def _get_covariates(self, df: pd.DataFrame):
covariates = []
_LOGGER.info("Generating covariates for %s", self.covariate_attributes)
for attribute in self.covariate_attributes:
day_series = datetime_attribute_timeseries(
TimeSeries.from_dataframe(
df.reset_index(), self.timestamp_column_name, self.columns_to_preserve
),
attribute=attribute,
one_hot=False,
cyclic=True,
).values()
covariates.append(day_series)
return np.concatenate(covariates, axis=1)

def transform(self, input_: pd.DataFrame, **__):
"""
Generate covariates for a timeseries dataset.

Args:
----
data: np.array: input data
"""
covariates = self._get_covariates(input_)
return np.concatenate([input_.to_numpy(), covariates], axis=1)
4 changes: 3 additions & 1 deletion numalogic/udfs/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
status=Status.RUNTIME_ERROR,
)
msgs = Messages(
get_trainer_message(keys, _stream_conf, payload, **_metric_label_values),
get_trainer_message(
keys=keys, stream_conf=_stream_conf, payload=payload, **_metric_label_values
),
)
if _conf.numalogic_conf.score.adjust:
msgs.append(get_static_thresh_message(keys, payload))
Expand Down
1 change: 0 additions & 1 deletion numalogic/udfs/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from pandas import DataFrame
from pynumaflow.mapper import Message


from numalogic.registry import ArtifactManager, ArtifactData
from numalogic.tools.exceptions import RedisRegistryError
from numalogic.tools.types import KEYS, redis_client_t
Expand Down
Loading
Loading