From 01b71c913f2a72056b35456b3ac30adf75180796 Mon Sep 17 00:00:00 2001 From: fkuehlein Date: Tue, 13 Aug 2024 11:37:46 +0200 Subject: [PATCH] MAINT: extend caching and review `Surrogates` API Extend caching according to 93ab748 to - `RecurrenceNetwork` and child classes - `Surrogates` Review API of `Surrogates` to enable caching and reduce redundancy - remove unhashable argument `original_data` in surrogate methods, use respective attribute - make embedding a property as in `RecurrencePlot` - substitute static method `normalize_time_series_array()` with regular method `normalize_original_data()` with same functionality - add attributes `self.N`, `self.n_time` - adapt tests, implement additional tests Resolve and re-enable pylint flag `no-member` --- pyproject.toml | 2 +- .../inter_system_recurrence_network.py | 21 +- .../timeseries/joint_recurrence_network.py | 12 - .../timeseries/recurrence_network.py | 12 - src/pyunicorn/timeseries/surrogates.py | 306 ++++++++---------- tests/test_core/test_cache.py | 3 + tests/test_timeseries/test_timeseries.py | 91 ++++-- 7 files changed, 222 insertions(+), 225 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d6a1102..c46c8ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,7 +47,7 @@ jobs = 0 disable = [ "duplicate-code", "invalid-name", "fixme", "missing-docstring", "no-else-return", - "arguments-differ", "no-member", "no-name-in-module" + "arguments-differ", "no-name-in-module" ] [tool.pylint.format] diff --git a/src/pyunicorn/timeseries/inter_system_recurrence_network.py b/src/pyunicorn/timeseries/inter_system_recurrence_network.py index ee61b9d..6d148a9 100644 --- a/src/pyunicorn/timeseries/inter_system_recurrence_network.py +++ b/src/pyunicorn/timeseries/inter_system_recurrence_network.py @@ -18,6 +18,9 @@ analysis (RQA) and recurrence network analysis. """ +from typing import Tuple +from collections.abc import Hashable + # array object and fast numerics import numpy as np @@ -211,22 +214,8 @@ def __str__(self): # Service methods # - def clear_cache(self): - """ - Clean up memory by deleting information that can be recalculated from - basic data. - - Extends the clean up methods of the parent classes. - """ - # Call clean up of RecurrencePlot objects - self.rp_x.clear_cache() - self.rp_y.clear_cache() - - # Call clean up of CrossRecurrencePlot object - self.crp_xy.clear_cache() - - # Call clean up of InteractingNetworks - InteractingNetworks.clear_cache(self) + def __cache_state__(self) -> Tuple[Hashable, ...]: + return (self.rp_x, self.rp_x, self.crp_xy,) # # Methods to handle inter system recurrence networks diff --git a/src/pyunicorn/timeseries/joint_recurrence_network.py b/src/pyunicorn/timeseries/joint_recurrence_network.py index df461b3..2d8a940 100644 --- a/src/pyunicorn/timeseries/joint_recurrence_network.py +++ b/src/pyunicorn/timeseries/joint_recurrence_network.py @@ -158,18 +158,6 @@ def __str__(self): f"{JointRecurrencePlot.__str__(self)}\n" f"{Network.__str__(self)}") - def clear_cache(self): - """ - Clean up memory by deleting information that can be recalculated from - basic data. - - Extends the clean up methods of the parent classes. - """ - # Call clean up of RecurrencePlot - JointRecurrencePlot.clear_cache(self) - # Call clean up of Network - Network.clear_cache(self) - # # Methods to handle recurrence networks # diff --git a/src/pyunicorn/timeseries/recurrence_network.py b/src/pyunicorn/timeseries/recurrence_network.py index 379207a..9030d68 100644 --- a/src/pyunicorn/timeseries/recurrence_network.py +++ b/src/pyunicorn/timeseries/recurrence_network.py @@ -152,18 +152,6 @@ def __str__(self): f"{RecurrencePlot.__str__(self)}\n" f"{Network.__str__(self)}") - def clear_cache(self): - """ - Clean up memory by deleting information that can be recalculated from - basic data. - - Extends the clean up methods of the parent classes. - """ - # Call clean up of RecurrencePlot - RecurrencePlot.clear_cache(self) - # Call clean up of Network - Network.clear_cache(self) - # # Methods to handle recurrence networks # diff --git a/src/pyunicorn/timeseries/surrogates.py b/src/pyunicorn/timeseries/surrogates.py index e2130de..660790a 100644 --- a/src/pyunicorn/timeseries/surrogates.py +++ b/src/pyunicorn/timeseries/surrogates.py @@ -17,6 +17,9 @@ multivariate data and generating time series surrogates. """ +from typing import Tuple +from collections.abc import Hashable + # array object and fast numerics import numpy as np from numpy import random @@ -24,7 +27,9 @@ # easy progress bar handling from tqdm import trange -from ..core._ext.types import to_cy, ADJ, DEGREE, FIELD, DFIELD +from ..core.cache import Cached + +from ..core._ext.types import to_cy, ADJ, DEGREE, DFIELD from ._ext.numerics import _embed_time_series_array, _recurrence_plot, \ _twins_s, _twin_surrogates_s, _test_pearson_correlation, \ _test_mutual_information @@ -34,7 +39,7 @@ # -class Surrogates: +class Surrogates(Cached): """ Encapsulates structures and methods related to surrogate time series. @@ -72,14 +77,14 @@ def __init__(self, original_data, silence_level=1): self.silence_level = silence_level """(string) - The inverse level of verbosity of the object.""" + (self.N, self.n_time) = self.original_data.shape + + self._mut_embedding: int = 0 + self._embedding = None + """The embedded times series""" + # Set flags self._normalized = False - self._fft_cached = False - self._twins_cached = False - - # Cache - self._twins = None - self._original_data_fft = None def __str__(self): """ @@ -87,13 +92,21 @@ def __str__(self): """ return f"Surrogates: time series shape {self.original_data.shape}." - def clear_cache(self): - """Clean up cache.""" - try: - del self._original_data_fft - del self._twins - except AttributeError: - pass + def __cache_state__(self) -> Tuple[Hashable, ...]: + return (self._mut_embedding,) + + @property + def embedding(self) -> np.ndarray: + """ + The embedded time series / phase space trajectory + (time, embedding dimension). + """ + return self._embedding + + @embedding.setter + def embedding(self, embedding: np.ndarray): + self._embedding = to_cy(embedding, DFIELD) + self._mut_embedding += 1 # # Methods for testing purposes @@ -122,47 +135,15 @@ def SmallTestData(): # @staticmethod - def normalize_time_series_array(time_series_array): - """ - :index:`Normalize ` an array of - time series to zero mean and unit variance individually for each - individual time series. - - **Modifies the given array in place!** - - **Examples:** - - >>> ts = Surrogates.SmallTestData().original_data - >>> Surrogates.SmallTestData().normalize_time_series_array(ts) - >>> r(ts.mean(axis=1)) - array([ 0., 0., 0., 0., 0., 0.]) - >>> r(ts.std(axis=1)) - array([ 1., 1., 1., 1., 1., 1.]) - - :type time_series_array: 2D array [index, time] - :arg time_series_array: The time series array to be normalized. - """ - mean = time_series_array.mean(axis=1) - std = time_series_array.std(axis=1) - - for i in range(time_series_array.shape[0]): - # Remove mean value from time series at each node (grid point) - time_series_array[i, :] -= mean[i] - # Normalize the standard deviation of anomalies to one - if std[i] != 0: - time_series_array[i, :] /= std[i] - - def embed_time_series_array(self, time_series_array, dimension, delay): + def embed_time_series_array(time_series_array, dimension, delay, + silence_level=1): """ Return a :index:`delay embedding` of all time series. - .. note:: - Only works for scalar time series! - **Example:** >>> ts = Surrogates.SmallTestData().original_data - >>> Surrogates.SmallTestData().embed_time_series_array( + >>> Surrogates.embed_time_series_array( ... time_series_array=ts, dimension=3, delay=2)[0,:6,:] array([[ 0. , 0.61464833, 1.14988147], [ 0.31244015, 0.89680225, 1.3660254 ], @@ -178,7 +159,7 @@ def embed_time_series_array(self, time_series_array, dimension, delay): :rtype: 3D array [index, time, dimension] :return: the embedded time series. """ - if self.silence_level <= 1: + if silence_level <= 1: print(f"Embedding all time series in dimension {dimension} " f"and with lag {delay} ...") (N, n_time) = time_series_array.shape @@ -191,23 +172,60 @@ def embed_time_series_array(self, time_series_array, dimension, delay): to_cy(time_series_array, DFIELD), embedding) return embedding - # FIXME: I(wb) included the line - # dimension = embedding.shape[1] - # whose missing caused an error. I can't guarantee if it is correct. - def recurrence_plot(self, embedding, threshold): + def normalize_original_data(self): + """ + :index:`Normalize ` the original + data to zero mean and unit variance individually for each + individual time series. + + **Examples:** + + >>> ts = Surrogates.SmallTestData() + >>> ts.normalize_original_data() + >>> r(ts.original_data.mean(axis=1)) + array([ 0., 0., 0., 0., 0., 0.]) + >>> r(ts.original_data.std(axis=1)) + array([ 1., 1., 1., 1., 1., 1.]) + """ + mean = self.original_data.mean(axis=1) + std = self.original_data.std(axis=1) + + for i in range(self.N): + # Remove mean value from time series at each node (grid point) + self.original_data[i, :] -= mean[i] + # Normalize the standard deviation of anomalies to one + if std[i] != 0: + self.original_data[i, :] /= std[i] + + self._normalized = True + + @staticmethod + def recurrence_plot(embedding, threshold, silence_level=1): """ Return the :index:`recurrence plot ` from an embedding of a time series. Uses supremum norm. + **Example:** + + >>> ts = Surrogates.SmallTestData().original_data + >>> embedding = Surrogates. \ + ... embed_time_series_array(ts, dimension=3, delay=2) + >>> Surrogates.recurrence_plot(embedding[0], threshold=.8)[:5, :5] + array([[1, 1, 0, 0, 0], + [1, 1, 1, 0, 0], + [0, 1, 1, 1, 0], + [0, 0, 1, 1, 1], + [0, 0, 0, 1, 1]], dtype=int8) + :type embedding: 2D array [time, dimension] :arg embedding: The embedded time series. :arg float threshold: The recurrence threshold. :rtype: 2D array [time, time] :return: the recurrence matrix. """ - if self.silence_level <= 1: + if silence_level <= 1: print("Calculating the recurrence plot...") n_time = embedding.shape[0] @@ -215,13 +233,11 @@ def recurrence_plot(self, embedding, threshold): R = np.ones((n_time, n_time), dtype=ADJ) _recurrence_plot(n_time, dimension, threshold, - to_cy(embedding, FIELD), R) + to_cy(embedding, DFIELD), R) return R - # FIXME: I(wb) included the line - # dimension = embedding_array.shape[2] - # whose missing caused an error. I can't guarantee if it is correct. - def twins(self, embedding_array, threshold, min_dist=7): + @Cached.method(name="twins", attrs=("_mut_embedding", "_normalized")) + def twins(self, threshold, min_dist=7): """ Return list of the :index:`twins ` of each state vector for all time series. @@ -242,9 +258,9 @@ def twins(self, embedding_array, threshold, min_dist=7): if self.silence_level <= 1: print("Finding twins...") - N = embedding_array.shape[0] - n_time = embedding_array.shape[1] - dimension = embedding_array.shape[2] + N = self.embedding.shape[0] + n_time = self.embedding.shape[1] + dimension = self.embedding.shape[2] twins = [] # Initialize the R matrix with ones @@ -253,7 +269,7 @@ def twins(self, embedding_array, threshold, min_dist=7): nR = np.empty(n_time, dtype=DEGREE) _twins_s(N, n_time, dimension, threshold, min_dist, - to_cy(embedding_array, DFIELD), R, nR, twins) + to_cy(self.embedding, DFIELD), R, nR, twins) return twins @@ -261,7 +277,17 @@ def twins(self, embedding_array, threshold, min_dist=7): # Define methods to generate sets of surrogate time series # - def white_noise_surrogates(self, original_data): + @Cached.method(name="original data fft", attrs=("_normalized",)) + def original_data_fft(self): + """ + Return one-dimensional discrete Fourier Transform via numpy.fft.rfft() + + :rtype: 2D array [index, frequency] + :return: The original time series' FFT. + """ + return np.fft.rfft(self.original_data, axis=1) + + def white_noise_surrogates(self): """ Return a shuffled copy of a time series array. @@ -274,13 +300,11 @@ def white_noise_surrogates(self, original_data): >>> ts = Surrogates.SmallTestData().original_data >>> surrogates = Surrogates.\ - SmallTestData().white_noise_surrogates(ts) + ... SmallTestData().white_noise_surrogates() >>> np.allclose(np.histogram(ts[0,:])[0], ... np.histogram(surrogates[0,:])[0]) True - :type original_data: 2D array [index, time] - :arg original_data: The original time series. :rtype: 2D array [index, time] :return: The surrogate time series. """ @@ -290,14 +314,14 @@ def white_noise_surrogates(self, original_data): # Generate reference to shuffle function shuffle = random.shuffle - surrogates = original_data.copy() + surrogates = self.original_data.copy() for i in range(surrogates.shape[0]): shuffle(surrogates[i, :]) return surrogates - def correlated_noise_surrogates(self, original_data): + def correlated_noise_surrogates(self): """ Return Fourier surrogates. @@ -307,11 +331,6 @@ def correlated_noise_surrogates(self, original_data): share their power spectrum and autocorrelation function with the original_data time series. - The Fast Fourier transforms of all time series are cached to facilitate - a faster generation of several surrogates for each time series. Hence, - :meth:`clear_cache` has to be called before generating surrogates from - a different set of time series! - .. note:: The amplitudes are not adjusted here, i.e., the individual amplitude distributions are not conserved! @@ -320,11 +339,12 @@ def correlated_noise_surrogates(self, original_data): The power spectrum is conserved up to small numerical deviations: - >>> ts = Surrogates.SmallTestData().original_data - >>> surrogates = Surrogates.\ - SmallTestData().correlated_noise_surrogates(ts) - >>> all(r(np.abs(np.fft.fft(ts, axis=1))[0,1:10]) == \ - r(np.abs(np.fft.fft(surrogates, axis=1))[0,1:10])) + >>> ts = Surrogates.SmallTestData() + >>> surrogates = ts.correlated_noise_surrogates() + >>> all(np.abs(np.fft.fft( + ... ts.original_data, axis=1))[0,1:10]).round(4) == + ... np.abs(np.fft.fft( + ... surrogates, axis=1))[0,1:10]).round(4)) True However, the time series amplitude distributions differ: @@ -341,71 +361,62 @@ def correlated_noise_surrogates(self, original_data): print("Generating correlated noise surrogates...") # Calculate FFT of original_data time series - # The FFT of the original_data data has to be calculated only once, - # so it is stored in self._original_data_fft. - if self._fft_cached: - surrogates = self._original_data_fft - else: - surrogates = np.fft.rfft(original_data, axis=1) - self._original_data_fft = surrogates - self._fft_cached = True + surrogates = self.original_data_fft() # Get shapes - (N, n_time) = original_data.shape len_phase = surrogates.shape[1] # Generate random phases uniformly distributed in the # interval [0, 2*Pi] - phases = random.uniform(low=0, high=2 * np.pi, size=(N, len_phase)) + phases = random.uniform( + low=0, high=2 * np.pi, size=(self.N, len_phase)) # Add random phases uniformly distributed in the interval [0, 2*Pi] surrogates *= np.exp(1j * phases) # Calculate IFFT and take the real part, the remaining imaginary part # is due to numerical errors. - return np.ascontiguousarray(np.real(np.fft.irfft(surrogates, n=n_time, - axis=1))) + return np.ascontiguousarray( + np.real(np.fft.irfft(surrogates, n=self.n_time, axis=1))) - def AAFT_surrogates(self, original_data): + def AAFT_surrogates(self): """ Return surrogates using the amplitude adjusted Fourier transform method. Reference: [Schreiber2000]_ - :type original_data: 2D array [index, time] - :arg original_data: The original time series. :rtype: 2D array [index, time] :return: The surrogate time series. """ # Create sorted Gaussian reference series - gaussian = random.randn(original_data.shape[0], original_data.shape[1]) + gaussian = random.randn(self.N, self.n_time) gaussian.sort(axis=1) # Rescale data to Gaussian distribution - ranks = original_data.argsort(axis=1).argsort(axis=1) - rescaled_data = np.zeros(original_data.shape) + ranks = self.original_data.argsort(axis=1).argsort(axis=1) + rescaled_data = np.zeros((self.N, self.n_time)) - for i in range(original_data.shape[0]): + for i in range(self.N): rescaled_data[i, :] = gaussian[i, ranks[i, :]] # Phase randomize rescaled data - phase_randomized_data = \ - self.correlated_noise_surrogates(rescaled_data) + phase_randomized_data = Surrogates( + original_data=rescaled_data, silence_level=2 + ).correlated_noise_surrogates() # Rescale back to amplitude distribution of original data - sorted_original = original_data.copy() + sorted_original = self.original_data.copy() sorted_original.sort(axis=1) ranks = phase_randomized_data.argsort(axis=1).argsort(axis=1) - for i in range(original_data.shape[0]): + for i in range(self.N): rescaled_data[i, :] = sorted_original[i, ranks[i, :]] return rescaled_data - def refined_AAFT_surrogates(self, original_data, n_iterations, - output="true_amplitudes"): + def refined_AAFT_surrogates(self, n_iterations, output="true_amplitudes"): """ Return surrogates using the iteratively refined amplitude adjusted Fourier transform method. @@ -416,8 +427,6 @@ def refined_AAFT_surrogates(self, original_data, n_iterations, Reference: [Schreiber2000]_ - :type original_data: 2D array [index, time] - :arg original_data: The original time series. :arg int n_iterations: Number of iterations / refinement steps :arg str output: Type of surrogate to return. "true_amplitudes": surrogates with correct amplitude distribution, "true_spectrum": @@ -426,27 +435,19 @@ def refined_AAFT_surrogates(self, original_data, n_iterations, :rtype: 2D array [index, time] :return: The surrogate time series. """ - # Get size of dimensions - n_time = original_data.shape[1] - - # Get Fourier transform of original data with caching - if self._fft_cached: - fourier_transform = self._original_data_fft - else: - fourier_transform = np.fft.rfft(original_data, axis=1) - self._original_data_fft = fourier_transform - self._fft_cached = True + # Get Fourier transform of original data + fourier_transform = self.original_data_fft() # Get Fourier amplitudes original_fourier_amps = np.abs(fourier_transform) # Get sorted copy of original data - sorted_original = original_data.copy() + sorted_original = self.original_data.copy() sorted_original.sort(axis=1) # Get starting point / initial conditions for R surrogates # (see [Schreiber2000]_) - R = self.AAFT_surrogates(original_data) + R = self.AAFT_surrogates() # Start iteration for _ in range(n_iterations): @@ -456,13 +457,13 @@ def refined_AAFT_surrogates(self, original_data, n_iterations, # Transform back, replacing the actual amplitudes by the desired # ones, but keeping the phases exp(iψ(i) - s = np.fft.irfft(original_fourier_amps * r_phases, n=n_time, - axis=1) + s = np.fft.irfft(original_fourier_amps * r_phases, + n=self.n_time, axis=1) # Rescale to desired amplitude distribution ranks = s.argsort(axis=1).argsort(axis=1) - for j in range(original_data.shape[0]): + for j in range(self.N): R[j, :] = sorted_original[j, ranks[j, :]] if output == "true_amplitudes": @@ -474,8 +475,7 @@ def refined_AAFT_surrogates(self, original_data, n_iterations, else: return (R, s) - def twin_surrogates(self, original_data, dimension, delay, threshold, - min_dist=7): + def twin_surrogates(self, dimension, delay, threshold, min_dist=7): """ Return surrogates using the twin surrogate method. @@ -488,13 +488,6 @@ def twin_surrogates(self, original_data, dimension, delay, threshold, References: [Thiel2006]_ [*], [Marwan2007]_. - The twin lists of all time series are cached to facilitate a faster - generation of several surrogates for each time series. Hence, - :meth:`clear_cache` has to be called before generating twin surrogates - from a different set of time series! - - :type original_data: 2D array [index, time] - :arg original_data: The original time series. :arg int dimension: The embedding dimension. :arg int delay: The embedding delay. :arg float threshold: The recurrence threshold. @@ -511,21 +504,14 @@ def twin_surrogates(self, original_data, dimension, delay, threshold, # 2. Use the algorithm proposed in [*] to find twins # 3. Reconstruct one-dimensional twin surrogate time series - (N, n_time) = original_data.shape - n_time = n_time - (dimension-1)*delay + n_time = self.n_time - (dimension-1)*delay - # Make sure that twins are calculated only once - if self._twins_cached: - twins = self._twins - else: - embedding = self.embed_time_series_array(original_data, - dimension, delay) - twins = self.twins(embedding, threshold, min_dist) - self._twins = twins - self._twins_cached = True + self.embedding = \ + self.embed_time_series_array(self.original_data, dimension, delay) + twins = self.twins(threshold, min_dist) - return _twin_surrogates_s(N, n_time, twins, - to_cy(original_data, DFIELD)) + return _twin_surrogates_s(self.N, n_time, twins, + to_cy(self.original_data, DFIELD)) # # Defines methods to generate correlation measure matrices based on @@ -621,7 +607,7 @@ def test_mutual_information(original_data, surrogates, n_bins=32): # based on surrogates. # - def original_distribution(self, test_function, original_data, n_bins=100): + def original_distribution(self, test_function, n_bins=100): """ Return a normalized histogram of a similarity measure matrix. @@ -630,8 +616,6 @@ def original_distribution(self, test_function, original_data, n_bins=100): :type test_function: Python function :arg test_function: The function implementing the similarity measure. - :type original_data: 2D array [index, time] - :arg original_data: The original time series. :arg int n_bins: The number of bins for estimating prob. distributions. :rtype: tuple of 1D arrays ([bins],[bins]) :return: the similarity measure histogram and lower bin boundaries. @@ -642,11 +626,10 @@ def original_distribution(self, test_function, original_data, n_bins=100): # Normalize original_data time series to zero mean and unit variance if not self._normalized: - self.normalize_time_series_array(original_data) - self._normalized = True + self.normalize_original_data() - correlation_measure = np.abs(test_function(original_data, - original_data)) + correlation_measure = np.abs(test_function(self.original_data, + self.original_data)) (hist, lbb) = np.histogram(correlation_measure, n_bins, density=True) # Normalize hist /= hist.sum() @@ -689,17 +672,12 @@ def test_threshold_significance(self, surrogate_function, test_function, print(f"Starting significance test based on {realizations} " "realizations of surrogates...") - original_data = self.original_data - self._fft_cached = False - self._twins_cached = False - - # Create reference to np.histogram function - numpy_hist = np.histogram + self.original_data_fft.cache_clear() + self.twins.cache_clear() # Normalize original_data time series to zero mean and unit variance if not self._normalized: - self.normalize_time_series_array(original_data) - self._normalized = True + self.normalize_original_data() # Initialize density estimate density_estimate = np.zeros(n_bins) @@ -707,10 +685,10 @@ def test_threshold_significance(self, surrogate_function, test_function, for _ in trange(realizations, disable=self.silence_level > 2): # Get the surrogate # Mean and variance are conserved by all surrogates - surrogates = surrogate_function(original_data) + surrogates = surrogate_function(self) # Get the correlation measure test matrix - correlation_measure_test = np.abs(test_function(original_data, + correlation_measure_test = np.abs(test_function(self.original_data, surrogates)) # Test if correlation measure values are outside range @@ -720,8 +698,8 @@ def test_threshold_significance(self, surrogate_function, test_function, print("Warning! Correlation measure value right of range.") # Estimate density of current realization - (hist, lbb) = numpy_hist(correlation_measure_test, n_bins, - interval, normed=True) + (hist, lbb) = np.histogram(correlation_measure_test, n_bins, + interval, density=True) # Add to density estimate over all realizations density_estimate += hist diff --git a/tests/test_core/test_cache.py b/tests/test_core/test_cache.py index 533972e..fc03821 100644 --- a/tests/test_core/test_cache.py +++ b/tests/test_core/test_cache.py @@ -73,6 +73,7 @@ def test_args(cls, capfd: pytest.CaptureFixture): r1, o1 = [], [] for _ in range(m + 1): r1.append(X.foo1(1)) + # pylint: disable-next=no-member o1.append(cls.Foo.foo1.__wrapped__(X, 1)) r1.append(X.foo1(1.0)) assert all(r == r1[0] for r in r1) @@ -81,6 +82,7 @@ def test_args(cls, capfd: pytest.CaptureFixture): r2, o2 = [], [] for i in range(2 * m): r2.append(X.foo2(*range(i))) + # pylint: disable-next=no-member o2.append(cls.Foo.foo2.__wrapped__(X, *range(i))) assert (np.diff(r2) == range(2 * m - 1)).all() assert all(r == o for r, o in zip(r2, o2)) @@ -88,6 +90,7 @@ def test_args(cls, capfd: pytest.CaptureFixture): r3, o3 = [], [] for i in range(5): r3.append(X.bar()) + # pylint: disable-next=no-member o3.append(cls.Foo.bar.__wrapped__(X)) assert all(r and isinstance(r, bool) for r in r3) assert all(r == o for r, o in zip(r3, o3)) diff --git a/tests/test_timeseries/test_timeseries.py b/tests/test_timeseries/test_timeseries.py index ef92247..66551f3 100644 --- a/tests/test_timeseries/test_timeseries.py +++ b/tests/test_timeseries/test_timeseries.py @@ -20,7 +20,7 @@ import numpy as np from numpy.testing import assert_array_almost_equal -from pyunicorn.timeseries import CrossRecurrencePlot, \ +from pyunicorn.timeseries import RecurrencePlot, CrossRecurrencePlot, \ RecurrenceNetwork, JointRecurrenceNetwork, InterSystemRecurrenceNetwork, \ Surrogates, VisibilityGraph from pyunicorn.core.data import Data @@ -169,22 +169,24 @@ def testInterSystemRecurrenceNetwork(thresh, rr, metric: str): # ----------------------------------------------------------------------------- -def testNormalizeTimeSeriesArray(): - ts = Surrogates.SmallTestData().original_data - Surrogates.SmallTestData().normalize_time_series_array(ts) - res = ts.mean(axis=1) +def testNormalizeOriginalData(): + ts = Surrogates.SmallTestData() + ts.normalize_original_data() + + res = ts.original_data.mean(axis=1) exp = np.array([0., 0., 0., 0., 0., 0.]) assert np.allclose(res, exp, atol=1e-04) - res = ts.std(axis=1) + res = ts.original_data.std(axis=1) exp = np.array([1., 1., 1., 1., 1., 1.]) assert np.allclose(res, exp, atol=1e-04) def testEmbedTimeSeriesArray(): - ts = Surrogates.SmallTestData().original_data - res = Surrogates.SmallTestData().embed_time_series_array( - time_series_array=ts, dimension=3, delay=2)[0, :6, :] + ts = Surrogates.SmallTestData() + res = Surrogates.embed_time_series_array( + time_series_array=ts.original_data, + dimension=3, delay=2)[0, :6, :] exp = np.array([[0., 0.61464833, 1.14988147], [0.31244015, 0.89680225, 1.3660254], [0.61464833, 1.14988147, 1.53884177], @@ -194,32 +196,58 @@ def testEmbedTimeSeriesArray(): assert np.allclose(res, exp, atol=1e-04) +def testSurrogatesRecurrencePlot(): + thresh = .2 + dim = 3 + tau = 2 + # calculate Surrogates.recurrence_plot() + ts = Surrogates.SmallTestData() + embedding = Surrogates.\ + embed_time_series_array(ts.original_data, dimension=dim, delay=tau) + rp1 = Surrogates.recurrence_plot(embedding[0], threshold=thresh) + # compare to timeseries.RecurrencePlot + rp2 = RecurrencePlot( + ts.original_data[0], threshold=thresh, dim=dim, tau=tau).R + assert np.array_equal(rp1, rp2) + + def testWhiteNoiseSurrogates(): - ts = Surrogates.SmallTestData().original_data - surrogates = Surrogates.SmallTestData().white_noise_surrogates(ts) + ts = Surrogates.SmallTestData() + surrogates = ts.white_noise_surrogates() - assert np.allclose(np.histogram(ts[0, :])[0], + assert np.allclose(np.histogram(ts.original_data[0, :])[0], np.histogram(surrogates[0, :])[0]) def testCorrelatedNoiseSurrogates(): - ts = Surrogates.SmallTestData().original_data - surrogates = Surrogates.SmallTestData().correlated_noise_surrogates(ts) - assert np.allclose(np.abs(np.fft.fft(ts, axis=1))[0, 1:10], + ts = Surrogates.SmallTestData() + surrogates = ts.correlated_noise_surrogates() + assert np.allclose(np.abs(np.fft.fft(ts.original_data, axis=1))[0, 1:10], np.abs(np.fft.fft(surrogates, axis=1))[0, 1:10]) def testTwinSurrogates(): tdata = create_test_data() - n_index = tdata.shape[0] - s = Surrogates(tdata) - tsurro = s.twin_surrogates(tdata, 1, 0, 0.2) - corrcoef = np.corrcoef(tdata, tsurro)[n_index:, :n_index] - for i in range(n_index): + ts = Surrogates(tdata) + tsurro = ts.twin_surrogates(1, 0, 0.2) + corrcoef = np.corrcoef(tdata, tsurro)[ts.N:, :ts.N] + for i in range(ts.N): corrcoef[i, i] = 0.0 assert (corrcoef >= -1.0).all() and (corrcoef <= 1.0).all() +def testAAFTSurrogates(): + ts = Surrogates.SmallTestData() + # also covers Surrogates.AAFT_surrogates(), which is used as starting point + surr_R, surr_s = ts.refined_AAFT_surrogates(n_iterations=3, output="both") + # assert conserved amplitude distribution + assert all(np.histogram(ts.original_data[0, :])[0] == + np.histogram(surr_R[0, :])[0]) + # assert conserved power spectrum + assert np.allclose(np.abs(np.fft.fft(ts.original_data, axis=1))[0, 1:10], + np.abs(np.fft.fft(surr_s, axis=1))[0, 1:10]) + + def testPearsonCorrelation(): tdata = create_test_data() n_index, n_times = tdata.shape @@ -239,6 +267,29 @@ def testMutualInformation(): n_bins=32) assert (test_mi >= -1.0).all() and (test_mi <= 1.0).all() + +def testOriginalDistribution(): + nbins = 10 + ts = Surrogates.SmallTestData() + hist, lbb = ts.original_distribution( + Surrogates.test_mutual_information, n_bins=nbins) + + assert np.isclose(hist.sum(), 1) and len(lbb) == nbins + + +def testThresholdSignificance(): + nbins = 10 + ts = Surrogates.SmallTestData() + density_estimate, lbb = ts.test_threshold_significance( + Surrogates.white_noise_surrogates, + Surrogates.test_mutual_information, + realizations=5, + interval=[0, 2], + n_bins=nbins) + + assert np.isclose(density_estimate.sum(), 1) and len(lbb) == nbins + + # ----------------------------------------------------------------------------- # visibility_graph # -----------------------------------------------------------------------------