-
Notifications
You must be signed in to change notification settings - Fork 35
/
tst_volumentations_type_1.py
104 lines (85 loc) · 3.1 KB
/
tst_volumentations_type_1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# coding: utf-8
__author__ = 'ZFTurbo: https://kaggle.com/zfturbo'
from volumentations import *
import os
import cv2
import urllib.request
import time
OUTPUT_DIR = './debug_videos/'
if not os.path.isdir(OUTPUT_DIR):
os.mkdir(OUTPUT_DIR)
def read_video(f):
cap = cv2.VideoCapture(f)
length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
current_frame = 0
frame_list = []
print('ID: {} Video length: {} Width: {} Height: {} FPS: {}'.format(os.path.basename(f), length, width, height, fps))
while (cap.isOpened()):
ret, frame = cap.read()
if ret is False:
break
frame_list.append(frame.copy())
current_frame += 1
frame_list = np.array(frame_list, dtype=np.uint8)
return frame_list
def get_augmentation_v1(patch_size):
return Compose([
Rotate((-15, 15), (0, 0), (0, 0), p=0.5),
RandomCropFromBorders(crop_value=0.1, p=0.5),
ElasticTransform((0, 0.25), interpolation=2, p=0.1),
RandomDropPlane(plane_drop_prob=0.1, axes=(0, 1, 2), p=0.5),
Resize(patch_size, interpolation=1, always_apply=True, p=1.0),
Flip(0, p=0.5),
Flip(1, p=0.5),
Flip(2, p=0.5),
RandomRotate90((1, 2), p=0.5),
GaussianNoise(var_limit=(0, 5), p=0.5),
RandomGamma(gamma_limit=(80, 120), p=0.5),
], p=1.0)
def get_augmentation_v2(patch_size):
return Compose([
Resize(
patch_size,
interpolation=1,
resize_type=1,
always_apply=True,
p=1.0
),
], p=1.0)
def create_video(image_list, out_file, fps):
height, width = image_list[0].shape[:2]
# fourcc = cv2.VideoWriter_fourcc(*'DIB ')
fourcc = cv2.VideoWriter_fourcc(*'XVID')
# fourcc = cv2.VideoWriter_fourcc(*'H264')
# fourcc = -1
video = cv2.VideoWriter(out_file, fourcc, fps, (width, height), True)
for im in image_list:
if len(im.shape) == 2:
im = np.stack((im, im, im), axis=2)
video.write(im.astype(np.uint8))
cv2.destroyAllWindows()
video.release()
def tst_volumentations():
number_of_aug_videos = 10
out_shape = (150, 224, 360)
inp_video = 'sample.mp4'
if not os.path.isfile(inp_video):
print('Downloading sample.mp4...')
urllib.request.urlretrieve('https://github.com/ZFTurbo/volumentations/releases/download/v1.0/sample.mp4', inp_video)
cube = read_video(inp_video)
print('Sample video shape: {}'.format(cube.shape))
aug = get_augmentation_v1(out_shape)
start_time = time.time()
for i in range(number_of_aug_videos):
single_time = time.time()
data = {'image': cube}
aug_data = aug(**data)
img = aug_data['image']
create_video(img, OUTPUT_DIR + 'video_test_{}.avi'.format(i), 24)
print('Aug: {} Time: {:.2f} sec'.format(i, time.time() - single_time))
print('Total augm time: {:.2f} sec'.format(time.time() - start_time))
if __name__ == '__main__':
tst_volumentations()