-
Notifications
You must be signed in to change notification settings - Fork 0
/
transformer_model.py
163 lines (136 loc) · 6.79 KB
/
transformer_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import tensorflow as tf
from tensorflow import keras
from keras.layers import *
from keras.models import *
import numpy as np
class Time2Vector(Layer):
def __init__(self, seq_len, **kwargs):
# inherit from Layer
super(Time2Vector, self).__init__()
self.seq_len = seq_len
def build(self, input_shape):
# Initialize weights and biases with shape (batch, seq_len)
# for linear pattern
self.weights_linear = self.add_weight(name='weight_linear',
shape=(int(self.seq_len),), # shape is same as the sequence length
initializer='uniform', # randomized between 0-1
trainable=True)
self.bias_linear = self.add_weight(name='bias_linear',
shape=(int(self.seq_len),),
initializer='uniform',
trainable=True)
# for periodic pattern
self.weights_periodic = self.add_weight(name='weight_periodic',
shape=(int(self.seq_len),),
initializer='uniform',
trainable=True)
self.bias_periodic = self.add_weight(name='bias_periodic',
shape=(int(self.seq_len),),
initializer='uniform',
trainable=True)
def call(self, x):
# Calculate linear and periodic time features
x = tf.math.reduce_mean(x[:,:,:4], axis=-1)
time_linear = self.weights_linear * x + self.bias_linear # Linear time feature
time_linear = tf.expand_dims(time_linear, axis=-1) # Add dimension (batch, seq_len, 1)
time_periodic = tf.math.sin(tf.multiply(x, self.weights_periodic) + self.bias_periodic)
time_periodic = tf.expand_dims(time_periodic, axis=-1) # Add dimension (batch, seq_len, 1)
return tf.concat([time_linear, time_periodic], axis=-1) # shape = (batch, seq_len, 2)
def get_config(self):
# saving and loading model with custom layer
config = super().get_config().copy()
config.update({'seq_len': self.seq_len})
return config
class SingleAttention(Layer):
def __init__(self, d_k, d_v):
super(SingleAttention, self).__init__()
self.d_k = d_k
self.d_v = d_v
def build(self, input_shape):
# choose glorot_uniform as weight initialization to control variance
self.query = Dense(self.d_k,
input_shape=input_shape,
kernel_initializer='glorot_uniform',
bias_initializer='glorot_uniform')
self.key = Dense(self.d_k,
input_shape=input_shape,
kernel_initializer='glorot_uniform',
bias_initializer='glorot_uniform')
self.value = Dense(self.d_v,
input_shape=input_shape,
kernel_initializer='glorot_uniform',
bias_initializer='glorot_uniform')
def call(self, inputs): # inputs = (in_seq, in_seq, in_seq)
q = self.query(inputs[0])
k = self.key(inputs[1])
attn_weights = tf.matmul(q, k, transpose_b=True)
attn_weights = tf.map_fn(lambda x: x/np.sqrt(self.d_k), attn_weights)
attn_weights = tf.nn.softmax(attn_weights, axis=-1)
v = self.value(inputs[2])
attn_out = tf.matmul(attn_weights, v)
return attn_out
class MultiAttention(Layer):
def __init__(self, d_k, d_v, n_heads):
super(MultiAttention, self).__init__()
self.d_k = d_k
self.d_v = d_v
self.n_heads = n_heads
self.attn_heads = list()
def build(self, input_shape):
for n in range(self.n_heads):
self.attn_heads.append(SingleAttention(self.d_k, self.d_v))
# input_shape[0]=(batch, seq_len, 7), input_shape[0][-1]=7
self.linear = Dense(input_shape[0][-1],
input_shape=input_shape,
kernel_initializer='glorot_uniform',
bias_initializer='glorot_uniform')
def call(self, inputs):
attn = [self.attn_heads[i](inputs) for i in range(self.n_heads)]
concat_attn = tf.concat(attn, axis=-1)
multi_linear = self.linear(concat_attn)
return multi_linear
class TransformerEncoder(Layer):
def __init__(self, d_k, d_v, n_heads, ff_dim, dropout=0.1, **kwargs):
super(TransformerEncoder, self).__init__()
self.d_k = d_k
self.d_v = d_v
self.n_heads = n_heads
self.ff_dim = ff_dim
self.attn_heads = list()
self.dropout_rate = dropout
def build(self, input_shape):
self.attn_multi = MultiAttention(self.d_k, self.d_v, self.n_heads)
self.attn_dropout = Dropout(self.dropout_rate)
self.attn_normalize = LayerNormalization(input_shape=input_shape, epsilon=1e-6)
self.ff_conv1D_1 = Conv1D(filters=self.ff_dim, kernel_size=1, activation='relu')
# input_shape[0]=(batch, seq_len, 7), input_shape[0][-1] = 7
self.ff_conv1D_2 = Conv1D(filters=input_shape[0][-1], kernel_size=1)
self.ff_dropout = Dropout(self.dropout_rate)
self.ff_normalize = LayerNormalization(input_shape=input_shape, epsilon=1e-6)
def call(self, inputs): # inputs = (in_seq, in_seq, in_seq)
attn_layer = self.attn_multi(inputs)
attn_layer = self.attn_dropout(attn_layer)
attn_layer = self.attn_normalize(inputs[0] + attn_layer)
ff_layer = self.ff_conv1D_1(attn_layer)
ff_layer = self.ff_conv1D_2(ff_layer)
ff_layer = self.ff_dropout(ff_layer)
ff_layer = self.ff_normalize(inputs[0] + ff_layer)
return ff_layer
def get_config(self):
# saving and loading model with custom layer
config = super().get_config().copy()
config.update({'d_k': self.d_k,
'd_v': self.d_v,
'n_heads': self.n_heads,
'ff_dim': self.ff_dim,
'attn_heads': self.attn_heads,
'dropout_rate': self.dropout_rate})
return config
def load_model():
# Load the model from the saved file
path = './model/Transformer+TimeEmbedding.hdf5'
model = tf.keras.models.load_model(path, custom_objects={'Time2Vector': Time2Vector,
'SingleAttention': SingleAttention,
'MultiAttention': MultiAttention,
'TransformerEncoder': TransformerEncoder})
return model