-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfunctions.py
369 lines (284 loc) · 11.4 KB
/
functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
import pandas as pd
import json
import numpy as np
import matplotlib.pyplot as plt
import pygame
import time
import tensorflow as tf
from pandas import DataFrame
from pandas import concat
from constants import *
import json
file = open('data.json')
json_data = json.load(file)
json_lstm = json_data['models']['lstm']
'''
def findMissingFrames(df):
missing = []
prev = -1
for x in df.frame_number.unique():
x = int(x)
if x != prev + 1:
for i in range(prev+1, x):
missing.append(i)
prev = x
return missing
'''
# Load the data from .txt and return & store csv file
def getData(path: str, type = "obsmat"):
"""
Load the data from .txt and return & store csv file
Args:
- path (string) -- path of the folder of the data to be processed
- type (string) -- type of data (eg. obsmat)
Returns:
Preprocessed pandas DataFrame
"""
if type == "obsmat":
obsmat = path + "/" + type + ".txt"
name = path.split("/")[-1]
df = pd.read_csv(obsmat, delimiter=r"\s+", header = None)
df.columns = ["frame_number", "pedestrian_ID", "pos_x", "pos_z", "pos_y", "v_x", "v_z", "v_y"]
# Dropping irrelevent columns
df = df.drop(["pos_z", "v_z"], axis = 1)
# Converting the frames column into a proper range
for i in range(len(df)):
df.iloc[i,0] -= 1
df.iloc[i,0] = df.iloc[i,0]/10
# Columns Type Casting
df = df.astype({'frame_number': int, 'pedestrian_ID' : int})
df.to_csv(f"datasets/csvs/{name}.csv", index=False)
return df
else:
print("Invalid Type")
return None
# Convert normal series into a series suitable for LSTMs
def series_to_supervised(data, n_in=1, n_out=1, dropnan=True):
"""
Convert normal series into a series suitable for LSTMs
Args:
- data (Pandas DataFrame) -- Sequence of observations as a list or NumPy array.
- n_in (int) -- Number of lag observations as input (X).
- n_out (int) -- Number of observations as output (y).
- dropnan (int) -- Boolean whether or not to drop rows with NaN values.
Returns:
Pandas DataFrame of series framed for supervised learning.
"""
n_vars = 1 if type(data) is list else data.shape[1]
df = DataFrame(data)
cols, names = list(), list()
# input sequence (t-n, ... t-1)
for i in range(n_in, 0, -1):
cols.append(df.shift(i))
names += [('var%d(t-%d)' % (j+1, i)) for j in range(n_vars)]
# forecast sequence (t, t+1, ... t+n)
for i in range(0, n_out):
cols.append(df.shift(-i))
if i == 0:
names += [('var%d(t)' % (j+1)) for j in range(n_vars)]
else:
names += [('var%d(t+%d)' % (j+1, i)) for j in range(n_vars)]
# put it all together
agg = concat(cols, axis=1)
agg.columns = names
# drop rows with NaN values
if dropnan:
agg.dropna(inplace=True)
return agg
# Get data that can be fed into a machine learning model
def getModelData(df, lstm_data):
"""
Get data that can be fed into a machine learning model
Args:
- df (Pandas DataFrame) -- df of the position coords
Returns:
Pandas DataFrame of series framed for supervised learning.
"""
# Grouping the DataFrame and dropping peds with less data
pos_df = df.drop(['v_x', 'v_y'], axis = 1)
pos_grp = pos_df.groupby('pedestrian_ID')
for name, group in pos_grp:
if len(group) <= lstm_data['window_size'] + lstm_data['no_of_forecasts']:
pos_df = pos_df.drop(pos_grp.get_group(name).index)
pos_grp = pos_df.groupby('pedestrian_ID')
df_lstm = pd.DataFrame()
for name, group in pos_grp:
pos_cols = group[['pos_x', 'pos_y']]
pos_cols_converted = series_to_supervised(
pos_cols,
lstm_data['window_size'],
lstm_data['no_of_forecasts']
)
df_lstm = pd.concat([df_lstm, pos_cols_converted])
df_lstm = df_lstm.dropna()
df_lstm.to_csv(f"datasets/csvs/lstms/seq.csv", index=False)
return df_lstm
# Creates a forecast of the trajectory of the pedestrian
def getForecast(model, df, window_size=5):
"""
Creates a forecast of the trajectory of the pedestria
Args:
- model (tensorflow sequential) -- the trained model with 2 output tensors
- df (pandas dataframe) -- the part of the dataframe on which the predictions are to be made
- size_of_prediction (int) -- dimension of the prediction
- window_size (int) -- size of the window
Returns:
Numpy array with the predictions of the data
Note:
the prediction is expected to have only one pair of coordinates
"""
series = []
forecasts = []
len_of_df = len(df)
size_of_prediction = json_lstm["size_of_prediction"]
try:
for i in range(window_size):
for j in range(size_of_prediction):
series.append(df.iloc[i][j])
except IndexError:
print(f"Length of the dataframe={len(df)} is smaller than the window_size={window_size}. Add more data or reduce the window_size")
for i in range(len_of_df-window_size):
predict = np.array(series[-window_size*size_of_prediction:])[np.newaxis]
forecast = model.predict(predict[-window_size*size_of_prediction:][np.newaxis])[0][:2][np.newaxis]
forecasts.append(forecast[0])
for j in range(size_of_prediction):
series.append(forecast[0][j])
return np.array(forecasts)
# Draws the predictions of the data
def drawPredictions(df, forecasts, window_size):
"""
Draws the predictions of the data
Args:
- df (pandas dataframe) -- the part of the dataframe on which the predictions were made
- forecasts (array of int) -- all the predictions from make_forecast()
"""
x_val = [val[0] for val in forecasts]
y_val = [val[1] for val in forecasts]
b = plt.scatter(np.array(df.iloc[:window_size,0]), np.array(df.iloc[:window_size,1]), c='b')
c = plt.scatter(np.array(df.iloc[window_size:,0]), np.array(df.iloc[window_size:,1]), c='c')
o = plt.scatter(x_val, y_val, c='orange')
plt.legend((b, c, o), ('before', 'after', 'prediction'))
plt.show()
# Draws the prediction on pygame
# Draws the already traversed trajectory
def drawFastPath(screen, pastCoords):
"""
Draws the already covered trajectory
Args:
- screen (pygame screen) -- the screen of the pygame UI
- pastCoords (numpy array) -- the coords of the trajectory that was already covered
"""
for x, y in pastCoords:
pygame.draw.rect(screen, kObjColor, pygame.Rect(x, y, kObjSize[0], kObjSize[1]))
pygame.display.flip()
# Draws the predicted and original trajectory for the next timestamps
def drawSlowPath(screen, predictedCoords, futureCoords):
"""
Draws the predicted and original trajectory for the next timestamps
Args:
- screen (pygame screen) -- the screen of the pygame UI
- predictedCoords (numpy array) -- the coords of the predicted trajectory
- originalCoords (numpy array) -- the coords of the original trajectory
"""
for pred, fut in zip(predictedCoords, futureCoords):
time.sleep(0.3)
pygame.draw.rect(screen, kOgObjColor, pygame.Rect(fut[0], fut[1], kObjSize[0], kObjSize[1]))
pygame.draw.rect(screen, kPredObjColor, pygame.Rect(pred[0], pred[1], kObjSize[0], kObjSize[1]))
pygame.display.flip()
# Main function for drawing the complete trajectory
def drawPredictionPath(screen, model, originalCoords):
"""
Draws the trajectory using pygame
Args:
- screen (pygame screen) -- the screen of the pygame UI
- model (TensorFlow model) -- the model that would be used to make the predictions
- originalCoords (numpy array) -- the coords of the original trajectory
PS: only draws the prediction of the trajectories with more than 5 steps, others are just returned
"""
if(len(originalCoords) > 5):
for i in range(5, len(originalCoords)):
# multiply the values by a constant to show on the pygame
pastCoords = np.array([[c*30+200 for c in coord] for coord in originalCoords[:i]])
futureCoords = np.array([[c*30+200 for c in coord] for coord in originalCoords[i:]])
predictedCoords = getForecast(model, pd.DataFrame(originalCoords[i-5:]))
predictedCoords = np.array([[c*30+200 for c in coord] for coord in predictedCoords])
if(len(predictedCoords) > 5):
futureCoords = futureCoords[:5]
predictedCoords = predictedCoords[:5]
drawFastPath(screen, pastCoords)
drawSlowPath(screen, predictedCoords, futureCoords)
screen.fill(kBgColor)
# Get the the distance and angle of the pedestrains near the current pedestrian
def angle_between(p1, p2):
"""
Calculates the angle between two points
Args:
- p1 (tuple) -- Point 1
- p2 (tuple) -- Point 2
Returns:
Angle between the 2 points
"""
ang1 = np.arctan2(*p1[::-1])
ang2 = np.arctan2(*p2[::-1])
return np.rad2deg((ang1 - ang2) % (2 * np.pi))
def getSectorRange(angle):
res = []
if angle + 3*kSectorRange/2 < 360:
res.append(angle + 3*kSectorRange/2)
else:
res.append(360 - angle + 3*kSectorRange/2)
if angle + kSectorRange/2 < 360:
res.append(angle + kSectorRange/2)
else:
res.append(360 - angle + kSectorRange/2)
if angle - kSectorRange/2 > 0:
res.append(angle - kSectorRange/2)
else:
res.append(360 + angle - kSectorRange/2)
if angle - 3*kSectorRange/2 > 0:
res.append(angle - 3*kSectorRange/2)
else:
res.append(360 + angle - 3*kSectorRange/2)
return res
def getPedestrianROI(currData, coords):
"""
Counts the number of other pedestrians in a pedestrian's ROI
Args:
- currData (array) -- the coords of the pedestrian in focus -> [pos_x, pos_y, v_x, v_y]
- coords (array) -- the list of coords of other pedestrians -> [pos_x, pos_y]
Returns:
3 arrays with the number of pedestrians in the 3 sectors (Left, Front, Right)
"""
velVectorAngle = np.arctan(currData[3]/currData[2])
res = [0,0,0]
sectorRange = getSectorRange(velVectorAngle)
for coord in coords:
# Rejecting the point if the distance is more than the threshold
distanceBetween = np.linalg.norm(currData[:2]-coord)
if distanceBetween > kMaxDistance:
continue
# Finding which sector the point belongs in
angleBetween = angle_between(currData[:2], coord)
delta = angleBetween - velVectorAngle
if delta > 180:
delta = delta - 360
elif delta < -180:
delta = delta + 360
if(delta >= 0 and delta <= 3*kSectorRange/2):
if delta > kSectorRange/2:
res[0] += 1
else:
res[1] += 1
elif(delta < 0 and delta >= -3*sectorRange/2):
if delta < -kSectorRange/2:
res[2] += 1
else:
res[1] += 1
return res
if __name__ == "__main__":
file = open('data.json')
json_data = json.load(file)
# dfRaw = getData(json_data['datasets']['seq_hotel'])
# dfLSTM = getModelData(dfRaw, json_data)
good = [0,5,6,7,8,9,10,]
drop = [1,2,3,4,14,15]