Skip to content

Commit

Permalink
feat: static thresh udf
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 committed Mar 1, 2024
1 parent 2cb9776 commit c0f605c
Show file tree
Hide file tree
Showing 11 changed files with 444 additions and 106 deletions.
2 changes: 2 additions & 0 deletions numalogic/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
TrainerConf,
ScoreConf,
AggregatorConf,
ScoreAdjustConf,
)
from numalogic.config.factory import (
ModelFactory,
Expand All @@ -41,6 +42,7 @@
"RegistryFactory",
"TrainerConf",
"ScoreConf",
"ScoreAdjustConf",
"AggregatorConf",
"AggregatorFactory",
]
79 changes: 0 additions & 79 deletions numalogic/tools/adjust.py

This file was deleted.

19 changes: 16 additions & 3 deletions numalogic/udfs/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@
_increment_counter,
)
from numalogic.udfs.entities import StreamPayload, Status
from numalogic.udfs.tools import _load_artifact, _update_info_metric, get_trainer_message
from numalogic.udfs.tools import (
_load_artifact,
_update_info_metric,
get_trainer_message,
get_static_thresh_message,
)

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -131,7 +136,10 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:

# Send training request if artifact loading is not successful
if not artifact_data:
return Messages(get_trainer_message(keys, _stream_conf, payload, *_metric_label_values))
msgs = Messages(get_trainer_message(keys, _stream_conf, payload))
if _conf.numalogic_conf.score.adjust:
msgs.append(get_static_thresh_message(keys, payload))

Check warning on line 141 in numalogic/udfs/inference.py

View check run for this annotation

Codecov / codecov/patch

numalogic/udfs/inference.py#L141

Added line #L141 was not covered by tests
return msgs

# Perform inference
try:
Expand All @@ -145,7 +153,11 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
payload.composite_keys,
payload.metrics,
)
return Messages(get_trainer_message(keys, _stream_conf, payload, *_metric_label_values))
# Send training request if inference fails
msgs = Messages(get_trainer_message(keys, _stream_conf, payload))
if _conf.numalogic_conf.score.adjust:
msgs.append(get_static_thresh_message(keys, payload))
return msgs

msgs = Messages()
status = (
Expand All @@ -162,6 +174,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
**payload.metadata,
},
)
# Send trainer message if artifact is stale
if status == Status.ARTIFACT_STALE:
msgs.append(get_trainer_message(keys, _stream_conf, payload, *_metric_label_values))

Expand Down
14 changes: 11 additions & 3 deletions numalogic/udfs/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
_increment_counter,
)
from numalogic.udfs.entities import StreamPayload, Header, Status, OutputPayload
from numalogic.udfs.tools import _load_artifact, get_trainer_message
from numalogic.udfs.tools import _load_artifact, get_trainer_message, get_static_thresh_message

# TODO: move to config
LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", "3600"))
Expand Down Expand Up @@ -123,7 +123,11 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
payload = replace(
payload, status=Status.ARTIFACT_NOT_FOUND, header=Header.TRAIN_REQUEST
)
return Messages(get_trainer_message(keys, _stream_conf, payload, *_metric_label_values))
# Send training request if artifact loading is not successful
msgs = Messages(get_trainer_message(keys, _stream_conf, payload))
if _conf.numalogic_conf.score.adjust:
msgs.append(get_static_thresh_message(keys, payload))
return msgs

if payload.header == Header.STATIC_INFERENCE:
_LOGGER.warning("Static inference not supported in postprocess yet")

Check warning on line 133 in numalogic/udfs/postprocess.py

View check run for this annotation

Codecov / codecov/patch

numalogic/udfs/postprocess.py#L133

Added line #L133 was not covered by tests
Expand Down Expand Up @@ -154,7 +158,11 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
payload.composite_keys,
payload.metrics,
)
return Messages(get_trainer_message(keys, _stream_conf, payload, *_metric_label_values))
# Send training request if postprocess fails
msgs = Messages(get_trainer_message(keys, _stream_conf, payload))
if _conf.numalogic_conf.score.adjust:
msgs.append(get_static_thresh_message(keys, payload))
return msgs

payload = replace(
payload,
Expand Down
15 changes: 12 additions & 3 deletions numalogic/udfs/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
_load_artifact,
_update_info_metric,
get_trainer_message,
get_static_thresh_message,
)

# TODO: move to config
Expand Down Expand Up @@ -136,7 +137,6 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
# Drop message if dataframe shape conditions are not met
if raw_df.shape[0] < _stream_conf.window_size or raw_df.shape[1] != len(_conf.metrics):
_LOGGER.critical("Dataframe shape: (%f, %f) error ", raw_df.shape[0], raw_df.shape[1])
print(_metric_label_values)
_increment_counter(
counter=DATASHAPE_ERROR_COUNTER,
labels=_metric_label_values,
Expand Down Expand Up @@ -174,7 +174,10 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
)
payload = replace(payload, status=Status.ARTIFACT_FOUND)
else:
return Messages(get_trainer_message(keys, _stream_conf, payload))
msgs = Messages(get_trainer_message(keys, _stream_conf, payload))
if _conf.numalogic_conf.score.adjust:
msgs.append(get_static_thresh_message(keys, payload))

Check warning on line 179 in numalogic/udfs/preprocess.py

View check run for this annotation

Codecov / codecov/patch

numalogic/udfs/preprocess.py#L179

Added line #L179 was not covered by tests
return msgs
# Model will not be in registry
else:
# Load configuration for the config_id
Expand Down Expand Up @@ -220,7 +223,13 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
payload,
status=Status.RUNTIME_ERROR,
)
return Messages(get_trainer_message(keys, _stream_conf, payload, *_metric_label_values))
msgs = Messages(
get_trainer_message(keys, _stream_conf, payload, *_metric_label_values),
)
if _conf.numalogic_conf.score.adjust:
msgs.append(get_static_thresh_message(keys, payload))
return msgs

_increment_counter(
counter=MSG_PROCESSED_COUNTER,
labels=_metric_label_values,
Expand Down
170 changes: 170 additions & 0 deletions numalogic/udfs/staticthresh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import logging

from orjson import orjson
from pynumaflow.mapper import Datum, Messages, Message

from numalogic.config import AggregatorFactory, ScoreAdjustConf, AggregatorConf
from numalogic.models.threshold import SigmoidThreshold
from numalogic.tools.aggregators import aggregate_window, aggregate_features
from numalogic.udfs import NumalogicUDF, PipelineConf
import numpy.typing as npt

from numalogic.udfs.entities import StreamPayload, OutputPayload


_LOGGER = logging.getLogger(__name__)


class StaticThresholdUDF(NumalogicUDF):
"""
Static thresholding UDF, which computes the static anomaly scores.
Args:
pl_conf: PipelineConf object
"""

def __init__(self, pl_conf: PipelineConf):
super().__init__(pl_conf=pl_conf, _vtx="staticthresh")

def exec(self, keys: list[str], datum: Datum) -> Messages:
"""
Processes the input data and computes the static anomaly scores.
Args:
-------
keys: List of keys
datum: Datum object.
Returns
-------
Messages instance
"""
payload = StreamPayload(**orjson.loads(datum.value))
conf = self.get_ml_pipeline_conf(payload.config_id, payload.pipeline_id)
adjust_conf = conf.numalogic_conf.score.adjust

if not adjust_conf:
_LOGGER.warning(

Check warning on line 47 in numalogic/udfs/staticthresh.py

View check run for this annotation

Codecov / codecov/patch

numalogic/udfs/staticthresh.py#L47

Added line #L47 was not covered by tests
"%s - No score adjust config found for config_id: %s, pipeline_id: %s",
)
return Messages(Message.to_drop())

Check warning on line 50 in numalogic/udfs/staticthresh.py

View check run for this annotation

Codecov / codecov/patch

numalogic/udfs/staticthresh.py#L50

Added line #L50 was not covered by tests

try:
y_features = self.compute(
input_=payload.get_data(original=True, metrics=list(adjust_conf.upper_limits)),
adjust_conf=adjust_conf,
)
y_unified = self.compute_unified_score(y_features, adjust_conf.feature_agg)
except RuntimeError:
_LOGGER.exception(
"%s - Error occurred while computing static anomaly scores",
payload.uuid,
)
return Messages(Message.to_drop())

out_payload = OutputPayload(
uuid=payload.uuid,
config_id=payload.config_id,
pipeline_id=payload.pipeline_id,
composite_keys=payload.composite_keys,
timestamp=payload.end_ts,
unified_anomaly=y_unified,
data=self._additional_scores(payload.metrics, y_features, y_unified),
metadata=payload.metadata,
)
return Messages(Message(keys=keys, value=out_payload.to_json(), tags=["output"]))

@staticmethod
def _additional_scores(
feat_names: list[str], y_features: npt.NDArray[float], y_unified: float
) -> dict[str, float]:
"""
Additional scores to be computed.
Args:
-------
feat_names: List of feature names
y_features: Anomaly scores
y_unified: Unified anomaly score
Returns
-------
Additional scores
"""
scores_payload = {"unified_ST": y_unified}
if (scores_len := len(y_features)) == len(feat_names):
_LOGGER.debug(
"Scores length: %s does not match feat_names: %s",
scores_len,
feat_names,
)
scores_payload |= dict(zip(feat_names, y_features))
return scores_payload

@classmethod
def compute(
cls, input_: npt.NDArray[float], adjust_conf: ScoreAdjustConf, **_
) -> npt.NDArray[float]:
"""
Compute static thresholding over the raw input features.
Args:
input_: Input data
adjust_conf: Score adjust Config
Returns
-------
npt.NDArray[float]
"""
scorer = SigmoidThreshold(*adjust_conf.upper_limits.values())
try:
return cls.compute_feature_scores(scorer.score_samples(input_), adjust_conf.window_agg)
except Exception as err:
raise RuntimeError("Static Thresholding failed!") from err

@classmethod
def compute_feature_scores(
cls, scores: npt.NDArray[float], win_agg_conf: AggregatorConf
) -> npt.NDArray[float]:
"""
Aggregate scores over the window length.
Args:
-------
scores: anomaly scores (Shape: seq_len x n_features)
win_agg_conf: Window aggregator Config
Returns
-------
Aggregated scores of shape (n_features,)
"""
return aggregate_window(
scores,
agg_func=AggregatorFactory.get_func(win_agg_conf.method),
**win_agg_conf.conf,
)

@classmethod
def compute_unified_score(
cls, scores: npt.NDArray[float], feat_agg_conf: AggregatorConf
) -> float:
"""
Aggregate scores over the features to get a unified score.
Args:
-------
scores: anomaly scores (Shape: n_features, )
feat_agg_conf: Feature aggregator Config
Returns
-------
Unified score (float)
"""
try:
return aggregate_features(
scores.reshape(1, -1),
agg_func=AggregatorFactory.get_func(feat_agg_conf.method),
**feat_agg_conf.conf,
).item()
except Exception as err:
raise RuntimeError("Unified Score computation failed!") from err

Check warning on line 170 in numalogic/udfs/staticthresh.py

View check run for this annotation

Codecov / codecov/patch

numalogic/udfs/staticthresh.py#L169-L170

Added lines #L169 - L170 were not covered by tests
Loading

0 comments on commit c0f605c

Please sign in to comment.