-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCOP_Kmeans.py
159 lines (129 loc) · 5.83 KB
/
COP_Kmeans.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import cv2
import random
import numpy as np
np.seterr(divide='ignore', invalid='ignore')
class COP_KMeans:
def __init__(self, k, ml, cl, f_name, tolerance=0.00001, max_iterations=3):
self.k = k
self.ml = ml
self.cl = cl
self.f_name = f_name
self.tolerance = tolerance
self.max_iterations = max_iterations
def fit(self, data, initial_method='random'):
self.centroids = {}
# initialize the centroids, the random 'k' elements in the dataset will be our initial centroids
self.centroids = self.initialize_centers(
data, self.k, initial_method) # return dic
# begin iterations
for i in range(self.max_iterations):
# print('for :',i)
self.is_clustered = [-1] * len(data)
self.clusters = {} # {0:[] , 1:[] , ... , k:[]}
for i in range(self.k):
self.clusters[i] = set()
# find the distance between the point and cluster; choose the nearest centroid
for x_index in range(len(data)):
if self.f_name == "SIFT":
bf = cv2.BFMatcher(cv2.NORM_L2, crossCheck=True)
distances = {center_index: len(sorted(bf.match(
data[x_index], self.centroids[center_index]), key=lambda x: x.distance))for center_index in self.centroids}
sorted_distances = sorted(
distances.items(), key=lambda kv: kv[1], reverse=True)
else:
distances = {center_index: np.linalg.norm(
data[x_index] - self.centroids[center_index])for center_index in self.centroids}
sorted_distances = sorted(
distances.items(), key=lambda kv: kv[1])
empty_flag = True
for center_index, dis_value in sorted_distances: # ??????
vc_result = self.violate_constraints(
x_index, center_index, self.ml, self.cl) # boolean
# vc_result = False
if not vc_result:
self.clusters[center_index].add(x_index)
self.is_clustered[x_index] = center_index
for j in self.ml[x_index]:
self.is_clustered[j] = center_index
empty_flag = False
break
if empty_flag:
return "Clustering Not Found Exception"
previous = dict(self.centroids)
# average the cluster data points to re-calculate the centroids
for _center in self.clusters:
lst = []
for index_value in self.clusters[_center]:
lst.append(data[index_value])
avgArr = np.array(lst)
if len(lst) != 0:
self.centroids[_center] = np.average(
avgArr, axis=0) # ????????????
isOptimal = True
for centroid in self.centroids:
original_centroid = previous[centroid]
curr = self.centroids[centroid]
sum = np.sum((curr - original_centroid) /
original_centroid * 100.0)
if sum > self.tolerance:
isOptimal = False
if isOptimal:
break
def violate_constraints(self, data_index, cluster_index, ml, cl):
for i in ml[data_index]:
if self.is_clustered[i] != -1 and self.is_clustered[i] != cluster_index:
return True
for i in cl[data_index]:
if self.is_clustered[i] != -1 and self.is_clustered[i] == cluster_index:
return True
return False
def decoder(self, x_data, y_data):
# for i, j in zip(d.values(), y):
# if i != j:
# sum += 1
print(self.clusters)
def initialize_centers(self, dataset, k, method):
if method == 'random':
# print(method)
c = {}
index_list = list(range(len(dataset)))
random.shuffle(index_list)
for i in range(k):
# center is index of data point in datas {0:data[25] , 1:72 , 2:124}
c[i] = dataset[index_list[i]]
elif method == 'km++':
chances = [1] * len(dataset)
centers = []
c = {}
for i in range(k):
chances = [x / sum(chances) for x in chances]
r = random.random()
acc = 0.0
for index, chance in enumerate(chances):
if acc + chance >= r:
break
acc += chance
centers.append(dataset[index])
c[i] = dataset[index]
for index, point in enumerate(dataset):
# cids, distances = closest_clusters(centers, point)
distances = {center_index: np.linalg.norm(
point - c[center_index]) for center_index in c}
sorted_distances = sorted(
distances.items(), key=lambda kv: kv[1])
# chances[index] = distances[cids[0]]
chances[index] = sorted_distances[0][1]
return c
def pred(self, data):
distances = [np.linalg.norm(data - self.centroids[centroid])
for centroid in self.centroids]
classification = distances.index(min(distances))
return classification
def predict(self, data):
labels = []
for row in data:
dist = [np.linalg.norm(row - self.centroids[centroid])
for centroid in self.centroids]
classification = dist.index(min(dist))
labels.append(classification)
return labels