diff --git a/dingo/illustrations.py b/dingo/illustrations.py index fd37b65..f5a6e23 100644 --- a/dingo/illustrations.py +++ b/dingo/illustrations.py @@ -9,10 +9,7 @@ import matplotlib.pyplot as plt import plotly.graph_objects as go import plotly.io as pio -import plotly.express as px from dingo.utils import compute_copula -import plotly.figure_factory as ff -from scipy.cluster import hierarchy def plot_copula(data_flux1, data_flux2, n = 5, width = 900 , height = 600, export_format = "svg"): """A Python function to plot the copula between two fluxes @@ -84,124 +81,4 @@ def plot_histogram(reaction_fluxes, reaction, n_bins=40): plt.title("Reaction: " + reaction, fontweight="bold", fontsize=18) plt.axis([np.amin(reaction_fluxes), np.amax(reaction_fluxes), 0, np.amax(n) * 1.2]) - plt.show() - - - -def plot_corr_matrix(corr_matrix, reactions, removed_reactions=[], format="svg"): - """A Python function to plot the heatmap of a model's pearson correlation matrix. - - Keyword arguments: - corr_matrix -- A matrix produced from the "correlated_reactions" function - reactions -- A list with the model's reactions - removed_reactions -- A list with the removed reactions in case of a preprocess. - If provided removed reactions are not plotted. - """ - - sns_colormap = [[0.0, '#3f7f93'], - [0.1, '#6397a7'], - [0.2, '#88b1bd'], - [0.3, '#acc9d2'], - [0.4, '#d1e2e7'], - [0.5, '#f2f2f2'], - [0.6, '#f6cdd0'], - [0.7, '#efa8ad'], - [0.8, '#e8848b'], - [0.9, '#e15e68'], - [1.0, '#da3b46']] - - if removed_reactions != 0: - for reaction in reactions: - index = reactions.index(reaction) - if reaction in removed_reactions: - reactions[index] = None - - fig = px.imshow(corr_matrix, - color_continuous_scale = sns_colormap, - x = reactions, y = reactions, origin="upper") - - fig.update_layout( - xaxis=dict(tickfont=dict(size=5)), - yaxis=dict(tickfont=dict(size=5)), - width=900, height=900, plot_bgcolor="rgba(0,0,0,0)") - - fig.update_traces(xgap=1, ygap=1, hoverongaps=False) - - fig.show() - - fig_name = "CorrelationMatrix." + format - pio.write_image(fig, fig_name, scale=2) - - - -def plot_dendrogram(dissimilarity_matrix, reactions , plot_labels=False, t=2.0, linkage="ward"): - """A Python function to plot the dendrogram of a dissimilarity matrix. - - Keyword arguments: - dissimilarity_matrix -- A matrix produced from the "cluster_corr_reactions" function - reactions -- A list with the model's reactions - plot_labels -- A boolean variable that if True plots the reactions labels in the dendrogram - t -- A threshold that defines a threshold that cuts the dendrogram - at a specific height and colors the occuring clusters accordingly - linkage -- linkage defines the type of linkage. - Available linkage types are: single, average, complete, ward. - """ - - fig = ff.create_dendrogram(dissimilarity_matrix, - labels=reactions, - linkagefun=lambda x: hierarchy.linkage(x, linkage), - color_threshold=t) - fig.update_layout(width=800, height=800) - - if plot_labels == False: - fig.update_layout( - xaxis=dict( - showticklabels=False, - ticks="") ) - else: - fig.update_layout( - xaxis=dict( - title_font=dict(size=10), - tickfont=dict(size=8) ), - yaxis=dict( - title_font=dict(size=10), - tickfont=dict(size=8) ) ) - - fig.show() - - - -def plot_graph(G, pos): - """A Python function to plot a graph created from a correlation matrix. - - Keyword arguments: - G -- A graph produced from the "graph_corr_matrix" function. - pos -- A layout for the corresponding graph. - """ - - fig = go.Figure() - - for u, v, data in G.edges(data=True): - x0, y0 = pos[u] - x1, y1 = pos[v] - - edge_color = 'blue' if data['weight'] > 0 else 'red' - - fig.add_trace(go.Scatter(x=[x0, x1], y=[y0, y1], mode='lines', - line=dict(width=abs(data['weight']) * 1, - color=edge_color), hoverinfo='none', - showlegend=False)) - - for node in G.nodes(): - x, y = pos[node] - node_name = G.nodes[node].get('name', f'Node {node}') - - fig.add_trace(go.Scatter(x=[x], y=[y], mode='markers', - marker=dict(size=10), - text=[node_name], - textposition='top center', - name = node_name, - showlegend=False)) - - fig.update_layout(width=800, height=800) - fig.show() \ No newline at end of file + plt.show() \ No newline at end of file diff --git a/dingo/preprocess.py b/dingo/preprocess.py deleted file mode 100644 index 4f711ff..0000000 --- a/dingo/preprocess.py +++ /dev/null @@ -1,283 +0,0 @@ - -import cobra -import cobra.manipulation -from collections import Counter -from dingo import MetabolicNetwork, PolytopeSampler -from dingo.utils import correlated_reactions -import numpy as np - - -class PreProcess: - - def __init__(self, model, tol = 1e-6, open_exchanges = False, verbose = False): - - """ - model -- parameter gets a cobra model as input - - tol -- parameter gets a cutoff value used to classify - zero-flux and mle reactions and compare FBA solutions - before and after reactions removal - - open_exchanges -- parameter is used in the function that identifies blocked reactions - It controls whether or not to open all exchange reactions - to very high flux ranges. - - verbose -- A boolean type variable that if True - additional information for preprocess is printed. - """ - - self._model = model - self._tol = tol - - self._open_exchanges = open_exchanges - self._verbose = verbose - - if self._tol > 1e-6 and verbose == True: - print("Tolerance value set to",self._tol,"while default value is 1e-6. A looser check will be performed") - - self._objective = self._objective_function() - self._initial_reactions = self._initial() - self._reaction_bounds_dict = self._reaction_bounds_dictionary() - self._essential_reactions = self._essentials() - self._zero_flux_reactions = self._zero_flux() - self._blocked_reactions = self._blocked() - self._mle_reactions = self._metabolically_less_efficient() - self._removed_reactions = [] - - - def _objective_function(self): - """ - A function used to find the objective function of a model - """ - - objective = str(self._model.summary()._objective) - self._objective = objective.split(" ")[1] - - return self._objective - - - def _initial(self): - """ - A function used to find reaction ids of a model - """ - - self._initial_reactions = [ reaction.id for reaction in \ - self._model.reactions ] - - return self._initial_reactions - - - def _reaction_bounds_dictionary(self): - """ - A function used to create a dictionary that maps - reactions with their corresponding bounds. It is used to - later restore bounds to their wild-type values - """ - - self._reaction_bounds_dict = { } - - for reaction_id in self._initial_reactions: - bounds = self._model.reactions.get_by_id(reaction_id).bounds - self._reaction_bounds_dict[reaction_id] = bounds - - return self._reaction_bounds_dict - - - def _essentials(self): - """ - A function used to find all the essential reactions - and append them into a list. Essential reactions are - the ones that are required for growth. If removed the - objective function gets zeroed. - """ - - self._essential_reactions = [ reaction.id for reaction in \ - cobra.flux_analysis.find_essential_reactions(self._model) ] - - return self._essential_reactions - - - def _zero_flux(self): - """ - A function used to find zero-flux reactions. - “Zero-flux” reactions cannot carry a flux while maintaining - at least 90% of the maximum growth rate. - These reactions have both a min and a max flux equal to 0, - when running a FVA analysis with the fraction of optimum set to 90% - """ - - tol = self._tol - - fva = cobra.flux_analysis.flux_variability_analysis(self._model, fraction_of_optimum=0.9) - zero_flux = fva.loc[ (abs(fva['minimum']) < tol ) & (abs(fva['maximum']) < tol)] - self._zero_flux_reactions = zero_flux.index.tolist() - - return self._zero_flux_reactions - - - def _blocked(self): - """ - A function used to find blocked reactions. - "Blocked" reactions that cannot carry a flux in any condition. - These reactions can not have any flux other than 0 - """ - - self._blocked_reactions = cobra.flux_analysis.find_blocked_reactions(self._model, open_exchanges=self._open_exchanges) - return self._blocked_reactions - - - def _metabolically_less_efficient(self): - """ - A function used to find metabolically less efficient reactions. - "Metabolically less efficient" require a reduction in growth rate if used - These reactions are found when running an FBA and setting the - optimal growth rate as the lower bound of the objective function (in - this case biomass production. After running an FVA with the fraction of optimum - set to 0.95, the reactions that have no flux are the metabolically less efficient. - """ - - tol = self._tol - - fba_solution = self._model.optimize() - - wt_lower_bound = self._model.reactions.get_by_id(self._objective).lower_bound - self._model.reactions.get_by_id(self._objective).lower_bound = fba_solution.objective_value - - fva = cobra.flux_analysis.flux_variability_analysis(self._model, fraction_of_optimum=0.95) - mle = fva.loc[ (abs(fva['minimum']) < tol ) & (abs(fva['maximum']) < tol)] - self._mle_reactions = mle.index.tolist() - - self._model.reactions.get_by_id(self._objective).lower_bound = wt_lower_bound - - return self._mle_reactions - - - def _remove_model_reactions(self): - """ - A function used to set the lower and upper bounds of certain reactions to 0 - (it turns off reactions) - """ - - for reaction in self._removed_reactions: - self._model.reactions.get_by_id(reaction).lower_bound = 0 - self._model.reactions.get_by_id(reaction).upper_bound = 0 - - return self._model - - - def reduce(self, extend=False): - """ - A function that calls the "remove_model_reactions" function - and removes blocked, zero-flux and metabolically less efficient - reactions from the model. - - Then it finds the remaining reactions in the model after - exclusion of the essential reactions. - - When the "extend" parameter is set to True, the function performes - an additional check to remove further reactions. These reactions - are the ones that if knocked-down, they do not affect the value - of the objective function. Reactions are removed in an ordered way. - The ones with the least overall correlation in a correlation matrix - are removed first. These reactions are removed one by one from the model. - If this removal produces an infeasible solution (or a solution of 0) - to the objective function, these reactions are restored to their initial bounds. - - A dingo-type tuple is then created from the cobra model - using the "cobra_dingo_tuple" function. - - The outputs are - (a) A list of the removed reactions ids - (b) A reduced dingo model - """ - - # create a list from the combined blocked, zero-flux, mle reactions - blocked_mle_zero = self._blocked_reactions + self._mle_reactions + self._zero_flux_reactions - list_removed_reactions = list(set(blocked_mle_zero)) - self._removed_reactions = list_removed_reactions - - # remove these reactions from the model - self._remove_model_reactions() - - remained_reactions = list((Counter(self._initial_reactions)-Counter(self._removed_reactions)).elements()) - remained_reactions = list((Counter(remained_reactions)-Counter(self._essential_reactions)).elements()) - - tol = self._tol - - if extend != False and extend != True: - raise Exception("Wrong Input to extend parameter") - - elif extend == False: - - if self._verbose == True: - print(len(self._removed_reactions), "of the", len(self._initial_reactions), \ - "reactions were removed from the model with extend set to", extend) - - # call this functon to convert cobra to dingo model - self._dingo_model = MetabolicNetwork.from_cobra_model(self._model) - return self._removed_reactions, self._dingo_model - - elif extend == True: - - reduced_dingo_model = MetabolicNetwork.from_cobra_model(self._model) - reactions = reduced_dingo_model.reactions - sampler = PolytopeSampler(reduced_dingo_model) - steady_states = sampler.generate_steady_states() - - # calculate correlation matrix with additional filtering from copula indicator - corr_matrix = correlated_reactions( - steady_states, - pearson_cutoff = 0, - indicator_cutoff = 0, - cells = 10, - cop_coeff = 0.3, - lower_triangle = False) - - # convert pearson values to absolute values - abs_array = abs(corr_matrix) - # sum absolute pearson values per row - sum_array = np.sum((abs_array), axis=1) - # get indices of ordered sum values - order_sum_indices = np.argsort(sum_array) - - fba_solution_before = self._model.optimize().objective_value - - # count additional reactions with a possibility of removal - additional_removed_reactions_count = 0 - - # find additional reactions with a possibility of removal - for index in order_sum_indices: - if reactions[index] in remained_reactions: - reaction = reactions[index] - - # perform a knock-out and check the output - self._model.reactions.get_by_id(reaction).lower_bound = 0 - self._model.reactions.get_by_id(reaction).upper_bound = 0 - - try: - fba_solution_after = self._model.optimize().objective_value - if (abs(fba_solution_after - fba_solution_before) > tol): - # restore bounds - self._model.reactions.get_by_id(reaction).bounds = self._reaction_bounds_dict[reaction] - - # if system has no solution - except: - self._model.reactions.get_by_id(reaction).bounds = self._reaction_bounds_dict[reaction] - - finally: - if (fba_solution_after != None) and (abs(fba_solution_after - fba_solution_before) < tol): - self._removed_reactions.append(reaction) - additional_removed_reactions_count += 1 - else: - # restore bounds - self._model.reactions.get_by_id(reaction).bounds = self._reaction_bounds_dict[reaction] - - - if self._verbose == True: - print(len(self._removed_reactions), "of the", len(self._initial_reactions), \ - "reactions were removed from the model with extend set to", extend) - print(additional_removed_reactions_count, "additional reaction(s) removed") - - # call this functon to convert cobra to dingo model - self._dingo_model = MetabolicNetwork.from_cobra_model(self._model) - return self._removed_reactions, self._dingo_model diff --git a/dingo/utils.py b/dingo/utils.py index 303840c..c403447 100644 --- a/dingo/utils.py +++ b/dingo/utils.py @@ -11,9 +11,6 @@ from scipy.sparse import diags from dingo.scaling import gmscale from dingo.nullspace import nullspace_dense, nullspace_sparse -from scipy.cluster import hierarchy -from networkx.algorithms.components import connected_components -import networkx as nx def compute_copula(flux1, flux2, n): """A Python function to estimate the copula between two fluxes @@ -203,236 +200,4 @@ def get_matrices_of_full_dim_polytope(A, b, Aeq, beq): except: print("gmscale failed to compute a good scaling.") - return A, b, N, N_shift - - - -def correlated_reactions(steady_states, reactions=[], pearson_cutoff = 0.90, indicator_cutoff = 10, - cells = 10, cop_coeff = 0.3, lower_triangle = True, verbose = False): - """A Python function to calculate the pearson correlation matrix of a model - and filter values based on the copula's indicator - - Keyword arguments: - steady_states -- A numpy array of the generated steady states fluxes - reactions -- A list with the model's reactions - pearson_cutoff -- A cutoff to filter reactions based on pearson coefficient - indicator_cutoff -- A cutoff to filter reactions based on indicator value - cells -- Number of cells to compute the copula - cop_coeff -- A value that narrows or widens the width of the copula's diagonal - lower_triangle -- A boolean variable that if True plots only the lower triangular matrix - verbose -- A boolean variable that if True additional information is printed as an output. - """ - - if cop_coeff > 0.4 or cop_coeff < 0.2: - raise Exception("Input value to cop_coeff parameter must be between 0.2 and 0.4") - - # calculate coefficients to access red and blue copula mass - cop_coeff_1 = cop_coeff - cop_coeff_2 = 1 - cop_coeff - cop_coeff_3 = 1 + cop_coeff - - # compute correlation matrix - corr_matrix = np.corrcoef(steady_states, rowvar=True) - - # replace not assigned values with 0 - corr_matrix[np.isnan(corr_matrix)] = 0 - - # create a copy of correlation matrix to replace/filter values - filtered_corr_matrix = corr_matrix.copy() - - # find indices of correlation matrix where correlation does not occur - no_corr_indices = np.argwhere((filtered_corr_matrix < pearson_cutoff) & (filtered_corr_matrix > -pearson_cutoff)) - - # replace values from the correlation matrix that do not overcome - # the pearson cutoff with 0 - for i in range(0, no_corr_indices.shape[0]): - index1 = no_corr_indices[i][0] - index2 = no_corr_indices[i][1] - - filtered_corr_matrix[index1, index2] = 0 - - # if user does not provide an indicator cutoff then do not proceed - # with the filtering of the correlation matrix - if indicator_cutoff == 0: - if lower_triangle == True: - filtered_corr_matrix[np.triu_indices(filtered_corr_matrix.shape[0], 1)] = np.nan - np.fill_diagonal(filtered_corr_matrix, 1) - return filtered_corr_matrix - else: - np.fill_diagonal(filtered_corr_matrix, 1) - return filtered_corr_matrix - else: - # a dictionary that will store for each filtered reaction combination, - # the pearson correlation value, the copula's indicator value - # and the correlation classification - indicator_dict = {} - - # keep only the lower triangle - corr_matrix = np.tril(corr_matrix) - # replace diagonal values with 0 - np.fill_diagonal(corr_matrix, 0) - - # find indices of correlation matrix where correlation occurs - corr_indices = np.argwhere((corr_matrix > pearson_cutoff) | (corr_matrix < -pearson_cutoff)) - - # compute copula for each set of correlated reactions - for i in range(0, corr_indices.shape[0]): - - index1 = corr_indices[i][0] - index2 = corr_indices[i][1] - - reaction1 = reactions[index1] - reaction2 = reactions[index2] - - flux1 = steady_states[index1] - flux2 = steady_states[index2] - - copula = compute_copula(flux1, flux2, cells) - rows, cols = copula.shape - - red_mass = 0 - blue_mass = 0 - indicator = 0 - - for row in range(rows): - for col in range(cols): - # values in the diagonal - if ((row-col >= -cop_coeff_1*rows) & (row-col <= cop_coeff_1*rows)): - # values near the top left and bottom right corner - if ((row+col < cop_coeff_2*rows) | (row+col > cop_coeff_3*rows)): - red_mass = red_mass + copula[row][col] - else: - # values near the top right and bottom left corner - if ((row+col >= cop_coeff_2*rows-1) & (row+col <= cop_coeff_3*rows-1)): - blue_mass = blue_mass + copula[row][col] - - indicator = (red_mass+1e-9) / (blue_mass+1e-9) - - # classify specific pair of reactions as positive or negative - # correlated based on indicator cutoff - if indicator > indicator_cutoff: - pearson = filtered_corr_matrix[index1, index2] - indicator_dict[reaction1 + "~" + reaction2] = {'pearson': pearson, - 'indicator': indicator, - 'classification': "positive"} - - elif indicator < 1/indicator_cutoff: - pearson = filtered_corr_matrix[index1, index2] - indicator_dict[reaction1 + "~" + reaction2] = {'pearson': pearson, - 'indicator': indicator, - 'classification': "negative"} - - # if they do not overcome the cutoff replace their corresponding - # value in the correlation matrix with 0 - else: - filtered_corr_matrix[index1, index2] = 0 - filtered_corr_matrix[index2, index1] = 0 - pearson = filtered_corr_matrix[index1, index2] - indicator_dict[reaction1 + "~" + reaction2] = {'pearson': pearson, - 'indicator': indicator, - 'classification': "no correlation"} - - if verbose == True: - print("Completed process of",i+1,"from",corr_indices.shape[0],"copulas") - - if lower_triangle == True: - filtered_corr_matrix[np.triu_indices(filtered_corr_matrix.shape[0], 1)] = np.nan - np.fill_diagonal(filtered_corr_matrix, 1) - return filtered_corr_matrix, indicator_dict - - else: - np.fill_diagonal(filtered_corr_matrix, 1) - return filtered_corr_matrix, indicator_dict - - - -def cluster_corr_reactions(correlation_matrix, reactions, linkage="ward", - t = 4.0, correction=True): - """A Python function for hierarchical clustering of the correlation matrix - - Keyword arguments: - correlation_matrix -- A numpy 2D array of a correlation matrix - reactions -- A list with the model's reactions - linkage -- linkage defines the type of linkage. - Available linkage types are: single, average, complete, ward. - t -- A threshold that defines a threshold that cuts the dendrogram - at a specific height and produces clusters - correction -- A boolean variable that if True converts the values of the - the correlation matrix to absolute values. - """ - - # function to return a nested list with grouped reactions based on clustering - def clusters_list(reactions, labels): - clusters = [] - unique_labels = np.unique(labels) - for label in unique_labels: - cluster = [] - label_where = np.where(labels == label)[0] - for where in label_where: - cluster.append(reactions[where]) - clusters.append(cluster) - return clusters - - if correction == True: - dissimilarity_matrix = 1 - abs(correlation_matrix) - else: - dissimilarity_matrix = 1 - correlation_matrix - - Z = hierarchy.linkage(dissimilarity_matrix, linkage) - labels = hierarchy.fcluster(Z, t, criterion='distance') - - clusters = clusters_list(reactions, labels) - return dissimilarity_matrix, labels, clusters - - - -def graph_corr_matrix(correlation_matrix, reactions, correction=True, - clusters=[], subgraph_nodes = 5): - """A Python function that creates the main graph and its subgraphs - from a correlation matrix. - - Keyword arguments: - correlation_matrix -- A numpy 2D array of a correlation matrix. - reactions -- A list with the model's reactions. - correction -- A boolean variable that if True converts the values of the - the correlation matrix to absolute values. - clusters -- A nested list with clustered reactions created from the "" function. - subgraph_nodes -- A variable that specifies a cutoff for a graph's nodes. - It filters subgraphs with low number of nodes.. - """ - - graph_matrix = correlation_matrix.copy() - np.fill_diagonal(graph_matrix, 0) - - if correction == True: - graph_matrix = abs(graph_matrix) - - G = nx.from_numpy_array(graph_matrix) - G = nx.relabel_nodes(G, lambda x: reactions[x]) - - pos = nx.spring_layout(G) - unconnected_nodes = list(nx.isolates(G)) - G.remove_nodes_from(unconnected_nodes) - G_nodes = G.nodes() - - graph_list = [] - layout_list = [] - - graph_list.append(G) - layout_list.append(pos) - - subgraphs = [G.subgraph(c) for c in connected_components(G)] - H_nodes_list = [] - - for i in range(len(subgraphs)): - if len(subgraphs[i].nodes()) > subgraph_nodes and len(subgraphs[i].nodes()) != len(G_nodes): - H = G.subgraph(subgraphs[i].nodes()) - for cluster in clusters: - if H.has_node(cluster[0]) and H.nodes() not in H_nodes_list: - H_nodes_list.append(H.nodes()) - - pos = nx.spring_layout(H) - graph_list.append(H) - layout_list.append(pos) - - return graph_list, layout_list \ No newline at end of file + return A, b, N, N_shift \ No newline at end of file diff --git a/tests/correlation.py b/tests/correlation.py deleted file mode 100644 index fdb21b7..0000000 --- a/tests/correlation.py +++ /dev/null @@ -1,53 +0,0 @@ - -from dingo.utils import correlated_reactions -from dingo import MetabolicNetwork, PolytopeSampler -import numpy as np -import unittest - -class TestCorrelation(unittest.TestCase): - - def test_correlation(self): - - dingo_model = MetabolicNetwork.from_json('ext_data/e_coli_core.json') - reactions = dingo_model.reactions - - sampler = PolytopeSampler(dingo_model) - steady_states = sampler.generate_steady_states() - - # calculate correlation matrix with filtering from copula indicator - corr_matrix, indicator_dict = correlated_reactions(steady_states, - reactions = reactions, - indicator_cutoff = 5, - pearson_cutoff = 0.999999, - lower_triangle = False, - verbose = False) - - # sum values in the diagonal of the correlation matrix ==> 95*pearson ==> 95*1 - self.assertTrue(np.trace(corr_matrix) == len(reactions)) - # rows and columns must be equal to model reactions - self.assertTrue(corr_matrix.shape[0] == len(reactions)) - self.assertTrue(corr_matrix.shape[1] == len(reactions)) - - - dingo_model = MetabolicNetwork.from_json('ext_data/e_coli_core.json') - reactions = dingo_model.reactions - - sampler = PolytopeSampler(dingo_model) - steady_states = sampler.generate_steady_states() - - # calculate correlation matrix without filtering from copula indicator - corr_matrix = correlated_reactions(steady_states, - indicator_cutoff = 0, - pearson_cutoff = 0, - lower_triangle = True, - verbose = False) - - # sum values in the diagonal of the correlation matrix ==> 95*pearson ==> 95*1 - self.assertTrue(np.trace(corr_matrix) == len(reactions)) - # rows and columns must be equal to model reactions - self.assertTrue(corr_matrix.shape[0] == len(reactions)) - self.assertTrue(corr_matrix.shape[1] == len(reactions)) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/preprocess.py b/tests/preprocess.py deleted file mode 100644 index 64edce9..0000000 --- a/tests/preprocess.py +++ /dev/null @@ -1,69 +0,0 @@ - -from cobra.io import load_json_model -from dingo import MetabolicNetwork -from dingo.preprocess import PreProcess -import unittest -import numpy as np - -class TestPreprocess(unittest.TestCase): - - def test_preprocess(self): - - # load cobra model - cobra_model = load_json_model("ext_data/e_coli_core.json") - - # convert cobra to dingo model - initial_dingo_model = MetabolicNetwork.from_cobra_model(cobra_model) - - # perform an FBA to find the initial FBA solution - initial_fba_solution = initial_dingo_model.fba()[1] - - - # call the reduce function from the PreProcess class - # with extend=False to remove reactions from the model - obj = PreProcess(cobra_model, tol=1e-5, open_exchanges=False, verbose=False) - removed_reactions, final_dingo_model = obj.reduce(extend=False) - - # calculate the count of removed reactions with extend set to False - removed_reactions_count = len(removed_reactions) - self.assertTrue( 46 - removed_reactions_count == 0 ) - - # calculate the count of reactions with bounds equal to 0 - # with extend set to False from the dingo model - dingo_removed_reactions = np.sum((final_dingo_model.lb == 0) & (final_dingo_model.ub == 0)) - self.assertTrue( 46 - dingo_removed_reactions == 0 ) - - # perform an FBA to check the solution after reactions removal - final_fba_solution = final_dingo_model.fba()[1] - self.assertTrue(abs(final_fba_solution - initial_fba_solution) < 1e-03) - - - # load models in cobra and dingo format again to restore bounds - cobra_model = load_json_model("ext_data/e_coli_core.json") - - # convert cobra to dingo model - initial_dingo_model = MetabolicNetwork.from_cobra_model(cobra_model) - - # call the reduce function from the PreProcess class - # with extend=True to remove additional reactions from the model - obj = PreProcess(cobra_model, tol=1e-6, open_exchanges=False, verbose=False) - removed_reactions, final_dingo_model = obj.reduce(extend=True) - - # calculate the count of removed reactions with extend set to True - removed_reactions_count = len(removed_reactions) - self.assertTrue( 46 - removed_reactions_count <= 0 ) - - # calculate the count of reactions with bounds equal to 0 - # with extend set to True from the dingo model - dingo_removed_reactions = np.sum((final_dingo_model.lb == 0) & (final_dingo_model.ub == 0)) - self.assertTrue( 46 - dingo_removed_reactions <= 0 ) - - # perform an FBA to check the result after reactions removal - final_fba_solution = final_dingo_model.fba()[1] - self.assertTrue(abs(final_fba_solution - initial_fba_solution) < 1e-03) - - -if __name__ == "__main__": - unittest.main() - - \ No newline at end of file