-
Notifications
You must be signed in to change notification settings - Fork 0
/
helper.py
86 lines (74 loc) · 3.22 KB
/
helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import json
import math
import requests
import constants
import numpy as np
from nltk import sent_tokenize
from scipy.spatial import distance
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.preprocessing import MinMaxScaler
def get_sentence_from_text(text):
sentence_crude_list = list(sent_tokenize(text.lower()))
sentence_list = [i.strip() for i in sentence_crude_list]
return sentence_list
def get_text_vector(text: str):
payload = json.dumps({
"text": text
})
headers = {
'Content-Type': 'application/json'
}
response = requests.post(url=f"http://{constants.INTELLIGENCE_HOST}:{constants.INTELLIGENCE_PORT}/vectorizer",
headers=headers, data=payload)
return response.json()["contextual_vector"]
def get_embedding_from_sentence(data, normalise=False):
sentence_embeddings = [get_text_vector(i) for i in data]
if normalise:
sentence_embeddings_norm = sentence_embeddings / np.linalg.norm(sentence_embeddings,
axis=1, keepdims=True)
return sentence_embeddings_norm
return sentence_embeddings
def run_pca_on_embedding(embedding, n_components=None, variance_to_explain=0.95):
if n_components is not None:
pca = PCA(n_components=n_components)
else:
pca = PCA()
pca_components = pca.fit_transform(embedding)
try:
get_most_variation = np.where(np.cumsum(pca.explained_variance_ratio_) > variance_to_explain)[0][0]
except IndexError:
get_most_variation = embedding.shape[0]
pca_components_subset = pca_components[:, 0:get_most_variation]
return pca_components_subset
def get_optimum_kmeans_cluster_number(embeddings, max_cluster=None):
# https://jwcn-eurasipjournals.springeropen.com/articles/10.1186/s13638-021-01910-w
if max_cluster is None:
max_cluster = int(math.sqrt(len(embeddings)))
inertia_mapping = {}
max_cluster = max(4, max_cluster)
for n_cluster in range(1, max_cluster):
clustering_model = KMeans(n_clusters=n_cluster, random_state=constants.RANDOM_STATE)
clustering_model.fit(embeddings)
inertia_mapping[n_cluster] = clustering_model.inertia_ / len(embeddings)
n_vector = list(inertia_mapping.keys())
k_vector = list(MinMaxScaler((0, 10)).fit_transform(np.array(
list(inertia_mapping.values())).reshape(-1, 1)).reshape(len(inertia_mapping), ))
elbow_points = list(zip(k_vector, n_vector))
result = []
for i in range(0, len(elbow_points) - 2):
p1 = elbow_points[i]
p2 = elbow_points[i + 1]
p3 = elbow_points[i + 2]
a = distance.euclidean(p1, p2)
b = distance.euclidean(p2, p3)
c = distance.euclidean(p3, p1)
result.append(math.acos((a ** 2 + b ** 2 - c ** 2) / (2 * a * b)))
return np.argmin(result) + 2
def cluster_kmeans(embeddings, clustering_model=None, n_cluster=3):
if clustering_model is None:
clustering_model = KMeans(n_clusters=n_cluster)
clustering_model.fit(embeddings)
cluster_assignment = clustering_model.labels_
cluster_centroids = clustering_model.cluster_centers_
return cluster_assignment, cluster_centroids