Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Functions for correlation matrix, copula indicator and plotting #103

Merged
merged 17 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions dingo/illustrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import plotly.io as pio
import plotly.express as px
from dingo.utils import compute_copula

def plot_copula(data_flux1, data_flux2, n = 5, width = 900 , height = 600, export_format = "svg"):
Expand Down Expand Up @@ -82,3 +83,49 @@ def plot_histogram(reaction_fluxes, reaction, n_bins=40):
plt.axis([np.amin(reaction_fluxes), np.amax(reaction_fluxes), 0, np.amax(n) * 1.2])

plt.show()



def plot_corr_matrix(corr_matrix, reactions, removed_reactions=[], format="svg"):
"""A Python function to plot the heatmap of a model's pearson correlation matrix.

Keyword arguments:
corr_matrix -- A matrix produced from the "correlated_reactions" function
reactions -- A list with the model's reactions
removed_reactions -- A list with the removed reactions in case of a preprocess.
If provided removed reactions are not plotted.
"""

sns_colormap = [[0.0, '#3f7f93'],
[0.1, '#6397a7'],
[0.2, '#88b1bd'],
[0.3, '#acc9d2'],
[0.4, '#d1e2e7'],
[0.5, '#f2f2f2'],
[0.6, '#f6cdd0'],
[0.7, '#efa8ad'],
[0.8, '#e8848b'],
[0.9, '#e15e68'],
[1.0, '#da3b46']]

if removed_reactions != 0:
for reaction in reactions:
index = reactions.index(reaction)
if reaction in removed_reactions:
reactions[index] = None

fig = px.imshow(corr_matrix,
color_continuous_scale = sns_colormap,
x = reactions, y = reactions, origin="upper")

fig.update_layout(
xaxis=dict(tickfont=dict(size=5)),
yaxis=dict(tickfont=dict(size=5)),
width=900, height=900, plot_bgcolor="rgba(0,0,0,0)")

fig.update_traces(xgap=1, ygap=1, hoverongaps=False)

fig.show()

fig_name = "CorrelationMatrix." + format
pio.write_image(fig, fig_name, scale=2)
133 changes: 79 additions & 54 deletions dingo/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,39 @@
import cobra
import cobra.manipulation
from collections import Counter
from dingo import MetabolicNetwork
from dingo import MetabolicNetwork, PolytopeSampler
from dingo.utils import correlated_reactions
import numpy as np


class PreProcess:

def __init__(self, model, tol=1e-6, open_exchanges=False):
def __init__(self, model, tol = 1e-6, open_exchanges = False, verbose = False):

"""
model parameter gets a cobra model as input
model -- parameter gets a cobra model as input

tol parameter gets a cutoff value used to classify
zero-flux and mle reactions and compare FBA solutions
before and after reactions removal
tol -- parameter gets a cutoff value used to classify
zero-flux and mle reactions and compare FBA solutions
before and after reactions removal

open_exchanges parameter is used in the function that identifies blocked reactions
It controls whether or not to open all exchange reactions to very high flux ranges
open_exchanges -- parameter is used in the function that identifies blocked reactions
It controls whether or not to open all exchange reactions
to very high flux ranges.

verbose -- A boolean type variable that if True
additional information for preprocess is printed.
"""

self._model = model
self._tol = tol

if self._tol > 1e-6:
print("Tolerance value set to",self._tol,"while default value is 1e-6. A looser check will be performed")


self._open_exchanges = open_exchanges
self._verbose = verbose

if self._tol > 1e-6 and verbose == True:
print("Tolerance value set to",self._tol,"while default value is 1e-6. A looser check will be performed")

self._objective = self._objective_function()
self._initial_reactions = self._initial()
self._reaction_bounds_dict = self._reaction_bounds_dictionary()
Expand Down Expand Up @@ -170,10 +177,11 @@ def reduce(self, extend=False):
When the "extend" parameter is set to True, the function performes
an additional check to remove further reactions. These reactions
are the ones that if knocked-down, they do not affect the value
of the objective function. These reactions are removed simultaneously
from the model. If this simultaneous removal produces an infesible
solution (or a solution of 0) to the objective function,
these reactions are restored with their initial bounds.
of the objective function. Reactions are removed in an ordered way.
The ones with the least overall correlation in a correlation matrix
are removed first. These reactions are removed one by one from the model.
If this removal produces an infeasible solution (or a solution of 0)
to the objective function, these reactions are restored to their initial bounds.

A dingo-type tuple is then created from the cobra model
using the "cobra_dingo_tuple" function.
Expand Down Expand Up @@ -201,58 +209,75 @@ def reduce(self, extend=False):

elif extend == False:

print(len(self._removed_reactions), "of the", len(self._initial_reactions), \
"reactions were removed from the model with extend set to", extend)
if self._verbose == True:
print(len(self._removed_reactions), "of the", len(self._initial_reactions), \
"reactions were removed from the model with extend set to", extend)

# call this functon to convert cobra to dingo model
self._dingo_model = MetabolicNetwork.from_cobra_model(self._model)
return self._removed_reactions, self._dingo_model

elif extend == True:

# find additional reactions with a possibility of removal
additional_removed_reactions_list = []
additional_removed_reactions_count = 0
for reaction in remained_reactions:

fba_solution_before = self._model.optimize().objective_value
reduced_dingo_model = MetabolicNetwork.from_cobra_model(self._model)
reactions = reduced_dingo_model.reactions
sampler = PolytopeSampler(reduced_dingo_model)
steady_states = sampler.generate_steady_states()

# calculate correlation matrix with additional filtering from copula indicator
corr_matrix = correlated_reactions(
steady_states,
pearson_cutoff = 0,
indicator_cutoff = 0,
cells = 10,
cop_coeff = 0.3,
lower_triangle = False)

# perform a knock-out and check the output
self._model.reactions.get_by_id(reaction).lower_bound = 0
self._model.reactions.get_by_id(reaction).upper_bound = 0

fba_solution_after = self._model.optimize().objective_value
# convert pearson values to absolute values
abs_array = abs(corr_matrix)
# sum absolute pearson values per row
sum_array = np.sum((abs_array), axis=1)
# get indices of ordered sum values
order_sum_indices = np.argsort(sum_array)

if fba_solution_after != None:
if (abs(fba_solution_after - fba_solution_before) < tol):
self._removed_reactions.append(reaction)
additional_removed_reactions_list.append(reaction)
additional_removed_reactions_count += 1

self._model.reactions.get_by_id(reaction).upper_bound = self._reaction_bounds_dict[reaction][1]
self._model.reactions.get_by_id(reaction).lower_bound = self._reaction_bounds_dict[reaction][0]
fba_solution_before = self._model.optimize().objective_value

# count additional reactions with a possibility of removal
additional_removed_reactions_count = 0

# compare FBA solution before and after the removal of additional reactions
fba_solution_initial = self._model.optimize().objective_value
self._remove_model_reactions()
fba_solution_final = self._model.optimize().objective_value


# if FBA solution after removal is infeasible or altered
# restore the initial reactions bounds
if (fba_solution_final == None) | (abs(fba_solution_final - fba_solution_initial) > tol):
for reaction in additional_removed_reactions_list:
self._model.reactions.get_by_id(reaction).bounds = self._reaction_bounds_dict[reaction]
self._removed_reactions.remove(reaction)
print(len(self._removed_reactions), "of the", len(self._initial_reactions), \
"reactions were removed from the model with extend set to", extend)
# find additional reactions with a possibility of removal
for index in order_sum_indices:
if reactions[index] in remained_reactions:
reaction = reactions[index]

# perform a knock-out and check the output
self._model.reactions.get_by_id(reaction).lower_bound = 0
self._model.reactions.get_by_id(reaction).upper_bound = 0

try:
fba_solution_after = self._model.optimize().objective_value
if (abs(fba_solution_after - fba_solution_before) > tol):
# restore bounds
self._model.reactions.get_by_id(reaction).bounds = self._reaction_bounds_dict[reaction]

# if system has no solution
except:
self._model.reactions.get_by_id(reaction).bounds = self._reaction_bounds_dict[reaction]

finally:
if (fba_solution_after != None) and (abs(fba_solution_after - fba_solution_before) < tol):
self._removed_reactions.append(reaction)
additional_removed_reactions_count += 1
else:
# restore bounds
self._model.reactions.get_by_id(reaction).bounds = self._reaction_bounds_dict[reaction]

else:

if self._verbose == True:
print(len(self._removed_reactions), "of the", len(self._initial_reactions), \
"reactions were removed from the model with extend set to", extend)


print(additional_removed_reactions_count, "additional reaction(s) removed")
# call this functon to convert cobra to dingo model
self._dingo_model = MetabolicNetwork.from_cobra_model(self._model)
return self._removed_reactions, self._dingo_model
140 changes: 140 additions & 0 deletions dingo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,143 @@ def get_matrices_of_full_dim_polytope(A, b, Aeq, beq):
print("gmscale failed to compute a good scaling.")

return A, b, N, N_shift



def correlated_reactions(steady_states, reactions=[], pearson_cutoff = 0.90, indicator_cutoff = 10,
cells = 10, cop_coeff = 0.3, lower_triangle = True, verbose = False):
"""A Python function to calculate the pearson correlation matrix of a model
and filter values based on the copula's indicator

Keyword arguments:
steady_states -- A numpy array of the generated steady states fluxes
reactions -- A list with the model's reactions
pearson_cutoff -- A cutoff to filter reactions based on pearson coefficient
indicator_cutoff -- A cutoff to filter reactions based on indicator value
cells -- Number of cells to compute the copula
cop_coeff -- A value that narrows or widens the width of the copula's diagonal
lower_triangle -- A boolean variable that if True plots only the lower triangular matrix
verbose -- A boolean variable that if True additional information is printed as an output.
"""

if cop_coeff > 0.4 or cop_coeff < 0.2:
raise Exception("Input value to cop_coeff parameter must be between 0.2 and 0.4")

# calculate coefficients to access red and blue copula mass
cop_coeff_1 = cop_coeff
cop_coeff_2 = 1 - cop_coeff
cop_coeff_3 = 1 + cop_coeff

# compute correlation matrix
corr_matrix = np.corrcoef(steady_states, rowvar=True)

# replace not assigned values with 0
corr_matrix[np.isnan(corr_matrix)] = 0

# create a copy of correlation matrix to replace/filter values
filtered_corr_matrix = corr_matrix.copy()

# find indices of correlation matrix where correlation does not occur
no_corr_indices = np.argwhere((filtered_corr_matrix < pearson_cutoff) & (filtered_corr_matrix > -pearson_cutoff))

# replace values from the correlation matrix that do not overcome
# the pearson cutoff with 0
for i in range(0, no_corr_indices.shape[0]):
index1 = no_corr_indices[i][0]
index2 = no_corr_indices[i][1]

filtered_corr_matrix[index1, index2] = 0

# if user does not provide an indicator cutoff then do not proceed
# with the filtering of the correlation matrix
if indicator_cutoff == 0:
if lower_triangle == True:
filtered_corr_matrix[np.triu_indices(filtered_corr_matrix.shape[0], 1)] = np.nan
np.fill_diagonal(filtered_corr_matrix, 1)
return filtered_corr_matrix
else:
np.fill_diagonal(filtered_corr_matrix, 1)
return filtered_corr_matrix
else:
# a dictionary that will store for each filtered reaction combination,
# the pearson correlation value, the copula's indicator value
# and the correlation classification
indicator_dict = {}

# keep only the lower triangle
corr_matrix = np.tril(corr_matrix)
# replace diagonal values with 0
np.fill_diagonal(corr_matrix, 0)

# find indices of correlation matrix where correlation occurs
corr_indices = np.argwhere((corr_matrix > pearson_cutoff) | (corr_matrix < -pearson_cutoff))

# compute copula for each set of correlated reactions
for i in range(0, corr_indices.shape[0]):

index1 = corr_indices[i][0]
index2 = corr_indices[i][1]

reaction1 = reactions[index1]
reaction2 = reactions[index2]

flux1 = steady_states[index1]
flux2 = steady_states[index2]

copula = compute_copula(flux1, flux2, cells)
rows, cols = copula.shape

red_mass = 0
blue_mass = 0
indicator = 0

for row in range(rows):
for col in range(cols):
# values in the diagonal
if ((row-col >= -cop_coeff_1*rows) & (row-col <= cop_coeff_1*rows)):
# values near the top left and bottom right corner
if ((row+col < cop_coeff_2*rows) | (row+col > cop_coeff_3*rows)):
red_mass = red_mass + copula[row][col]
else:
# values near the top right and bottom left corner
if ((row+col >= cop_coeff_2*rows-1) & (row+col <= cop_coeff_3*rows-1)):
blue_mass = blue_mass + copula[row][col]

indicator = (red_mass+1e-9) / (blue_mass+1e-9)

# classify specific pair of reactions as positive or negative
# correlated based on indicator cutoff
if indicator > indicator_cutoff:
pearson = filtered_corr_matrix[index1, index2]
indicator_dict[reaction1 + "~" + reaction2] = {'pearson': pearson,
'indicator': indicator,
'classification': "positive"}

elif indicator < 1/indicator_cutoff:
pearson = filtered_corr_matrix[index1, index2]
indicator_dict[reaction1 + "~" + reaction2] = {'pearson': pearson,
'indicator': indicator,
'classification': "negative"}

# if they do not overcome the cutoff replace their corresponding
# value in the correlation matrix with 0
else:
filtered_corr_matrix[index1, index2] = 0
filtered_corr_matrix[index2, index1] = 0
pearson = filtered_corr_matrix[index1, index2]
indicator_dict[reaction1 + "~" + reaction2] = {'pearson': pearson,
'indicator': indicator,
'classification': "no correlation"}

if verbose == True:
print("Completed process of",i+1,"from",corr_indices.shape[0],"copulas")

if lower_triangle == True:
filtered_corr_matrix[np.triu_indices(filtered_corr_matrix.shape[0], 1)] = np.nan
np.fill_diagonal(filtered_corr_matrix, 1)
return filtered_corr_matrix, indicator_dict

else:
np.fill_diagonal(filtered_corr_matrix, 1)
return filtered_corr_matrix, indicator_dict

Loading
Loading