-
Notifications
You must be signed in to change notification settings - Fork 0
/
birdPredict.py
executable file
·273 lines (239 loc) · 10.2 KB
/
birdPredict.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
#settings
import logging
import os
import tensorflow as tf
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
logging.getLogger('tensorflow').setLevel(logging.FATAL)
from tensorflow.python.util import deprecation
deprecation._PRINT_DEPRECATION_WARNINGS = False
import warnings
warnings.filterwarnings('ignore')
import tensorflow_addons as tfa
from tensorflow.keras import layers
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = [10.0, 6.0]
plt.rcParams['figure.dpi'] = 140
import numpy as np
import seaborn as sb
import random
import PIL
import PIL.Image
import pathlib
import subprocess
import math as m
from itertools import cycle
global history
global model
#function to run model
def runModel():
history = None
model = None
#augment the dataset with zoomed and rotated images
#use convolutional layers to maintain spatial information about the images
#use max pool layers to reduce
#flatten and then apply a dense layer to predict classes
model = tf.keras.Sequential([
layers.experimental.preprocessing.RandomFlip('horizontal', input_shape=(height, width, 3)),
#layers.experimental.preprocessing.RandomRotation(0.1), #rotation might not be as useful on this dataset because bird images are generally taken with specific orietations.
layers.experimental.preprocessing.RandomZoom(0.1),
layers.experimental.preprocessing.Rescaling(1./255, input_shape=(height, width, 3)),
layers.Conv2D(16, 3, padding='same', activation='relu'),
layers.MaxPooling2D(),
layers.Conv2D(32, 3, padding='same', activation='relu'),
layers.MaxPooling2D(),
layers.Conv2D(64, 3, padding='same', activation='relu'),
layers.MaxPooling2D(),
layers.Conv2D(128, 3, padding='same', activation='relu'),
layers.MaxPooling2D(),
layers.Conv2D(256, 3, padding='same', activation='relu'),
layers.MaxPooling2D(),
# layers.Conv2D(512, 3, padding='same', activation='relu'),
# layers.MaxPooling2D(),
#layers.Conv2D(1024, 3, padding='same', activation='relu'),
#layers.MaxPooling2D(),
#dropout prevents overtraining by not allowing each node to see each datapoint
layers.Dropout(0.65),
layers.Flatten(),
layers.Dense(512, activation='relu'),
layers.Dense(len(classes))
])
opt = tf.keras.optimizers.Adam(learning_rate=learningRate)
model.compile(optimizer=opt,
loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
metrics=['accuracy'])
model.summary()
print(label)
history = model.fit(
trainData,
validation_data=testData,
epochs=epochs,
)
current = next(linecycler)
plt.plot(history.history['loss'], label='Training {0}'.format(label), linestyle=current)
plt.plot(history.history['val_loss'], label='Validation {0}'.format(label), linestyle=current)
return model
#settings
# stderr = sys.stderr
# sys.stderr = open(os.devnull, 'w')
# sys.stderr = stderr
#load the IMAGES
dataDirectory = './newBirds'
dataDirectory = pathlib.Path(dataDirectory)
imageCountProcess = subprocess.Popen('find -type f -name "*jpg" | wc -l', stdout=subprocess.PIPE, shell=True)
imageCount = int(imageCountProcess.communicate()[0].strip())
#print('Image count: {0}'.format(imageCount))
#test display an image
# osprey = list(dataDirectory.glob('OSPREY/*'))
# ospreyImage = PIL.Image.open(str(osprey[random.randint(1,100)]))
# ospreyImage.show()
# nFlicker = list(dataDirectory.glob('NORTHERN FLICKER/*'))
# nFlickerImage = PIL.Image.open(str(nFlicker[random.randint(1,100)]))
# nFlickerImage.show()
#set parameters
batchSize = 32
height=224
width=224
trainTestSplit = 0.2
randomSeed = random.randint(0,1000000000)
trainData = tf.keras.preprocessing.image_dataset_from_directory(
dataDirectory,
labels='inferred',
label_mode='categorical',
validation_split=trainTestSplit,
subset='training',
seed=randomSeed,
image_size=(height,width),
batch_size=batchSize)
testData = tf.keras.preprocessing.image_dataset_from_directory(
dataDirectory,
labels='inferred',
label_mode='categorical',
validation_split=trainTestSplit,
subset='validation',
seed=randomSeed,
image_size=(height,width),
batch_size=batchSize)
#sample additional images
#plt.figure(figsize=(10,10))
# for images, labels in trainData.take(1):
# for i in range(9):
# ax = plt.subplot(3, 3, i+1)
# plt.imshow(images[i].numpy().astype("uint8"))
# plt.title(classes[labels[i]])
# plt.axis("off")
# plt.show()
#class names
classes = trainData.class_names
testClasses = testData.class_names
#check classes
# print(trainData.class_names)
# print(len(classes))
#buffer to hold the data in memory for faster performance
autotune = tf.data.experimental.AUTOTUNE
trainData = trainData.cache().shuffle(1000).prefetch(buffer_size=autotune)
testData = testData.cache().prefetch(buffer_size=autotune)
#initialLearningRate = [0.0009, 0.00075, 0.0005] #try decreasing this value because the loss increases for the first part of each epoch
finalLearningRate = 0.0001
maximalLearningRate = 0.001
decayRate = [0.000001]
staircaseFlag = [False]
epochs = 15
#plot cycler
lines = ['-', '--', '-.', ':']
linecycler = cycle(lines)
#learning rate function for cyclical
a = lambda x: x
b = lambda x: 1.0
functions = (b) #, a )
printFunctions = ['lambda x: 1.0'] #, 'lambda x: x']
#range of number of schedulers being tested
for j in range(2):
if j == 0:
initialLearningRate = [0.0008]
for k in range(len(initialLearningRate)):
for l in range(len(decayRate)):
for n in range(len(staircaseFlag)):
#linear decay
label = 'ITD ' + str(initialLearningRate[k]) + ' ' + str(decayRate[l]) + ' ' + str(staircaseFlag[n])
learningRate = tf.keras.optimizers.schedules.InverseTimeDecay(
initial_learning_rate = initialLearningRate[k],
decay_steps = 1,
decay_rate = decayRate[l], #(imageCount * (1 - trainTestSplit)) / ((initialLearningRate - finalLearningRate) * batchSize),
staircase = staircaseFlag[n])
model = runModel()
if j == 1:
#cyclical
if True:
#for c, i in enumerate(functions):
initialLearningRate = [0.0001]
for k in range(len(initialLearningRate)):
method = 'cyclical ' #+ printFunctions[c]
learningRate = tfa.optimizers.CyclicalLearningRate(
initial_learning_rate = initialLearningRate[k],
maximal_learning_rate = maximalLearningRate,
step_size = (imageCount * (1 - trainTestSplit)) / ((maximalLearningRate - initialLearningRate[k]) * batchSize),
scale_fn = lambda x: 1.0, #if multiple functions tested switch to i, change executor from if True to the commented for loop
scale_mode = 'iterations')
label = method + ' ' + str(initialLearningRate[k]) #printed in legend
model = runModel()
#make some plots of the training process
#mean actual error.
plt.title('MAE for bird species prediction')
plt.ylabel('MAE')
plt.xlabel('Epoch')
plt.legend(loc='best', frameon=False, fontsize=8)
plt.savefig('./learningRateMAE' + str(randomSeed) + '.png')
plt.show()
predictions = np.array([])
labels = np.array([])
for x, y in testData:
predictions = np.concatenate([predictions, model.predict_classes(x)])
labels = np.concatenate([labels, np.argmax(y.numpy(), axis=-1)])
confusionMatrix = tf.math.confusion_matrix(labels=labels, predictions=predictions).numpy()
#plot the confusion matrix
sb.heatmap(confusionMatrix, cmap='vlag') #, xticklabels=labels, yticklabels=labels)
#plt.yticks(rotation=0, fontsize=6)
#plt.xticks(fontsize=6)
plt.xlabel('Predicted Class')
plt.ylabel('Actual Class')
plt.tight_layout()
plt.show()
plt.savefig('ConfusionMatrix.png')
#analysis of model
trace = np.trace(confusionMatrix)
matrixSum = np.sum(confusionMatrix)
diagonal = np.diag(confusionMatrix)
#properties
accuracy = trace / matrixSum
falsePositives = np.sum(confusionMatrix, axis=0) - diagonal
falseNegatives = np.sum(confusionMatrix, axis=1) - diagonal
truePositives = diagonal
trueNegatives = []
for j in range(len(classes)):
tempMatrix = np.delete(confusionMatrix, j, 0)
tempMatrix = np.delete(tempMatrix, j, 1)
trueNegatives.append(sum(sum(tempMatrix)))
#descriptive statistics
recall = truePositives / (truePositives + falseNegatives) #how many of the category are recognized as being in that category
precision = truePositives / (truePositives + falsePositives) #how many of those picked for a cateogry are actually that category
specificity = trueNegatives / (trueNegatives + falsePositives) #how many of those not in a catgory are correctly noted as not in the category
fScore = (2 * precision * recall) / (precision + recall)
if len(trueNegatives) != len(truePositives) or len(trueNegatives) != len(falseNegatives) or len(trueNegatives) != len(falsePositives):
print('Error: inequivalent vector lengths')
#test for incorrect calculations. Sum for all components of each class should be total number of images IN THE TEST SET
print('Images: {0}'.format(imageCount))
testImages = m.floor(imageCount*trainTestSplit)
for k in range(len(classes)):
images = falsePositives[k] + falseNegatives[k] + truePositives[k] + trueNegatives[k]
if images != testImages:
print('Error in counts: {0} {1}'.format(images, testImages))
#print a matrix with properties for each label as columns. Could add specificity here but it is consistently > 0.98 for every class, does not yield useful info.
tempMatrix = np.transpose(np.array([classes, falsePositives, falseNegatives, truePositives, recall, precision, fScore]))
PM = tempMatrix[tempMatrix[:,4].argsort()]
header = ['Class', 'FP', 'FN', 'TP', 'Recall', 'Precision', 'F-1 Score']
print('\n\n{0:27}{1:>4}{2:>4}{3:>4}{4:>8}{5:>12}{6:>12}'.format(header[0], header[1], header[2], header[3], header[4], header[5], header[6]))
for i in range(len(classes)):
print('{0:27}{1:>4}{2:>4}{3:>4}{4:>8.4}{5:>12.4}{6:>12.4}'.format(PM[i][0], PM[i][1], PM[i][2], PM[i][3], PM[i][4], PM[i][5], PM[i][6]))
#next steps:
#1 optimize the learning rate for inverse time decay and cyclical, then compare best performance against each other
#2 Add a loop and run 3 times, save the performance matrix each time and rank the classes from worst to best performance to evaluate opportunities for data set augmentation.