-
Notifications
You must be signed in to change notification settings - Fork 336
/
Copy pathAppFreeLink.py
145 lines (123 loc) · 4.48 KB
/
AppFreeLink.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""
@Author: Du Yunhao
@Filename: AFLink.py
@Contact: [email protected]
@Time: 2021/12/28 19:55
@Discription: Appearance-Free Post Link
"""
from collections import defaultdict
from scipy.optimize import linear_sum_assignment
import numpy as np
INFINITY = 1e5
def fill_or_cut(x, former: bool):
"""
:param x: input
:param former: True代表该轨迹片段为连接时的前者
"""
inputLen = 30
lengthX, widthX = x.shape
if lengthX >= inputLen:
if former:
x = x[-inputLen:]
else:
x = x[:inputLen]
else:
zeros = np.zeros((inputLen - lengthX, widthX))
if former:
x = np.concatenate((zeros, x), axis=0)
else:
x = np.concatenate((x, zeros), axis=0)
return x
def transform(x1, x2):
# fill or cut
x1 = fill_or_cut(x1, True)
x2 = fill_or_cut(x2, False)
# min-max normalization
min_ = np.concatenate((x1, x2), axis=0).min(axis=0)
max_ = np.concatenate((x1, x2), axis=0).max(axis=0)
subtractor = (max_ + min_) / 2
divisor = (max_ - min_) / 2 + 1e-5
x1 = (x1 - subtractor) / divisor
x2 = (x2 - subtractor) / divisor
# unsqueeze channel=1
x1 = np.expand_dims(x1, axis=0)
x2 = np.expand_dims(x2, axis=0)
return x1, x2
class AFLink:
def __init__(self, model, thrT: tuple, thrS: int, thrP: float):
self.thrP = thrP # 预测阈值
self.thrT = thrT # 时域阈值
self.thrS = thrS # 空域阈值
self.model = model # 预测模型
# 损失矩阵压缩
def compression(self, cost_matrix, ids):
# 行压缩
mask_row = cost_matrix.min(axis=1) < self.thrP
matrix = cost_matrix[mask_row, :]
ids_row = ids[mask_row]
# 列压缩
mask_col = cost_matrix.min(axis=0) < self.thrP
matrix = matrix[:, mask_col]
ids_col = ids[mask_col]
# 矩阵压缩
return matrix, ids_row, ids_col
# 连接损失预测
def predict(self, track1, track2):
track1, track2 = transform(track1, track2)
track1 = np.expand_dims(track1, axis=0)
track2 = np.expand_dims(track2, axis=0)
output = self.model.predict([track1, track2])
score = output[0][0, 1]
return 1 - score
# 去重复: 即去除同一帧同一ID多个框的情况
@staticmethod
def deduplicate(tracks):
_, index = np.unique(tracks[:, :2], return_index=True, axis=0) # 保证帧号和ID号的唯一性
return tracks[index]
# 主函数
def link(self, track):
track = track[np.argsort(track[:, 0])] # 按帧排序
id2info = defaultdict(list)
for row in track:
f, i, x, y, w, h = row[:6]
id2info[i].append([f, x, y, w, h])
id2info = {k: np.array(v) for k, v in id2info.items()}
num = len(id2info) # 目标数量
ids = np.array(list(id2info)) # 目标ID
fn_l2 = lambda x, y: np.sqrt(x ** 2 + y ** 2) # L2距离
cost_matrix = np.ones((num, num)) * INFINITY # 损失矩阵
'''计算损失矩阵'''
for i, id_i in enumerate(ids): # 前一轨迹
for j, id_j in enumerate(ids): # 后一轨迹
if id_i == id_j: # 禁止自娱自乐
continue
info_i, info_j = id2info[id_i], id2info[id_j]
fi, bi = info_i[-1][0], info_i[-1][1:3]
fj, bj = info_j[0][0], info_j[0][1:3]
if not self.thrT[0] <= fj - fi < self.thrT[1]:
continue
if self.thrS < fn_l2(bi[0] - bj[0], bi[1] - bj[1]):
continue
cost = self.predict(info_i, info_j)
if cost <= self.thrP:
cost_matrix[i, j] = cost
'''二分图最优匹配'''
id2id = dict() # 存储临时匹配结果
ID2ID = dict() # 存储最终匹配结果
cost_matrix, ids_row, ids_col = self.compression(cost_matrix, ids)
indices = linear_sum_assignment(cost_matrix)
for i, j in zip(indices[0], indices[1]):
if cost_matrix[i, j] < self.thrP:
id2id[ids_row[i]] = ids_col[j]
for k, v in id2id.items():
if k in ID2ID:
ID2ID[v] = ID2ID[k]
else:
ID2ID[v] = k
# print(' ', ID2ID.items())
'''结果存储'''
res = track.copy()
for k, v in ID2ID.items():
res[res[:, 1] == k, 1] = v
res = self.deduplicate(res)
return res