-
Notifications
You must be signed in to change notification settings - Fork 0
/
model.py
103 lines (74 loc) · 3.74 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import xarray as xr
import cv2
import numpy as np
import matplotlib.pyplot as plt
import cartopy.crs as ccrs # CRS stands for "coordinate reference system"
from skimage import metrics
from cloudcasting.models import AbstractModel
# We define a new class that inherits from AbstractModel
class OpticalFlowFarneback(AbstractModel):
"""An optical flow model which predicts the next image given the prior two images using the Farneback method"""
def __init__(self, history_steps: int) -> None:
# All models must include `history_steps` as a parameter. This is the number of previous
# frames that the model uses to makes its predictions. This should not be more than 25, i.e.
# 6 hours (inclusive of end points) of 15 minutely data
# You also must inlude the following line in your init function
super().__init__(history_steps)
###### YOUR CODE HERE ######
# Here you can add any other parameters that you need to initialize your model
# You might load your trained ML model or set up an optical flow method here
############################
def forward(self, X):
# This is where you will make predictions with your model
# The input X is a numpy array with shape (batch_size, channels, time, height, width)
###### YOUR CODE HERE ######
# Grab the most recent frame from the input data
samples_list = []
for b in range (X.shape[0]):
channel_list = []
for c in range (X.shape[1]):
previous_image = X[b, c, -2, :, :]
current_image = X[b, c, -1, :, :]
new_image_list = []
for i in range (12):
# convert to uint8 data type
image1 = (previous_image * 255).astype(np.uint8)
image2 = (current_image * 255).astype(np.uint8)
# Predict next frame (image3) for next 12 time steps
# This is the next 3 hours at 15 minute intervals
flow = cv2.calcOpticalFlowFarneback(image1, image2, None, pyr_scale = 0.5, levels = 5, winsize = 11, iterations = 5, poly_n = 5, poly_sigma = 1.1, flags = 0)
# extracting flow components
u = flow[..., 0] # width
v = flow[..., 1] # height
#predict next image
h = flow.shape[0]
w = flow.shape[1]
flow2 = -flow.copy()
flow2[:,:,0] += np.arange(w)
flow2[:,:,1] += np.arange(h)[:,np.newaxis]
new_image = cv2.remap(image2, flow2, None, cv2.INTER_LINEAR)
new_image_list.append(new_image.astype(np.float32)/255)
previous_image = image2.astype(float)/255
current_image = new_image.astype(float)/255
############################
image_stack = np.stack(new_image_list)
channel_list.append(image_stack)
channel_stack = np.stack(channel_list)
samples_list.append(channel_stack)
samples_stack = np.stack(samples_list)
return samples_stack
def hyperparameters_dict(self):
# This function should return a dictionary of hyperparameters for the model
# This is just for your own reference and will be saved with the model scores to wandb
###### YOUR CODE HERE ######
params_dict = {
"pyr_scale": 0.5,
"levels": 5,
"winsize": 11,
"iterations" : 5,
"poly_n" : 5,
"poly_sigma" : 1.1,
"flags" : 0
}
############################
return params_dict