diff --git a/docs/autoencoders.md b/docs/autoencoders.md index b68edef9..a264ad6f 100644 --- a/docs/autoencoders.md +++ b/docs/autoencoders.md @@ -25,10 +25,10 @@ Here we are using `VanillaAE`, a Vanilla Autoencoder model. ```python from numalogic.models.autoencoder.variants import VanillaAE -from numalogic.models.autoencoder import AutoencoderTrainer +from numalogic.models.autoencoder import TimeseriesTrainer model = VanillaAE(seq_len=12, n_features=3) -trainer = AutoencoderTrainer(max_epochs=50, enable_progress_bar=True) +trainer = TimeseriesTrainer(max_epochs=50, enable_progress_bar=True) trainer.fit(model, datamodule=datamodule) ``` diff --git a/docs/quick-start.md b/docs/quick-start.md index 6018f2a2..8dbd4031 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -20,7 +20,7 @@ pip install numalogic ## Numalogic as a Library -Numalogic can be used as an independent library, and it provides various ML models and tools. Here, we are using the `AutoencoderTrainer`. Refer to [training section](autoencoders.md) for other available options. +Numalogic can be used as an independent library, and it provides various ML models and tools. Here, we are using the `TimeseriesTrainer`. Refer to [training section](autoencoders.md) for other available options. In this example, the train data set has numbers ranging from 1-10. Whereas in the test data set, there are data points that go out of this range, which the algorithm should be able to detect as anomalies. @@ -28,7 +28,7 @@ In this example, the train data set has numbers ranging from 1-10. Whereas in th import numpy as np from sklearn.preprocessing import StandardScaler from torch.utils.data import DataLoader -from numalogic.models.autoencoder import AutoencoderTrainer +from numalogic.models.autoencoder import TimeseriesTrainer from numalogic.models.autoencoder.variants import VanillaAE from numalogic.models.threshold import StdDevThreshold from numalogic.transforms import TanhNorm @@ -57,7 +57,7 @@ model = VanillaAE(seq_len=SEQ_LEN, n_features=1) train_dataset = StreamingDataset(train_data, seq_len=SEQ_LEN) # Define the trainer, and fit the model. -trainer = AutoencoderTrainer(max_epochs=30, enable_progress_bar=True) +trainer = TimeseriesTrainer(max_epochs=30, enable_progress_bar=True) trainer.fit(model, train_dataloaders=DataLoader(train_dataset)) # Get the training reconstruction error from the model. diff --git a/examples/conv_ae.ipynb b/examples/conv_ae.ipynb index d52737e4..975644f2 100644 --- a/examples/conv_ae.ipynb +++ b/examples/conv_ae.ipynb @@ -329,7 +329,7 @@ ], "source": [ "from numalogic.models.autoencoder.variants import Conv1dAE\n", - "from numalogic.models.autoencoder import AutoencoderTrainer\n", + "from numalogic.models.autoencoder import TimeseriesTrainer\n", "\n", "model_1 = Conv1dAE(seq_len=SEQ_LEN, in_channels=1, enc_channels=(16, 32, 8), enc_kernel_sizes=3)\n", "print(model_1)" @@ -367,7 +367,7 @@ } ], "source": [ - "trainer = AutoencoderTrainer(max_epochs=MAX_EPOCHS, enable_progress_bar=True)\n", + "trainer = TimeseriesTrainer(max_epochs=MAX_EPOCHS, enable_progress_bar=True)\n", "trainer.fit(model_1, train_dataloaders=DataLoader(train_dataset, batch_size=BATCH_SIZE))" ], "metadata": { @@ -565,7 +565,7 @@ } ], "source": [ - "trainer = AutoencoderTrainer(max_epochs=MAX_EPOCHS, enable_progress_bar=True)\n", + "trainer = TimeseriesTrainer(max_epochs=MAX_EPOCHS, enable_progress_bar=True)\n", "trainer.fit(model_2, train_dataloaders=DataLoader(train_dataset, batch_size=BATCH_SIZE))" ], "metadata": { @@ -756,7 +756,7 @@ ], "source": [ "\n", - "trainer = AutoencoderTrainer(accelerator=\"cpu\", max_epochs=MAX_EPOCHS, enable_progress_bar=True)\n", + "trainer = TimeseriesTrainer(accelerator=\"cpu\", max_epochs=MAX_EPOCHS, enable_progress_bar=True)\n", "trainer.fit(model_3, train_dataloaders=DataLoader(StreamingDataset(x_train, seq_len=SEQ_LEN), batch_size=BATCH_SIZE))" ], "metadata": { diff --git a/examples/multi_udf/src/udf/inference.py b/examples/multi_udf/src/udf/inference.py index 179e4b73..4ff92886 100644 --- a/examples/multi_udf/src/udf/inference.py +++ b/examples/multi_udf/src/udf/inference.py @@ -2,7 +2,7 @@ import os import numpy.typing as npt -from numalogic.models.autoencoder import AutoencoderTrainer +from numalogic.models.autoencoder import TimeseriesTrainer from numalogic.udfs import NumalogicUDF from numalogic.registry import MLflowRegistry, ArtifactData from numalogic.tools.data import StreamingDataset @@ -36,7 +36,7 @@ def _infer(artifact_data: ArtifactData, stream_data: npt.NDArray[float]) -> list main_model = artifact_data.artifact streamloader = DataLoader(StreamingDataset(stream_data, WIN_SIZE)) - trainer = AutoencoderTrainer() + trainer = TimeseriesTrainer() reconerr = trainer.predict(main_model, dataloaders=streamloader) return reconerr.tolist() diff --git a/examples/multi_udf/src/udf/train.py b/examples/multi_udf/src/udf/train.py index 1292ef73..ff137c6c 100644 --- a/examples/multi_udf/src/udf/train.py +++ b/examples/multi_udf/src/udf/train.py @@ -4,7 +4,7 @@ import cachetools import numpy.typing as npt import pandas as pd -from numalogic.models.autoencoder import AutoencoderTrainer +from numalogic.models.autoencoder import TimeseriesTrainer from numalogic.models.autoencoder.variants import Conv1dAE from numalogic.models.threshold import StdDevThreshold from numalogic.udfs import NumalogicUDF @@ -34,7 +34,7 @@ def __init__(self): self.model_key = "ae::model" def _save_artifact( - self, model, skeys: list[str], dkeys: list[str], _: Optional[AutoencoderTrainer] = None + self, model, skeys: list[str], dkeys: list[str], _: Optional[TimeseriesTrainer] = None ) -> None: """Saves the model in the registry.""" self.registry.save(skeys=skeys, dkeys=dkeys, artifact=model) @@ -81,7 +81,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: # Train the autoencoder model datamodule = TimeseriesDataModule(WIN_SIZE, train_data, batch_size=BATCH_SIZE) model = Conv1dAE(seq_len=WIN_SIZE, in_channels=train_data.shape[1]) - trainer = AutoencoderTrainer(max_epochs=MAX_EPOCHS, enable_progress_bar=True) + trainer = TimeseriesTrainer(max_epochs=MAX_EPOCHS, enable_progress_bar=True) trainer.fit(model, datamodule=datamodule) # Get reconstruction error of the training set diff --git a/examples/quick-start.ipynb b/examples/quick-start.ipynb index fa87efca..237ead9d 100644 --- a/examples/quick-start.ipynb +++ b/examples/quick-start.ipynb @@ -389,7 +389,7 @@ { "data": { "text/plain": "", - "application/javascript": "\n setTimeout(function() {\n var nbb_cell_id = 60;\n var nbb_unformatted_code = \"from torch.utils.data import DataLoader\\nfrom numalogic.models.autoencoder import AutoencoderTrainer\\nfrom numalogic.models.autoencoder.variants import Conv1dAE\\nfrom numalogic.tools.data import StreamingDataset\\n\\nSEQ_LEN = 24 # length of the sequence\\nMAX_EPOCHS = 30 # number of epochs to run\\nBATCH_SIZE = 64 # training batch size\\n\\nmodel = Conv1dAE(seq_len=SEQ_LEN, in_channels=1, enc_channels=(8, 4))\\nmodel\";\n var nbb_formatted_code = \"from torch.utils.data import DataLoader\\nfrom numalogic.models.autoencoder import AutoencoderTrainer\\nfrom numalogic.models.autoencoder.variants import Conv1dAE\\nfrom numalogic.tools.data import StreamingDataset\\n\\nSEQ_LEN = 24 # length of the sequence\\nMAX_EPOCHS = 30 # number of epochs to run\\nBATCH_SIZE = 64 # training batch size\\n\\nmodel = Conv1dAE(seq_len=SEQ_LEN, in_channels=1, enc_channels=(8, 4))\\nmodel\";\n var nbb_cells = Jupyter.notebook.get_cells();\n for (var i = 0; i < nbb_cells.length; ++i) {\n if (nbb_cells[i].input_prompt_number == nbb_cell_id) {\n if (nbb_cells[i].get_text() == nbb_unformatted_code) {\n nbb_cells[i].set_text(nbb_formatted_code);\n }\n break;\n }\n }\n }, 500);\n " + "application/javascript": "\n setTimeout(function() {\n var nbb_cell_id = 60;\n var nbb_unformatted_code = \"from torch.utils.data import DataLoader\\nfrom numalogic.models.autoencoder import TimeseriesTrainer\\nfrom numalogic.models.autoencoder.variants import Conv1dAE\\nfrom numalogic.tools.data import StreamingDataset\\n\\nSEQ_LEN = 24 # length of the sequence\\nMAX_EPOCHS = 30 # number of epochs to run\\nBATCH_SIZE = 64 # training batch size\\n\\nmodel = Conv1dAE(seq_len=SEQ_LEN, in_channels=1, enc_channels=(8, 4))\\nmodel\";\n var nbb_formatted_code = \"from torch.utils.data import DataLoader\\nfrom numalogic.models.autoencoder import TimeseriesTrainer\\nfrom numalogic.models.autoencoder.variants import Conv1dAE\\nfrom numalogic.tools.data import StreamingDataset\\n\\nSEQ_LEN = 24 # length of the sequence\\nMAX_EPOCHS = 30 # number of epochs to run\\nBATCH_SIZE = 64 # training batch size\\n\\nmodel = Conv1dAE(seq_len=SEQ_LEN, in_channels=1, enc_channels=(8, 4))\\nmodel\";\n var nbb_cells = Jupyter.notebook.get_cells();\n for (var i = 0; i < nbb_cells.length; ++i) {\n if (nbb_cells[i].input_prompt_number == nbb_cell_id) {\n if (nbb_cells[i].get_text() == nbb_unformatted_code) {\n nbb_cells[i].set_text(nbb_formatted_code);\n }\n break;\n }\n }\n }, 500);\n " }, "metadata": {}, "output_type": "display_data" @@ -397,7 +397,7 @@ ], "source": [ "from torch.utils.data import DataLoader\n", - "from numalogic.models.autoencoder import AutoencoderTrainer\n", + "from numalogic.models.autoencoder import TimeseriesTrainer\n", "from numalogic.models.autoencoder.variants import Conv1dAE\n", "from numalogic.tools.data import StreamingDataset\n", "\n", @@ -440,14 +440,14 @@ { "data": { "text/plain": "", - "application/javascript": "\n setTimeout(function() {\n var nbb_cell_id = 61;\n var nbb_unformatted_code = \"trainer = AutoencoderTrainer(max_epochs=MAX_EPOCHS, enable_progress_bar=True)\\ntrainer.fit(\\n model,\\n train_dataloaders=DataLoader(StreamingDataset(X_train, seq_len=SEQ_LEN), batch_size=BATCH_SIZE),\\n val_dataloaders=DataLoader(StreamingDataset(X_val, seq_len=SEQ_LEN), batch_size=BATCH_SIZE)\\n)\";\n var nbb_formatted_code = \"trainer = AutoencoderTrainer(max_epochs=MAX_EPOCHS, enable_progress_bar=True)\\ntrainer.fit(\\n model,\\n train_dataloaders=DataLoader(\\n StreamingDataset(X_train, seq_len=SEQ_LEN), batch_size=BATCH_SIZE\\n ),\\n val_dataloaders=DataLoader(\\n StreamingDataset(X_val, seq_len=SEQ_LEN), batch_size=BATCH_SIZE\\n ),\\n)\";\n var nbb_cells = Jupyter.notebook.get_cells();\n for (var i = 0; i < nbb_cells.length; ++i) {\n if (nbb_cells[i].input_prompt_number == nbb_cell_id) {\n if (nbb_cells[i].get_text() == nbb_unformatted_code) {\n nbb_cells[i].set_text(nbb_formatted_code);\n }\n break;\n }\n }\n }, 500);\n " + "application/javascript": "\n setTimeout(function() {\n var nbb_cell_id = 61;\n var nbb_unformatted_code = \"trainer = TimeseriesTrainer(max_epochs=MAX_EPOCHS, enable_progress_bar=True)\\ntrainer.fit(\\n model,\\n train_dataloaders=DataLoader(StreamingDataset(X_train, seq_len=SEQ_LEN), batch_size=BATCH_SIZE),\\n val_dataloaders=DataLoader(StreamingDataset(X_val, seq_len=SEQ_LEN), batch_size=BATCH_SIZE)\\n)\";\n var nbb_formatted_code = \"trainer = TimeseriesTrainer(max_epochs=MAX_EPOCHS, enable_progress_bar=True)\\ntrainer.fit(\\n model,\\n train_dataloaders=DataLoader(\\n StreamingDataset(X_train, seq_len=SEQ_LEN), batch_size=BATCH_SIZE\\n ),\\n val_dataloaders=DataLoader(\\n StreamingDataset(X_val, seq_len=SEQ_LEN), batch_size=BATCH_SIZE\\n ),\\n)\";\n var nbb_cells = Jupyter.notebook.get_cells();\n for (var i = 0; i < nbb_cells.length; ++i) {\n if (nbb_cells[i].input_prompt_number == nbb_cell_id) {\n if (nbb_cells[i].get_text() == nbb_unformatted_code) {\n nbb_cells[i].set_text(nbb_formatted_code);\n }\n break;\n }\n }\n }, 500);\n " }, "metadata": {}, "output_type": "display_data" } ], "source": [ - "trainer = AutoencoderTrainer(max_epochs=MAX_EPOCHS, enable_progress_bar=True)\n", + "trainer = TimeseriesTrainer(max_epochs=MAX_EPOCHS, enable_progress_bar=True)\n", "trainer.fit(\n", " model,\n", " train_dataloaders=DataLoader(StreamingDataset(X_train, seq_len=SEQ_LEN), batch_size=BATCH_SIZE),\n", diff --git a/numalogic/blocks/_nn.py b/numalogic/blocks/_nn.py index a343b8d0..c28a7c10 100644 --- a/numalogic/blocks/_nn.py +++ b/numalogic/blocks/_nn.py @@ -14,7 +14,7 @@ import numpy.typing as npt from numalogic.blocks import Block -from numalogic.models.autoencoder import AutoencoderTrainer +from numalogic.models.autoencoder import TimeseriesTrainer from numalogic.tools.data import StreamingDataset from numalogic.tools.types import nn_model_t, state_dict_t @@ -64,7 +64,7 @@ def fit( ------- The error of the model on the input data. """ - trainer = AutoencoderTrainer(**trainer_kwargs) + trainer = TimeseriesTrainer(**trainer_kwargs) ds = StreamingDataset(input_, self.seq_len) trainer.fit(self._artifact, train_dataloaders=DataLoader(ds, batch_size=batch_size)) reconerr = trainer.predict( diff --git a/numalogic/config/_config.py b/numalogic/config/_config.py index e3d965f6..c9045e0d 100644 --- a/numalogic/config/_config.py +++ b/numalogic/config/_config.py @@ -11,7 +11,7 @@ from dataclasses import dataclass, field -from typing import Optional, Any +from typing import Any from omegaconf import MISSING @@ -75,14 +75,12 @@ class LightningTrainerConf: accelerator: str = "auto" max_epochs: int = 50 - logger: bool = False + logger: bool = True + log_freq: int = 5 check_val_every_n_epoch: int = 5 - log_every_n_steps: int = 20 enable_checkpointing: bool = False - enable_progress_bar: bool = True + enable_progress_bar: bool = False enable_model_summary: bool = True - limit_val_batches: bool = 0 - callbacks: Optional[Any] = None @dataclass diff --git a/numalogic/config/factory.py b/numalogic/config/factory.py index a82ca89b..2ad4e483 100644 --- a/numalogic/config/factory.py +++ b/numalogic/config/factory.py @@ -106,6 +106,7 @@ class ModelFactory(_ObjectFactory): TransformerAE, SparseTransformerAE, ) + from numalogic.models.vae.variants import Conv1dVAE _CLS_MAP: ClassVar[dict] = { "VanillaAE": VanillaAE, @@ -116,13 +117,16 @@ class ModelFactory(_ObjectFactory): "SparseLSTMAE": SparseLSTMAE, "TransformerAE": TransformerAE, "SparseTransformerAE": SparseTransformerAE, + "Conv1dVAE": Conv1dVAE, } class RegistryFactory(_ObjectFactory): """Factory class to create registry instances.""" - _CLS_SET: ClassVar[frozenset] = frozenset({"RedisRegistry", "MLflowRegistry"}) + _CLS_SET: ClassVar[frozenset] = frozenset( + {"RedisRegistry", "MLflowRegistry", "DynamoDBRegistry"} + ) def get_instance(self, object_info: Union[ModelInfo, RegistryInfo]): import numalogic.registry as reg diff --git a/numalogic/models/autoencoder/__init__.py b/numalogic/models/autoencoder/__init__.py index 296eb72f..bc712ddf 100644 --- a/numalogic/models/autoencoder/__init__.py +++ b/numalogic/models/autoencoder/__init__.py @@ -10,6 +10,6 @@ # limitations under the License. -from numalogic.models.autoencoder.trainer import AutoencoderTrainer +from numalogic.tools.trainer import TimeseriesTrainer -__all__ = ["AutoencoderTrainer"] +__all__ = ["TimeseriesTrainer"] diff --git a/numalogic/models/autoencoder/base.py b/numalogic/models/autoencoder/base.py index 5786cc76..1b9fdf4b 100644 --- a/numalogic/models/autoencoder/base.py +++ b/numalogic/models/autoencoder/base.py @@ -44,23 +44,6 @@ def __init__( self.criterion = self.init_criterion(loss_fn) self.weight_decay = weight_decay - self._total_train_loss = 0.0 - self._total_val_loss = 0.0 - - @property - def total_train_loss(self): - return self._total_train_loss - - @property - def total_val_loss(self): - return self._total_val_loss - - def reset_train_loss(self): - self._total_train_loss = 0.0 - - def reset_val_loss(self): - self._total_val_loss = 0.0 - @staticmethod def init_criterion(loss_fn: str): if loss_fn == "huber": @@ -97,11 +80,11 @@ def configure_optimizers(self) -> dict[str, Any]: return {"optimizer": optimizer} def training_step(self, batch: Tensor, batch_idx: int) -> Tensor: - loss = self._get_reconstruction_loss(batch) - self._total_train_loss += loss.detach().item() - return loss + recon_loss = self._get_reconstruction_loss(batch) + self.log("train_loss", recon_loss, on_epoch=True, on_step=False) + return recon_loss def validation_step(self, batch: Tensor, batch_idx: int) -> Tensor: - loss = self._get_reconstruction_loss(batch) - self._total_val_loss += loss.detach().item() - return loss + recon_loss = self._get_reconstruction_loss(batch) + self.log("val_loss", recon_loss) + return recon_loss diff --git a/numalogic/models/autoencoder/trainer.py b/numalogic/models/autoencoder/trainer.py deleted file mode 100644 index bff09853..00000000 --- a/numalogic/models/autoencoder/trainer.py +++ /dev/null @@ -1,86 +0,0 @@ -# Copyright 2022 The Numaproj Authors. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import logging -import sys -import warnings - -import pytorch_lightning as pl -import torch -from pytorch_lightning import Trainer -from torch import Tensor - -from numalogic.tools.callbacks import ProgressDetails -from numalogic.tools.data import inverse_window -from typing import Optional - -_LOGGER = logging.getLogger(__name__) - - -class AutoencoderTrainer(Trainer): - r"""A PyTorch Lightning Trainer for Autoencoder models. - - Args: - ---- - max_epochs: The maximum number of epochs to train for. (default: 100) - logger: The logger to use. (default: False) - check_val_every_n_epoch: The number of epochs between validation checks. (default: 5) - enable_checkpointing: Whether to enable checkpointing. (default: False) - enable_progress_bar: Whether to enable the progress bar. (default: False) - enable_model_summary: Whether to enable the model summary. (default: False) - callbacks: A list of callbacks to use. (default: None) - **trainer_kw: Additional keyword arguments to pass to the Lightning Trainer. - """ - - def __init__( - self, - max_epochs=100, - logger=False, - check_val_every_n_epoch=5, - enable_checkpointing=False, - enable_progress_bar=False, - enable_model_summary=False, - callbacks=None, - **trainer_kw - ): - if (not callbacks) and enable_progress_bar: - callbacks = ProgressDetails() - - if not sys.warnoptions: - warnings.simplefilter("ignore", category=UserWarning) - - super().__init__( - logger=logger, - max_epochs=max_epochs, - check_val_every_n_epoch=check_val_every_n_epoch, - enable_checkpointing=enable_checkpointing, - enable_progress_bar=enable_progress_bar, - enable_model_summary=enable_model_summary, - callbacks=callbacks, - **trainer_kw - ) - - def predict(self, model: Optional[pl.LightningModule] = None, unbatch=True, **kwargs) -> Tensor: - r"""Predicts the output of the model. - - Args: - ---- - model: The model to predict with. (default: None) - unbatch: Whether to inverse window the output. (default: True) - **kwargs: Additional keyword arguments to pass to the Lightning - trainers predict method. - """ - recon_err = super().predict(model, **kwargs) - recon_err = torch.vstack(recon_err) - if unbatch: - return inverse_window(recon_err, method="keep_last") - return recon_err diff --git a/numalogic/models/autoencoder/variants/conv.py b/numalogic/models/autoencoder/variants/conv.py index 42d339e7..32a1482f 100644 --- a/numalogic/models/autoencoder/variants/conv.py +++ b/numalogic/models/autoencoder/variants/conv.py @@ -342,5 +342,5 @@ def _get_reconstruction_loss(self, batch) -> Tensor: def validation_step(self, batch: Tensor, batch_idx: int) -> Tensor: recon = self.reconstruction(batch) loss = self.criterion(batch, recon) - self._total_val_loss += loss.detach().item() + self.log("val_loss", loss) return loss diff --git a/numalogic/models/autoencoder/variants/vanilla.py b/numalogic/models/autoencoder/variants/vanilla.py index 149d6901..5580f6d8 100644 --- a/numalogic/models/autoencoder/variants/vanilla.py +++ b/numalogic/models/autoencoder/variants/vanilla.py @@ -156,8 +156,6 @@ def __init__( self.dropout_prob = dropout_p self.n_features = n_features - self.save_hyperparameters() - if encoder_layersizes[-1] != decoder_layersizes[0]: raise LayerSizeMismatchError( f"Last layersize of encoder: {encoder_layersizes[-1]} " @@ -252,5 +250,5 @@ def _get_reconstruction_loss(self, batch: Tensor) -> Tensor: def validation_step(self, batch: Tensor, batch_idx: int) -> Tensor: recon = self.reconstruction(batch) loss = self.criterion(batch, recon) - self._total_val_loss += loss.detach().item() + self.log("val_loss", loss) return loss diff --git a/numalogic/models/vae/__init__.py b/numalogic/models/vae/__init__.py index c6f64d4e..3e0f6dec 100644 --- a/numalogic/models/vae/__init__.py +++ b/numalogic/models/vae/__init__.py @@ -1,3 +1,3 @@ -from numalogic.models.vae.trainer import VAETrainer +from numalogic.models.vae.variants import Conv1dVAE -__all__ = ["VAETrainer"] +__all__ = ["Conv1dVAE"] diff --git a/numalogic/models/vae/trainer.py b/numalogic/tools/trainer.py similarity index 75% rename from numalogic/models/vae/trainer.py rename to numalogic/tools/trainer.py index 0800bd36..756f18a1 100644 --- a/numalogic/models/vae/trainer.py +++ b/numalogic/tools/trainer.py @@ -1,17 +1,32 @@ +# Copyright 2022 The Numaproj Authors. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging import sys import warnings -from typing import Optional import torch -from torch import Tensor from pytorch_lightning import Trainer, LightningModule +from torch import Tensor from numalogic.tools.callbacks import ConsoleLogger from numalogic.tools.data import inverse_window +from typing import Optional + +_LOGGER = logging.getLogger(__name__) -class VAETrainer(Trainer): - """A PyTorch Lightning Trainer for VAE models. +class TimeseriesTrainer(Trainer): + """A PyTorch Lightning Trainer for timeseries NN models in numalogic. Args: ---- @@ -27,7 +42,7 @@ class VAETrainer(Trainer): def __init__( self, - max_epochs: int = 100, + max_epochs: int = 50, logger: bool = True, log_freq: int = 5, check_val_every_n_epoch: int = 5, diff --git a/numalogic/udfs/postprocess.py b/numalogic/udfs/postprocess.py index 6831d203..ea6578ba 100644 --- a/numalogic/udfs/postprocess.py +++ b/numalogic/udfs/postprocess.py @@ -138,9 +138,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: composite_keys=payload.composite_keys, timestamp=payload.end_ts, unified_anomaly=np.max(anomaly_scores), - data={ - _metric: _score for _metric, _score in zip(payload.metrics, anomaly_scores) - }, + data=self._per_feature_score(payload.metrics, anomaly_scores), # TODO: add model version, & emit as ML metrics metadata=payload.metadata, ) @@ -148,7 +146,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: "%s - Successfully post-processed, Keys: %s, Scores: %s", out_payload.uuid, out_payload.composite_keys, - out_payload.data, + out_payload.unified_anomaly, ) messages.append(Message(keys=keys, value=out_payload.to_json(), tags=["output"])) @@ -168,6 +166,13 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: ) return messages + @staticmethod + def _per_feature_score(feat_names: list[str], scores: NDArray[float]) -> dict[str, float]: + if scores.shape[0] == 1: + # TODO improve this to incorporate per feature anomaly insights + return {_name: scores.item() for _name in feat_names} + return dict(zip(feat_names, scores)) + @classmethod def compute( cls, model: artifact_t, input_: NDArray[float], postproc_clf=None, **_ @@ -198,4 +203,4 @@ def compute( _LOGGER.debug( "Time taken in postprocess compute: %.4f sec", time.perf_counter() - _start_time ) - return score + return score.reshape(-1) diff --git a/numalogic/udfs/trainer.py b/numalogic/udfs/trainer.py index 0845c062..bded2481 100644 --- a/numalogic/udfs/trainer.py +++ b/numalogic/udfs/trainer.py @@ -16,7 +16,7 @@ from numalogic.config._config import NumalogicConf from numalogic.config.factory import ConnectorFactory from numalogic.connectors import DruidFetcherConf -from numalogic.models.autoencoder import AutoencoderTrainer +from numalogic.models.autoencoder import TimeseriesTrainer from numalogic.tools.data import StreamingDataset from numalogic.tools.exceptions import ConfigNotFoundError, RedisRegistryError from numalogic.tools.types import redis_client_t, artifact_t, KEYS, KeyedArtifact @@ -170,7 +170,7 @@ def compute( ) train_ds = StreamingDataset(input_, model.seq_len) - trainer = AutoencoderTrainer(**asdict(trainer_cfg.pltrainer_conf)) + trainer = TimeseriesTrainer(**asdict(trainer_cfg.pltrainer_conf)) trainer.fit( model, train_dataloaders=DataLoader(train_ds, batch_size=trainer_cfg.batch_size) ) diff --git a/pyproject.toml b/pyproject.toml index 2b59a2af..6bc7a336 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "numalogic" -version = "0.6.0a11" +version = "0.6.0rc0" description = "Collection of operational Machine Learning models and tools." authors = ["Numalogic Developers"] packages = [{ include = "numalogic" }] diff --git a/tests/config/test_config.py b/tests/config/test_config.py index 25e115ab..ba1c7918 100644 --- a/tests/config/test_config.py +++ b/tests/config/test_config.py @@ -26,9 +26,10 @@ NumalogicConf, ModelInfo, ) -from numalogic.models.autoencoder import AutoencoderTrainer +from numalogic.models.autoencoder import TimeseriesTrainer from numalogic.models.autoencoder.variants import SparseVanillaAE, SparseConv1dAE, LSTMAE from numalogic.models.threshold import StdDevThreshold +from numalogic.models.vae.variants import Conv1dVAE from numalogic.transforms import LogTransformer, TanhNorm from numalogic.tools.exceptions import UnknownConfigArgsError @@ -72,13 +73,13 @@ def test_postprocess(self): def test_trainer(self): trainer_cfg = self.conf.trainer - trainer = AutoencoderTrainer(**trainer_cfg.pltrainer_conf) - self.assertIsInstance(trainer, AutoencoderTrainer) + trainer = TimeseriesTrainer(**trainer_cfg.pltrainer_conf) + self.assertIsInstance(trainer, TimeseriesTrainer) self.assertEqual(trainer.max_epochs, 40) class TestFactory(unittest.TestCase): - def test_instance(self): + def test_instance_01(self): factory = ModelFactory() model = factory.get_instance( ModelInfo( @@ -88,6 +89,16 @@ def test_instance(self): ) self.assertIsInstance(model, SparseConv1dAE) + def test_instance_02(self): + factory = ModelFactory() + model = factory.get_instance( + ModelInfo( + name="Conv1dVAE", + conf={"seq_len": 12, "n_features": 3, "latent_dim": 1}, + ) + ) + self.assertIsInstance(model, Conv1dVAE) + def test_cls(self): factory = ModelFactory() model_cls = factory.get_cls("LSTMAE") diff --git a/tests/models/autoencoder/test_trainer.py b/tests/models/autoencoder/test_trainer.py index 7ff77af1..b007bcea 100644 --- a/tests/models/autoencoder/test_trainer.py +++ b/tests/models/autoencoder/test_trainer.py @@ -1,3 +1,4 @@ +import logging import math import os import unittest @@ -8,7 +9,7 @@ from torch.utils.data import DataLoader from numalogic._constants import TESTS_DIR -from numalogic.models.autoencoder import AutoencoderTrainer +from numalogic.tools.trainer import TimeseriesTrainer from numalogic.models.autoencoder.variants import ( Conv1dAE, LSTMAE, @@ -27,6 +28,8 @@ ACCELERATOR = "cuda" if torch.cuda.is_available() else "cpu" torch.manual_seed(42) +logging.basicConfig(level=logging.DEBUG) + class TestAutoencoderTrainer(unittest.TestCase): x_train = None @@ -48,7 +51,7 @@ def setUpClass(cls) -> None: def test_trainer_01(self): model = Conv1dAE(seq_len=SEQ_LEN, in_channels=self.x_train.shape[1], enc_channels=(4, 8)) datamodule = TimeseriesDataModule(SEQ_LEN, self.x_train, batch_size=BATCH_SIZE) - trainer = AutoencoderTrainer( + trainer = TimeseriesTrainer( accelerator=ACCELERATOR, max_epochs=EPOCHS, enable_progress_bar=True, @@ -65,9 +68,7 @@ def test_trainer_02(self): datamodule = TimeseriesDataModule( SEQ_LEN, self.x_train, val_split_ratio=0.1, batch_size=BATCH_SIZE ) - trainer = AutoencoderTrainer( - accelerator=ACCELERATOR, max_epochs=EPOCHS, enable_progress_bar=True - ) + trainer = TimeseriesTrainer(accelerator=ACCELERATOR, max_epochs=EPOCHS, log_freq=1) trainer.fit(model, datamodule=datamodule) y_train = trainer.predict(model, dataloaders=datamodule.train_dataloader()) val_size = math.floor(0.1 * len(self.x_train)) @@ -82,7 +83,9 @@ def test_trainer_03(self): datamodule = TimeseriesDataModule( SEQ_LEN, self.x_train, val_split_ratio=0.3, batch_size=BATCH_SIZE ) - trainer = AutoencoderTrainer(accelerator=ACCELERATOR, max_epochs=EPOCHS, barebones=True) + trainer = TimeseriesTrainer( + accelerator=ACCELERATOR, max_epochs=EPOCHS, barebones=True, logger=False + ) trainer.fit(model, datamodule=datamodule) streamloader = DataLoader(StreamingDataset(self.x_test, SEQ_LEN), batch_size=16) @@ -92,7 +95,9 @@ def test_trainer_03(self): def test_trainer_04(self): model = SparseVanillaAE(seq_len=SEQ_LEN, n_features=self.x_train.shape[1]) datamodule = TimeseriesDataModule(SEQ_LEN, self.x_train, batch_size=BATCH_SIZE) - trainer = AutoencoderTrainer(accelerator=ACCELERATOR, max_epochs=EPOCHS, barebones=True) + trainer = TimeseriesTrainer( + accelerator=ACCELERATOR, max_epochs=EPOCHS, barebones=True, logger=False + ) trainer.fit(model, datamodule=datamodule) streamloader = DataLoader(StreamingDataset(self.x_test, SEQ_LEN)) @@ -104,8 +109,12 @@ def test_trainer_05(self): datamodule = TimeseriesDataModule( SEQ_LEN, self.x_train, val_split_ratio=0.25, batch_size=BATCH_SIZE ) - trainer = AutoencoderTrainer( - accelerator=ACCELERATOR, max_epochs=EPOCHS, barebones=True, limit_val_batches=1 + trainer = TimeseriesTrainer( + accelerator=ACCELERATOR, + max_epochs=EPOCHS, + barebones=True, + limit_val_batches=1, + logger=False, ) trainer.fit(model, datamodule=datamodule) @@ -119,7 +128,7 @@ def test_trainer_06_w_val(self): val_dataset = StreamingDataset(self.x_val, SEQ_LEN) test_dataset = StreamingDataset(self.x_test, SEQ_LEN) - trainer = AutoencoderTrainer( + trainer = TimeseriesTrainer( accelerator=ACCELERATOR, max_epochs=EPOCHS, enable_progress_bar=True ) trainer.fit( @@ -145,7 +154,7 @@ def test_trainer_07_wo_val(self): test_dataset = StreamingDataset(self.x_test, SEQ_LEN) - trainer = AutoencoderTrainer(accelerator=ACCELERATOR, max_epochs=EPOCHS) + trainer = TimeseriesTrainer(accelerator=ACCELERATOR, max_epochs=EPOCHS) trainer.fit(model, train_dataloaders=DataLoader(train_dataset, batch_size=BATCH_SIZE)) y_train = trainer.predict( diff --git a/tests/models/autoencoder/variants/test_conv.py b/tests/models/autoencoder/variants/test_conv.py index 18f662d7..4d2f9ca4 100644 --- a/tests/models/autoencoder/variants/test_conv.py +++ b/tests/models/autoencoder/variants/test_conv.py @@ -9,7 +9,7 @@ from numalogic._constants import TESTS_DIR from numalogic.tools.data import TimeseriesDataModule, StreamingDataset -from numalogic.models.autoencoder.trainer import AutoencoderTrainer +from numalogic.tools.trainer import TimeseriesTrainer from numalogic.models.autoencoder.variants import Conv1dAE from numalogic.models.autoencoder.variants.conv import SparseConv1dAE @@ -38,11 +38,11 @@ def setUpClass(cls) -> None: def test_conv1d_1(self): model = Conv1dAE(seq_len=SEQ_LEN, in_channels=self.X_train.shape[1], enc_channels=[8]) datamodule = TimeseriesDataModule(SEQ_LEN, self.X_train, batch_size=BATCH_SIZE) - trainer = AutoencoderTrainer(max_epochs=2, enable_progress_bar=True, fast_dev_run=True) + trainer = TimeseriesTrainer(max_epochs=2, enable_progress_bar=True, fast_dev_run=True) trainer.fit(model, datamodule=datamodule) streamloader = DataLoader(StreamingDataset(self.X_val, SEQ_LEN), batch_size=BATCH_SIZE) - stream_trainer = AutoencoderTrainer() + stream_trainer = TimeseriesTrainer() test_reconerr = stream_trainer.predict(model, dataloaders=streamloader) self.assertTupleEqual(self.X_val.shape, test_reconerr.shape) @@ -57,13 +57,13 @@ def test_conv1d_2(self): optim_algo="rmsprop", ) datamodule = TimeseriesDataModule(SEQ_LEN, self.X_train, batch_size=BATCH_SIZE) - trainer = AutoencoderTrainer( + trainer = TimeseriesTrainer( accelerator=ACCELERATOR, enable_progress_bar=True, fast_dev_run=True ) trainer.fit(model, datamodule=datamodule) streamloader = DataLoader(StreamingDataset(self.X_val, SEQ_LEN), batch_size=BATCH_SIZE) - stream_trainer = AutoencoderTrainer(accelerator=ACCELERATOR) + stream_trainer = TimeseriesTrainer(accelerator=ACCELERATOR) test_reconerr = stream_trainer.predict(model, dataloaders=streamloader) self.assertTupleEqual(self.X_val.shape, test_reconerr.shape) @@ -76,7 +76,7 @@ def test_conv1d_encode(self): dec_activation="sigmoid", ) datamodule = TimeseriesDataModule(SEQ_LEN, self.X_train, batch_size=BATCH_SIZE) - trainer = AutoencoderTrainer(accelerator=ACCELERATOR, fast_dev_run=True) + trainer = TimeseriesTrainer(accelerator=ACCELERATOR, fast_dev_run=True) trainer.fit(model, datamodule=datamodule) streamloader = DataLoader(StreamingDataset(self.X_val, SEQ_LEN), batch_size=256) @@ -122,13 +122,13 @@ def test_sparse_conv1d(self): dec_activation="relu", ) datamodule = TimeseriesDataModule(SEQ_LEN, self.X_train, batch_size=BATCH_SIZE) - trainer = AutoencoderTrainer( + trainer = TimeseriesTrainer( accelerator=ACCELERATOR, enable_progress_bar=True, fast_dev_run=True ) trainer.fit(model, datamodule=datamodule) streamloader = DataLoader(StreamingDataset(self.X_val, SEQ_LEN), batch_size=BATCH_SIZE) - stream_trainer = AutoencoderTrainer(accelerator=ACCELERATOR) + stream_trainer = TimeseriesTrainer(accelerator=ACCELERATOR) test_reconerr = stream_trainer.predict(model, dataloaders=streamloader, unbatch=False) self.assertTupleEqual((229, SEQ_LEN, self.X_train.shape[1]), test_reconerr.size()) diff --git a/tests/models/autoencoder/variants/test_lstm.py b/tests/models/autoencoder/variants/test_lstm.py index fe3376e7..c0f05425 100644 --- a/tests/models/autoencoder/variants/test_lstm.py +++ b/tests/models/autoencoder/variants/test_lstm.py @@ -9,7 +9,7 @@ from numalogic._constants import TESTS_DIR from numalogic.tools.data import TimeseriesDataModule, StreamingDataset -from numalogic.models.autoencoder.trainer import AutoencoderTrainer +from numalogic.tools.trainer import TimeseriesTrainer from numalogic.models.autoencoder.variants import LSTMAE from numalogic.models.autoencoder.variants.lstm import SparseLSTMAE @@ -38,26 +38,26 @@ def setUpClass(cls) -> None: def test_lstm_ae(self): model = LSTMAE(seq_len=SEQ_LEN, no_features=2, embedding_dim=15, weight_decay=1e-3) datamodule = TimeseriesDataModule(SEQ_LEN, self.X_train, batch_size=BATCH_SIZE) - trainer = AutoencoderTrainer( + trainer = TimeseriesTrainer( accelerator=ACCELERATOR, fast_dev_run=True, enable_progress_bar=True ) trainer.fit(model, datamodule=datamodule) streamloader = DataLoader(StreamingDataset(self.X_val, SEQ_LEN), batch_size=BATCH_SIZE) - stream_trainer = AutoencoderTrainer(accelerator=ACCELERATOR) + stream_trainer = TimeseriesTrainer(accelerator=ACCELERATOR) test_reconerr = stream_trainer.predict(model, dataloaders=streamloader) self.assertTupleEqual(self.X_val.shape, test_reconerr.shape) def test_sparse_lstm_ae(self): model = SparseLSTMAE(seq_len=SEQ_LEN, no_features=2, embedding_dim=15, loss_fn="mse") datamodule = TimeseriesDataModule(SEQ_LEN, self.X_train, batch_size=BATCH_SIZE) - trainer = AutoencoderTrainer( + trainer = TimeseriesTrainer( accelerator=ACCELERATOR, fast_dev_run=True, enable_progress_bar=True ) trainer.fit(model, datamodule=datamodule) streamloader = DataLoader(StreamingDataset(self.X_val, SEQ_LEN), batch_size=BATCH_SIZE) - stream_trainer = AutoencoderTrainer(accelerator=ACCELERATOR) + stream_trainer = TimeseriesTrainer(accelerator=ACCELERATOR) test_reconerr = stream_trainer.predict(model, dataloaders=streamloader, unbatch=False) self.assertListEqual([229, SEQ_LEN, self.X_train.shape[1]], list(test_reconerr.size())) diff --git a/tests/models/autoencoder/variants/test_transformers.py b/tests/models/autoencoder/variants/test_transformers.py index 80aef9f9..67e4aeb0 100644 --- a/tests/models/autoencoder/variants/test_transformers.py +++ b/tests/models/autoencoder/variants/test_transformers.py @@ -9,7 +9,7 @@ from numalogic._constants import TESTS_DIR from numalogic.tools.data import StreamingDataset, TimeseriesDataModule -from numalogic.models.autoencoder.trainer import AutoencoderTrainer +from numalogic.tools.trainer import TimeseriesTrainer from numalogic.models.autoencoder.variants import TransformerAE from numalogic.models.autoencoder.variants.transformer import SparseTransformerAE @@ -44,22 +44,22 @@ def test_transformer(self): num_decoder_layers=1, ) datamodule = TimeseriesDataModule(SEQ_LEN, self.X_train, batch_size=BATCH_SIZE) - trainer = AutoencoderTrainer(accelerator="cpu", fast_dev_run=True, enable_progress_bar=True) + trainer = TimeseriesTrainer(accelerator="cpu", fast_dev_run=True, enable_progress_bar=True) trainer.fit(model, datamodule=datamodule) streamloader = DataLoader(StreamingDataset(self.X_val, SEQ_LEN), batch_size=BATCH_SIZE) - stream_trainer = AutoencoderTrainer(accelerator="cpu") + stream_trainer = TimeseriesTrainer(accelerator="cpu") test_reconerr = stream_trainer.predict(model, dataloaders=streamloader) self.assertTupleEqual(self.X_val.shape, test_reconerr.shape) def test_sparse_transformer(self): model = SparseTransformerAE(seq_len=SEQ_LEN, n_features=self.X_train.shape[1], loss_fn="l1") datamodule = TimeseriesDataModule(SEQ_LEN, self.X_train, batch_size=BATCH_SIZE) - trainer = AutoencoderTrainer(accelerator="cpu", fast_dev_run=True, enable_progress_bar=True) + trainer = TimeseriesTrainer(accelerator="cpu", fast_dev_run=True, enable_progress_bar=True) trainer.fit(model, datamodule=datamodule) streamloader = DataLoader(StreamingDataset(self.X_val, SEQ_LEN), batch_size=BATCH_SIZE) - stream_trainer = AutoencoderTrainer(accelerator="cpu") + stream_trainer = TimeseriesTrainer(accelerator="cpu") test_reconerr = stream_trainer.predict(model, dataloaders=streamloader, unbatch=False) self.assertListEqual([229, SEQ_LEN, self.X_train.shape[1]], list(test_reconerr.size())) diff --git a/tests/models/autoencoder/variants/test_vanilla.py b/tests/models/autoencoder/variants/test_vanilla.py index 9d643e3b..61b280d3 100644 --- a/tests/models/autoencoder/variants/test_vanilla.py +++ b/tests/models/autoencoder/variants/test_vanilla.py @@ -9,7 +9,7 @@ from numalogic._constants import TESTS_DIR from numalogic.tools.data import StreamingDataset, TimeseriesDataModule -from numalogic.models.autoencoder.trainer import AutoencoderTrainer +from numalogic.tools.trainer import TimeseriesTrainer from numalogic.models.autoencoder.variants.vanilla import VanillaAE, SparseVanillaAE from numalogic.tools.exceptions import LayerSizeMismatchError @@ -37,11 +37,11 @@ def setUpClass(cls) -> None: def test_vanilla(self): model = VanillaAE(seq_len=SEQ_LEN, n_features=self.X_train.shape[1], weight_decay=1e-3) datamodule = TimeseriesDataModule(SEQ_LEN, self.X_train, batch_size=BATCH_SIZE) - trainer = AutoencoderTrainer(fast_dev_run=True, enable_progress_bar=True) + trainer = TimeseriesTrainer(fast_dev_run=True, enable_progress_bar=True) trainer.fit(model, datamodule=datamodule) streamloader = DataLoader(StreamingDataset(self.X_val, SEQ_LEN), batch_size=BATCH_SIZE) - stream_trainer = AutoencoderTrainer() + stream_trainer = TimeseriesTrainer() test_reconerr = stream_trainer.predict(model, dataloaders=streamloader) self.assertTupleEqual(self.X_val.shape, test_reconerr.shape) @@ -50,11 +50,11 @@ def test_sparse_vanilla(self): seq_len=SEQ_LEN, n_features=self.X_train.shape[1], loss_fn="l1", optim_algo="adagrad" ) datamodule = TimeseriesDataModule(SEQ_LEN, self.X_train, batch_size=BATCH_SIZE) - trainer = AutoencoderTrainer(fast_dev_run=True, enable_progress_bar=True) + trainer = TimeseriesTrainer(fast_dev_run=True, enable_progress_bar=True) trainer.fit(model, datamodule=datamodule) streamloader = DataLoader(StreamingDataset(self.X_val, SEQ_LEN), batch_size=BATCH_SIZE) - stream_trainer = AutoencoderTrainer() + stream_trainer = TimeseriesTrainer() test_reconerr = stream_trainer.predict(model, dataloaders=streamloader, unbatch=False) self.assertTupleEqual((229, SEQ_LEN, self.X_train.shape[1]), test_reconerr.size()) @@ -98,7 +98,7 @@ def test_train_err_01(self): def test_train_err_02(self): model = VanillaAE(SEQ_LEN, n_features=2, optim_algo="random") datamodule = TimeseriesDataModule(SEQ_LEN, self.X_train, batch_size=BATCH_SIZE) - trainer = AutoencoderTrainer(max_epochs=EPOCHS, enable_progress_bar=True) + trainer = TimeseriesTrainer(max_epochs=EPOCHS, enable_progress_bar=True) with self.assertRaises(NotImplementedError): trainer.fit(model, datamodule=datamodule) diff --git a/tests/models/vae/test_conv.py b/tests/models/vae/test_conv.py index 398324cf..e4ea9346 100644 --- a/tests/models/vae/test_conv.py +++ b/tests/models/vae/test_conv.py @@ -9,7 +9,7 @@ from torch.utils.data import DataLoader from numalogic._constants import TESTS_DIR -from numalogic.models.vae import VAETrainer +from numalogic.tools.trainer import TimeseriesTrainer from numalogic.models.vae.variants import Conv1dVAE from numalogic.tools.data import TimeseriesDataModule, StreamingDataset from numalogic.tools.exceptions import ModelInitializationError @@ -42,11 +42,11 @@ def setUpClass(cls) -> None: def test_model_01(self): model = Conv1dVAE(seq_len=SEQ_LEN, n_features=2, latent_dim=1, loss_fn="l1") datamodule = TimeseriesDataModule(SEQ_LEN, self.x_train, batch_size=BATCH_SIZE) - trainer = VAETrainer(accelerator=ACCELERATOR, max_epochs=EPOCHS, fast_dev_run=True) + trainer = TimeseriesTrainer(accelerator=ACCELERATOR, max_epochs=EPOCHS, fast_dev_run=True) trainer.fit(model, datamodule=datamodule) streamloader = DataLoader(StreamingDataset(self.x_val, SEQ_LEN), batch_size=BATCH_SIZE) - stream_trainer = VAETrainer(accelerator=ACCELERATOR) + stream_trainer = TimeseriesTrainer(accelerator=ACCELERATOR) test_reconerr = stream_trainer.predict(model, dataloaders=streamloader) test_reconerr_w_seq = stream_trainer.predict(model, dataloaders=streamloader, unbatch=False) @@ -55,7 +55,7 @@ def test_model_01(self): def test_model_02(self): model = Conv1dVAE(seq_len=SEQ_LEN, n_features=2, latent_dim=1, conv_channels=(8, 4)) - trainer = VAETrainer(accelerator=ACCELERATOR, max_epochs=EPOCHS, log_freq=1) + trainer = TimeseriesTrainer(accelerator=ACCELERATOR, max_epochs=EPOCHS, log_freq=1) trainer.fit( model, train_dataloaders=DataLoader( diff --git a/tests/udfs/test_postprocess.py b/tests/udfs/test_postprocess.py index aea1be83..fcf86d7e 100644 --- a/tests/udfs/test_postprocess.py +++ b/tests/udfs/test_postprocess.py @@ -16,7 +16,7 @@ from numalogic.registry import RedisRegistry, LocalLRUCache from numalogic.tools.exceptions import ModelKeyNotFound from numalogic.udfs import PipelineConf -from numalogic.udfs.entities import Header, TrainerPayload, Status +from numalogic.udfs.entities import Header, TrainerPayload, Status, OutputPayload from numalogic.udfs.postprocess import PostprocessUDF logging.basicConfig(level=logging.DEBUG) @@ -127,6 +127,8 @@ def test_postprocess_all_model_present_01(self): ) msg = self.udf(KEYS, Datum(keys=KEYS, value=orjson.dumps(data), **DATUM_KW)) + payload = OutputPayload(**orjson.loads(msg[0].value)) + self.assertListEqual(data["metrics"], list(payload.data)) self.assertEqual(1, len(msg)) def test_postprocess_all_model_present_02(self): @@ -148,7 +150,8 @@ def test_postprocess_all_model_present_02(self): ) msg = udf(KEYS, Datum(keys=KEYS, value=orjson.dumps(data), **DATUM_KW)) - print(msg) + payload = OutputPayload(**orjson.loads(msg[0].value)) + self.assertListEqual(data["metrics"], list(payload.data)) self.assertEqual(1, len(msg)) @patch("numalogic.udfs.postprocess.PostprocessUDF.compute", Mock(side_effect=RuntimeError))