diff --git a/news/lowercasing.rst b/news/lowercasing.rst new file mode 100644 index 00000000..d23c8549 --- /dev/null +++ b/news/lowercasing.rst @@ -0,0 +1,23 @@ +**Added:** + +* + +**Changed:** + +* + +**Deprecated:** + +* + +**Removed:** + +* + +**Fixed:** + +* Conform variable names to PEP-8 + +**Security:** + +* diff --git a/src/diffpy/snmf/main.py b/src/diffpy/snmf/main.py index e38b5ab9..3378d8d0 100644 --- a/src/diffpy/snmf/main.py +++ b/src/diffpy/snmf/main.py @@ -1,14 +1,20 @@ import numpy as np from snmf_class import SNMFOptimizer -X0 = np.loadtxt("input/X0.txt", dtype=float) -MM = np.loadtxt("input/MM.txt", dtype=float) -A0 = np.loadtxt("input/A0.txt", dtype=float) -Y0 = np.loadtxt("input/W0.txt", dtype=float) -N, M = MM.shape +# Example input files (not provided) +init_components_file = np.loadtxt("input/init_components.txt", dtype=float) +source_matrix_file = np.loadtxt("input/source_matrix.txt", dtype=float) +init_stretch_file = np.loadtxt("input/init_stretch.txt", dtype=float) +init_weights_file = np.loadtxt("input/init_weights.txt", dtype=float) + +my_model = SNMFOptimizer( + source_matrix=source_matrix_file, + init_weights=init_weights_file, + init_components=init_components_file, + init_stretch=init_stretch_file, +) -my_model = SNMFOptimizer(MM=MM, Y0=Y0, X0=X0, A0=A0) print("Done") -np.savetxt("my_norm_X.txt", my_model.X, fmt="%.6g", delimiter=" ") -np.savetxt("my_norm_Y.txt", my_model.Y, fmt="%.6g", delimiter=" ") -np.savetxt("my_norm_A.txt", my_model.A, fmt="%.6g", delimiter=" ") +np.savetxt("my_norm_components.txt", my_model.components, fmt="%.6g", delimiter=" ") +np.savetxt("my_norm_weights.txt", my_model.weights, fmt="%.6g", delimiter=" ") +np.savetxt("my_norm_stretch.txt", my_model.stretch, fmt="%.6g", delimiter=" ") diff --git a/src/diffpy/snmf/snmf_class.py b/src/diffpy/snmf/snmf_class.py index 950da7cd..63bfe9a0 100644 --- a/src/diffpy/snmf/snmf_class.py +++ b/src/diffpy/snmf/snmf_class.py @@ -4,11 +4,11 @@ class SNMFOptimizer: - """A implementation of stretched NMF (sNMF), including sparse stretched NMF. + """An implementation of stretched NMF (sNMF), including sparse stretched NMF. Instantiating the SNMFOptimizer class runs all the analysis immediately. The results matrices can then be accessed as instance attributes - of the class (X, Y, and A). + of the class (components, weights, and stretch). For more information on sNMF, please reference: Gu, R., Rakita, Y., Lan, L. et al. Stretched non-negative matrix factorization. @@ -16,45 +16,45 @@ class SNMFOptimizer: Attributes ---------- - MM : ndarray + source_matrix : ndarray The original, unmodified data to be decomposed and later, compared against. - Shape is (length_of_signal, number_of_conditions). - Y : ndarray + Shape is (length_of_signal, number_of_signals). + stretch : ndarray The best guess (or while running, the current guess) for the stretching factor matrix. - X : ndarray + components : ndarray The best guess (or while running, the current guess) for the matrix of component intensities. - A : ndarray + weights : ndarray The best guess (or while running, the current guess) for the matrix of component weights. rho : float - The stretching factor that influences the decomposition. Zero corresponds to no - stretching present. Relatively insensitive and typically adjusted in powers of 10. + The stretching factor that influences the decomposition. Zero corresponds to no + stretching present. Relatively insensitive and typically adjusted in powers of 10. eta : float The sparsity factor that influences the decomposition. Should be set to zero for non-sparse data such as PDF. Can be used to improve results for sparse data such as XRD, but due to instability, should be used only after first selecting the best value for rho. Suggested adjustment is by powers of 2. max_iter : int - The maximum number of times to update each of A, X, and Y before stopping - the optimization. + The maximum number of times to update each of stretch, components, and weights before stopping + the optimization. tol : float The convergence threshold. This is the minimum fractional improvement in the objective function to allow without terminating the optimization. Note that a minimum of 20 updates are run before this parameter is checked. n_components : int - The number of components to extract from MM. Must be provided when and only when + The number of components to extract from source_matrix. Must be provided when and only when Y0 is not provided. random_state : int - The seed for the initial guesses at the matrices (A, X, and Y) created by + The seed for the initial guesses at the matrices (stretch, components, and weights) created by the decomposition. num_updates : int - The total number of times that any of (A, X, and Y) have had their values changed. + The total number of times that any of (stretch, components, and weights) have had their values changed. If not terminated by other means, this value is used to stop when reaching max_iter. objective_function: float - The value corresponding to the minimization of the difference between the MM and the - products of A, X, and Y. For full details see the sNMF paper. Smaller corresponds to + The value corresponding to the minimization of the difference between the source_matrix and the + products of (stretch, components, and weights). For full details see the sNMF paper. Smaller corresponds to better agreement and is desirable. objective_difference : float The change in the objective function value since the last update. A negative value @@ -63,12 +63,12 @@ class SNMFOptimizer: def __init__( self, - MM, - Y0=None, - X0=None, - A0=None, - rho=1e12, - eta=610, + source_matrix, + init_weights=None, + init_components=None, + init_stretch=None, + rho=0, + eta=0, max_iter=500, tol=5e-7, n_components=None, @@ -78,94 +78,104 @@ def __init__( Parameters ---------- - MM : ndarray + source_matrix : ndarray The data to be decomposed. Shape is (length_of_signal, number_of_conditions). - Y0 : ndarray + init_weights : ndarray Optional Default = rng.beta(a=2.5, b=1.5, size=(n_components, n_signals)) The initial guesses for the component weights at each stretching condition. - Shape is (number_of_components, number_of_conditions) Must provide exactly one + Shape is (number_of_components, number_of_signals) Must provide exactly one of this or n_components. - X0 : ndarray + init_components : ndarray Optional Default = rng.random((self.signal_length, self.n_components)) The initial guesses for the intensities of each component per row/sample/angle. Shape is (length_of_signal, number_of_components). - A0 : ndarray + init_stretch : ndarray Optional Default = np.ones((self.n_components, self.n_signals)) + self._rng.normal( + 0, 1e-3, size=(self.n_components, self.n_signals) The initial guesses for the stretching factor for each component, at each - condition. Shape is (number_of_components, number_of_conditions). - rho : float + condition (for each signal). Shape is (number_of_components, number_of_signals). + rho : float Optional Default = 0 The stretching factor that influences the decomposition. Zero corresponds to no stretching present. Relatively insensitive and typically adjusted in powers of 10. - eta : float + eta : int Optional Default = 0 The sparsity factor that influences the decomposition. Should be set to zero for non-sparse data such as PDF. Can be used to improve results for sparse data such as XRD, but due to instability, should be used only after first selecting the best value for rho. Suggested adjustment is by powers of 2. - max_iter : int + max_iter : int Optional Default = 500 The maximum number of times to update each of A, X, and Y before stopping the optimization. - tol : float + tol : float Optional Default = 5e-7 The convergence threshold. This is the minimum fractional improvement in the objective function to allow without terminating the optimization. Note that a minimum of 20 updates are run before this parameter is checked. - n_components : int - The number of components to extract from MM. Must be provided when and only when + n_components : int Optional Default = None + The number of components to extract from source_matrix. Must be provided when and only when Y0 is not provided. - random_state : int + random_state : int Optional Default = None The seed for the initial guesses at the matrices (A, X, and Y) created by the decomposition. """ - self.MM = MM + self.source_matrix = source_matrix self.rho = rho self.eta = eta # Capture matrix dimensions - self._N, self._M = MM.shape + self.signal_length, self.n_signals = source_matrix.shape self.num_updates = 0 self._rng = np.random.default_rng(random_state) # Enforce exclusive specification of n_components or Y0 - if (n_components is None) == (Y0 is not None): - raise ValueError("Must provide exactly one of Y0 or n_components, but not both.") + if (n_components is None and init_weights is None) or ( + n_components is not None and init_weights is not None + ): + raise ValueError( + "Conflicting source for n_components. Must provide either init_weights or n_components " + "directly, but not both." + ) - # Initialize Y0 and determine number of components - if Y0 is None: - self._K = n_components - self.Y = self._rng.beta(a=2.5, b=1.5, size=(self._K, self._M)) + # Initialize weights and determine number of components + if init_weights is None: + self.n_components = n_components + self.weights = self._rng.beta(a=2.5, b=1.5, size=(self.n_components, self.n_signals)) else: - self._K = Y0.shape[0] - self.Y = Y0 + self.n_components = init_weights.shape[0] + self.weights = init_weights - # Initialize A if not provided - if self.A is None: - self.A = np.ones((self._K, self._M)) + self._rng.normal(0, 1e-3, size=(self._K, self._M)) + # Initialize stretching matrix if not provided + if init_stretch is None: + self.stretch = np.ones((self.n_components, self.n_signals)) + self._rng.normal( + 0, 1e-3, size=(self.n_components, self.n_signals) + ) else: - self.A = A0 + self.stretch = init_stretch - # Initialize X0 if not provided - if self.X is None: - self.X = self._rng.random((self._N, self._K)) + # Initialize component matrix if not provided + if init_components is None: + self.components = self._rng.random((self.signal_length, self.n_components)) else: - self.X = X0 + self.components = init_components - # Enforce non-negativity - self.X = np.maximum(0, self.X) - self.Y = np.maximum(0, self.Y) + # Enforce non-negativity in our initial guesses + self.components = np.maximum(0, self.components) + self.weights = np.maximum(0, self.weights) # Second-order spline: Tridiagonal (-2 on diagonal, 1 on sub/superdiagonals) - self.P = 0.25 * diags([1, -2, 1], offsets=[0, 1, 2], shape=(self._M - 2, self._M)) - self.PP = self.P.T @ self.P + self._spline_smooth_operator = 0.25 * diags( + [1, -2, 1], offsets=[0, 1, 2], shape=(self.n_signals - 2, self.n_signals) + ) + self._spline_smooth_penalty = self._spline_smooth_operator.T @ self._spline_smooth_operator # Set up residual matrix, objective function, and history - self.R = self.get_residual_matrix() + self.residuals = self.get_residual_matrix() self.objective_function = self.get_objective_function() self.objective_difference = None self._objective_history = [self.objective_function] # Set up tracking variables for updateX() - self._preX = None - self._GraX = np.zeros_like(self.X) # Gradient of X (zeros for now) - self._preGraX = np.zeros_like(self.X) # Previous gradient of X (zeros for now) + self._prev_components = None + self.grad_components = np.zeros_like(self.components) # Gradient of X (zeros for now) + self._prev_grad_components = np.zeros_like(self.components) # Previous gradient of X (zeros for now) - regularization_term = 0.5 * rho * np.linalg.norm(self.P @ self.A.T, "fro") ** 2 - sparsity_term = eta * np.sum(np.sqrt(self.X)) # Square root penalty + regularization_term = 0.5 * rho * np.linalg.norm(self._spline_smooth_operator @ self.stretch.T, "fro") ** 2 + sparsity_term = eta * np.sum(np.sqrt(self.components)) # Square root penalty print( f"Start, Objective function: {self.objective_function:.5e}" f", Obj - reg/sparse: {self.objective_function - regularization_term - sparsity_term:.5e}" @@ -175,8 +185,10 @@ def __init__( for iter in range(max_iter): self.optimize_loop() # Print diagnostics - regularization_term = 0.5 * rho * np.linalg.norm(self.P @ self.A.T, "fro") ** 2 - sparsity_term = eta * np.sum(np.sqrt(self.X)) # Square root penalty + regularization_term = ( + 0.5 * rho * np.linalg.norm(self._spline_smooth_operator @ self.stretch.T, "fro") ** 2 + ) + sparsity_term = eta * np.sum(np.sqrt(self.components)) # Square root penalty print( f"Num_updates: {self.num_updates}, " f"Obj fun: {self.objective_function:.5e}, " @@ -184,29 +196,30 @@ def __init__( f"Iter: {iter}" ) - # Convergence check: Stop if diffun is small and at least 20 iterations have passed + # Convergence check: decide when to terminate for small/no improvement print(self.objective_difference, " < ", self.objective_function * tol) if self.objective_difference < self.objective_function * tol and iter >= 20: break # Normalize our results - Y_row_max = np.max(self.Y, axis=1, keepdims=True) - self.Y = self.Y / Y_row_max - A_row_max = np.max(self.A, axis=1, keepdims=True) - self.A = self.A / A_row_max - # loop to normalize X - # effectively just re-running class with non-normalized X, normalized Y/A as inputs, then only update X - # reset difference trackers and initialize - self._preX = None - self._GraX = np.zeros_like(self.X) # Gradient of X (zeros for now) - self._preGraX = np.zeros_like(self.X) # Previous gradient of X (zeros for now) - self.R = self.get_residual_matrix() + weights_row_max = np.max(self.weights, axis=1, keepdims=True) + stretch_row_max = np.max(self.stretch, axis=1, keepdims=True) + self.weights = self.weights / weights_row_max + self.stretch = self.stretch / stretch_row_max + + # loop to normalize components + # effectively just re-running class with non-normalized components, normalized wts/stretch as inputs, + # then only update components + self._prev_components = None + self.grad_components = np.zeros_like(self.components) + self._prev_grad_components = np.zeros_like(self.components) + self.residuals = self.get_residual_matrix() self.objective_function = self.get_objective_function() self.objective_difference = None self._objective_history = [self.objective_function] for norm_iter in range(100): - self.updateX() - self.R = self.get_residual_matrix() + self.update_components() + self.residuals = self.get_residual_matrix() self.objective_function = self.get_objective_function() print(f"Objective function after normX: {self.objective_function:.5e}") self._objective_history.append(self.objective_function) @@ -218,246 +231,264 @@ def __init__( print("Finished optimization.") def optimize_loop(self): - self._preGraX = self._GraX.copy() - self.updateX() + # Update components first + self._prev_grad_components = self.grad_components.copy() + self.update_components() self.num_updates += 1 - self.R = self.get_residual_matrix() + self.residuals = self.get_residual_matrix() self.objective_function = self.get_objective_function() - print(f"Objective function after updateX: {self.objective_function:.5e}") + print(f"Objective function after update_components: {self.objective_function:.5e}") self._objective_history.append(self.objective_function) if self.objective_difference is None: self.objective_difference = self._objective_history[-1] - self.objective_function - # Now we update Y - self.updateY2() + # Now we update weights + self.update_weights() self.num_updates += 1 - self.R = self.get_residual_matrix() + self.residuals = self.get_residual_matrix() self.objective_function = self.get_objective_function() - print(f"Objective function after updateY2: {self.objective_function:.5e}") + print(f"Objective function after update_weights: {self.objective_function:.5e}") self._objective_history.append(self.objective_function) - self.updateA2() - + # Now we update stretch + self.update_stretch() self.num_updates += 1 - self.R = self.get_residual_matrix() + self.residuals = self.get_residual_matrix() self.objective_function = self.get_objective_function() - print(f"Objective function after updateA2: {self.objective_function:.5e}") + print(f"Objective function after update_stretch: {self.objective_function:.5e}") self._objective_history.append(self.objective_function) self.objective_difference = self._objective_history[-2] - self._objective_history[-1] def apply_interpolation(self, a, x, return_derivatives=False): """ Applies an interpolation-based transformation to `x` based on scaling `a`. - Also can compute first (`Tx`) and second (`Hx`) derivatives. + Also can compute first (`d_intr_x`) and second (`dd_intr_x`) derivatives. """ - N = len(x) + x_len = len(x) # Ensure `a` is an array and reshape for broadcasting a = np.atleast_1d(np.asarray(a)) # Ensures a is at least 1D # Compute fractional indices, broadcasting over `a` - ii = np.arange(N)[:, None] / a # Shape (N, M) + fractional_indices = np.arange(x_len)[:, None] / a # Shape (N, M) - II = np.floor(ii).astype(int) # Integer part (still (N, M)) - valid_mask = II < (N - 1) # Ensure indices are within bounds + integer_indices = np.floor(fractional_indices).astype(int) # Integer part (still (N, M)) + valid_mask = integer_indices < (x_len - 1) # Ensure indices are within bounds # Apply valid_mask to keep correct indices - idx_int = np.where(valid_mask, II, N - 2) # Prevent out-of-bounds indexing (previously "I") - idx_frac = np.where(valid_mask, ii, II) # Keep aligned (previously "i") + idx_int = np.where( + valid_mask, integer_indices, x_len - 2 + ) # Prevent out-of-bounds indexing (previously "I") + idx_frac = np.where(valid_mask, fractional_indices, integer_indices) # Keep aligned (previously "i") # Ensure x is a 1D array x = np.asarray(x).ravel() - # Compute Ax (linear interpolation) - Ax = x[idx_int] * (1 - idx_frac + idx_int) + x[np.minimum(idx_int + 1, N - 1)] * (idx_frac - idx_int) + # Compute interpolated_x (linear interpolation) + interpolated_x = x[idx_int] * (1 - idx_frac + idx_int) + x[np.minimum(idx_int + 1, x_len - 1)] * ( + idx_frac - idx_int + ) # Fill the tail with the last valid value - Ax_tail = np.full((N - len(idx_int), Ax.shape[1]), Ax[-1, :]) - Ax = np.vstack([Ax, Ax_tail]) + intr_x_tail = np.full((x_len - len(idx_int), interpolated_x.shape[1]), interpolated_x[-1, :]) + interpolated_x = np.vstack([interpolated_x, intr_x_tail]) if return_derivatives: - # Compute first derivative (Tx) + # Compute first derivative (d_intr_x) di = -idx_frac / a - Tx = x[idx_int] * (-di) + x[np.minimum(idx_int + 1, N - 1)] * di - Tx = np.vstack([Tx, np.zeros((N - len(idx_int), Tx.shape[1]))]) + d_intr_x = x[idx_int] * (-di) + x[np.minimum(idx_int + 1, x_len - 1)] * di + d_intr_x = np.vstack([d_intr_x, np.zeros((x_len - len(idx_int), d_intr_x.shape[1]))]) - # Compute second derivative (Hx) + # Compute second derivative (dd_intr_x) ddi = -di / a + idx_frac * a**-2 - Hx = x[idx_int] * (-ddi) + x[np.minimum(idx_int + 1, N - 1)] * ddi - Hx = np.vstack([Hx, np.zeros((N - len(idx_int), Hx.shape[1]))]) + dd_intr_x = x[idx_int] * (-ddi) + x[np.minimum(idx_int + 1, x_len - 1)] * ddi + dd_intr_x = np.vstack([dd_intr_x, np.zeros((x_len - len(idx_int), dd_intr_x.shape[1]))]) else: # Make placeholders - Tx = np.empty(Ax.shape) - Hx = np.empty(Ax.shape) + d_intr_x = np.empty(interpolated_x.shape) + dd_intr_x = np.empty(interpolated_x.shape) - return Ax, Tx, Hx + return interpolated_x, d_intr_x, dd_intr_x - def get_residual_matrix(self, X=None, Y=None, A=None): - # Initialize residual matrix as negative of MM + def get_residual_matrix(self, components=None, weights=None, stretch=None): + # Initialize residual matrix as negative of source_matrix # In MATLAB this is getR - if X is None: - X = self.X - if Y is None: - Y = self.Y - if A is None: - A = self.A - R = -self.MM.copy() - # Compute transformed X for all (k, m) pairs - for k in range(Y.shape[0]): # K - Ax, _, _ = self.apply_interpolation(A[k, :], X[:, k]) # Only calculate Ax - R += Y[k, :] * Ax # Element-wise scaling and sum - return R - - def get_objective_function(self, R=None, A=None): - if R is None: - R = self.R - if A is None: - A = self.A - residual_term = 0.5 * np.linalg.norm(R, "fro") ** 2 - regularization_term = 0.5 * self.rho * np.linalg.norm(self.P @ A.T, "fro") ** 2 - sparsity_term = self.eta * np.sum(np.sqrt(self.X)) # Square root penalty + if components is None: + components = self.components + if weights is None: + weights = self.weights + if stretch is None: + stretch = self.stretch + residuals = -self.source_matrix.copy() + # Compute transformed components for all (k, m) pairs + for k in range(weights.shape[0]): + stretched_components, _, _ = self.apply_interpolation( + stretch[k, :], components[:, k] + ) # Only calculate Ax + residuals += weights[k, :] * stretched_components # Element-wise scaling and sum + return residuals + + def get_objective_function(self, residuals=None, stretch=None): + if residuals is None: + residuals = self.residuals + if stretch is None: + stretch = self.stretch + residual_term = 0.5 * np.linalg.norm(residuals, "fro") ** 2 + regularization_term = 0.5 * self.rho * np.linalg.norm(self._spline_smooth_operator @ stretch.T, "fro") ** 2 + sparsity_term = self.eta * np.sum(np.sqrt(self.components)) # Square root penalty # Final objective function value function = residual_term + regularization_term + sparsity_term return function - def apply_interpolation_matrix(self, X=None, Y=None, A=None, return_derivatives=False): + def apply_interpolation_matrix(self, components=None, weights=None, stretch=None, return_derivatives=False): """ - Applies an interpolation-based transformation to the matrix `X` using `A`, - weighted by `Y`. Optionally computes first (`Tx`) and second (`Hx`) derivatives. + Applies an interpolation-based transformation to the matrix `components` using `stretch`, + weighted by `weights`. Optionally computes first and second derivatives. Equivalent to getAfun_matrix in MATLAB. """ - if X is None: - X = self.X - if Y is None: - Y = self.Y - if A is None: - A = self.A - - N, M = self.MM.shape - K = Y.shape[0] + if components is None: + components = self.components + if weights is None: + weights = self.weights + if stretch is None: + stretch = self.stretch # Compute scaled indices (MATLAB: AA = repmat(reshape(A',1,M*K).^-1, N,1)) - A_flat = A.reshape(1, M * K) ** -1 - AA = np.tile(A_flat, (N, 1)) + stretch_flat = stretch.reshape(1, self.n_signals * self.n_components) ** -1 + stretch_tiled = np.tile(stretch_flat, (self.signal_length, 1)) - # Compute `ii` (MATLAB: ii = repmat((0:N-1)',1,K*M).*AA) - ii = np.tile(np.arange(N)[:, None], (1, M * K)) * AA + # Compute `ii` (MATLAB: ii = repmat((0:N-1)',1,K*M).*tiled_stretch) + fractional_indices = ( + np.tile(np.arange(self.signal_length)[:, None], (1, self.n_signals * self.n_components)) + * stretch_tiled + ) # Weighting matrix (MATLAB: YY = repmat(reshape(Y',1,M*K), N,1)) - Y_flat = Y.reshape(1, M * K) - YY = np.tile(Y_flat, (N, 1)) + weights_flat = weights.reshape(1, self.n_signals * self.n_components) + weights_tiled = np.tile(weights_flat, (self.signal_length, 1)) # Bias for indexing into reshaped X (MATLAB: bias = kron((0:K-1)*(N+1),ones(N,M))) # TODO break this up or describe what it does better - bias = np.kron(np.arange(K) * (N + 1), np.ones((N, M), dtype=int)).reshape(N, K * M) + bias = np.kron( + np.arange(self.n_components) * (self.signal_length + 1), + np.ones((self.signal_length, self.n_signals), dtype=int), + ).reshape(self.signal_length, self.n_components * self.n_signals) # Handle boundary conditions for interpolation (MATLAB: X1=[X;X(end,:)]) - X1 = np.vstack([X, X[-1, :]]) # Duplicate last row (like MATLAB) + components_bounded = np.vstack([components, components[-1, :]]) # Duplicate last row (like MATLAB) # Compute floor indices (MATLAB: II = floor(ii); II1=min(II+1,N+1); II2=min(II1+1,N+1)) - II = np.floor(ii).astype(int) + floor_indices = np.floor(fractional_indices).astype(int) - II1 = np.minimum(II + 1, N) - II2 = np.minimum(II1 + 1, N) + floor_ind_1 = np.minimum(floor_indices + 1, self.signal_length) + floor_ind_2 = np.minimum(floor_ind_1 + 1, self.signal_length) # Compute fractional part (MATLAB: iI = ii - II) - iI = ii - II + fractional_floor_indices = fractional_indices - floor_indices # Compute offset indices (MATLAB: II1_ = II1 + bias; II2_ = II2 + bias) - II1_ = II1 + bias - II2_ = II2 + bias + offset_floor_ind_1 = floor_ind_1 + bias + offset_floor_ind_2 = floor_ind_2 + bias # Extract values (MATLAB: XI1 = reshape(X1(II1_), N, K*M); XI2 = reshape(X1(II2_), N, K*M)) # Note: this "-1" corrects an off-by-one error that may have originated in an earlier line - XI1 = X1.flatten(order="F")[(II1_ - 1).ravel()].reshape( - N, K * M - ) # order = F uses FORTRAN, column major order - XI2 = X1.flatten(order="F")[(II2_ - 1).ravel()].reshape(N, K * M) + # order = F uses FORTRAN, column major order + components_val_1 = components_bounded.flatten(order="F")[(offset_floor_ind_1 - 1).ravel()].reshape( + self.signal_length, self.n_components * self.n_signals + ) + components_val_2 = components_bounded.flatten(order="F")[(offset_floor_ind_2 - 1).ravel()].reshape( + self.signal_length, self.n_components * self.n_signals + ) - # Interpolation (MATLAB: Ax2=XI1.*(1-iI)+XI2.*(iI); Ax=Ax2.*YY) - Ax2 = XI1 * (1 - iI) + XI2 * iI - Ax = Ax2 * YY # Apply weighting + # Interpolation (MATLAB: Ax2=XI1.*(1-iI)+XI2.*(iI); stretched_components=Ax2.*YY) + stretch_components2 = ( + components_val_1 * (1 - fractional_floor_indices) + components_val_2 * fractional_floor_indices + ) + stretched_components = stretch_components2 * weights_tiled # Apply weighting if return_derivatives: - # Compute first derivative (MATLAB: Tx2=XI1.*(-di)+XI2.*di; Tx=Tx2.*YY) - di = -ii * AA - Tx2 = XI1 * (-di) + XI2 * di - Tx = Tx2 * YY - - # Compute second derivative (MATLAB: Hx2=XI1.*(-ddi)+XI2.*ddi; Hx=Hx2.*YY) - ddi = -di * AA * 2 - Hx2 = XI1 * (-ddi) + XI2 * ddi - Hx = Hx2 * YY + # Compute first derivative (MATLAB: Tx2=XI1.*(-di)+XI2.*di; d_str_cmps=Tx2.*YY) + di = -fractional_indices * stretch_tiled + d_components2 = components_val_1 * (-di) + components_val_2 * di + d_stretch_components = d_components2 * weights_tiled + + # Compute second derivative (MATLAB: Hx2=XI1.*(-ddi)+XI2.*ddi; dd_str_components=Hx2.*YY) + ddi = -di * stretch_tiled * 2 + dd_components2 = components_val_1 * (-ddi) + components_val_2 * ddi + dd_stretch_components = dd_components2 * weights_tiled else: - shape = Ax.shape - Tx = np.empty(shape) - Hx = np.empty(shape) + shape = stretched_components.shape + d_stretch_components = np.empty(shape) + dd_stretch_components = np.empty(shape) - return Ax, Tx, Hx + return stretched_components, d_stretch_components, dd_stretch_components - def apply_transformation_matrix(self, A=None, Y=None, R=None): + def apply_transformation_matrix(self, stretch=None, weights=None, residuals=None): """ - Computes the transformation matrix `AT` for residual `R`, - using scaling matrix `A` and weight coefficients `Y`. + Computes the transformation matrix `stretch_transformed` for `residuals`, + using scaling matrix `stretch` and coefficients `weights`. """ - if A is None: - A = self.A - if Y is None: - Y = self.Y - if R is None: - R = self.R - - N, M = self.MM.shape - K = Y.shape[0] + if stretch is None: + stretch = self.stretch + if weights is None: + weights = self.weights + if residuals is None: + residuals = self.residuals # Compute scaling matrix (MATLAB: AA = repmat(reshape(A,1,M*K).^-1,Nindex,1)) - AA = np.tile(A.reshape(1, M * K, order="F") ** -1, (N, 1)) + stretch_tiled = np.tile( + stretch.reshape(1, self.n_signals * self.n_components, order="F") ** -1, (self.signal_length, 1) + ) # Compute indices (MATLAB: ii = repmat((index-1)',1,K*M).*AA) - ii = np.arange(N)[:, None] * AA # Shape (N, M*K), replacing `index` + indices = np.arange(self.signal_length)[:, None] * stretch_tiled # Shape (N, M*K), replacing `index` # Weighting coefficients (MATLAB: YY = repmat(reshape(Y,1,M*K),Nindex,1)) - YY = np.tile(Y.reshape(1, M * K, order="F"), (N, 1)) + weights_tiled = np.tile( + weights.reshape(1, self.n_signals * self.n_components, order="F"), (self.signal_length, 1) + ) # Compute floor indices (MATLAB: II = floor(ii); II1 = min(II+1,N+1); II2 = min(II1+1,N+1)) - II = np.floor(ii).astype(int) - II1 = np.minimum(II + 1, N) - II2 = np.minimum(II1 + 1, N) - - # Assign directly (MATLAB: II1_ = II1; II2_ = II2) - II1_ = II1 - II2_ = II2 + floor_indices = np.floor(indices).astype(int) + floor_indices_1 = np.minimum(floor_indices + 1, self.signal_length) + floor_indices_2 = np.minimum(floor_indices_1 + 1, self.signal_length) # Compute fractional part (MATLAB: iI = ii - II) - iI = ii - II + fractional_indices = indices - floor_indices # Expand row indices (MATLAB: repm = repmat(1:K, Nindex, M)) - repm = np.tile(np.arange(K), (N, M)) + repm = np.tile(np.arange(self.n_components), (self.signal_length, self.n_signals)) # Compute transformations (MATLAB: kro = kron(R(index,:), ones(1, K))) - kro = np.kron(R, np.ones((1, K))) + kron = np.kron(residuals, np.ones((1, self.n_components))) # (MATLAB: kroiI = kro .* (iI); iIYY = (iI-1) .* YY) - kroiI = kro * iI - iIYY = (iI - 1) * YY + fractional_kron = kron * fractional_indices + fractional_weights = (fractional_indices - 1) * weights_tiled # Construct sparse matrices (MATLAB: sparse(II1_,repm,kro.*-iIYY,(N+1),K)) - x2 = coo_matrix(((-kro * iIYY).flatten(), (II1_.flatten() - 1, repm.flatten())), shape=(N + 1, K)).tocsc() - x3 = coo_matrix(((kroiI * YY).flatten(), (II2_.flatten() - 1, repm.flatten())), shape=(N + 1, K)).tocsc() + x2 = coo_matrix( + ((-kron * fractional_weights).flatten(), (floor_indices_1.flatten() - 1, repm.flatten())), + shape=(self.signal_length + 1, self.n_components), + ).tocsc() + x3 = coo_matrix( + ((fractional_kron * weights_tiled).flatten(), (floor_indices_2.flatten() - 1, repm.flatten())), + shape=(self.signal_length + 1, self.n_components), + ).tocsc() # Combine the last row into previous, then remove the last row - x2[N - 1, :] += x2[N, :] - x3[N - 1, :] += x3[N, :] + x2[self.signal_length - 1, :] += x2[self.signal_length, :] + x3[self.signal_length - 1, :] += x3[self.signal_length, :] x2 = x2[:-1, :] x3 = x3[:-1, :] - AT = x2 + x3 + stretch_transformed = x2 + x3 - return AT + return stretch_transformed - def solve_quadratic_program(self, T, m, alg="trust-constr"): + def solve_quadratic_program(self, t, m, alg="trust-constr"): """ Solves the quadratic program for updating y in stretched NMF using scipy.optimize: @@ -469,180 +500,194 @@ def solve_quadratic_program(self, T, m, alg="trust-constr"): constraints. Parameters: - - T: (N, K) ndarray + - t: (N, K) ndarray Matrix computed from getAfun(A(k, m), X[:, k]). - m: int - Index of the current column in MM. + Index of the current column in source_matrix. Returns: - - y: (K,) ndarray + - y: (k,) ndarray Optimal solution for y, clipped to ensure non-negativity. """ - MM_col = self.MM[:, m] - Q = T.T @ T - d = -T.T @ MM_col - K = Q.shape[0] - reg_factor = 1e-8 * np.linalg.norm(Q, ord="fro") - Q += np.eye(K) * reg_factor + source_matrix_col = self.source_matrix[:, m] + q = t.T @ t + d = -t.T @ source_matrix_col + k = q.shape[0] + reg_factor = 1e-8 * np.linalg.norm(q, ord="fro") + q += np.eye(k) * reg_factor def objective(y): - return 0.5 * y @ Q @ y + d @ y + return 0.5 * y @ q @ y + d @ y def grad(y): - return Q @ y + d + return q @ y + d if alg == "trust-constr": def hess(y): - return csc_matrix(Q) # sparse format for efficiency + return csc_matrix(q) # sparse format for efficiency - bounds = [(0, 1)] * K - y0 = np.clip(-np.linalg.solve(Q + np.eye(K) * 1e-5, d), 0, 1) + bounds = [(0, 1)] * k + y0 = np.clip(-np.linalg.solve(q + np.eye(k) * 1e-5, d), 0, 1) result = minimize( objective, y0, method="trust-constr", jac=grad, hess=hess, bounds=bounds, options={"verbose": 0} ) elif alg == "L-BFGS-B": - bounds = [(0, 1) for _ in range(K)] # per-variable bounds - y0 = np.clip(-np.linalg.solve(Q + np.eye(K) * 1e-5, d), 0, 1) # Initial guess + bounds = [(0, 1) for _ in range(k)] # per-variable bounds + y0 = np.clip(-np.linalg.solve(q + np.eye(k) * 1e-5, d), 0, 1) # Initial guess result = minimize(objective, y0, method="L-BFGS-B", jac=grad, bounds=bounds) return np.maximum(result.x, 0) - def updateX(self): + def update_components(self): """ - Updates `X` using gradient-based optimization with adaptive step size L. + Updates `components` using gradient-based optimization with adaptive step size step_size. """ - # Compute `AX` using the interpolation function - AX, _, _ = self.apply_interpolation_matrix() # Skip the other two outputs + # Compute `stretched_components` using the interpolation function + stretched_components, _, _ = self.apply_interpolation_matrix() # Skip the other two outputs (derivatives) # Compute RA and RR - intermediate_RA = AX.flatten(order="F").reshape((self._N * self._M, self._K), order="F") - RA = intermediate_RA.sum(axis=1).reshape((self._N, self._M), order="F") - RR = RA - self.MM + intermediate_reshaped = stretched_components.flatten(order="F").reshape( + (self.signal_length * self.n_signals, self.n_components), order="F" + ) + reshaped_stretched_components = intermediate_reshaped.sum(axis=1).reshape( + (self.signal_length, self.n_signals), order="F" + ) + component_residuals = reshaped_stretched_components - self.source_matrix # Compute gradient `GraX` - self._GraX = self.apply_transformation_matrix( - R=RR - ).toarray() # toarray equivalent of full, make non-sparse - - # Compute initial step size `L0` - L0 = np.linalg.eigvalsh(self.Y.T @ self.Y).max() * np.max([self.A.max(), 1 / self.A.min()]) - # Compute adaptive step size `L` - if self._preX is None: - L = L0 - else: - num = np.sum((self._GraX - self._preGraX) * (self.X - self._preX)) # Element-wise multiplication - denom = np.linalg.norm(self.X - self._preX, "fro") ** 2 # Frobenius norm squared - L = num / denom if denom > 0 else L0 - if L <= 0: - L = L0 + self.grad_components = self.apply_transformation_matrix( + residuals=component_residuals + ).toarray() # toarray equivalent of MATLAB "full", makes non-sparse - # Store our old X before updating because it is used in step selection - self._preX = self.X.copy() - - while True: # iterate updating X - x_step = self._preX - self._GraX / L + # Compute initial step size `initial_step_size` + initial_step_size = np.linalg.eigvalsh(self.weights.T @ self.weights).max() * np.max( + [self.stretch.max(), 1 / self.stretch.min()] + ) + # Compute adaptive step size `step_size` + if self._prev_components is None: + step_size = initial_step_size + else: + num = np.sum( + (self.grad_components - self._prev_grad_components) * (self.components - self._prev_components) + ) # Elem-wise multiply + denom = np.linalg.norm(self.components - self._prev_components, "fro") ** 2 # Frobenius norm squared + step_size = num / denom if denom > 0 else initial_step_size + if step_size <= 0: + step_size = initial_step_size + + # Store our old component matrix before updating because it is used in step selection + self._prev_components = self.components.copy() + + while True: # iterate updating components + components_step = self._prev_components - self.grad_components / step_size # Solve x^3 + p*x + q = 0 for the largest real root - self.X = np.square(cubic_largest_real_root(-x_step, self.eta / (2 * L))) + self.components = np.square(cubic_largest_real_root(-components_step, self.eta / (2 * step_size))) # Mask values that should be set to zero - mask = self.X**2 * L / 2 - L * self.X * x_step + self.eta * np.sqrt(self.X) < 0 - self.X = mask * self.X + mask = ( + self.components**2 * step_size / 2 + - step_size * self.components * components_step + + self.eta * np.sqrt(self.components) + < 0 + ) + self.components = mask * self.components objective_improvement = self._objective_history[-1] - self.get_objective_function( - R=self.get_residual_matrix() + residuals=self.get_residual_matrix() ) # Check if objective function improves if objective_improvement > 0: break - # If not, increase L (step size) - L *= 2 - if np.isinf(L): + # If not, increase step_size (step size) + step_size *= 2 + if np.isinf(step_size): break - def updateY2(self): + def update_weights(self): """ - Updates Y using matrix operations, solving a quadratic program via `solve_mkr_box`. + Updates weights using matrix operations, solving a quadratic program via to do so. """ - K = self._K - N = self._N - M = self._M - - for m in range(M): - T = np.zeros((N, K)) # Initialize T as an (N, K) zero matrix + for m in range(self.n_signals): + t = np.zeros((self.signal_length, self.n_components)) # Populate T using apply_interpolation - for k in range(K): - T[:, k] = self.apply_interpolation(self.A[k, m], self.X[:, k], return_derivatives=True)[ - 0 - ].squeeze() + for k in range(self.n_components): + t[:, k] = self.apply_interpolation( + self.stretch[k, m], self.components[:, k], return_derivatives=True + )[0].squeeze() # Solve quadratic problem for y - y = self.solve_quadratic_program(T=T, m=m) + y = self.solve_quadratic_program(t=t, m=m) # Update Y - self.Y[:, m] = y + self.weights[:, m] = y - def regularize_function(self, A=None): + def regularize_function(self, stretch=None): """ Computes the regularization function, gradient, and Hessian for optimization. Returns: - fun: Objective function value (scalar) - - gra: Gradient (same shape as A) + - gra: Gradient (same shape as stretch) """ - if A is None: - A = self.A - - K = self._K - M = self._M - N = self._N + if stretch is None: + stretch = self.stretch # Compute interpolated matrices - AX, TX, HX = self.apply_interpolation_matrix(A=A, return_derivatives=True) + stretched_components, d_stretch_components, dd_stretch_components = self.apply_interpolation_matrix( + stretch=stretch, return_derivatives=True + ) # Compute residual - intermediate_RA = AX.flatten(order="F").reshape((N * M, K), order="F") - RA = intermediate_RA.sum(axis=1).reshape((N, M), order="F") - RA = RA - self.MM + intermediate_diff = stretched_components.flatten(order="F").reshape( + (self.signal_length * self.n_signals, self.n_components), order="F" + ) + stretch_difference = intermediate_diff.sum(axis=1).reshape((self.signal_length, self.n_signals), order="F") + stretch_difference = stretch_difference - self.source_matrix # Compute objective function - fun = self.get_objective_function(RA, A) + reg_func = self.get_objective_function(stretch_difference, stretch) # Compute gradient - tiled_derivative = np.sum(TX * np.tile(RA, (1, K)), axis=0) - der_reshaped = np.asarray(tiled_derivative).reshape((M, K), order="F") - gra = der_reshaped.T + self.rho * A @ self.P.T @ self.P + tiled_derivative = np.sum( + d_stretch_components * np.tile(stretch_difference, (1, self.n_components)), axis=0 + ) + der_reshaped = np.asarray(tiled_derivative).reshape((self.n_signals, self.n_components), order="F") + func_grad = ( + der_reshaped.T + self.rho * stretch @ self._spline_smooth_operator.T @ self._spline_smooth_operator + ) - return fun, gra + return reg_func, func_grad - def updateA2(self): + def update_stretch(self): """ - Updates matrix A using constrained optimization (equivalent to fmincon in MATLAB). + Updates stretching matrix using constrained optimization (equivalent to fmincon in MATLAB). """ - # Flatten A for compatibility with the optimizer (since SciPy expects 1D input) - A_initial = self.A.flatten() + # Flatten stretch for compatibility with the optimizer (since SciPy expects 1D input) + stretch_init_vec = self.stretch.flatten() # Define the optimization function - def objective(A_vec): - A_matrix = A_vec.reshape(self.A.shape) # Reshape back to matrix form - fun, gra = self.regularize_function(A_matrix) - gra = gra.flatten() - return fun, gra + def objective(stretch_vec): + stretch_matrix = stretch_vec.reshape(self.stretch.shape) # Reshape back to matrix form + func, grad = self.regularize_function(stretch_matrix) + grad = grad.flatten() + return func, grad # Optimization constraints: lower bound 0.1, no upper bound - bounds = [(0.1, None)] * A_initial.size # Equivalent to 0.1 * ones(K, M) + bounds = [(0.1, None)] * stretch_init_vec.size # Equivalent to 0.1 * ones(K, M) # Solve optimization problem (equivalent to fmincon) result = minimize( - fun=lambda A_vec: objective(A_vec)[0], # Objective function - x0=A_initial, # Initial guess + fun=lambda stretch_vec: objective(stretch_vec)[0], # Objective function + x0=stretch_init_vec, # Initial guess method="trust-constr", # Equivalent to 'trust-region-reflective' - jac=lambda A_vec: objective(A_vec)[1], # Gradient - bounds=bounds, # Lower bounds on A + jac=lambda stretch_vec: objective(stretch_vec)[1], # Gradient + bounds=bounds, # Lower bounds on stretch + # TODO: A Hessian can be incorporated for better convergence. ) - # Update A with the optimized values - self.A = result.x.reshape(self.A.shape) + # Update stretch with the optimized values + self.stretch = result.x.reshape(self.stretch.shape) def cubic_largest_real_root(p, q): @@ -655,9 +700,9 @@ def cubic_largest_real_root(p, q): sqrt_delta = np.sqrt(np.abs(delta)) # When delta >= 0: one real root - A = np.cbrt(-q / 2 + sqrt_delta) - B = np.cbrt(-q / 2 - sqrt_delta) - root1 = A + B + a = np.cbrt(-q / 2 + sqrt_delta) + b = np.cbrt(-q / 2 - sqrt_delta) + root1 = a + b # When delta < 0: three real roots, use trigonometric method phi = np.arccos(-q / (2 * np.sqrt(-((p / 3) ** 3) + 1e-12)))