-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpreprocess_data.py
366 lines (336 loc) · 14.4 KB
/
preprocess_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
import os
import cv2
import pickle
import random
import traceback
import argparse
import glob
import numpy as np
from enum import Enum
from tqdm import tqdm
from pathlib import Path
from collections import defaultdict, namedtuple
from concurrent.futures import ProcessPoolExecutor
RawData = namedtuple('RawData', 'data_set, label, image_path, idx, class_id, bbox')
CropData = namedtuple('CropData', 'data_set, label, image_path')
class Padding(Enum):
BBOX = 'BBOX'
PIXEL = 'PIXEL'
IMAGE = 'IMAGE'
FIX = 'FIX'
FIX2 = 'FIX2'
def __str__(self):
return self.value
def xywh2xyxy(bbox, width, height):
x, y, w, h = bbox
x1 = int(np.clip(x - w / 2, 0, 1) * width)
y1 = int(np.clip(y - h / 2, 0, 1) * height)
x2 = int(np.clip(x + w / 2, 0, 1) * width)
y2 = int(np.clip(y + h / 2, 0, 1) * height)
return x1, y1, x2, y2
def crop_image(image_path, bbox, padding, padding_size, use_shift, use_bbox, output_path=None, imsave=True):
try:
image = cv2.imread(str(image_path))
height, width, _ = image.shape
x1, y1, x2, y2 = xywh2xyxy(bbox, width, height)
if use_bbox:
image = cv2.rectangle(
image,
(x1 - 1, y1 - 1),
(x2, y2),
(0, 0, 255),
1
)
if padding_size > 0:
if padding == Padding.BBOX or padding =='BBOX':
pad_x = int(padding_size * (x2 - x1))
pad_y = int(padding_size * (y2 - y1))
elif padding == Padding.PIXEL or padding =='PIXEL':
pad_x = int(padding_size)
pad_y = int(padding_size)
elif padding == Padding.IMAGE or padding =='IMAGE':
pad_x = int(padding_size * width)
pad_y = int(padding_size * height)
elif padding == Padding.FIX or padding =='FIX':
pad_x = max(int((padding_size - (x2 - x1)) / 2), 0)
pad_y = max(int((padding_size - (y2 - y1)) / 2), 0)
elif padding == Padding.FIX2 or padding =='FIX2':
padding_size = max(padding_size, x2 - x1, y2 - y1)
pad_x = max(int((padding_size - (x2 - x1)) / 2), 0)
pad_y = max(int((padding_size - (y2 - y1)) / 2), 0)
else:
raise Exception(f'Unknown padding {padding}')
if use_shift:
x1 -= pad_x
y1 -= pad_y
x2 += pad_x
y2 += pad_y
if x1 < 0:
x2 -= x1
x1 = 0
if width < x2:
x1 -= x2 - width
x2 = width
if y1 < 0:
y2 -= y1
y1 = 0
if height < y2:
y1 -= y2 - height
y2 = height
else:
x1 = np.clip(x1 - pad_x, 0, width)
y1 = np.clip(y1 - pad_y, 0, height)
x2 = np.clip(x2 + pad_x, 0, width)
y2 = np.clip(y2 + pad_y, 0, height)
image = image[y1:y2, x1:x2]
if imsave:
cv2.imwrite(str(output_path), image)
return True
else:
return image
except:
traceback.print_exc()
return False
def get_image_path(data_root, data, is_train, is_output):
if is_output:
image_path = data.image_path.with_name(f'{data.idx}_{data.image_path.name}')
else:
image_path = data.image_path
if is_train:
return data_root / data.label / data.data_set / image_path
else:
return data_root / data.data_set / data.label / image_path
def find_image(anno):
ext_list = ['.jpg', '.png']
for ext in ext_list:
image = anno.with_suffix(ext)
if image.exists():
return image
raise Exception(f'image not found: {anno}')
def find_annotation(img):
annot = Path(img)
annot = annot.with_suffix('.txt')
if annot.exists():
return annot
raise Exception(f'image not found: {img}')
def read_annotation(annotation):
with annotation.open('r', encoding='utf-8') as rf:
for line_idx, line in enumerate(rf.readlines()):
line = line.strip()
data = line.split()
if len(data) == 0:
continue
class_id = int(data[0])
bbox = [float(v) for v in data[1:5]]
yield line_idx, class_id, bbox
def make_dataset_file(dataset_file):
data_list = []
with open(dataset_file, 'r') as rf:
data_lines = [i.strip() for i in rf.readlines()]
for image_path in data_lines:
data_root = '/'.join(image_path.split('/')[:-3])
annot_path = image_path.replace('images', 'annotations')
annot_path = find_annotation(annot_path)
image_path = Path(image_path)
image_path = image_path.relative_to(data_root)
for line_idx, class_id, bbox in read_annotation(annot_path):
label = annot_path.parts[-3]
raw_data = RawData(
data_root, label, image_path,
line_idx, class_id, bbox
)
data_list.append(raw_data)
return data_list
def make_crop_dataset_file(dataset_file):
data_list = []
with open(dataset_file, 'r') as rf:
data_lines = [i.strip() for i in rf.readlines()]
for image_path in data_lines:
data_root = '/'.join(image_path.split('/')[:-3])
image_path = Path(image_path)
image_path = image_path.relative_to(data_root)
label = image_path.parts[-3]
raw_data = RawData(data_root, label, image_path, 0, 0, [])
data_list.append(raw_data)
return data_list
def make_crop_dataset_list(data_root, label_list=None, split_info=None):
data_list = defaultdict(list)
data_root = Path(data_root).resolve()
for image_path in split_info.keys():
image_path = Path(image_path)
label = image_path.parts[-3]
try:
split_tag = split_info[image_path]
except:
print(f'{image_path} is not exist.')
continue
# print(data_root, label, image_path, split_tag)
# crop_data = CropData(data_root, label, image_path)
# data_list[split_tag].append(crop_data)
# RawData = namedtuple('RawData', 'data_set, label, image_path, idx, class_id, bbox')
raw_data = RawData(data_root, label, image_path, 0, 0, [])
data_list[split_tag].append(raw_data)
return data_list
def make_list(data_root, label_list=None, split_info=None):
data_list = defaultdict(list)
# print('dataset\tlabel\tclass_id\ttest_cnt\tval_cnt\ttrain_cnt')
if split_info is None:
for annotation in data_root.glob('*.txt'):
image_path = find_image(annotation)
image_path = image_path.relative_to(data_root)
annotation = annotation.relative_to(data_root)
for line_idx, class_id, bbox in read_annotation(annotation):
raw_data = (
data_root, image_path,
line_idx, class_id, bbox
)
data_list['data'].append(raw_data)
elif label_list is None:
for annotation in data_root.glob('**/*.txt'):
# annotation = annotation.relative_to(data_root)
try:
image_path = find_image(annotation)
except:
newanno = Path(str(annotation).replace('annotations', 'images'))
image_path = find_image(newanno)
image_path = image_path.relative_to(data_root)
try:
split_tag = split_info[image_path]
except:
print(f'{image_path} is not exist.')
continue
# image_path = image_path.relative_to(data_root)
# split_tag = split_info[annotation.relative_to(data_root)]
# annotation = annotation.relative_to(data_root)
for line_idx, class_id, bbox in read_annotation(annotation):
label = annotation.parts[-3]
raw_data = RawData(
data_root, label, image_path,
line_idx, class_id, bbox
)
data_list[split_tag].append(raw_data)
else:
for data_sub_root in data_root.glob('*'):
if not data_sub_root.is_dir():
continue
for label in label_list:
annotation_root = data_sub_root / label
count_dict = defaultdict(lambda: defaultdict(int))
for annotation in annotation_root.glob('**/*.txt'):
image_path = find_image(annotation)
image_path = image_path.relative_to(annotation_root)
split_tag = split_info[annotation.relative_to(data_root)]
for line_idx, class_id, bbox in read_annotation(annotation):
raw_data = RawData(
data_sub_root.name, label, image_path,
line_idx, class_id, bbox
)
count_dict[raw_data.class_id][split_tag] += 1
data_list[split_tag].append(raw_data)
for class_id, count in count_dict.items():
test_cnt = count['test']
val_cnt = count['val']
train_cnt = count['train']
print(f'{data_sub_root.name}\t{label}\t{class_id}\t{test_cnt}\t{val_cnt}\t{train_cnt}')
return data_list
def split_data(data_root, test_ratio, val_ratio, label_list=None, file_write=False, use_cropimg=False):
split_info_path = data_root / f'split_info_{test_ratio}_{val_ratio}'
train_ratio = 1 - (test_ratio + val_ratio)
split_population = 'test', 'val', 'train'
split_weights = test_ratio, val_ratio, train_ratio
if use_cropimg:
split_info = []
types = ('*.jpg', '*.jpeg', '*.png') # the tuple of file types
for files in types:
# glob("./**/*.jpg", recursive=True)
split_info.extend(glob.glob(str(data_root/ '**' / files), recursive=True))
dict_val = []
cls_lists = []
for cls in os.listdir(data_root):
cls_list = [Path(i).relative_to(data_root) for i in split_info if Path(i).relative_to(data_root).parts[0]==cls]
dict_val.extend([random.choices(split_population, split_weights)[0] for i in range(len(cls_list))])
cls_lists.extend(cls_list)
split_info = dict(zip(cls_lists, dict_val))
data_list = make_crop_dataset_list(data_root, label_list, split_info)
else:
if split_info_path.exists():
print(f'load {split_info_path}')
with split_info_path.open('rb') as rf:
split_info = pickle.load(rf)
else:
split_info = []
types = ('*.jpg', '*.jpeg', '*.png') # the tuple of file types
for files in types:
# glob("./**/*.jpg", recursive=True)
split_info.extend(glob.glob(str(data_root/ '**' / files), recursive=True))
dict_val = []
cls_lists = []
for cls in os.listdir(data_root):
cls_list = [Path(i).relative_to(data_root) for i in split_info if Path(i).relative_to(data_root).parts[0]==cls]
dict_val.extend([random.choices(split_population, split_weights)[0] for i in range(len(cls_list))])
cls_lists.extend(cls_list)
split_info = dict(zip(cls_lists, dict_val))
# split_info = defaultdict(lambda: random.choices(split_population, split_weights)[0], split_info)
data_list = make_list(data_root, label_list, split_info)
if file_write:
print(f'make {split_info_path}')
with split_info_path.open('wb') as wf:
split_info = dict(split_info)
pickle.dump(split_info, wf)
return data_list
def preprocess_data(data_list, padding, padding_size, use_shift, use_bbox,
use_class, data_root, output_root, is_train):
print(f'preprocess {output_root}')
with ProcessPoolExecutor() as executor:
future_list = []
for data in tqdm(data_list):
if data.class_id not in use_class:
continue
image_path = get_image_path(data_root, data, False, False)
output_path = get_image_path(output_root, data, is_train, True)
output_path.parent.mkdir(parents=True, exist_ok=True)
image_future = executor.submit(
crop_image, image_path, data.bbox,
padding, padding_size, use_shift, use_bbox,
output_path
)
future_list.append(image_future)
for future in tqdm(future_list):
future.result()
def main():
parser = argparse.ArgumentParser('preprocess pothole data')
parser.add_argument('--name', default='test', type=str)
parser.add_argument('--data_root', default='/data/pothole_data/raw', type=str)
parser.add_argument('--output_root', default='/data/pothole_data/out', type=str)
parser.add_argument('--label_list', default=['positive', 'negative'], nargs='+', type=str)
parser.add_argument('--padding', default=Padding.BBOX, choices=list(Padding), type=Padding)
parser.add_argument('--padding_size', default=0.0, type=float)
parser.add_argument("--use_bbox", action='store_true')
parser.add_argument("--use_shift", action='store_true')
parser.add_argument('--use_class', default=[0, 1, 2, 3], nargs='+', type=int)
parser.add_argument('--val_ratio', default=0.1, type=float)
parser.add_argument('--test_ratio', default=0.1, type=float)
args = parser.parse_args()
data_root = Path(args.data_root)
output_root = Path(args.output_root) / args.name
label_list = args.label_list
padding = args.padding
padding_size = args.padding_size
use_bbox = args.use_bbox
use_shift = args.use_shift
use_class = args.use_class
val_ratio = args.val_ratio
test_ratio = args.test_ratio
data_list = split_data(
data_root, test_ratio, val_ratio, label_list)
for data_name, data_sub_list in data_list.items():
is_train = data_name != 'test'
preprocess_data(
data_sub_list,
padding, padding_size, use_shift, use_bbox, use_class,
data_root,
output_root / data_name,
is_train
)
if __name__ == "__main__":
main()