From 934d8cfe28a964915545ffa262ba162c9a85612f Mon Sep 17 00:00:00 2001 From: Avik Basu <3485425+ab93@users.noreply.github.com> Date: Tue, 14 May 2024 13:41:10 -0700 Subject: [PATCH] Feat/vanilla ae refactor (#379) - Refactor Vanilla AE - Support static filters in Druid - Add Percentile Scaler --------- Signed-off-by: Avik Basu Co-authored-by: Gulshan Bhatia Co-authored-by: Kushal Batra <34571348+s0nicboOm@users.noreply.github.com> Co-authored-by: Nandita Koppisetty --- numalogic/backtest/_prom.py | 35 ++++- numalogic/config/_config.py | 2 +- numalogic/config/factory.py | 12 +- numalogic/connectors/__init__.py | 6 +- numalogic/connectors/_config.py | 8 + numalogic/connectors/druid/_druid.py | 68 ++++++-- numalogic/models/autoencoder/base.py | 22 +-- numalogic/models/autoencoder/variants/conv.py | 9 +- numalogic/models/autoencoder/variants/lstm.py | 8 +- .../autoencoder/variants/transformer.py | 6 +- .../models/autoencoder/variants/vanilla.py | 147 +++++++++++++----- numalogic/models/threshold/_static.py | 4 +- numalogic/models/vae/base.py | 9 +- numalogic/models/vae/variants/conv.py | 2 +- numalogic/tools/data.py | 2 +- numalogic/tools/loss.py | 34 ++++ numalogic/transforms/__init__.py | 6 +- numalogic/transforms/_movavg.py | 56 ++----- numalogic/transforms/_postprocess.py | 10 ++ numalogic/transforms/_scaler.py | 70 +++++++++ numalogic/udfs/inference.py | 3 +- numalogic/udfs/postprocess.py | 44 ++++-- numalogic/udfs/trainer/_base.py | 26 ++-- numalogic/udfs/trainer/_druid.py | 1 + poetry.lock | 26 ++-- pyproject.toml | 3 +- tests/connectors/test_druid.py | 60 +++++++ tests/transforms/test_postprocess.py | 118 +++++++------- tests/transforms/test_transforms.py | 17 +- tests/udfs/test_inference.py | 4 +- tests/udfs/test_pipeline.py | 65 ++++---- tests/udfs/test_postprocess.py | 14 +- 32 files changed, 612 insertions(+), 285 deletions(-) create mode 100644 numalogic/tools/loss.py diff --git a/numalogic/backtest/_prom.py b/numalogic/backtest/_prom.py index 9fc044c7..d5d2fbff 100644 --- a/numalogic/backtest/_prom.py +++ b/numalogic/backtest/_prom.py @@ -49,6 +49,9 @@ class OutDataFrames: thresh_out: pd.DataFrame postproc_out: pd.DataFrame unified_out: pd.DataFrame + static_out: Optional[pd.DataFrame] = None + static_features: Optional[pd.DataFrame] = None + adjusted_unified: Optional[pd.DataFrame] = None class PromBacktester: @@ -135,9 +138,14 @@ def train_models( x_train = df_train.to_numpy(dtype=np.float32) LOGGER.info("Training data shape: %s", x_train.shape) + if self.nlconf.trainer.transforms: + train_txs = PreprocessFactory().get_pipeline_instance(self.nlconf.trainer.transforms) + else: + train_txs = None artifacts = UDFFactory.get_udf_cls("promtrainer").compute( model=ModelFactory().get_instance(self.nlconf.model), input_=x_train, + trainer_transform=train_txs, preproc_clf=PreprocessFactory().get_pipeline_instance(self.nlconf.preprocess), threshold_clf=ThresholdFactory().get_instance(self.nlconf.threshold), numalogic_cfg=self.nlconf, @@ -238,6 +246,8 @@ def generate_scores( x_recon = np.zeros((len(ds), self.seq_len, n_feat), dtype=np.float32) raw_scores = np.zeros((len(ds), self.seq_len, n_feat), dtype=np.float32) + unified_raw_scores = np.zeros((len(ds), 1), dtype=np.float32) + feature_scores = np.zeros((len(ds), n_feat), dtype=np.float32) unified_scores = np.zeros((len(ds), 1), dtype=np.float32) @@ -253,12 +263,17 @@ def generate_scores( winscores = postproc_udf.compute_feature_scores( raw_scores[idx], self.nlconf.score.window_agg + ) # (nfeat,) + + unified_raw_scores[idx] = postproc_udf.compute_unified_score( + winscores, + feat_agg_conf=self.nlconf.score.feature_agg, ) feature_scores[idx] = postproc_udf.compute_postprocess(postproc_func, winscores) - unified_scores[idx] = postproc_udf.compute_unified_score( - feature_scores[idx], self.nlconf.score.feature_agg + unified_scores[idx] = postproc_udf.compute_postprocess( + postproc_func, unified_raw_scores[idx] ) x_recon = self.window_inverse(x_recon) @@ -274,7 +289,7 @@ def generate_scores( [np.full((len(x_test) - len(ds), 1), fill_value=np.nan), unified_scores] ) - return self._construct_output( + out_dfs = self._construct_output( df_test, preproc_out=x_scaled, nn_out=x_recon, @@ -282,6 +297,16 @@ def generate_scores( postproc_out=feature_scores, unified_out=unified_scores, ) + if self.nlconf.score.adjust: + static_scores = self.generate_static_scores(df_test) + out_dfs.static_out = static_scores["static_unified"] + out_dfs.static_features = static_scores["static_features"] + + out_dfs.adjusted_unified = pd.concat( + [out_dfs.unified_out, out_dfs.static_out], axis=1 + ).max(axis=1) + + return out_dfs def generate_static_scores(self, df: pd.DataFrame) -> pd.DataFrame: if not self.nlconf.score.adjust: @@ -305,12 +330,12 @@ def generate_static_scores(self, df: pd.DataFrame) -> pd.DataFrame: ) feature_scores = np.vstack( [ - np.full((self.seq_len - 1, len(metrics)), fill_value=np.nan), + np.full((len(x_test) - len(ds), len(metrics)), fill_value=np.nan), feature_scores, ] ) unified_scores = np.vstack( - [np.full((self.seq_len - 1, 1), fill_value=np.nan), unified_scores] + [np.full((len(x_test) - len(ds), 1), fill_value=np.nan), unified_scores] ) dfs = { "input": df, diff --git a/numalogic/config/_config.py b/numalogic/config/_config.py index 67f37eb9..a37469e7 100644 --- a/numalogic/config/_config.py +++ b/numalogic/config/_config.py @@ -157,7 +157,7 @@ class NumalogicConf: trainer: TrainerConf = field(default_factory=TrainerConf) preprocess: list[ModelInfo] = field(default_factory=list) threshold: ModelInfo = field(default_factory=lambda: ModelInfo(name="StdDevThreshold")) - postprocess: ModelInfo = field( + postprocess: Optional[ModelInfo] = field( default_factory=lambda: ModelInfo(name="TanhNorm", stateful=False) ) score: ScoreConf = field(default_factory=lambda: ScoreConf()) diff --git a/numalogic/config/factory.py b/numalogic/config/factory.py index 28898b10..b0fc73e8 100644 --- a/numalogic/config/factory.py +++ b/numalogic/config/factory.py @@ -51,6 +51,8 @@ class PreprocessFactory(_ObjectFactory): GaussianNoiseAdder, DifferenceTransform, FlattenVector, + PercentileScaler, + ExpMovingAverage, ) _CLS_MAP: ClassVar[dict] = { @@ -65,6 +67,8 @@ class PreprocessFactory(_ObjectFactory): "GaussianNoiseAdder": GaussianNoiseAdder, "DifferenceTransform": DifferenceTransform, "FlattenVector": FlattenVector, + "PercentileScaler": PercentileScaler, + "ExpMovingAverage": ExpMovingAverage, } def get_pipeline_instance(self, objs_info: list[ModelInfo]): @@ -82,9 +86,13 @@ def get_pipeline_instance(self, objs_info: list[ModelInfo]): class PostprocessFactory(_ObjectFactory): """Factory class to create postprocess instances.""" - from numalogic.transforms import TanhNorm, ExpMovingAverage + from numalogic.transforms import TanhNorm, ExpMovingAverage, SigmoidNorm - _CLS_MAP: ClassVar[dict] = {"TanhNorm": TanhNorm, "ExpMovingAverage": ExpMovingAverage} + _CLS_MAP: ClassVar[dict] = { + "TanhNorm": TanhNorm, + "ExpMovingAverage": ExpMovingAverage, + "SigmoidNorm": SigmoidNorm, + } class ThresholdFactory(_ObjectFactory): diff --git a/numalogic/connectors/__init__.py b/numalogic/connectors/__init__.py index 6f33b047..8c8e2d19 100644 --- a/numalogic/connectors/__init__.py +++ b/numalogic/connectors/__init__.py @@ -10,7 +10,6 @@ RDSConf, RDSFetcherConf, ) -from numalogic.connectors.rds import RDSFetcher from numalogic.connectors.prometheus import PrometheusFetcher __all__ = [ @@ -26,6 +25,11 @@ "RDSFetcherConf", ] +if find_spec("boto3"): + from numalogic.connectors.rds import RDSFetcher # noqa: F401 + + __all__.append("RDSFetcher") + if find_spec("pydruid"): from numalogic.connectors.druid import DruidFetcher # noqa: F401 diff --git a/numalogic/connectors/_config.py b/numalogic/connectors/_config.py index 13638fca..8a5c6ea7 100644 --- a/numalogic/connectors/_config.py +++ b/numalogic/connectors/_config.py @@ -36,11 +36,19 @@ class Pivot: index: str = "timestamp" columns: list[str] = field(default_factory=list) value: list[str] = field(default_factory=lambda: ["count"]) + agg: list[str] = field(default_factory=lambda: ["sum"]) + + +@dataclass +class FilterConf: + inclusion_filters: Optional[list[dict]] = None + exclusion_filters: Optional[list[dict]] = None @dataclass class DruidFetcherConf: datasource: str + static_filters: Optional[FilterConf] = None dimensions: list[str] = field(default_factory=list) aggregations: dict = field(default_factory=dict) group_by: list[str] = field(default_factory=list) diff --git a/numalogic/connectors/druid/_druid.py b/numalogic/connectors/druid/_druid.py index c62d4fec..121baa4f 100644 --- a/numalogic/connectors/druid/_druid.py +++ b/numalogic/connectors/druid/_druid.py @@ -10,7 +10,7 @@ from pydruid.utils.filters import Filter from numalogic.connectors._base import DataFetcher -from numalogic.connectors._config import Pivot +from numalogic.connectors._config import Pivot, FilterConf from typing import Optional, Final from numalogic.tools.exceptions import DruidFetcherError @@ -34,6 +34,24 @@ def make_filter_pairs(filter_keys: list[str], filter_values: list[str]) -> dict[ return dict(zip(filter_keys, filter_values)) +def _combine_in_filters(filters_list) -> Filter: + return Filter(type="and", fields=[Filter(**item) for item in filters_list]) + + +def _combine_ex_filters(filters_list) -> Filter: + filters = _combine_in_filters(filters_list) + return Filter(type="not", field=filters) + + +def _make_static_filters(filters: FilterConf) -> Filter: + filter_list = [] + if filters.inclusion_filters: + filter_list.append(_combine_in_filters(filters.inclusion_filters)) + if filters.exclusion_filters: + filter_list.append(_combine_ex_filters(filters.exclusion_filters)) + return Filter(type="and", fields=filter_list) + + def build_params( datasource: str, dimensions: list[str], @@ -41,6 +59,7 @@ def build_params( granularity: str, hours: float, delay: float, + static_filters: Optional[FilterConf] = None, aggregations: Optional[list[str]] = None, post_aggregations: Optional[list[str]] = None, reference_dt: Optional[datetime] = None, @@ -52,6 +71,7 @@ def build_params( dimensions: The dimensions to group by filter_pairs: Indicates which rows of data to include in the query + static_filters: Static filters passed from config granularity: Time bucket to aggregate data by hour, day, minute, etc., hours: Hours from now to skip training. delay: Added delay to the fetch query from current time. @@ -69,6 +89,11 @@ def build_params( type="and", fields=[Filter(type="selector", dimension=k, value=v) for k, v in filter_pairs.items()], ) + if static_filters: + _LOGGER.debug("Static Filters are present!") + _static_filters = _make_static_filters(static_filters) + _filter = Filter(type="and", fields=[_static_filters, _filter]) + reference_dt = reference_dt or datetime.now(pytz.utc) end_dt = reference_dt - timedelta(hours=delay) _LOGGER.debug("Querying with end_dt: %s, that is with delay of %s hrs", end_dt, delay) @@ -118,6 +143,7 @@ def fetch( dimensions: list[str], delay: float = 3.0, granularity: str = "minute", + static_filters: Optional[FilterConf] = None, aggregations: Optional[dict] = None, post_aggregations: Optional[dict] = None, group_by: Optional[list[str]] = None, @@ -135,6 +161,7 @@ def fetch( dimensions: The dimensions to group by delay: Added delay to the fetch query from current time. granularity: Time bucket to aggregate data by hour, day, minute, etc. + static_filters: user defined filters aggregations: A map from aggregator name to one of the ``pydruid.utils.aggregators`` e.g., ``doublesum`` post_aggregations: postaggregations map @@ -152,6 +179,7 @@ def fetch( datasource=datasource, dimensions=dimensions, filter_pairs=filter_pairs, + static_filters=static_filters, granularity=granularity, hours=hours, delay=delay, @@ -169,12 +197,16 @@ def fetch( if group_by: df = df.groupby(by=group_by).sum().reset_index() - if pivot and pivot.columns: - df = df.pivot( - index=pivot.index, - columns=pivot.columns, - values=pivot.value, - ) + # TODO: performance review + if pivot: + pivoted_frames = [] + for idx, column in enumerate(pivot.columns): + _df = df.pivot_table( + index=pivot.index, columns=[column], values=pivot.value, aggfunc=pivot.agg[idx] + ) + pivoted_frames.append(_df) + + df = pd.concat(pivoted_frames, axis=1, join="outer") df.columns = df.columns.map("{0[1]}".format) df.reset_index(inplace=True) @@ -193,6 +225,7 @@ def chunked_fetch( dimensions: list[str], delay: float = 3.0, granularity: str = "minute", + static_filter: Optional[FilterConf] = None, aggregations: Optional[dict] = None, post_aggregations: Optional[dict] = None, group_by: Optional[list[str]] = None, @@ -213,6 +246,7 @@ def chunked_fetch( granularity: Time bucket to aggregate data by hour, day, minute, etc. aggregations: A map from aggregator name to one of the ``pydruid.utils.aggregators`` e.g., ``doublesum`` + static_filter: user defined filters post_aggregations: postaggregations map group_by: List of columns to group by pivot: Pivot configuration @@ -245,6 +279,7 @@ def chunked_fetch( datasource=datasource, dimensions=dimensions, filter_pairs=filter_pairs, + static_filters=static_filter, granularity=granularity, hours=min(chunked_hours, hours - hours_elapsed), delay=delay, @@ -259,21 +294,22 @@ def chunked_fetch( _LOGGER.debug("Fetching data concurrently with %s threads", max_threads) with ThreadPoolExecutor(max_workers=max_threads) as executor: futures = [executor.submit(self._fetch, **params) for params in qparams] - for future in futures: - chunked_dfs.append(future.result()) - + chunked_dfs.extend(future.result() for future in futures) df = pd.concat(chunked_dfs, axis=0, ignore_index=True) df["timestamp"] = pd.to_datetime(df["timestamp"]).astype("int64") // 10**6 if group_by: df = df.groupby(by=group_by).sum().reset_index() - if pivot and pivot.columns: - df = df.pivot( - index=pivot.index, - columns=pivot.columns, - values=pivot.value, - ) + if pivot: + pivoted_frames = [] + for idx, column in enumerate(pivot.columns): + _df = df.pivot_table( + index=pivot.index, columns=[column], values=pivot.value, aggfunc=pivot.agg[idx] + ) + pivoted_frames.append(_df) + + df = pd.concat(pivoted_frames, axis=1, join="outer") df.columns = df.columns.map("{0[1]}".format) df.reset_index(inplace=True) diff --git a/numalogic/models/autoencoder/base.py b/numalogic/models/autoencoder/base.py index 1b9fdf4b..131bb9d7 100644 --- a/numalogic/models/autoencoder/base.py +++ b/numalogic/models/autoencoder/base.py @@ -12,10 +12,10 @@ from typing import Any -import torch.nn.functional as F from torch import Tensor, optim from numalogic.base import TorchModel +from numalogic.tools.loss import get_loss_fn class BaseAE(TorchModel): @@ -41,19 +41,9 @@ def __init__( super().__init__() self.lr = lr self.optim_algo = optim_algo - self.criterion = self.init_criterion(loss_fn) + self.criterion = get_loss_fn(loss_fn) self.weight_decay = weight_decay - @staticmethod - def init_criterion(loss_fn: str): - if loss_fn == "huber": - return F.huber_loss - if loss_fn == "l1": - return F.l1_loss - if loss_fn == "mse": - return F.mse_loss - raise NotImplementedError(f"Unsupported loss function provided: {loss_fn}") - def init_optimizer(self, optim_algo: str): if optim_algo == "adam": return optim.Adam(self.parameters(), lr=self.lr, weight_decay=self.weight_decay) @@ -67,9 +57,9 @@ def configure_shape(self, x: Tensor) -> Tensor: """Method to configure the batch shape for each type of model architecture.""" return x - def _get_reconstruction_loss(self, batch: Tensor) -> Tensor: + def get_reconstruction_loss(self, batch: Tensor, reduction="mean") -> Tensor: _, recon = self.forward(batch) - return self.criterion(batch, recon) + return self.criterion(batch, recon, reduction=reduction) def reconstruction(self, batch: Tensor) -> Tensor: _, recon = self.forward(batch) @@ -80,11 +70,11 @@ def configure_optimizers(self) -> dict[str, Any]: return {"optimizer": optimizer} def training_step(self, batch: Tensor, batch_idx: int) -> Tensor: - recon_loss = self._get_reconstruction_loss(batch) + recon_loss = self.get_reconstruction_loss(batch) self.log("train_loss", recon_loss, on_epoch=True, on_step=False) return recon_loss def validation_step(self, batch: Tensor, batch_idx: int) -> Tensor: - recon_loss = self._get_reconstruction_loss(batch) + recon_loss = self.get_reconstruction_loss(batch) self.log("val_loss", recon_loss) return recon_loss diff --git a/numalogic/models/autoencoder/variants/conv.py b/numalogic/models/autoencoder/variants/conv.py index 494aff1b..1fd9c7e8 100644 --- a/numalogic/models/autoencoder/variants/conv.py +++ b/numalogic/models/autoencoder/variants/conv.py @@ -202,6 +202,7 @@ class Conv1dAE(BaseAE): dec_activation: The final activation for the decoder Supported values include: ("sigmoid", "tanh", "relu") If None then no output activation is added + **kwargs: BaseAE kwargs Note: Length of list/tuple of enc_channels and enc_kernel_sizes must be equal """ @@ -284,10 +285,6 @@ def encode(self, batch: Tensor) -> Tensor: batch = self.configure_shape(batch) return self.encoder(batch) - def _get_reconstruction_loss(self, batch: Tensor) -> Tensor: - _, recon = self.forward(batch) - return self.criterion(batch, recon) - def predict_step(self, batch: Tensor, batch_idx: int, dataloader_idx: int = 0) -> Tensor: """Returns reconstruction for streaming input.""" recon = self.reconstruction(batch) @@ -308,7 +305,7 @@ class SparseConv1dAE(Conv1dAE): ---- beta: Penalty factor (Defaults to 1e-3) rho: Sparsity parameter value (Defaults to 0.05) - **kwargs: VanillaAE kwargs + **kwargs: Conv1dAE kwargs """ def __init__(self, beta: float = 1e-3, rho: float = 0.05, *args, **kwargs): @@ -335,7 +332,7 @@ def kl_divergence(self, activations: Tensor) -> Tensor: ) return torch.sum(torch.clamp(kl_loss, max=1.0)) - def _get_reconstruction_loss(self, batch) -> Tensor: + def get_reconstruction_loss(self, batch: Tensor, reduction="mean") -> Tensor: latent, recon = self.forward(batch) loss = self.criterion(batch, recon) penalty = self.kl_divergence(latent) diff --git a/numalogic/models/autoencoder/variants/lstm.py b/numalogic/models/autoencoder/variants/lstm.py index e077b4f2..a83c55b0 100644 --- a/numalogic/models/autoencoder/variants/lstm.py +++ b/numalogic/models/autoencoder/variants/lstm.py @@ -107,7 +107,7 @@ def __init__( embedding_dim: int, encoder_layers: int = 1, decoder_layers: int = 1, - **kwargs + **kwargs, ): super().__init__(**kwargs) @@ -146,7 +146,7 @@ def forward(self, x: Tensor) -> tuple[Tensor, Tensor]: decoded = self.decoder(encoded) return encoded, decoded - def predict_step(self, batch: Tensor, batch_idx: int, dataloader_idx: int = 0): + def predict_step(self, batch: Tensor, batch_idx: int, dataloader_idx: int = 0) -> Tensor: """Returns reconstruction for streaming input.""" recon = self.reconstruction(batch) return self.criterion(batch, recon, reduction="none") @@ -166,7 +166,7 @@ class SparseLSTMAE(LSTMAE): ---- beta: regularization parameter (Defaults to 1e-3) rho: sparsity parameter value (Defaults to 0.05) - **kwargs: VanillaAE kwargs + **kwargs: LSTMAE kwargs """ def __init__(self, beta=1e-3, rho=0.05, *args, **kwargs): @@ -192,7 +192,7 @@ def kl_divergence(self, activations: Tensor) -> Tensor: _dim = 0 if rho_hat.dim() == 1 else 1 return kl_loss(torch.log_softmax(rho_hat, dim=_dim), torch.softmax(rho, dim=_dim)) - def _get_reconstruction_loss(self, batch): + def get_reconstruction_loss(self, batch: Tensor, reduction="mean") -> Tensor: latent, recon = self.forward(batch) loss = self.criterion(batch, recon) penalty = self.kl_divergence(latent) diff --git a/numalogic/models/autoencoder/variants/transformer.py b/numalogic/models/autoencoder/variants/transformer.py index a9f61737..955ef975 100644 --- a/numalogic/models/autoencoder/variants/transformer.py +++ b/numalogic/models/autoencoder/variants/transformer.py @@ -313,7 +313,7 @@ def __init__( dim_feedforward: int = 2048, dropout: float = 0.1, activation: nn.Module = nn.ReLU(), - **kwargs + **kwargs, ): super().__init__(**kwargs) self.n_features = n_features @@ -348,7 +348,7 @@ def forward(self, batch: Tensor) -> tuple[Tensor, Tensor]: decoded = self.decoder(batch, encoded) return encoded, decoded - def _get_reconstruction_loss(self, batch): + def get_reconstruction_loss(self, batch): _, recon = self.forward(batch) x = batch.view(-1, self.n_features, self.seq_len) return self.criterion(x, recon) @@ -400,7 +400,7 @@ def kl_divergence(self, activations: Tensor) -> Tensor: _dim = 0 if rho_hat.dim() == 1 else 1 return kl_loss(torch.log_softmax(rho_hat, dim=_dim), torch.softmax(rho, dim=_dim)) - def _get_reconstruction_loss(self, batch): + def get_reconstruction_loss(self, batch): latent, recon = self.forward(batch) x = batch.view(-1, self.n_features, self.seq_len) loss = self.criterion(x, recon) diff --git a/numalogic/models/autoencoder/variants/vanilla.py b/numalogic/models/autoencoder/variants/vanilla.py index 5580f6d8..991f4831 100644 --- a/numalogic/models/autoencoder/variants/vanilla.py +++ b/numalogic/models/autoencoder/variants/vanilla.py @@ -20,8 +20,8 @@ from numalogic.tools.exceptions import LayerSizeMismatchError -class _Encoder(nn.Module): - r"""Encoder module for the autoencoder module. +class _VanillaEncoder(nn.Module): + r"""Encoder module for the VanillaAE. Args: ---- @@ -32,11 +32,19 @@ class _Encoder(nn.Module): """ - def __init__(self, seq_len: int, n_features: int, layersizes: Sequence[int], dropout_p: float): + def __init__( + self, + seq_len: int, + n_features: int, + layersizes: Sequence[int], + dropout_p: float, + batchnorm: bool, + ): super().__init__() self.seq_len = seq_len self.n_features = n_features self.dropout_p = dropout_p + self.bnorm = batchnorm layers = self._construct_layers(layersizes) self.encoder = nn.Sequential(*layers) @@ -56,23 +64,17 @@ def _construct_layers(self, layersizes: Sequence[int]) -> nn.ModuleList: start_layersize = self.seq_len for lsize in layersizes[:-1]: - layers.extend( - [ - nn.Linear(start_layersize, lsize), - nn.BatchNorm1d(self.n_features), - nn.Tanh(), - nn.Dropout(p=self.dropout_p), - ] - ) + _l = [nn.Linear(start_layersize, lsize)] + if self.bnorm: + _l.append(nn.BatchNorm1d(self.n_features)) + layers.extend([*_l, nn.Tanh(), nn.Dropout(p=self.dropout_p)]) start_layersize = lsize - layers.extend( - [ - nn.Linear(start_layersize, layersizes[-1]), - nn.BatchNorm1d(self.n_features), - nn.ReLU(), - ] - ) + _l = [nn.Linear(start_layersize, layersizes[-1])] + if self.bnorm: + _l.append(nn.BatchNorm1d(self.n_features)) + layers.extend([*_l, nn.Tanh(), nn.Dropout(p=self.dropout_p)]) + return layers def forward(self, x: Tensor) -> Tensor: @@ -91,11 +93,19 @@ class _Decoder(nn.Module): """ - def __init__(self, seq_len: int, n_features: int, layersizes: Sequence[int], dropout_p: float): + def __init__( + self, + seq_len: int, + n_features: int, + layersizes: Sequence[int], + dropout_p: float, + batchnorm: bool, + ): super().__init__() self.seq_len = seq_len self.n_features = n_features self.dropout_p = dropout_p + self.bnorm = batchnorm layers = self._construct_layers(layersizes) self.decoder = nn.Sequential(*layers) @@ -117,14 +127,10 @@ def _construct_layers(self, layersizes: Sequence[int]) -> nn.ModuleList: layers = nn.ModuleList() for idx, _ in enumerate(layersizes[:-1]): - layers.extend( - [ - nn.Linear(layersizes[idx], layersizes[idx + 1]), - nn.BatchNorm1d(self.n_features), - nn.Tanh(), - nn.Dropout(p=self.dropout_p), - ] - ) + _l = [nn.Linear(layersizes[idx], layersizes[idx + 1])] + if self.bnorm: + _l.append(nn.BatchNorm1d(self.n_features)) + layers.extend([*_l, nn.Tanh(), nn.Dropout(p=self.dropout_p)]) layers.append(nn.Linear(layersizes[-1], self.seq_len)) return layers @@ -135,11 +141,13 @@ class VanillaAE(BaseAE): Args: ---- - signal_len: sequence length / window length + seq_len: sequence length / window length n_features: num of features encoder_layersizes: encoder layer size (default = Sequence[int] = (16, 8)) decoder_layersizes: decoder layer size (default = Sequence[int] = (8, 16)) dropout_p: the dropout value (default=0.25) + batchnorm: Flag to enable batch normalization (default=False) + **kwargs: BaseAE kwargs """ def __init__( @@ -149,6 +157,7 @@ def __init__( encoder_layersizes: Sequence[int] = (16, 8), decoder_layersizes: Sequence[int] = (8, 16), dropout_p: float = 0.25, + batchnorm: bool = False, **kwargs, ): super().__init__(**kwargs) @@ -162,17 +171,19 @@ def __init__( f"does not match first layersize of decoder: {decoder_layersizes[0]}" ) - self.encoder = _Encoder( + self.encoder = _VanillaEncoder( seq_len=seq_len, n_features=n_features, layersizes=encoder_layersizes, dropout_p=dropout_p, + batchnorm=batchnorm, ) self.decoder = _Decoder( seq_len=seq_len, n_features=n_features, layersizes=decoder_layersizes, dropout_p=dropout_p, + batchnorm=batchnorm, ) self.encoder.apply(self.init_weights) @@ -181,7 +192,7 @@ def __init__( @staticmethod def init_weights(m: nn.Module) -> None: """Initialize the parameters in the model.""" - if type(m) == nn.Linear: + if isinstance(m, nn.Linear): nn.init.xavier_normal_(m.weight) def forward(self, batch: Tensor) -> tuple[Tensor, Tensor]: @@ -190,16 +201,42 @@ def forward(self, batch: Tensor) -> tuple[Tensor, Tensor]: decoded = self.decoder(encoded) return encoded, torch.swapdims(decoded, 1, 2) - def _get_reconstruction_loss(self, batch: Tensor): - _, recon = self.forward(batch) - return self.criterion(batch, recon) - def predict_step(self, batch: Tensor, batch_idx: int, dataloader_idx: int = 0): """Returns reconstruction for streaming input.""" recon = self.reconstruction(batch) return self.criterion(batch, recon, reduction="none") +class _SparseVanillaEncoder(_VanillaEncoder): + def _construct_layers(self, layersizes: Sequence[int]) -> nn.ModuleList: + r"""Utility function to generate a simple feedforward network layer. + + Args: + ---- + layersizes: layer size + + Returns + ------- + A simple feedforward network layer of type nn.ModuleList + """ + layers = nn.ModuleList() + start_layersize = self.seq_len + + for lsize in layersizes[:-1]: + _l = [nn.Linear(start_layersize, lsize)] + if self.bnorm: + _l.append(nn.BatchNorm1d(self.n_features)) + layers.extend([*_l, nn.Tanh(), nn.Dropout(p=self.dropout_p)]) + start_layersize = lsize + + _l = [nn.Linear(start_layersize, layersizes[-1])] + if self.bnorm: + _l.append(nn.BatchNorm1d(self.n_features)) + layers.extend([*_l, nn.ReLU()]) + + return layers + + class SparseVanillaAE(VanillaAE): r"""Sparse Autoencoder for a fully connected network. It inherits from VanillaAE class and serves as a wrapper around base network models. @@ -212,13 +249,45 @@ class SparseVanillaAE(VanillaAE): Args: ---- + seq_len: sequence length / window length + n_features: num of features + encoder_layersizes: encoder layer size (default = Sequence[int] = (16, 8)) + decoder_layersizes: decoder layer size (default = Sequence[int] = (8, 16)) + dropout_p: the dropout value (default=0.25) + batchnorm: Flag to enable batch normalization (default=False) beta: Regularization factor (Defaults to 1e-3) rho: Sparsity parameter value (Defaults to 0.05) - **kwargs: VanillaAE kwargs + **kwargs: BaseAE kwargs """ - def __init__(self, beta=1e-3, rho=0.05, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__( + self, + seq_len: int, + n_features: int = 1, + encoder_layersizes: Sequence[int] = (16, 8), + decoder_layersizes: Sequence[int] = (8, 16), + dropout_p: float = 0.25, + batchnorm: bool = True, + beta=1e-3, + rho=0.05, + **kwargs, + ): + super().__init__( + seq_len, + n_features=n_features, + encoder_layersizes=encoder_layersizes, + decoder_layersizes=decoder_layersizes, + dropout_p=dropout_p, + batchnorm=batchnorm, + **kwargs, + ) + self.encoder = _SparseVanillaEncoder( + seq_len=seq_len, + n_features=n_features, + layersizes=encoder_layersizes, + dropout_p=dropout_p, + batchnorm=batchnorm, + ) self.beta = beta self.rho = rho @@ -241,9 +310,9 @@ def kl_divergence(self, activations: Tensor) -> Tensor: ) return torch.sum(torch.clamp(kl_loss, max=1.0)) - def _get_reconstruction_loss(self, batch: Tensor) -> Tensor: + def get_reconstruction_loss(self, batch: Tensor, reduction="mean") -> Tensor: latent, recon = self.forward(batch) - loss = self.criterion(batch, recon) + loss = self.criterion(batch, recon, reduction=reduction) penalty = self.kl_divergence(latent) return loss + (self.beta * penalty) diff --git a/numalogic/models/threshold/_static.py b/numalogic/models/threshold/_static.py index f33011f3..7ac031a2 100644 --- a/numalogic/models/threshold/_static.py +++ b/numalogic/models/threshold/_static.py @@ -121,4 +121,6 @@ def score_samples(self, x: npt.NDArray[float]) -> npt.NDArray[float]: with values being anomaly scores. """ self._validate_input(x) - return self.score_limit / (1.0 + np.exp(-self.coeff * (x.copy() - self.upper_limits))) + exp_arg = -self.coeff * (x - self.upper_limits) + exp_arg = np.clip(exp_arg, -88.72, 88.72) + return self.score_limit / (1.0 + np.exp(exp_arg)) diff --git a/numalogic/models/vae/base.py b/numalogic/models/vae/base.py index 3103b4d6..963140ec 100644 --- a/numalogic/models/vae/base.py +++ b/numalogic/models/vae/base.py @@ -47,17 +47,16 @@ def configure_optimizers(self) -> dict: optimizer = optim.Adam(self.parameters(), lr=self._lr, weight_decay=self.weight_decay) return {"optimizer": optimizer} - def recon_loss(self, batch: Tensor, recon: Tensor, reduction: str = "sum"): + def get_reconstruction_loss(self, batch: Tensor, reduction: str = "sum"): + _, recon = self.forward(batch) return self.criterion(batch, recon, reduction=reduction) def validation_step(self, batch: Tensor, batch_idx: int) -> Tensor: """Validation step for the model.""" - p, recon = self.forward(batch) - loss = self.recon_loss(batch, recon) + loss = self.get_reconstruction_loss(batch) self.log("val_loss", loss) return loss def predict_step(self, batch: Tensor, batch_idx: int, dataloader_idx: int = 0) -> Tensor: """Prediction step for the model.""" - p, recon = self.forward(batch) - return self.recon_loss(batch, recon, reduction="none") + return self.get_reconstruction_loss(batch, reduction="none") diff --git a/numalogic/models/vae/variants/conv.py b/numalogic/models/vae/variants/conv.py index 80c8f01e..5800b829 100644 --- a/numalogic/models/vae/variants/conv.py +++ b/numalogic/models/vae/variants/conv.py @@ -232,7 +232,7 @@ def training_step(self, batch: Tensor, batch_idx: int) -> Tensor: """Training step for the model.""" p, recon = self.forward(batch) kld_loss = self.kld_loss(p) - recon_loss = self.recon_loss(batch, recon) + recon_loss = self.criterion(batch, recon, reduction="sum") train_loss = recon_loss + (self.beta * kld_loss) self.log_dict( { diff --git a/numalogic/tools/data.py b/numalogic/tools/data.py index dd38a066..10f956c8 100644 --- a/numalogic/tools/data.py +++ b/numalogic/tools/data.py @@ -191,7 +191,7 @@ def __getitem__(self, idx: Union[int, slice]) -> npt.NDArray[float]: return np.stack(output) if idx >= len(self): raise IndexError(f"{idx} out of bound!") - return self._data[idx : idx + self._seq_len] + return self._data[(idx * self._stride) : (idx * self._stride) + self._seq_len] class StreamingDataLoader(DataLoader): diff --git a/numalogic/tools/loss.py b/numalogic/tools/loss.py new file mode 100644 index 00000000..7cbe5095 --- /dev/null +++ b/numalogic/tools/loss.py @@ -0,0 +1,34 @@ +from collections.abc import Callable + +import torch.nn.functional as F +from torch import Tensor + + +def l2_loss(input_: Tensor, target: Tensor, reduction: str = "mean") -> Tensor: + """Compute the Torch MSE (L2) loss multiplied with a factor of 0.5.""" + return 0.5 * F.mse_loss(input_, target, reduction=reduction) + + +def get_loss_fn(loss_fn: str) -> Callable: + """ + Get the loss function based on the provided loss name. + + Args: + ---- + loss_fn: loss function name (huber, l1, mse) + + Returns + ------- + Callable: loss function + + Raises + ------ + NotImplementedError: If unsupported loss function provided + """ + if loss_fn == "huber": + return F.huber_loss + if loss_fn == "l1": + return F.l1_loss + if loss_fn == "mse": + return l2_loss + raise NotImplementedError(f"Unsupported loss function provided: {loss_fn}") diff --git a/numalogic/transforms/__init__.py b/numalogic/transforms/__init__.py index 55186b83..80a2f426 100644 --- a/numalogic/transforms/__init__.py +++ b/numalogic/transforms/__init__.py @@ -14,7 +14,7 @@ feature engineering and postprocessing. """ -from numalogic.transforms._scaler import TanhScaler +from numalogic.transforms._scaler import TanhScaler, PercentileScaler from numalogic.transforms._stateless import ( LogTransformer, StaticPowerTransformer, @@ -24,7 +24,7 @@ FlattenVector, ) from numalogic.transforms._movavg import ExpMovingAverage, expmov_avg_aggregator -from numalogic.transforms._postprocess import TanhNorm, tanh_norm +from numalogic.transforms._postprocess import TanhNorm, tanh_norm, SigmoidNorm __all__ = [ "TanhScaler", @@ -38,4 +38,6 @@ "GaussianNoiseAdder", "DifferenceTransform", "FlattenVector", + "PercentileScaler", + "SigmoidNorm", ] diff --git a/numalogic/transforms/_movavg.py b/numalogic/transforms/_movavg.py index 67608e08..0afa2c1b 100644 --- a/numalogic/transforms/_movavg.py +++ b/numalogic/transforms/_movavg.py @@ -10,6 +10,7 @@ # limitations under the License. import numpy as np +import pandas as pd from numalogic.base import StatelessTransformer from numalogic.tools.exceptions import InvalidDataShapeError @@ -71,7 +72,7 @@ def expmov_avg_aggregator( class ExpMovingAverage(StatelessTransformer): - r"""Calculate the exponential moving averages for a vector. + r"""Calculate the exponentially weighted moving averages for per feature column. This transformation returns an array where each element "n" is given by the expression: @@ -80,66 +81,33 @@ class ExpMovingAverage(StatelessTransformer): "1.0 - beta" denotes the weight given to the latest element. - Without bias correction, early values can tend more towards zero, since V(0) = 0 - Bias correction helps inhibit this issue by dividing with (1 - beta**i) - Args: ---- beta: how much weight to give to the previous weighted average - bias_correction: flag to perform bias correction (default: true) - - Note: this only supports single feature input array. Raises ------ ValueError: if beta is not between 0 and 1 """ - __slots__ = ("beta", "bias_correction") + __slots__ = ("alpha",) - def __init__(self, beta: float, bias_correction: bool = True): + def __init__(self, beta: float = 0.5): if beta <= 0.0 or beta >= 1.0: raise ValueError("beta only accepts values between 0 and 1 (not inclusive)") - self.beta = beta - self.bias_correction = bias_correction + self.alpha = 1.0 - beta - def transform(self, input_: npt.NDArray[float], **__): + def transform(self, input_: npt.NDArray[float], **__) -> npt.NDArray[float]: r"""Returns transformed output. Args: ---- input_: input column vector - Raises - ------ - InvalidDataShapeError: if input array is not single featured + Returns + ------- + Transformed output """ - _allow_only_single_feature(input_) - - # alpha is the weight given to the latest element - alpha = 1.0 - self.beta - n = len(input_) - - theta = input_.reshape(-1, 1) - theta_tril = np.multiply(theta.T, np.tril(np.ones((n, n)))) - powers = np.arange(1, n + 1).reshape(-1, 1) - - # Calculate increasing powers of beta of the form, - # [beta, beta**2, .., beta**n] - beta_powers = np.power(self.beta, powers) - - # Calculate the array of reciprocals of beta powers of form, - # [beta**(-1), beta**(-2), .., beta**(-n)] - beta_arr_inv = np.reciprocal(beta_powers) - - # Calculate the summation of the ratio between (theta(i) / beta**i), - # [ theta(1)/beta, sum(theta(1)/beta, theta(2)/beta**2), .., ] - theta_beta_ratio = theta_tril @ beta_arr_inv - - # Elemental multiply with beta powers - exp_avg = alpha * np.multiply(beta_powers, theta_beta_ratio) - if not self.bias_correction: - return exp_avg - - # Calculate array of 1 / (1 - beta**i) values - return np.divide(exp_avg, 1.0 - beta_powers) + x_df = pd.DataFrame(input_, dtype=np.float32, copy=True) + x_smoothed = x_df.ewm(alpha=self.alpha).mean().to_numpy(dtype=np.float32) + return np.ascontiguousarray(x_smoothed, dtype=np.float32) diff --git a/numalogic/transforms/_postprocess.py b/numalogic/transforms/_postprocess.py index 1e4ad149..899fe1cc 100644 --- a/numalogic/transforms/_postprocess.py +++ b/numalogic/transforms/_postprocess.py @@ -48,3 +48,13 @@ def __init__(self, scale_factor=10, smooth_factor=10): def transform(self, input_: npt.NDArray[float], **__) -> npt.NDArray[float]: return tanh_norm(input_, scale_factor=self.scale_factor, smooth_factor=self.smooth_factor) + + +class SigmoidNorm(StatelessTransformer): + def __init__(self, scale_factor: float = 10.0, smooth_factor: float = 0.5): + super().__init__() + self.scale_factor = scale_factor + self.smooth_factor = smooth_factor + + def transform(self, x: npt.NDArray[float], **__) -> npt.NDArray[float]: + return self.scale_factor / (1.0 + np.exp(5 - (self.smooth_factor * x))) diff --git a/numalogic/transforms/_scaler.py b/numalogic/transforms/_scaler.py index 39c59031..49e092aa 100644 --- a/numalogic/transforms/_scaler.py +++ b/numalogic/transforms/_scaler.py @@ -11,12 +11,15 @@ import logging +from typing import Optional import numpy as np import numpy.typing as npt +from sklearn.preprocessing import MinMaxScaler from typing_extensions import Self from numalogic.base import BaseTransformer +from numalogic.transforms._stateless import DataClipper LOGGER = logging.getLogger(__name__) @@ -69,3 +72,70 @@ def fit_transform(self, x: npt.NDArray[float], y=None, **_) -> npt.NDArray[float def _check_if_constant(self, x: npt.NDArray[float]) -> None: delta = np.max(x, axis=0) - np.min(x, axis=0) self._std[delta < self._eps] = 1.0 + + +class PercentileScaler(BaseTransformer): + """ + Scales the data based on the percentiles of the data. + + Args: + ----- + max_percentile: float, optional + The upper percentile to clip the data. + Default is 99. + min_percentile: float, optional + The lower percentile to clip the data. + If None, minimum value of the data is used. + Default is None. + """ + + def __init__( + self, max_percentile: float = 99, min_percentile: Optional[float] = None, eps: float = 1e-2 + ): + self._max_px = max_percentile + self._min_px = min_percentile + self.tx = MinMaxScaler() + + self._data_pth_max = None + self._data_pth_min = None + self._eps = eps + + @property + def data_pth_max(self) -> float: + return self._data_pth_max + + @property + def data_pth_min(self) -> float: + return self._data_pth_min + + def fit(self, x: npt.NDArray[float]) -> Self: + data_max_px = np.percentile(x, self._max_px, axis=0) + data_max = np.max(x, axis=0) + + if self._min_px is None: + data_min_px = np.min(x, axis=0) + else: + data_min_px = np.percentile(x, self._min_px, axis=0) + + p_ranges = data_max_px - data_min_px + + for idx, _range in enumerate(p_ranges): + if _range <= self._eps: + LOGGER.warning( + "Max and Min percentile difference is less than " "epsilon: %s for column %s", + self._eps, + idx, + ) + data_max_px[idx] = data_max[idx] + + self._data_pth_max = data_max_px + self._data_pth_min = data_min_px + + x_clipped = DataClipper(lower=data_min_px, upper=data_max_px).transform(x) + return self.tx.fit(x_clipped) + + def fit_transform(self, x: npt.NDArray[float], y=None, **_): + return self.fit(x).transform(x) + + def transform(self, x: npt.NDArray[float]) -> npt.NDArray[float]: + return self.tx.transform(x) diff --git a/numalogic/udfs/inference.py b/numalogic/udfs/inference.py index 38ea614e..0f19d7b8 100644 --- a/numalogic/udfs/inference.py +++ b/numalogic/udfs/inference.py @@ -84,8 +84,7 @@ def compute(cls, model: artifact_t, input_: npt.NDArray[float], **_) -> npt.NDAr model.eval() try: with torch.no_grad(): - _, out = model.forward(x) - recon_err = model.criterion(out, x, reduction="none") + recon_err = model.get_reconstruction_loss(x, reduction="none") except Exception as err: raise RuntimeError("Model forward pass failed!") from err return np.ascontiguousarray(recon_err).squeeze(0) diff --git a/numalogic/udfs/postprocess.py b/numalogic/udfs/postprocess.py index 1d44b9fd..540c503a 100644 --- a/numalogic/udfs/postprocess.py +++ b/numalogic/udfs/postprocess.py @@ -122,7 +122,11 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: load_latest=LOAD_LATEST, vertex=self._vtx, ) - postproc_tx = self.postproc_factory.get_instance(postprocess_cfg) + postproc_tx = ( + self.postproc_factory.get_instance(postprocess_cfg) if postprocess_cfg else None + ) + if not postproc_tx: + logger.info("Postprocess model is absent!") if thresh_artifact is None: payload = replace( @@ -139,17 +143,19 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: # Postprocess payload try: - # Compute anomaly scores per feature - a_features = self.compute( + # Compute anomaly scores + a_unified, a_features = self.compute( model=thresh_artifact.artifact, input_=payload.get_data(), score_conf=_conf.numalogic_conf.score, postproc_tx=postproc_tx, ) # (nfeat,) - - # Compute unified score - a_unified = self.compute_unified_score( - a_features, feat_agg_conf=_conf.numalogic_conf.score.feature_agg + payload = replace( + payload, + metadata={ + "threshold": thresh_artifact.artifact.threshold, + **payload.metadata, + }, ) # Calculate adjusted unified score @@ -303,7 +309,7 @@ def compute( postproc_tx=None, score_conf: Optional[ScoreConf] = None, **_, - ) -> NDArray[float]: + ) -> tuple[float, NDArray[float]]: """ Compute thresholding, window aggregation followed by postprocess. @@ -316,7 +322,7 @@ def compute( Returns ------- - Output data of shape (n_features, ) + Tuple of combined/unified score (float), and feature scores of shape (n_features,) Raises ------ @@ -326,13 +332,23 @@ def compute( _struct_log.warning("Score config not provided, using default values") score_conf = ScoreConf() - scores = cls.compute_threshold(model, input_) # (seqlen x nfeat) - win_scores = cls.compute_feature_scores( - scores, win_agg_conf=score_conf.window_agg + thresh_scores = cls.compute_threshold(model, input_) # (seqlen x nfeat) + + # Aggregate over the sequence length + raw_scores = cls.compute_feature_scores( + thresh_scores, win_agg_conf=score_conf.window_agg ) # (nfeat,) + + # Aggregate over the features + unified_raw_score = cls.compute_unified_score(raw_scores, score_conf.feature_agg) # float + if postproc_tx: - win_scores = cls.compute_postprocess(postproc_tx, win_scores) # (nfeat,) - return win_scores # (nfeat, ) + # Postprocess the raw scores + feature_scores = cls.compute_postprocess(postproc_tx, raw_scores) # (nfeat,) + unified_score = cls.compute_postprocess(postproc_tx, unified_raw_score) # float + return unified_score, feature_scores + + return unified_raw_score, raw_scores @classmethod def compute_threshold(cls, model: artifact_t, input_: NDArray[float]) -> NDArray[float]: diff --git a/numalogic/udfs/trainer/_base.py b/numalogic/udfs/trainer/_base.py index c71b3759..cfeb25a0 100644 --- a/numalogic/udfs/trainer/_base.py +++ b/numalogic/udfs/trainer/_base.py @@ -33,6 +33,7 @@ ) from numalogic.udfs.entities import TrainerPayload from numalogic.udfs.tools import TrainMsgDeduplicator +import torch _struct_log = configure_logger() @@ -121,8 +122,12 @@ def compute( model, train_dataloaders=DataLoader(train_ds, batch_size=trainer_cfg.batch_size) ) train_reconerr = trainer.predict( - model, dataloaders=DataLoader(train_ds, batch_size=trainer_cfg.batch_size) - ).numpy() + model, + dataloaders=DataLoader(train_ds, batch_size=trainer_cfg.batch_size), + unbatch=False, + ) + train_reconerr = torch.mean(train_reconerr, dim=1).numpy() + dict_artifacts["inference"] = KeyedArtifact( dkeys=[numalogic_cfg.model.name], artifact=model, stateful=numalogic_cfg.model.stateful ) @@ -235,12 +240,12 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: _add_summary( summary=NAN_SUMMARY, labels=_metric_label_values, - data=nan_counter, + data=np.sum(nan_counter), ) _add_summary( summary=INF_SUMMARY, labels=_metric_label_values, - data=inf_counter, + data=np.sum(inf_counter), ) # Initialize artifacts @@ -366,7 +371,7 @@ def get_feature_arr( raw_df: pd.DataFrame, metrics: list[str], fill_value: float = 0.0, - ) -> tuple[npt.NDArray[float], float, float]: + ) -> tuple[npt.NDArray[float], pd.Series, pd.Series]: """ Get feature array from the raw dataframe. @@ -381,14 +386,15 @@ def get_feature_arr( nan_counter: Number of nan values inf_counter: Number of inf values """ - nan_counter = 0 - for col in metrics: + nan_counter = np.zeros(len(metrics), dtype=int) + inf_counter = np.zeros(len(metrics), dtype=int) + for idx, col in enumerate(metrics): if col not in raw_df.columns: raw_df[col] = fill_value - nan_counter += len(raw_df) + nan_counter[idx] += len(raw_df) feat_df = raw_df[metrics] - nan_counter += raw_df.isna().sum().all() - inf_counter = np.isinf(feat_df).sum().all() + nan_counter += feat_df.isna().sum() + inf_counter = np.isinf(feat_df).sum() feat_df = feat_df.fillna(fill_value).replace([np.inf, -np.inf], fill_value) return feat_df.to_numpy(dtype=np.float32), nan_counter, inf_counter diff --git a/numalogic/udfs/trainer/_druid.py b/numalogic/udfs/trainer/_druid.py index f21f9d77..7a348bae 100644 --- a/numalogic/udfs/trainer/_druid.py +++ b/numalogic/udfs/trainer/_druid.py @@ -120,6 +120,7 @@ def fetch_data(self, payload: TrainerPayload) -> Optional[pd.DataFrame]: datasource=_fetcher_conf.datasource, filter_keys=_stream_conf.composite_keys, filter_values=payload.composite_keys, + static_filters=_fetcher_conf.static_filters, dimensions=list(_fetcher_conf.dimensions), delay=self.dataconn_conf.delay_hrs, granularity=_fetcher_conf.granularity, diff --git a/poetry.lock b/poetry.lock index 4513c4de..c7af659f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "aiohttp" @@ -406,17 +406,17 @@ css = ["tinycss2 (>=1.1.0,<1.3)"] [[package]] name = "boto3" -version = "1.34.99" +version = "1.34.101" description = "The AWS SDK for Python" optional = false python-versions = ">=3.8" files = [ - {file = "boto3-1.34.99-py3-none-any.whl", hash = "sha256:b54084d000483b578757df03ce39a819fbba47071c9aa98611beb8806bcecd45"}, - {file = "boto3-1.34.99.tar.gz", hash = "sha256:6f600b3fe0bda53476395c902d9af5a47294c93ec52a9cdc2b926a9dc705ce79"}, + {file = "boto3-1.34.101-py3-none-any.whl", hash = "sha256:79b93f3370ea96ce838042bc2eac0c996aee204b01e7e6452eb77abcbe697d6a"}, + {file = "boto3-1.34.101.tar.gz", hash = "sha256:1d854b5880e185db546b4c759fcb664bf3326275064d2b44229cc217e8be9d7e"}, ] [package.dependencies] -botocore = ">=1.34.99,<1.35.0" +botocore = ">=1.34.101,<1.35.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.10.0,<0.11.0" @@ -425,13 +425,13 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.34.99" +version = "1.34.101" description = "Low-level, data-driven core of boto 3." optional = false python-versions = ">=3.8" files = [ - {file = "botocore-1.34.99-py3-none-any.whl", hash = "sha256:18c68bdeb0ffb73290912b0c96204fc36d3128f00a00b5cdc35ac34d66225f1c"}, - {file = "botocore-1.34.99.tar.gz", hash = "sha256:cafe569e2136cb33cb0e5dd32fb1c0e1503ddc1413d3be215df8ddf05e69137a"}, + {file = "botocore-1.34.101-py3-none-any.whl", hash = "sha256:f145e8b4b8fc9968f5eb695bdc2fcc8e675df7fbc3c56102dc1f5471be6baf35"}, + {file = "botocore-1.34.101.tar.gz", hash = "sha256:01f3802d25558dd7945d83884bf6885e2f84e1ff27f90b5f09614966fe18c18f"}, ] [package.dependencies] @@ -2699,13 +2699,13 @@ tbb = "==2021.*" [[package]] name = "mlflow-skinny" -version = "2.12.1" +version = "2.12.2" description = "MLflow is an open source platform for the complete machine learning lifecycle" optional = true python-versions = ">=3.8" files = [ - {file = "mlflow_skinny-2.12.1-py3-none-any.whl", hash = "sha256:51539de93a7f8b74b2d6b4307f204235469f6fadd19078599abd12a4bcf6b5b0"}, - {file = "mlflow_skinny-2.12.1.tar.gz", hash = "sha256:a5c3bb2f111867db988d4cdd782b6224fca4fc3d86706e9e587c379973ce7353"}, + {file = "mlflow_skinny-2.12.2-py3-none-any.whl", hash = "sha256:c7944fbe7019e8da05115f1776b67cf64766f3455c94e02d13aa2b444116d37d"}, + {file = "mlflow_skinny-2.12.2.tar.gz", hash = "sha256:44839fa7beb8bf6fbfd8a417d63eb5577b1de489aabcb0f926bc3b621535f584"}, ] [package.dependencies] @@ -2715,7 +2715,7 @@ entrypoints = "<1" gitpython = ">=3.1.9,<4" importlib-metadata = ">=3.7.0,<4.7.0 || >4.7.0,<8" packaging = "<25" -protobuf = ">=3.12.0,<6" +protobuf = ">=3.12.0,<5" pytz = "<2025" pyyaml = ">=5.1,<7" requests = ">=2.17.3,<3" @@ -3445,6 +3445,7 @@ optional = false python-versions = ">=3.9" files = [ {file = "pandas-2.2.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:90c6fca2acf139569e74e8781709dccb6fe25940488755716d1d354d6bc58bce"}, + {file = "pandas-2.2.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c7adfc142dac335d8c1e0dcbd37eb8617eac386596eb9e1a1b77791cf2498238"}, {file = "pandas-2.2.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4abfe0be0d7221be4f12552995e58723c7422c80a659da13ca382697de830c08"}, {file = "pandas-2.2.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8635c16bf3d99040fdf3ca3db669a7250ddf49c55dc4aa8fe0ae0fa8d6dcc1f0"}, {file = "pandas-2.2.2-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:40ae1dffb3967a52203105a077415a86044a2bea011b5f321c6aa64b379a3f51"}, @@ -3465,6 +3466,7 @@ files = [ {file = "pandas-2.2.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:43498c0bdb43d55cb162cdc8c06fac328ccb5d2eabe3cadeb3529ae6f0517c32"}, {file = "pandas-2.2.2-cp312-cp312-win_amd64.whl", hash = "sha256:d187d355ecec3629624fccb01d104da7d7f391db0311145817525281e2804d23"}, {file = "pandas-2.2.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:0ca6377b8fca51815f382bd0b697a0814c8bda55115678cbc94c30aacbb6eff2"}, + {file = "pandas-2.2.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:9057e6aa78a584bc93a13f0a9bf7e753a5e9770a30b4d758b8d5f2a62a9433cd"}, {file = "pandas-2.2.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:001910ad31abc7bf06f49dcc903755d2f7f3a9186c0c040b827e522e9cef0863"}, {file = "pandas-2.2.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:66b479b0bd07204e37583c191535505410daa8df638fd8e75ae1b383851fe921"}, {file = "pandas-2.2.2-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:a77e9d1c386196879aa5eb712e77461aaee433e54c68cf253053a73b7e49c33a"}, diff --git a/pyproject.toml b/pyproject.toml index 6c883709..dc4819c5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "numalogic" -version = "0.9.1" +version = "0.10.0a1" description = "Collection of operational Machine Learning models and tools." authors = ["Numalogic Developers"] packages = [{ include = "numalogic" }] @@ -39,7 +39,6 @@ structlog = "^24.1.0" mlflow-skinny = { version = "^2.0", optional = true } redis = { extras = ["hiredis"], version = "^5.0", optional = true } boto3 = { version = "^1.24.64", optional = true } - pydruid = { version = "^0.6", optional = true } PyMySQL = { version = "^1.1.0", optional = true } diff --git a/tests/connectors/test_druid.py b/tests/connectors/test_druid.py index ffeb4c2f..bbd43f87 100644 --- a/tests/connectors/test_druid.py +++ b/tests/connectors/test_druid.py @@ -96,6 +96,45 @@ def group_by(*_, **__): mocker.patch.object(PyDruid, "groupby", side_effect=group_by) +@pytest.fixture +def mock_group_by_multi_column(mocker): + """Creates a Mock for PyDruid's groupby method for doubles sketch.""" + + def group_by(*_, **__): + """Mock group by response for doubles sketch from druid.""" + result = [ + { + "event": { + "service_alias": "identity.authn.signin", + "env": "prod", + "status": 200, + "http_status": "2xx", + "gw_gen": "T", + "count": 20, + }, + "timestamp": "2023-09-06T07:50:00.000Z", + "version": "v1", + }, + { + "event": { + "service_alias": "identity.authn.signin", + "env": "prod", + "status": 500, + "http_status": "5xx", + "gw_gen": "T", + "count": 10, + }, + "timestamp": "2023-09-06T07:53:00.000Z", + "version": "v1", + }, + ] + query = pydruid.query.Query(query_dict={}, query_type="groupBy") + query.parse(json.dumps(result)) + return query + + mocker.patch.object(PyDruid, "groupby", side_effect=group_by) + + def test_fetch(setup, mock_group_by): start, end, fetcher = setup _out = fetcher.fetch( @@ -251,3 +290,24 @@ def test_chunked_fetch_err(get_args): **get_args, chunked_hours=0, ) + + +def test_multi_column_pivot(setup, mock_group_by_multi_column): + start, end, fetcher = setup + _out = fetcher.fetch( + filter_keys=["authtype", "slane"], + filter_values=["browserUserAgent", "sw1"], + dimensions=["http_status", "status", "gw_gen"], + datasource="ip-apigw-telegraf-druid", + aggregations={"count": aggregators.doublesum("count")}, + group_by=["timestamp", "http_status", "status", "gw_gen"], + hours=2, + pivot=Pivot( + index="timestamp", + columns=["http_status", "status", "gw_gen"], + value=["count"], + agg=["sum", "sum", "count"], + ), + ) + print(_out) + assert (2, 6) == _out.shape diff --git a/tests/transforms/test_postprocess.py b/tests/transforms/test_postprocess.py index b1249005..9fd76eaf 100644 --- a/tests/transforms/test_postprocess.py +++ b/tests/transforms/test_postprocess.py @@ -1,78 +1,80 @@ -import unittest - import numpy as np from sklearn.pipeline import make_pipeline +import pytest + +from numalogic.transforms import ( + tanh_norm, + TanhNorm, + ExpMovingAverage, + SigmoidNorm, + expmov_avg_aggregator, +) + + +def test_tanh_norm_func(): + arr = np.arange(10) + scores = tanh_norm(arr) + assert sum(scores) == pytest.approx(39.52, 0.01) # places=2 + -from numalogic.transforms import tanh_norm, TanhNorm, ExpMovingAverage, expmov_avg_aggregator -from numalogic.tools.exceptions import InvalidDataShapeError +def test_tanh_norm_clf(): + arr = np.arange(10).reshape(5, 2) + clf = TanhNorm() + scores = clf.fit_transform(arr) + assert arr.shape == scores.shape + assert np.sum(scores) == pytest.approx(39.52, 0.01) # places=2 -class TestPostprocess(unittest.TestCase): - def test_tanh_norm_func(self): - arr = np.arange(10) - scores = tanh_norm(arr) - self.assertAlmostEqual(sum(scores), 39.52, places=2) +def test_exp_mov_avg_estimator(): + beta = 0.9 + arr = np.arange(1, 11).reshape(-1, 1) + clf = ExpMovingAverage(beta) + out = clf.fit_transform(arr) - def test_tanh_norm_clf(self): - arr = np.arange(10).reshape(5, 2) - clf = TanhNorm() - scores = clf.fit_transform(arr) + expected = expmov_avg_aggregator(arr, beta) - self.assertTupleEqual(arr.shape, scores.shape) - self.assertAlmostEqual(np.sum(scores), 39.52, places=2) + assert arr.shape == out.shape + assert expected == pytest.approx(out[-1].item(), 0.01) # places=2 + assert out.data.c_contiguous - def test_exp_mov_avg_estimator_01(self): - arr = np.arange(1, 11).reshape(-1, 1) - clf = ExpMovingAverage(0.9) - out = clf.fit_transform(arr) - self.assertTupleEqual(arr.shape, out.shape) - self.assertAlmostEqual(expmov_avg_aggregator(arr, 0.9), out[-1].item(), places=3) - def test_exp_mov_avg_estimator_02(self): - arr = np.arange(1, 11).reshape(-1, 1) - clf = ExpMovingAverage(0.9, bias_correction=False) - out = clf.fit_transform(arr) - self.assertTupleEqual(arr.shape, out.shape) - self.assertAlmostEqual( - expmov_avg_aggregator(arr, 0.9, bias_correction=False), out[-1].item(), places=3 - ) +def test_exp_mov_avg_estimator_err(): + with pytest.raises(ValueError): + ExpMovingAverage(1.1) - def test_exp_mov_avg_estimator_err(self): - with self.assertRaises(ValueError): - ExpMovingAverage(1.1) + with pytest.raises(ValueError): + ExpMovingAverage(0.0) - with self.assertRaises(ValueError): - ExpMovingAverage(0.0) + with pytest.raises(ValueError): + ExpMovingAverage(1.0) - with self.assertRaises(ValueError): - ExpMovingAverage(1.0) - with self.assertRaises(InvalidDataShapeError): - clf = ExpMovingAverage(0.1) - clf.fit_transform(np.arange(10).reshape(1, -1)) +def test_exp_mov_avg_agg(): + arr = np.arange(1, 11) + val = expmov_avg_aggregator(arr, 0.9) + assert isinstance(val, float) + assert val < 10 - with self.assertRaises(InvalidDataShapeError): - clf = ExpMovingAverage(0.1) - clf.fit_transform(np.arange(12).reshape((2, 3, -1))) - def test_exp_mov_avg_agg(self): - arr = np.arange(1, 11) - val = expmov_avg_aggregator(arr, 0.9) - self.assertIsInstance(val, float) - self.assertLess(val, 10) +def test_exp_mov_avg_agg_err(): + arr = np.arange(1, 11) + with pytest.raises(ValueError): + expmov_avg_aggregator(arr, 1.01) - def test_exp_mov_avg_agg_err(self): - arr = np.arange(1, 11) - with self.assertRaises(ValueError): - expmov_avg_aggregator(arr, 1.01) - def test_postproc_pl(self): - x = np.arange(1, 11).reshape(-1, 1) - pl = make_pipeline(TanhNorm(), ExpMovingAverage(0.9)) - out = pl.transform(x) - self.assertTupleEqual(x.shape, out.shape) +def test_postproc_pl(): + x = np.arange(1, 11).reshape(-1, 1) + pl = make_pipeline(TanhNorm(), ExpMovingAverage(0.9)) + out = pl.transform(x) + assert x.shape == out.shape -if __name__ == "__main__": - unittest.main() +def test_sig_norm(): + x = np.arange(1, 11).reshape(-1, 1) + clf = SigmoidNorm() + out = clf.fit_transform(x) + assert x.shape == out.shape + assert out.data.c_contiguous + assert np.all(out >= 0) + assert np.all(out <= 10) diff --git a/tests/transforms/test_transforms.py b/tests/transforms/test_transforms.py index 9c81b2f7..810b5071 100644 --- a/tests/transforms/test_transforms.py +++ b/tests/transforms/test_transforms.py @@ -2,7 +2,12 @@ import numpy as np import pytest -from numpy.testing import assert_almost_equal, assert_array_less, assert_array_equal +from numpy.testing import ( + assert_almost_equal, + assert_array_less, + assert_array_equal, + assert_allclose, +) from sklearn.pipeline import make_pipeline from numalogic.base import StatelessTransformer @@ -14,6 +19,7 @@ GaussianNoiseAdder, DifferenceTransform, FlattenVector, + PercentileScaler, ) RNG = np.random.default_rng(42) @@ -171,3 +177,12 @@ def test_flattenvector(): assert data.shape[1] == 1 assert clf.inverse_transform(data).shape[1] == 2 + + +def test_percentile_scaler(): + x = np.abs(RNG.random((10, 3))) + tx = PercentileScaler(max_percentile=99, eps=1e-6) + x_ = tx.fit_transform(x) + assert x.shape == x_.shape + assert_allclose(tx.data_pth_max, np.percentile(x, 99, axis=0)) + assert_allclose(tx.data_pth_min, np.min(x, axis=0)) diff --git a/tests/udfs/test_inference.py b/tests/udfs/test_inference.py index 6ca1eb00..dc8eb0bd 100644 --- a/tests/udfs/test_inference.py +++ b/tests/udfs/test_inference.py @@ -265,7 +265,7 @@ def test_model_pass_error_01(udf, udf_args, mocker): RedisRegistry, "load", return_value=ArtifactData( - artifact=VanillaAE(seq_len=12, n_features=1), + artifact=VanillaAE(seq_len=10, n_features=1), extras=dict(version="0", timestamp=time.time(), source="registry"), metadata={}, ), @@ -282,7 +282,7 @@ def test_model_pass_error_02(udf_with_adjust, udf_args, mocker): RedisRegistry, "load", return_value=ArtifactData( - artifact=VanillaAE(seq_len=12, n_features=1), + artifact=VanillaAE(seq_len=10, n_features=1), extras=dict(version="0", timestamp=time.time(), source="registry"), metadata={}, ), diff --git a/tests/udfs/test_pipeline.py b/tests/udfs/test_pipeline.py index 7ee8e4c1..e4e0ee5e 100644 --- a/tests/udfs/test_pipeline.py +++ b/tests/udfs/test_pipeline.py @@ -1,18 +1,16 @@ import logging import os -import unittest from datetime import datetime -from fakeredis import FakeServer, FakeStrictRedis from omegaconf import OmegaConf from orjson import orjson +import pytest from numalogic._constants import TESTS_DIR -from numalogic.udfs._config import PipelineConf +from numalogic.udfs import PipelineConf from numalogic.udfs.payloadtx import PayloadTransformer from tests.udfs.utility import input_json_from_file logging.basicConfig(level=logging.DEBUG) -REDIS_CLIENT = FakeStrictRedis(server=FakeServer()) KEYS = ["service-mesh", "1", "2"] DATUM = input_json_from_file(os.path.join(TESTS_DIR, "udfs", "resources", "data", "stream.json")) @@ -22,34 +20,31 @@ } -class TestPipelineUDF(unittest.TestCase): - def setUp(self) -> None: - _given_conf = OmegaConf.load(os.path.join(TESTS_DIR, "udfs", "resources", "_config.yaml")) - _given_conf_2 = OmegaConf.load( - os.path.join(TESTS_DIR, "udfs", "resources", "_config2.yaml") - ) - schema = OmegaConf.structured(PipelineConf) - pl_conf = PipelineConf(**OmegaConf.merge(schema, _given_conf)) - pl_conf_2 = PipelineConf(**OmegaConf.merge(schema, _given_conf_2)) - self.udf1 = PayloadTransformer(pl_conf=pl_conf) - self.udf2 = PayloadTransformer(pl_conf=pl_conf_2) - self.udf1.register_conf("druid-config", pl_conf.stream_confs["druid-config"]) - self.udf2.register_conf("druid-config", pl_conf_2.stream_confs["druid-config"]) - - def test_pipeline_1(self): - msgs = self.udf1(KEYS, DATUM) - self.assertEqual(2, len(msgs)) - for msg in msgs: - data_payload = orjson.loads(msg.value) - self.assertTrue(data_payload["pipeline_id"]) - - def test_pipeline_2(self): - msgs = self.udf2(KEYS, DATUM) - self.assertEqual(1, len(msgs)) - for msg in msgs: - data_payload = orjson.loads(msg.value) - self.assertTrue(data_payload["pipeline_id"]) - - -if __name__ == "__main__": - unittest.main() +@pytest.fixture +def setup(): + _given_conf = OmegaConf.load(os.path.join(TESTS_DIR, "udfs", "resources", "_config.yaml")) + _given_conf_2 = OmegaConf.load(os.path.join(TESTS_DIR, "udfs", "resources", "_config2.yaml")) + schema = OmegaConf.structured(PipelineConf) + pl_conf = PipelineConf(**OmegaConf.merge(schema, _given_conf)) + pl_conf_2 = PipelineConf(**OmegaConf.merge(schema, _given_conf_2)) + udf1 = PayloadTransformer(pl_conf=pl_conf) + udf2 = PayloadTransformer(pl_conf=pl_conf_2) + udf1.register_conf("druid-config", pl_conf.stream_confs["druid-config"]) + udf2.register_conf("druid-config", pl_conf_2.stream_confs["druid-config"]) + return udf1, udf2 + + +def test_pipeline_1(setup): + msgs = setup[0](KEYS, DATUM) + assert 2 == len(msgs) + for msg in msgs: + data_payload = orjson.loads(msg.value) + assert data_payload["pipeline_id"] + + +def test_pipeline_2(setup): + msgs = setup[1](KEYS, DATUM) + assert 1 == len(msgs) + for msg in msgs: + data_payload = orjson.loads(msg.value) + assert data_payload["pipeline_id"] diff --git a/tests/udfs/test_postprocess.py b/tests/udfs/test_postprocess.py index 4101eece..e0b5e853 100644 --- a/tests/udfs/test_postprocess.py +++ b/tests/udfs/test_postprocess.py @@ -14,6 +14,7 @@ from numalogic._constants import TESTS_DIR from numalogic.models.threshold import StdDevThreshold from numalogic.registry import RedisRegistry, ArtifactData +from numalogic.transforms import TanhNorm from numalogic.udfs import PipelineConf from numalogic.udfs.entities import Header, TrainerPayload, Status, OutputPayload from numalogic.udfs.postprocess import PostprocessUDF @@ -172,6 +173,15 @@ def test_postprocess_runtime_err_02(udf, mocker, bad_artifact): assert msgs[1].tags == ["staticthresh"] -def test_compute(udf, artifact): - x_inferred = udf.compute(artifact.artifact, np.asarray(DATA["data"])) +def test_compute_without_postproc(udf, artifact): + y_unified, x_inferred = udf.compute(artifact.artifact, np.asarray(DATA["data"])) + assert isinstance(y_unified, float) + assert x_inferred.shape == (2,) + + +def test_compute_with_postproc(udf, artifact): + y_unified, x_inferred = udf.compute( + artifact.artifact, np.asarray(DATA["data"]), postproc_tx=TanhNorm() + ) + assert isinstance(y_unified, float) assert x_inferred.shape == (2,)