Skip to content

Commit

Permalink
new methods
Browse files Browse the repository at this point in the history
  • Loading branch information
ctrlaltaf committed Sep 5, 2024
1 parent 5a6b4cb commit a44a4e9
Show file tree
Hide file tree
Showing 6 changed files with 352 additions and 44 deletions.
171 changes: 171 additions & 0 deletions classes/hypergeometric_distribution_class_V3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
from classes.base_algorithm_class import BaseAlgorithm
import networkx as nx
import pandas as pd
from colorama import init as colorama_init
from colorama import Fore, Back, Style
from pathlib import Path
import math
from tools.helper import print_progress, normalize, import_graph_from_pickle
from tools.workflow import get_datasets


class HypergeometricDistributionV3(BaseAlgorithm):
def __init__(self):
self.y_score = []
self.y_true = []

def get_y_score(self):
return self.y_score

def get_y_true(self):
return self.y_true

def set_y_score(self, y_score):
self.y_score = y_score

def set_y_true(self, y_true):
self.y_true = y_true

def predict(
self,
input_directory_path,
graph_file_path,
output_path,
rep_num,
name,
):
"""
Uses a Hypergeometric distribution to calculate a confidence value for the relationship between a protein of
interest and a GO term. Does not include protein of interest in calculations.
"""
colorama_init()

# have two sets of positive and negative protein-go_term pairs
# for each pair, calculate the score of how well they predict whether a protein should be annotated to a GO term.
# 50% of the data are proteins that are annotated to a GO term
# 50% of the data are proteins that are not annotated to a GO term

data = {
"protein": [],
"go_term": [],
"protein_neighbor": [],
"go_neighbor": [],
"go_annotated_protein_neighbors": [],
"score": [],
"norm_score": [],
"true_label": [],
}

positive_dataset, negative_dataset = get_datasets(input_directory_path, rep_num, name)
G = import_graph_from_pickle(graph_file_path)

i = 1
for positive_protein, positive_go, negative_protein, negative_go in zip(
positive_dataset["protein"],
positive_dataset["go"],
negative_dataset["protein"],
negative_dataset["go"],
):

# calculate the score for the positive set
positive_protein_neighbor = get_neighbors(
G, positive_protein, ["protein_protein", "regulatory"]
)
positive_go_neighbor = get_neighbors(G, positive_go, ["protein_go_term"])
positive_go_annotated_protein_neighbor_count = (
get_go_annotated_protein_neighbor_count(
G, positive_protein_neighbor, positive_go
)
)

N = len([x for x,y in G.nodes(data=True) if y['type']=="protein"]) #Total number of protein nodes in the entire graph
pos_n = len(positive_protein_neighbor) #Number of protein neighbors the protein of interest has
K = len(positive_go_neighbor) - 1 #Number of protein neighbors the GO term of interest has, same for pos & neg, does not include protein of interest (but does not change significantly if protein is included)
pos_k = positive_go_annotated_protein_neighbor_count #The overlap between the GO protein neighbors and protein neighbors of the protein of interest

if pos_n == 0:
positive_score = 0
else:
#The hypergeometric function using variables above, math.comb(n,k) is an n choose k function
positive_score = 1 - ((math.comb(K,pos_k)*math.comb(N-K,pos_n-pos_k))/math.comb(N,pos_n))

# calculate the score for the negative set
negative_protein_neighbor = get_neighbors(
G, negative_protein, "protein_protein"
)
negative_go_neighbor = get_neighbors(G, negative_go, "protein_go_term")
negative_go_annotated_protein_neighbor_count = (
get_go_annotated_protein_neighbor_count(
G, negative_protein_neighbor, negative_go
)
)

neg_n = len(negative_protein_neighbor) #Negative protein of interest neighbors
neg_k = negative_go_annotated_protein_neighbor_count #Overlap between go neighbors and protein neighbors (should be fewer for neg than pos)

if neg_n == 0:
negative_score = 0
else:
negative_score = 1 - ((math.comb(K + 1,neg_k)*math.comb(N-K + 1,neg_n-neg_k))/math.comb(N,neg_n))


# input positive and negative score to data
data["protein"].append(positive_protein)
data["go_term"].append(positive_go)
data["protein_neighbor"].append(len(positive_protein_neighbor))
data["go_neighbor"].append(len(positive_go_neighbor))
data["go_annotated_protein_neighbors"].append(
positive_go_annotated_protein_neighbor_count
)
data["score"].append(positive_score)
data["true_label"].append(1)

data["protein"].append(negative_protein)
data["go_term"].append(negative_go)
data["protein_neighbor"].append(len(negative_protein_neighbor))
data["go_neighbor"].append(len(negative_go_neighbor))
data["go_annotated_protein_neighbors"].append(
negative_go_annotated_protein_neighbor_count
)
data["score"].append(negative_score)
data["true_label"].append(0)

print_progress(i, len(positive_dataset["protein"]))
i += 1

normalized_data = normalize(data["score"])
for item in normalized_data:
data["norm_score"].append(item)

df = pd.DataFrame(data)
df = df.sort_values(by="norm_score", ascending=False)

df.to_csv(
Path(output_path, "hypergeometric_distribution.csv"),
index=False,
sep="\t",
)

y_score = df["norm_score"].to_list()
y_true = df["true_label"].to_list()

return y_score, y_true


def get_neighbors(G: nx.DiGraph, node, edgeTypes):
res = G.edges(node, data=True)
neighbors = []
for edge in res:
if edge[2]["type"] in edgeTypes:
neighborNode = [edge[1], edge[2]]
neighbors.append(neighborNode)

return neighbors


def get_go_annotated_protein_neighbor_count(G: nx.Graph, nodeList, goTerm):
count = 0
for element in nodeList:
if G.has_edge(element[0], goTerm):
count += 1
return count
167 changes: 167 additions & 0 deletions classes/one_hop_go_degree_class.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
from classes.base_algorithm_class import BaseAlgorithm
import networkx as nx
import pandas as pd
from colorama import init as colorama_init
from colorama import Fore, Back, Style
from pathlib import Path
from tools.helper import print_progress, normalize, import_graph_from_pickle
from tools.workflow import get_datasets


class OneHopGODegree(BaseAlgorithm):
def __init__(self):
self.y_score = []
self.y_true = []

def get_y_score(self):
return self.y_score

def get_y_true(self):
return self.y_true

def set_y_score(self, y_score):
self.y_score = y_score

def set_y_true(self, y_true):
self.y_true = y_true

def predict(
self,
input_directory_path,
graph_file_path,
output_path,
rep_num,
name,
):
"""
evaluate overlapping neighbors method on a protein protein interaction network with go term annotation.
"""
colorama_init()

# have two sets of positive and negative protein-go_term pairs
# for each pair, calculate the score of how well they predict whether a protein should be annotated to a GO term.
# 50% of the data are proteins that are annotated to a GO term
# 50% of the data are proteins that are not annotated to a GO term
# score equation (1 + number of ProProNeighbor that are annotated to the go term) / (number of ProProNeighbor + number of GoNeighbor)

data = {
"protein": [],
"go_term": [],
"protein_neighbor": [],
"go_neighbor": [],
"go_annotated_protein_neighbors": [],
"score": [],
"norm_score": [],
"true_label": [],
}

positive_dataset, negative_dataset = get_datasets(
input_directory_path, rep_num, name
)
G = import_graph_from_pickle(graph_file_path)
i = 1
for positive_protein, positive_go in zip(
positive_dataset["protein"],
positive_dataset["go"],
):
# calculate the score for the positive set
positive_protein_neighbor = get_neighbors(
G, positive_protein, ["protein_protein", "regulatory"]
)

positive_go_neighbor = get_neighbors(G, positive_go, ["protein_go_term"])
positive_go_annotated_protein_neighbor_count = (
get_go_annotated_protein_neighbor_count(
G, positive_protein_neighbor, positive_go
)
)

if len(positive_protein_neighbor) == 0:
positive_score = 0
else:
positive_score = positive_go_annotated_protein_neighbor_count

# input positive and negative score to data
data["protein"].append(positive_protein)
data["go_term"].append(positive_go)
data["protein_neighbor"].append(len(positive_protein_neighbor))
data["go_neighbor"].append(len(positive_go_neighbor))
data["go_annotated_protein_neighbors"].append(
positive_go_annotated_protein_neighbor_count
)
data["score"].append(positive_score)
data["true_label"].append(1)

print_progress(i, len(positive_dataset["protein"]))
i += 1

for negative_protein, negative_go in zip(
negative_dataset["protein"],
negative_dataset["go"],
):

# calculate the score for the negative set
negative_protein_neighbor = get_neighbors(
G, negative_protein, "protein_protein"
)
negative_go_neighbor = get_neighbors(G, negative_go, "protein_go_term")
negative_go_annotated_protein_neighbor_count = (
get_go_annotated_protein_neighbor_count(
G, negative_protein_neighbor, negative_go
)
)

if len(negative_protein_neighbor) == 0:
negative_score = 0
else:
negative_score = negative_go_annotated_protein_neighbor_count

data["protein"].append(negative_protein)
data["go_term"].append(negative_go)
data["protein_neighbor"].append(len(negative_protein_neighbor))
data["go_neighbor"].append(len(negative_go_neighbor))
data["go_annotated_protein_neighbors"].append(
negative_go_annotated_protein_neighbor_count
)
data["score"].append(negative_score)
data["true_label"].append(0)

print_progress(i, len(negative_dataset["protein"]))
i += 1

normalized_data = normalize(data["score"])
for item in normalized_data:
data["norm_score"].append(item)

df = pd.DataFrame(data)
df = df.sort_values(by="norm_score", ascending=False)

df.to_csv(
Path(output_path, "overlapping_neighbor_data.csv"),
index=False,
sep="\t",
)

y_score = df["norm_score"].to_list()
y_true = df["true_label"].to_list()

return y_score, y_true


def get_neighbors(G: nx.DiGraph, node, edgeTypes):
res = G.edges(node, data=True)
neighbors = []
for edge in res:
if edge[2]["type"] in edgeTypes:
neighborNode = [edge[1], edge[2]]
neighbors.append(neighborNode)

return neighbors


def get_go_annotated_protein_neighbor_count(G: nx.DiGraph, nodeList, goTerm):
count = 0
for element in nodeList:
if G.has_edge(element[0], goTerm):
count += 1
return count
34 changes: 0 additions & 34 deletions classes/random_walk_class_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,40 +107,6 @@ def predict(
print_progress(i, len(negative_dataset["protein"]))
i += 1

# for positive_protein, positive_go, negative_protein, negative_go in zip(
# positive_dataset["protein"],
# positive_dataset["go"],
# negative_dataset["protein"],
# negative_dataset["go"],
# ):
# go_neighbors = get_neighbors(G, positive_go, "protein_go_term")

# go_neighbor_dict = {}
# for j in go_neighbors:
# if j[0] != positive_protein:
# go_neighbor_dict[j[0]] = 1
# G.remove_edge(j[0], positive_go)
# if len(go_neighbor_dict) != 0:
# p = nx.pagerank(G, alpha=0.7, personalization=go_neighbor_dict)
# data["walk"].append(p[positive_protein])
# data["walk"].append(p[negative_protein])
# else:
# data["walk"].append(0)
# data["walk"].append(0)

# data["protein"].append(positive_protein)
# data["go_term"].append(positive_go)
# data["true_label"].append(1)

# data["protein"].append(negative_protein)
# data["go_term"].append(negative_go)
# data["true_label"].append(0)
# for j in go_neighbors:
# G.add_edge(j[0], positive_go, type="protein_go_term")

# print_progress(i, len(positive_dataset["protein"]))
# i += 1

normalized_data = normalize(data["walk"])
for item in normalized_data:
data["norm_score"].append(item)
Expand Down
Loading

0 comments on commit a44a4e9

Please sign in to comment.