Skip to content

Commit

Permalink
make pr changes from comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rchan26 committed Oct 17, 2023
1 parent 1e89417 commit 52fda32
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 113 deletions.
146 changes: 33 additions & 113 deletions src/signature_mahalanobis_knn/sig_mahal_knn.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from sklearn.metrics import roc_auc_score
from sklearn.neighbors import NearestNeighbors

from signature_mahalanobis_knn.mahal_distance import Mahalanobis
from signature_mahalanobis_knn.utils import plot_roc_curve

X_OR_SIGNATURE_ERROR_MSG = "Either X or signatures must be provided"


class SignatureMahalanobisKNN:
Expand Down Expand Up @@ -59,12 +59,39 @@ def fit(
Keyword arguments passed to the signature transformer if
signatures are not provided and X is provided.
See sktime documentation for
`sktime.transformations.panel.signature_based.SignatureTransformer`.
`sktime.transformations.panel.signature_based.SignatureTransformer`
for more details on what arguments are available.
Some notable options are:
- augmentation_list: tuple[str], Possible augmentation strings are
['leadlag', 'ir', 'addtime', 'cumsum', 'basepoint']
- window_name: str, String from
['global', 'sliding', 'expanding', 'dyadic']
- window_depth: int, The depth of the dyadic window.
(Active only if `window_name == 'dyadic']`.
- window_length: int, The length of the sliding/expanding window.
(Active only if `window_name in ['sliding, 'expanding'].
- window_step: int, The step of the sliding/expanding window.
(Active only if `window_name in ['sliding, 'expanding'].
- rescaling: "pre" or "post",
- "pre": rescale the path last signature term should
be roughly O(1)
- "post": Rescals the output signature by multiplying
the depth-d term by d!. Aim is that every term become ~O(1).
- sig_tfm: One of: ['signature', 'logsignature']).
- depth: int, Signature truncation depth.
By default, the following arguments are used:
- augmentation_list: ("addtime",)
- window_name: "global"
- window_depth: None
- window_length: None
- window_step: None
- rescaling: None
- sig_tfm: "signature"
- depth: 2
"""
if signatures is None:
if X is None:
msg = "Either X or signatures must be provided"
raise ValueError(msg)
raise ValueError(X_OR_SIGNATURE_ERROR_MSG)

from sktime.transformations.panel.signature_based import (
SignatureTransformer,
Expand Down Expand Up @@ -145,8 +172,7 @@ def conformance(

if signatures is None:
if X is None:
msg = "Either X or signatures must be provided"
raise ValueError(msg)
raise ValueError(X_OR_SIGNATURE_ERROR_MSG)
if self.signature_transform is None:
msg = "Must fit the model first"
raise ValueError(msg)
Expand All @@ -165,109 +191,3 @@ def conformance(
)

return distances

def compute_auc_given_dists(
self,
distances_in: np.ndarray,
distances_out: np.ndarray,
plot: bool = False,
title: str = "",
) -> float:
"""
Compute ROC AUC given the distances of inliers and outliers.
Parameters
----------
distances_in : np.ndarray
KNN distances for the inlier data points.
distances_out : np.ndarray
KNN distances for the outlier data points.
plot : bool, optional
Whether to plot the ROC curve, by default False
title : str, optional
Title for the ROC curve plot, by default "".
Only used when plot is True.
Returns
-------
float
ROC AUC score.
"""
# replace infinity with twice of the maximum value, hacky, may need more thoughts
distances_in[distances_in == np.inf] = np.nan
distances_out[distances_out == np.inf] = np.nan
max_val = max(np.nanmax(distances_in), np.nanmax(distances_out))
distances_in = np.nan_to_num(distances_in, max_val * 2)
distances_out = np.nan_to_num(distances_out, max_val * 2)

y_true = [0] * len(distances_in) + [1] * len(distances_out)
y_score = np.concatenate([distances_in, distances_out])
roc_auc = roc_auc_score(
y_true=y_true,
y_score=y_score,
)

if plot:
plot_roc_curve(
y_true=y_true,
y_score=y_score,
roc_auc=roc_auc,
title=title,
)

return roc_auc

def compute_auc(
self,
test_in: np.ndarray,
test_out: np.ndarray,
is_signature: bool,
plot: bool = False,
title: str = "",
) -> float:
"""
Compute ROC AUC given the data points of inliers and outliers.
Parameters
----------
test_in : np.ndarray
Data points from the inlier class.
test_out : np.ndarray
Data points from the outlier class.
is_signature : bool
Whether the data provided are signatures or not.
If True, then test_in and test_out are signatures.
If False, then test_in and test_out are data points
and the signatures will be computed.
plot : bool, optional
Whether to plot the ROC curve, by default False
title : str, optional
Title for the ROC curve plot, by default "".
Only used when plot is True.
Returns
-------
float
ROC AUC score.
"""
# compute KNN distances for the signatures of the data points
# for both inliers and outliers of distribution data
if is_signature:
distances_in = self.conformance(signatures=test_in)
distances_out = self.conformance(signatures=test_out)
else:
distances_in = self.conformance(X=test_in)
distances_out = self.conformance(X=test_out)

# convert to the default data type of the arrays in
# the mahalanobis distance object
distances_in = distances_in.astype(self.mahal_distance.default_dtype)
distances_out = distances_out.astype(self.mahal_distance.default_dtype)

# compute AUC for the inliers and outliers
return self.compute_auc_given_dists(
distances_in=distances_in,
distances_out=distances_out,
plot=plot,
title=title,
)
113 changes: 113 additions & 0 deletions src/signature_mahalanobis_knn/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import numpy as np
from sklearn.metrics import roc_auc_score, roc_curve

from signature_mahalanobis_knn.sig_mahal_knn import SignatureMahalanobisKNN


def plot_roc_curve(
y_true: np.ndarray,
Expand Down Expand Up @@ -48,3 +50,114 @@ def plot_roc_curve(
plt.show()

return roc_auc


def compute_auc_given_dists(
distances_in: np.ndarray,
distances_out: np.ndarray,
plot: bool = False,
title: str = "",
) -> float:
"""
Compute ROC AUC given the distances of inliers and outliers.
Parameters
----------
distances_in : np.ndarray
KNN distances for the inlier data points.
distances_out : np.ndarray
KNN distances for the outlier data points.
plot : bool, optional
Whether to plot the ROC curve, by default False
title : str, optional
Title for the ROC curve plot, by default "".
Only used when plot is True.
Returns
-------
float
ROC AUC score.
"""
# replace infinity with twice of the maximum value, hacky, may need more thoughts
distances_in[distances_in == np.inf] = np.nan
distances_out[distances_out == np.inf] = np.nan
max_val = max(np.nanmax(distances_in), np.nanmax(distances_out))
distances_in = np.nan_to_num(distances_in, max_val * 2)
distances_out = np.nan_to_num(distances_out, max_val * 2)

y_true = [0] * len(distances_in) + [1] * len(distances_out)
y_score = np.concatenate([distances_in, distances_out])
roc_auc = roc_auc_score(
y_true=y_true,
y_score=y_score,
)

if plot:
plot_roc_curve(
y_true=y_true,
y_score=y_score,
roc_auc=roc_auc,
title=title,
)

return roc_auc


def compute_auc(
signature_mahalanobis_knn: SignatureMahalanobisKNN,
test_in: np.ndarray,
test_out: np.ndarray,
is_signature: bool,
plot: bool = False,
title: str = "",
) -> float:
"""
Compute ROC AUC given the data points of inliers and outliers.
Parameters
----------
test_in : np.ndarray
Data points from the inlier class.
test_out : np.ndarray
Data points from the outlier class.
is_signature : bool
Whether the data provided are signatures or not.
If True, then test_in and test_out are signatures.
If False, then test_in and test_out are data points
and the signatures will be computed.
plot : bool, optional
Whether to plot the ROC curve, by default False
title : str, optional
Title for the ROC curve plot, by default "".
Only used when plot is True.
Returns
-------
float
ROC AUC score.
"""
# compute KNN distances for the signatures of the data points
# for both inliers and outliers of distribution data
if is_signature:
distances_in = signature_mahalanobis_knn.conformance(signatures=test_in)
distances_out = signature_mahalanobis_knn.conformance(signatures=test_out)
else:
distances_in = signature_mahalanobis_knn.conformance(X=test_in)
distances_out = signature_mahalanobis_knn.conformance(X=test_out)

# convert to the default data type of the arrays in
# the mahalanobis distance object
distances_in = distances_in.astype(
signature_mahalanobis_knn.mahal_distance.default_dtype
)
distances_out = distances_out.astype(
signature_mahalanobis_knn.mahal_distance.default_dtype
)

# compute AUC for the inliers and outliers
return compute_auc_given_dists(
distances_in=distances_in,
distances_out=distances_out,
plot=plot,
title=title,
)

0 comments on commit 52fda32

Please sign in to comment.