Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Functions for correlation matrix, copula indicator and plotting #103

Merged
merged 17 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions dingo/illustrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import plotly.io as pio
import plotly.express as px
from dingo.utils import compute_copula

def plot_copula(data_flux1, data_flux2, n = 5, width = 900 , height = 600, export_format = "svg"):
Expand Down Expand Up @@ -82,3 +83,49 @@ def plot_histogram(reaction_fluxes, reaction, n_bins=40):
plt.axis([np.amin(reaction_fluxes), np.amax(reaction_fluxes), 0, np.amax(n) * 1.2])

plt.show()



def plot_corr_matrix(corr_matrix, reactions, removed_reactions=[], format="svg"):
"""A Python function to plot the heatmap of a model's pearson correlation matrix.

Keyword arguments:
corr_matrix -- A matrix produced from the "correlated_reactions" function
reactions -- A list with the model's reactions
removed_reactions -- A list with the removed reactions in case of a preprocess.
If provided removed reactions are not plotted.
"""

sns_colormap = [[0.0, '#3f7f93'],
[0.1, '#6397a7'],
[0.2, '#88b1bd'],
[0.3, '#acc9d2'],
[0.4, '#d1e2e7'],
[0.5, '#f2f2f2'],
[0.6, '#f6cdd0'],
[0.7, '#efa8ad'],
[0.8, '#e8848b'],
[0.9, '#e15e68'],
[1.0, '#da3b46']]

if removed_reactions != 0:
for reaction in reactions:
index = reactions.index(reaction)
if reaction in removed_reactions:
reactions[index] = None

fig = px.imshow(corr_matrix,
color_continuous_scale = sns_colormap,
x = reactions, y = reactions, origin="upper")

fig.update_layout(
xaxis=dict(tickfont=dict(size=5)),
yaxis=dict(tickfont=dict(size=5)),
width=900, height=900, plot_bgcolor="rgba(0,0,0,0)")

fig.update_traces(xgap=1, ygap=1, hoverongaps=False)

fig.show()

fig_name = "CorrelationMatrix." + format
pio.write_image(fig, fig_name, scale=2)
104 changes: 61 additions & 43 deletions dingo/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import cobra
import cobra.manipulation
from collections import Counter
from dingo import MetabolicNetwork
from dingo import MetabolicNetwork, PolytopeSampler
from dingo.utils import correlated_reactions
import numpy as np


class PreProcess:
Expand Down Expand Up @@ -170,10 +172,11 @@ def reduce(self, extend=False):
When the "extend" parameter is set to True, the function performes
an additional check to remove further reactions. These reactions
are the ones that if knocked-down, they do not affect the value
of the objective function. These reactions are removed simultaneously
from the model. If this simultaneous removal produces an infesible
solution (or a solution of 0) to the objective function,
these reactions are restored with their initial bounds.
of the objective function. Reactions are removed in an ordered way.
The ones with the least overall correlation in a correlation matrix
are removed first. These reactions are removed one by one from the model.
If this removal produces an infeasible solution (or a solution of 0)
to the objective function, these reactions are restored to their initial bounds.

A dingo-type tuple is then created from the cobra model
using the "cobra_dingo_tuple" function.
Expand Down Expand Up @@ -209,50 +212,65 @@ def reduce(self, extend=False):
return self._removed_reactions, self._dingo_model

elif extend == True:

# find additional reactions with a possibility of removal
additional_removed_reactions_list = []
additional_removed_reactions_count = 0
for reaction in remained_reactions:

fba_solution_before = self._model.optimize().objective_value
reduced_dingo_model = MetabolicNetwork.from_cobra_model(self._model)
reactions = reduced_dingo_model.reactions
sampler = PolytopeSampler(reduced_dingo_model)
steady_states = sampler.generate_steady_states()

# calculate correlation matrix with additional filtering from copula indicator
corr_matrix = correlated_reactions(
steady_states,
pearson_cutoff = 0,
indicator_cutoff = 0,
cells = 10,
cop_coeff = 0.3,
lower_triangle = False)

# perform a knock-out and check the output
self._model.reactions.get_by_id(reaction).lower_bound = 0
self._model.reactions.get_by_id(reaction).upper_bound = 0

fba_solution_after = self._model.optimize().objective_value
# convert pearson values to absolute values
abs_array = abs(corr_matrix)
# sum absolute pearson values per row
sum_array = np.sum((abs_array), axis=1)
# get indices of ordered sum values
order_sum_indices = np.argsort(sum_array)

if fba_solution_after != None:
if (abs(fba_solution_after - fba_solution_before) < tol):
self._removed_reactions.append(reaction)
additional_removed_reactions_list.append(reaction)
additional_removed_reactions_count += 1

self._model.reactions.get_by_id(reaction).upper_bound = self._reaction_bounds_dict[reaction][1]
self._model.reactions.get_by_id(reaction).lower_bound = self._reaction_bounds_dict[reaction][0]
fba_solution_before = self._model.optimize().objective_value

# count additional reactions with a possibility of removal
additional_removed_reactions_count = 0

# compare FBA solution before and after the removal of additional reactions
fba_solution_initial = self._model.optimize().objective_value
self._remove_model_reactions()
fba_solution_final = self._model.optimize().objective_value


# if FBA solution after removal is infeasible or altered
# restore the initial reactions bounds
if (fba_solution_final == None) | (abs(fba_solution_final - fba_solution_initial) > tol):
for reaction in additional_removed_reactions_list:
self._model.reactions.get_by_id(reaction).bounds = self._reaction_bounds_dict[reaction]
self._removed_reactions.remove(reaction)
print(len(self._removed_reactions), "of the", len(self._initial_reactions), \
"reactions were removed from the model with extend set to", extend)

else:
print(len(self._removed_reactions), "of the", len(self._initial_reactions), \
"reactions were removed from the model with extend set to", extend)

# find additional reactions with a possibility of removal
for index in order_sum_indices:
if reactions[index] in remained_reactions:
reaction = reactions[index]

# perform a knock-out and check the output
self._model.reactions.get_by_id(reaction).lower_bound = 0
self._model.reactions.get_by_id(reaction).upper_bound = 0

try:
fba_solution_after = self._model.optimize().objective_value
if (abs(fba_solution_after - fba_solution_before) > tol):
# restore bounds
self._model.reactions.get_by_id(reaction).bounds = self._reaction_bounds_dict[reaction]

# if system has no solution
except:
self._model.reactions.get_by_id(reaction).bounds = self._reaction_bounds_dict[reaction]

finally:
if (fba_solution_after != None) and (abs(fba_solution_after - fba_solution_before) < tol):
self._removed_reactions.append(reaction)
additional_removed_reactions_count += 1
else:
# restore bounds
self._model.reactions.get_by_id(reaction).bounds = self._reaction_bounds_dict[reaction]


print(len(self._removed_reactions), "of the", len(self._initial_reactions), \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plz, see my comment below for the printed messages.

"reactions were removed from the model with extend set to", extend)
print(additional_removed_reactions_count, "additional reaction(s) removed")

# call this functon to convert cobra to dingo model
self._dingo_model = MetabolicNetwork.from_cobra_model(self._model)
return self._removed_reactions, self._dingo_model
124 changes: 124 additions & 0 deletions dingo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,127 @@ def get_matrices_of_full_dim_polytope(A, b, Aeq, beq):
print("gmscale failed to compute a good scaling.")

return A, b, N, N_shift



def correlated_reactions(steady_states, pearson_cutoff = 0.90, indicator_cutoff = 10,
cells = 10, cop_coeff = 0.3, lower_triangle = True):
"""A Python function to calculate the pearson correlation matrix of a model
and filter values based on the copula's indicator

Keyword arguments:
steady_states -- A numpy array of the generated steady states fluxes
pearson_cutoff -- A cutoff to filter reactions based on pearson coefficient
indicator_cutoff -- A cutoff to filter reactions based on indicator value
cells -- Number of cells to compute the copula
cop_coeff -- A value that narrows or widens the width of the copula's diagonal
lower_triangle -- A boolean variable that if True plots only the lower triangular matrix
"""

if cop_coeff > 0.4 or cop_coeff < 0.2:
raise Exception("Input value to cop_coeff parameter must be between 0.2 and 0.4")

# calculate coefficients to access red and blue copula mass
cop_coeff_1 = cop_coeff
cop_coeff_2 = 1 - cop_coeff
cop_coeff_3 = 1 + cop_coeff

# compute correlation matrix
corr_matrix = np.corrcoef(steady_states, rowvar=True)

# replace not assigned values with 0
corr_matrix[np.isnan(corr_matrix)] = 0

# create a copy of correlation matrix to replace/filter values
filtered_corr_matrix = corr_matrix.copy()

# find indices of correlation matrix where correlation does not occur
no_corr_indices = np.argwhere((filtered_corr_matrix < pearson_cutoff) & (filtered_corr_matrix > -pearson_cutoff))

# replace values from the correlation matrix that do not overcome
# the pearson cutoff with 0
for i in range(0, no_corr_indices.shape[0]):
index1 = no_corr_indices[i][0]
index2 = no_corr_indices[i][1]
filtered_corr_matrix[index1, index2] = 0

# if user does not provide an indicator cutoff then do not proceed
# with the filtering of the correlation matrix
if indicator_cutoff == 0:
if lower_triangle == True:
filtered_corr_matrix[np.triu_indices(filtered_corr_matrix.shape[0], 1)] = np.nan
np.fill_diagonal(filtered_corr_matrix, 1)
return filtered_corr_matrix
else:
np.fill_diagonal(filtered_corr_matrix, 1)
return filtered_corr_matrix
else:

# keep only the lower triangle
corr_matrix = np.tril(corr_matrix)
# replace diagonal values with 0
np.fill_diagonal(corr_matrix, 0)

# find indices of correlation matrix where correlation occurs
corr_indices = np.argwhere((corr_matrix > pearson_cutoff) | (corr_matrix < -pearson_cutoff))

# count reactions with positive or negative correlation based on indicator
positive = 0
negative = 0

# compute copula for each set of correlated reactions
for i in range(0, corr_indices.shape[0]):

index1 = corr_indices[i][0]
index2 = corr_indices[i][1]

flux1 = steady_states[index1]
flux2 = steady_states[index2]

copula = compute_copula(flux1, flux2, cells)
rows, cols = copula.shape

red_mass = 0
blue_mass = 0
indicator = 0

for row in range(rows):
for col in range(cols):
# values in the diagonal
if ((row-col >= -cop_coeff_1*rows) & (row-col <= cop_coeff_1*rows)):
# values near the top left and bottom right corner
if ((row+col < cop_coeff_2*rows) | (row+col > cop_coeff_3*rows)):
red_mass = red_mass + copula[row][col]
else:
# values near the top right and bottom left corner
if ((row+col >= cop_coeff_2*rows-1) & (row+col <= cop_coeff_3*rows-1)):
blue_mass = blue_mass + copula[row][col]

indicator = (red_mass+1e-9) / (blue_mass+1e-9)

# classify specific pair of reactions as positive or negative
# correlated based on indicator cutoff
if indicator > indicator_cutoff:
positive += 1
elif indicator < 1/indicator_cutoff:
negative += 1
# if they do not overcome the cutoff replace their corresponding
# value in the correlation matrix with 0
else:
filtered_corr_matrix[index1, index2] = 0
filtered_corr_matrix[index2, index1] = 0

print("Completed process of",i+1,"from",corr_indices.shape[0],"copulas")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the prints should ideally requested by the user and be off by default.
I'd use a flag verbose: bool as an input variable with False as default value. Then, print any message only if verbose is true.



print(positive, "out of", i+1, "copulas were positive correlated based on copula indicator")
print(negative, "out of", i+1, "copulas were negative correlated based on copula indicator")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The prints should be requested by the user I think. These numbers could be part of the output as with a label for each copula: "positive" or "negative"

Copy link
Contributor Author

@SotirisTouliopoulos SotirisTouliopoulos Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a dictionary of this format that is returned

{'ACONTb_ACONTa': {'pearson': 0.9999999999999996, 'indicator': 700000000.9999999, 'classification': 'positive'},
 'ACt2r_ACKr': {'pearson': 1.0, 'indicator': 700000000.9999999, 'classification': 'positive'}}

if lower_triangle == True:
filtered_corr_matrix[np.triu_indices(filtered_corr_matrix.shape[0], 1)] = np.nan
np.fill_diagonal(filtered_corr_matrix, 1)
return filtered_corr_matrix

else:
np.fill_diagonal(filtered_corr_matrix, 1)
return filtered_corr_matrix
50 changes: 50 additions & 0 deletions tests/correlation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@

from dingo.utils import correlated_reactions
from dingo import MetabolicNetwork, PolytopeSampler
import numpy as np
import unittest

class TestCorrelation(unittest.TestCase):

def test_correlation(self):

dingo_model = MetabolicNetwork.from_json('ext_data/e_coli_core.json')
reactions = dingo_model.reactions

sampler = PolytopeSampler(dingo_model)
steady_states = sampler.generate_steady_states()

# calculate correlation matrix without filtering from copula indicator
corr_matrix = correlated_reactions(steady_states,
indicator_cutoff=2,
pearson_cutoff = 0.99999,
lower_triangle = False)

# sum values in the diagonal of the correlation matrix ==> 95*pearson ==> 95*1
self.assertTrue(np.trace(corr_matrix) == len(reactions))
# rows and columns must be equal to model reactions
self.assertTrue(corr_matrix.shape[0] == len(reactions))
self.assertTrue(corr_matrix.shape[1] == len(reactions))


dingo_model = MetabolicNetwork.from_json('ext_data/e_coli_core.json')
reactions = dingo_model.reactions

sampler = PolytopeSampler(dingo_model)
steady_states = sampler.generate_steady_states()

# calculate correlation matrix without filtering from copula indicator
corr_matrix = correlated_reactions(steady_states,
indicator_cutoff=0,
pearson_cutoff = 0,
lower_triangle = True)

# sum values in the diagonal of the correlation matrix ==> 95*pearson ==> 95*1
self.assertTrue(np.trace(corr_matrix) == len(reactions))
# rows and columns must be equal to model reactions
self.assertTrue(corr_matrix.shape[0] == len(reactions))
self.assertTrue(corr_matrix.shape[1] == len(reactions))


if __name__ == "__main__":
unittest.main()
4 changes: 2 additions & 2 deletions tests/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ def test_preprocess(self):

# calculate the count of removed reactions with extend set to True
removed_reactions_count = len(removed_reactions)
self.assertTrue( 47 - removed_reactions_count == 0 )
self.assertTrue( 46 - removed_reactions_count <= 0 )

# calculate the count of reactions with bounds equal to 0
# with extend set to True from the dingo model
dingo_removed_reactions = np.sum((final_dingo_model.lb == 0) & (final_dingo_model.ub == 0))
self.assertTrue( 47 - dingo_removed_reactions == 0 )
self.assertTrue( 46 - dingo_removed_reactions <= 0 )

# perform an FBA to check the result after reactions removal
final_fba_solution = final_dingo_model.fba()[1]
Expand Down
Loading