forked from thibo73800/capsnet-traffic-sign-classifier
-
Notifications
You must be signed in to change notification settings - Fork 0
/
caps_net.py
126 lines (110 loc) · 5.64 KB
/
caps_net.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import numpy as np
import tensorflow as tf
import numpy as np
def conv_caps_layer(input_layer, capsules_size, nb_filters, kernel, stride=2):
"""
Capsule layer for the convolutional inputs
**input:
*input_layer: (Tensor)
*capsule_numbers: (Integer) the number of capsule in this layer.
*kernel_size: (Integer) Size of the kernel for each filter.
*stride: (Integer) 2 by default
"""
# "In convolutional capsule layers each unit in a capsule is a convolutional unit.
# Therefore, each capsule will output a grid of vectors rather than a single vector output."
capsules = tf.contrib.layers.conv2d(
input_layer, nb_filters * capsules_size, kernel, stride, padding="VALID")
# conv shape: [?, kernel, kernel, nb_filters]
shape = capsules.get_shape().as_list()
capsules = tf.reshape(capsules, shape=(-1, np.prod(shape[1:3]) * nb_filters, capsules_size, 1))
# capsules shape: [?, nb_capsules, capsule_size, 1]
return squash(capsules)
def routing(u_hat, b_ij, nb_capsules, nb_capsules_p, iterations=4):
"""
Routing algorithm
**input:
*u_hat: Dot product (weights between previous capsule and current capsule)
*b_ij: the log prior probabilities that capsule i should be coupled to capsule j
*nb_capsules_p: Number of capsule in the previous layer
*nb_capsules: Number of capsule in this layer
"""
# Start the routing algorithm
for it in range(iterations):
with tf.variable_scope('routing_' + str(it)):
# Line 4 of algo
# probabilities that capsule i should be coupled to capsule j.
# c_ij: [nb_capsules_p, nb_capsules, 1, 1]
c_ij = tf.nn.softmax(b_ij, dim=2)
# Line 5 of algo
# c_ij: [ nb_capsules_p, nb_capsules, 1, 1]
# u_hat: [?, nb_capsules_p, nb_capsules, len_v_j, 1]
s_j = tf.multiply(c_ij, u_hat)
# s_j: [?, nb_capsules_p, nb_capsules, len_v_j, 1]
s_j = tf.reduce_sum(s_j, axis=1, keep_dims=True)
# s_j: [?, 1, nb_capsules, len_v_j, 1)
# line 6:
# squash using Eq.1,
v_j = squash(s_j)
# v_j: [1, 1, nb_capsules, len_v_j, 1)
# line 7:
# Frist reshape & tile v_j
# [? , 1, nb_capsules, len_v_j, 1] ->
# [?, nb_capsules_p, nb_capsules, len_v_j, 1]
v_j_tiled = tf.tile(v_j, [1, nb_capsules_p, 1, 1, 1])
# u_hat: [?, nb_capsules_p, nb_capsules, len_v_j, 1]
# v_j_tiled [1, nb_capsules_p, nb_capsules, len_v_j, 1]
u_dot_v = tf.matmul(u_hat, v_j_tiled, transpose_a=True)
# u_produce_v: [?, nb_capsules_p, nb_capsules, 1, 1]
b_ij += tf.reduce_sum(u_dot_v, axis=0, keep_dims=True)
#b_ih: [1, nb_capsules_p, nb_capsules, 1, 1]
return tf.squeeze(v_j, axis=1)
def fully_connected_caps_layer(input_layer, capsules_size, nb_capsules, iterations=4):
"""
Second layer receiving inputs from all capsules of the layer below
**input:
*input_layer: (Tensor)
*capsules_size: (Integer) Size of each capsule
*nb_capsules: (Integer) Number of capsule
*iterations: (Integer) Number of iteration for the routing algorithm
i refer to the layer below.
j refer to the layer above (the current layer).
"""
shape = input_layer.get_shape().as_list()
# Get the size of each capsule in the previous layer and the current layer.
len_u_i = np.prod(shape[2])
len_v_j = capsules_size
# Get the number of capsule in the layer bellow.
nb_capsules_p = np.prod(shape[1])
# w_ij: Used to compute u_hat by multiplying the output ui of a capsule in the layer below
# with this matrix
# [nb_capsules_p, nb_capsules, len_v_j, len_u_i]
_init = tf.random_normal_initializer(stddev=0.01, seed=0)
_shape = (nb_capsules_p, nb_capsules, len_v_j, len_u_i)
w_ij = tf.get_variable('weight', shape=_shape, dtype=tf.float32, initializer=_init)
# Adding one dimension to the input [batch_size, nb_capsules_p, length(u_i), 1] ->
# [batch_size, nb_capsules_p, 1, length(u_i), 1]
# To allow the next dot product
input_layer = tf.reshape(input_layer, shape=(-1, nb_capsules_p, 1, len_u_i, 1))
input_layer = tf.tile(input_layer, [1, 1, nb_capsules, 1, 1])
# Eq.2, calc u_hat
# Prediction uj|i made by capsule i
# w_ij: [ nb_capsules_p, nb_capsules, len_v_j, len_u_i, ]
# input: [batch_size, nb_capsules_p, nb_capsules, len_ui, 1]
# u_hat: [batch_size, nb_capsules_p, nb_capsules, len_v_j, 1]
# Each capsule of the previous layer capsule layer is associated to a capsule of this layer
u_hat = tf.einsum('abdc,iabcf->iabdf', w_ij, input_layer)
# bij are the log prior probabilities that capsule i should be coupled to capsule j
# [nb_capsules_p, nb_capsules, 1, 1]
b_ij = tf.zeros(shape=[nb_capsules_p, nb_capsules, 1, 1], dtype=np.float32)
return routing(u_hat, b_ij, nb_capsules, nb_capsules_p, iterations=iterations)
def squash(vector):
"""
Squashing function corresponding to Eq. 1
**input: **
*vector
"""
vector += 0.00001 # Workaround for the squashing function ...
vec_squared_norm = tf.reduce_sum(tf.square(vector), -2, keep_dims=True)
scalar_factor = vec_squared_norm / (1 + vec_squared_norm) / tf.sqrt(vec_squared_norm)
vec_squashed = scalar_factor * vector # element-wise
return(vec_squashed)