diff --git a/pyproject.toml b/pyproject.toml index 7e34221..03c7bc2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,7 @@ dependencies = [ "sktime", "numpy", "scikit-learn", + "matplotlib", "numba", ] diff --git a/src/signature_mahalanobis_knn/mahal_distance.py b/src/signature_mahalanobis_knn/mahal_distance.py index 8162e74..eb8f207 100644 --- a/src/signature_mahalanobis_knn/mahal_distance.py +++ b/src/signature_mahalanobis_knn/mahal_distance.py @@ -41,14 +41,16 @@ def __init__( # numerical rank of the corpus self.numerical_rank: int | None = None - def fit(self, X: np.ndarray, **kwargs) -> None: + def fit(self, X: np.ndarray, y: None = None, **kwargs) -> None: # noqa: ARG002 """ Fit the object to a corpus X. - :param X: numpy array, panel data representing the corpus, each row is a data point - :param y: No use, here for interface consistency - - :return: None + Parameters + ---------- + X : np.ndarray + Panel data representing the corpus, each row is a data point. + y: None + Not used, present for API consistency by convention. """ # mean centering self.mu = np.mean(X, axis=0) @@ -57,8 +59,8 @@ def fit(self, X: np.ndarray, **kwargs) -> None: U, S, Vt = np.linalg.svd(X) k = np.sum(self.svd_thres <= S) # detected numerical rank self.numerical_rank = k - self.Vt = Vt[:k] - self.S = S[:k] + self.Vt = Vt[:k].astype(self.default_dtype) + self.S = S[:k].astype(self.default_dtype) @staticmethod @jit(nopython=True) # Observe 6 times speed up on pen-digit dataset @@ -73,20 +75,31 @@ def calc_distance( """ Compute the variance norm between x1 and x2 using the precomputed SVD. - :param x1: 1D array, row vector - :param x2: 1D array, row vector - :param Vt: 2D array, truncated right singular matrix transposed of the corpus - :param S: 1D array, truncated singular values of the corpus - :subspace_thres: float, threshold to decide whether a point is in the data subspace - - :return: a value representing distance between x, y + Parameters + ---------- + x1 : np.ndarray + One-dimensional array. + x2 : np.ndarray + One-dimensional array. + Vt : np.ndarray + Two-dimensional arrat, truncated right singular matrix transposed of the corpus. + S : np.ndarray + One-dimensional array, truncated singular values of the corpus. + subspace_thres : float + Threshold to decide whether a point is in the data subspace. + + Returns + ------- + float + Value representing distance between x, y. """ x = x1 - x2 - # quantifies the amount that x is outside the row-subspace - if np.linalg.norm(x) < zero_thres: + norm_x = np.linalg.norm(x) + if norm_x < zero_thres: return 0.0 - rho = np.linalg.norm(x - x @ Vt.T @ Vt) / np.linalg.norm(x) + # quantifies the amount that x is outside the row-subspace + rho = np.linalg.norm(x - x @ Vt.T @ Vt) / norm_x if rho > subspace_thres: return np.inf @@ -94,17 +107,28 @@ def calc_distance( def distance(self, x1: np.ndarray, x2: np.ndarray) -> float: """ - Compute the variance norm between x1 and x2. - - :param x1: 1D array, row vector - :param x2: 1D array, row vector + Compute the variance norm between x1 and x2 using the precomputed SVD. - :return: a value representing distance between x, y + Parameters + ---------- + x1 : np.ndarray + One-dimensional array. + x2 : np.ndarray + One-dimensional array. + + Returns + ------- + float + Value representing distance between x, y. """ if self.numerical_rank is None: msg = "Mahalanobis distance is not fitted yet." raise ValueError(msg) + # ensure inputs are the right data type + x1 = x1.astype(self.default_dtype) + x2 = x2.astype(self.default_dtype) + return self.calc_distance( x1=x1, x2=x2, diff --git a/src/signature_mahalanobis_knn/sig_mahal_knn.py b/src/signature_mahalanobis_knn/sig_mahal_knn.py index 1e3c182..4573138 100644 --- a/src/signature_mahalanobis_knn/sig_mahal_knn.py +++ b/src/signature_mahalanobis_knn/sig_mahal_knn.py @@ -3,128 +3,190 @@ import numpy as np import pandas as pd from joblib import Parallel, delayed -from sklearn.metrics import roc_auc_score from sklearn.neighbors import NearestNeighbors -from sktime.transformations.panel.signature_based import SignatureTransformer from signature_mahalanobis_knn.mahal_distance import Mahalanobis +X_OR_SIGNATURE_ERROR_MSG = "Either X or signatures must be provided" +MODEL_NOT_FITTED_ERROR_MSG = "Must fit the model first" + class SignatureMahalanobisKNN: def __init__( self, - n_jobs=-2, - augmentation_list=("addtime",), - window_name="global", - window_depth=None, - window_length=None, - window_step=None, - rescaling=None, - sig_tfm="signature", - depth=2, + n_jobs: int = -2, ): """ - __description__ - - :param n_jobs: parameter for joblib, number of parallel processors to use. - :param augmentation_list: Possible augmentation strings are - ['leadlag', 'ir', 'addtime', 'cumsum', 'basepoint'] - :param window_name: str, String from ['global', 'sliding', 'expanding', 'dyadic'] - :param window_depth: int, The depth of the dyadic window. (Active only if - `window_name == 'dyadic']`. - :param window_length: int, The length of the sliding/expanding window. (Active - only if `window_name in ['sliding, 'expanding']. - :param window_step: int, The step of the sliding/expanding window. (Active - only if `window_name in ['sliding, 'expanding']. - :param rescaling: "pre" or "post", - "pre": rescale the path last signature term should be roughly O(1) - "post": Rescals the output signature by multiplying the depth-d term by d!. - Aim is that every term become ~O(1). - :param sig_tfm: One of: ['signature', 'logsignature']). - :param depth: int, Signature truncation depth. + Parameters + ---------- + n_jobs : int, optional + Parameter for joblib, number of parallel processors to use, by default -2. + -1 means using all processors, -2 means using all processors but one. """ - self.signature_transform = SignatureTransformer( - augmentation_list=augmentation_list, - window_name=window_name, - window_depth=window_depth, - window_length=window_length, - window_step=window_step, - rescaling=rescaling, - sig_tfm=sig_tfm, - depth=depth, - ) + self.signature_transform = None self.n_jobs = n_jobs + self.mahal_distance = None + self.signatures = None self.knn = None - def fit(self, X): + def fit( + self, + X: np.ndarray | None = None, + signatures: np.ndarray | None = None, + knn_algorithm: str = "auto", + **kwargs, + ) -> None: """ - __description__ - - :param X: Must support index operation X[i] where each X[i] returns a data point in the corpus - - :return: + Fit the KNN model with the corpus of signatures. + If signatures is not provided, then X must be provided + to compute the signatures. + If signatures is provided, then X is ignored. + + Parameters + ---------- + X : np.ndarray | None, optional + Data points, by default None. + Must support index operation X[i] where + each X[i] returns a data point in the corpus. + signatures : np.ndarray | None, optional + Signatures of the data points, by default None. + Must support index operation X[i] where + each X[i] returns a data point in the corpus. + knn_algorithm : str, optional + Algorithm used to compute the nearest neighbors + (see scikit-learn documentation for `sklearn.neighbors.NearestNeighbors`), + by default "auto". + **kwargs + Keyword arguments passed to the signature transformer if + signatures are not provided and X is provided. + See sktime documentation for + `sktime.transformations.panel.signature_based.SignatureTransformer` + for more details on what arguments are available. + Some notable options are: + - augmentation_list: tuple[str], Possible augmentation strings are + ['leadlag', 'ir', 'addtime', 'cumsum', 'basepoint'] + - window_name: str, String from + ['global', 'sliding', 'expanding', 'dyadic'] + - window_depth: int, The depth of the dyadic window. + (Active only if `window_name == 'dyadic']`. + - window_length: int, The length of the sliding/expanding window. + (Active only if `window_name in ['sliding, 'expanding']. + - window_step: int, The step of the sliding/expanding window. + (Active only if `window_name in ['sliding, 'expanding']. + - rescaling: "pre" or "post", + - "pre": rescale the path last signature term should + be roughly O(1) + - "post": Rescals the output signature by multiplying + the depth-d term by d!. Aim is that every term become ~O(1). + - sig_tfm: One of: ['signature', 'logsignature']). + - depth: int, Signature truncation depth. + By default, the following arguments are used: + - augmentation_list: ("addtime",) + - window_name: "global" + - window_depth: None + - window_length: None + - window_step: None + - rescaling: None + - sig_tfm: "signature" + - depth: 2 """ - sigs = Parallel(n_jobs=self.n_jobs)( - delayed(self.signature_transform.fit_transform)(X[i]) for i in range(len(X)) - ) - sigs = pd.concat(sigs) - - mahal_distance = Mahalanobis() - mahal_distance.fit(sigs) - + if signatures is None: + if X is None: + raise ValueError(X_OR_SIGNATURE_ERROR_MSG) + + from sktime.transformations.panel.signature_based import ( + SignatureTransformer, + ) + + # set default kwargs for signature transformer if not provided + if kwargs == {}: + kwargs = { + "augmentation_list": ("addtime",), + "window_name": "global", + "window_depth": None, + "window_length": None, + "window_step": None, + "rescaling": None, + "sig_tfm": "signature", + "depth": 2, + } + + self.signature_transform = SignatureTransformer( + **kwargs, + ) + + # compute signatures + sigs = Parallel(n_jobs=self.n_jobs)( + delayed(self.signature_transform.fit_transform)(X[i]) + for i in range(len(X)) + ) + self.signatures = pd.concat(sigs) + else: + self.signatures = signatures + + # fit mahalanobis distance for the signatures + self.mahal_distance = Mahalanobis() + self.mahal_distance.fit(self.signatures) + + # fit knn for the mahalanobis distance knn = NearestNeighbors( - metric=mahal_distance.distance, n_jobs=self.n_jobs, algorithm="auto" + metric=self.mahal_distance.distance, + n_jobs=self.n_jobs, + algorithm=knn_algorithm, ) - knn.fit(sigs) + knn.fit(self.signatures) self.knn = knn - def conformance(self, X): + def conformance( + self, + X: np.ndarray | None = None, + signatures: np.ndarray | None = None, + ) -> np.ndarray: """ - __description__ - - :param X: Must support index operation X[i] where each X[i] returns a data point - - :return: + Compute the conformance scores for the data points either passed in + directly as X or the signatures of the data points in signatures. + If signatures is not provided, then X must be provided + to compute the signatures. + If signatures is provided, then X is ignored. + + Must call fit() method first. + + Parameters + ---------- + X : np.ndarray | None, optional + Data points, by default None. + Must support index operation X[i] where + each X[i] returns a data point in the corpus. + signatures : np.ndarray | None, optional + Signatures of the data points, by default None. + Must support index operation X[i] where + each X[i] returns a data point in the corpus. + + Returns + ------- + np.ndarray + Conformance scores for data points provided. """ - sigs = Parallel(n_jobs=self.n_jobs)( - delayed(self.signature_transform.fit_transform)(X[i]) for i in range(len(X)) + if self.knn is None: + raise ValueError(MODEL_NOT_FITTED_ERROR_MSG) + + if signatures is None: + if X is None: + raise ValueError(X_OR_SIGNATURE_ERROR_MSG) + if self.signature_transform is None: + raise ValueError(MODEL_NOT_FITTED_ERROR_MSG) + + # compute signatures + sigs = Parallel(n_jobs=self.n_jobs)( + delayed(self.signature_transform.fit_transform)(X[i]) + for i in range(len(X)) + ) + signatures = pd.concat(sigs) + + # compute KNN distances for the signatures of the data points + # against the signatures of the corpus + distances, _ = self.knn.kneighbors( + signatures, n_neighbors=1, return_distance=True ) - sigs = pd.concat(sigs) - - distances, _ = self.knn.kneighbors(sigs, n_neighbors=1) return distances - - def compute_auc(self, test_in, test_out): - """ - __description__ - - :param test_in: - :param test_out: - - :return: - """ - distances_in = self.conformance(test_in) - distances_out = self.conformance(test_out) - return self.compute_auc_given_dists(distances_in, distances_out) - - def compute_auc_given_dists(self, distances_in, distances_out): - """ - __description__ - - :param distances_in: - :param distances_out: - - :return: - """ - # replace infinity with twice of the maximum value, hacky, may need more thoughts - distances_in[distances_in == np.inf] = np.nan - distances_out[distances_out == np.inf] = np.nan - max_val = max(np.nanmax(distances_in), np.nanmax(distances_out)) - distances_in = np.nan_to_num(distances_in, max_val * 2) - distances_out = np.nan_to_num(distances_out, max_val * 2) - - return roc_auc_score( - [False] * len(distances_in) + [True] * len(distances_out), - np.concatenate([distances_in, distances_out]), - ) diff --git a/src/signature_mahalanobis_knn/utils.py b/src/signature_mahalanobis_knn/utils.py new file mode 100644 index 0000000..5542ba7 --- /dev/null +++ b/src/signature_mahalanobis_knn/utils.py @@ -0,0 +1,163 @@ +from __future__ import annotations + +import matplotlib.pyplot as plt +import numpy as np +from sklearn.metrics import roc_auc_score, roc_curve + +from signature_mahalanobis_knn.sig_mahal_knn import SignatureMahalanobisKNN + + +def plot_roc_curve( + y_true: np.ndarray, + y_score: np.ndarray, + roc_auc: float | None = None, + title: str = "", +) -> float: + """ + Plot the ROC curve given the true labels and the scores. + + Parameters + ---------- + y_true : np.ndarray + True binary labels. + y_score : np.ndarray + Target scores. + roc_auc : float | None, optional + ROC AUC, by default None. + If None, then it will be computed. + title : str, optional + Title for the ROC curve plot, by default "". + + Returns + ------- + float + ROC AUC score. + """ + + # compute and plot metrics + fp_rate, tp_rate, _ = roc_curve(y_true, y_score) + if roc_auc is None: + roc_auc = roc_auc_score(y_true, y_score) + + plt.title(f"Receiver Operating Characteristic {title}") + plt.plot(fp_rate, tp_rate, "b", label=f"AUC = {round(roc_auc, 2)}") + plt.legend(loc="lower right") + plt.plot([0, 1], [0, 1], "r--") + plt.xlim([0, 1]) + plt.ylim([0, 1]) + plt.ylabel("True Positive Rate") + plt.xlabel("False Positive Rate") + plt.show() + + return roc_auc + + +def compute_auc_given_dists( + distances_in: np.ndarray, + distances_out: np.ndarray, + plot: bool = False, + title: str = "", +) -> float: + """ + Compute ROC AUC given the distances of inliers and outliers. + + Parameters + ---------- + distances_in : np.ndarray + KNN distances for the inlier data points. + distances_out : np.ndarray + KNN distances for the outlier data points. + plot : bool, optional + Whether to plot the ROC curve, by default False + title : str, optional + Title for the ROC curve plot, by default "". + Only used when plot is True. + + Returns + ------- + float + ROC AUC score. + """ + # replace infinity with twice of the maximum value, hacky, may need more thoughts + distances_in[distances_in == np.inf] = np.nan + distances_out[distances_out == np.inf] = np.nan + max_val = max(np.nanmax(distances_in), np.nanmax(distances_out)) + distances_in = np.nan_to_num(distances_in, max_val * 2) + distances_out = np.nan_to_num(distances_out, max_val * 2) + + y_true = [0] * len(distances_in) + [1] * len(distances_out) + y_score = np.concatenate([distances_in, distances_out]) + roc_auc = roc_auc_score( + y_true=y_true, + y_score=y_score, + ) + + if plot: + plot_roc_curve( + y_true=y_true, + y_score=y_score, + roc_auc=roc_auc, + title=title, + ) + + return roc_auc + + +def compute_auc( + signature_mahalanobis_knn: SignatureMahalanobisKNN, + test_in: np.ndarray, + test_out: np.ndarray, + is_signature: bool, + plot: bool = False, + title: str = "", +) -> float: + """ + Compute ROC AUC given the data points of inliers and outliers. + + Parameters + ---------- + test_in : np.ndarray + Data points from the inlier class. + test_out : np.ndarray + Data points from the outlier class. + is_signature : bool + Whether the data provided are signatures or not. + If True, then test_in and test_out are signatures. + If False, then test_in and test_out are data points + and the signatures will be computed. + plot : bool, optional + Whether to plot the ROC curve, by default False + title : str, optional + Title for the ROC curve plot, by default "". + Only used when plot is True. + + Returns + ------- + float + ROC AUC score. + """ + # compute KNN distances for the signatures of the data points + # for both inliers and outliers of distribution data + if is_signature: + distances_in = signature_mahalanobis_knn.conformance(signatures=test_in) + distances_out = signature_mahalanobis_knn.conformance(signatures=test_out) + else: + distances_in = signature_mahalanobis_knn.conformance(X=test_in) + distances_out = signature_mahalanobis_knn.conformance(X=test_out) + + # convert to the default data type of the arrays in + # the mahalanobis distance object + distances_in = distances_in.astype( + signature_mahalanobis_knn.mahal_distance.default_dtype + ) + distances_out = distances_out.astype( + signature_mahalanobis_knn.mahal_distance.default_dtype + ) + + # compute AUC for the inliers and outliers + return compute_auc_given_dists( + distances_in=distances_in, + distances_out=distances_out, + plot=plot, + title=title, + )