-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
2ea60ee
commit 4a626db
Showing
1 changed file
with
220 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,220 @@ | ||
|
||
import pandas as pd | ||
import numpy as np | ||
import seaborn as sns | ||
import cv2 | ||
import tensorflow as tf | ||
import os | ||
from skimage import io | ||
from PIL import Image | ||
from tensorflow.keras import backend as K | ||
|
||
#creating a custom datagenerator: | ||
|
||
class DataGenerator(tf.keras.utils.Sequence): | ||
def __init__(self, ids , mask, image_dir = './', batch_size = 16, img_h = 256, img_w = 256, shuffle = True): | ||
|
||
self.ids = ids | ||
self.mask = mask | ||
self.image_dir = image_dir | ||
self.batch_size = batch_size | ||
self.img_h = img_h | ||
self.img_w = img_w | ||
self.shuffle = shuffle | ||
self.on_epoch_end() | ||
|
||
def __len__(self): | ||
'Get the number of batches per epoch' | ||
|
||
return int(np.floor(len(self.ids)) / self.batch_size) | ||
|
||
def __getitem__(self, index): | ||
'Generate a batch of data' | ||
|
||
#generate index of batch_size length | ||
indexes = self.indexes[index* self.batch_size : (index+1) * self.batch_size] | ||
|
||
#get the ImageId corresponding to the indexes created above based on batch size | ||
list_ids = [self.ids[i] for i in indexes] | ||
|
||
#get the MaskId corresponding to the indexes created above based on batch size | ||
list_mask = [self.mask[i] for i in indexes] | ||
|
||
|
||
#generate data for the X(features) and y(label) | ||
X, y = self.__data_generation(list_ids, list_mask) | ||
|
||
#returning the data | ||
return X, y | ||
|
||
def on_epoch_end(self): | ||
'Used for updating the indices after each epoch, once at the beginning as well as at the end of each epoch' | ||
|
||
#getting the array of indices based on the input dataframe | ||
self.indexes = np.arange(len(self.ids)) | ||
|
||
#if shuffle is true, shuffle the indices | ||
if self.shuffle: | ||
np.random.shuffle(self.indexes) | ||
|
||
def __data_generation(self, list_ids, list_mask): | ||
'generate the data corresponding the indexes in a given batch of images' | ||
|
||
# create empty arrays of shape (batch_size,height,width,depth) | ||
#Depth is 3 for input and depth is taken as 1 for output becasue mask consist only of 1 channel. | ||
X = np.empty((self.batch_size, self.img_h, self.img_w, 3)) | ||
y = np.empty((self.batch_size, self.img_h, self.img_w, 1)) | ||
|
||
#iterate through the dataframe rows, whose size is equal to the batch_size | ||
for i in range(len(list_ids)): | ||
#path of the image | ||
img_path = './' + str(list_ids[i]) | ||
|
||
#mask path | ||
mask_path = './' + str(list_mask[i]) | ||
|
||
#reading the original image and the corresponding mask image | ||
img = io.imread(img_path) | ||
mask = io.imread(mask_path) | ||
|
||
#resizing and coverting them to array of type float64 | ||
img = cv2.resize(img,(self.img_h,self.img_w)) | ||
img = np.array(img, dtype = np.float64) | ||
|
||
mask = cv2.resize(mask,(self.img_h,self.img_w)) | ||
mask = np.array(mask, dtype = np.float64) | ||
|
||
#standardising | ||
img -= img.mean() | ||
img /= img.std() | ||
|
||
mask -= mask.mean() | ||
mask /= mask.std() | ||
|
||
#Adding image to the empty array | ||
X[i,] = img | ||
|
||
#expanding the dimnesion of the image from (256,256) to (256,256,1) | ||
y[i,] = np.expand_dims(mask, axis = 2) | ||
|
||
#normalizing y | ||
y = (y > 0).astype(int) | ||
|
||
return X, y | ||
|
||
|
||
|
||
|
||
|
||
|
||
def prediction(test, model, model_seg): | ||
''' | ||
Predcition function which takes dataframe containing ImageID as Input and perform 2 type of prediction on the image | ||
Initially, image is passed through the classification network which predicts whether the image has defect or not, if the model | ||
is 99% sure that the image has no defect, then the image is labeled as no-defect, if the model is not sure, it passes the image to the | ||
segmentation network, it again checks if the image has defect or not, if it has defect, then the type and location of defect is found | ||
''' | ||
|
||
#directory | ||
directory = "./" | ||
|
||
#Creating empty list to store the results | ||
mask = [] | ||
image_id = [] | ||
has_mask = [] | ||
|
||
#iterating through each image in the test data | ||
for i in test.image_path: | ||
|
||
path = directory + str(i) | ||
|
||
#reading the image | ||
img = io.imread(path) | ||
|
||
#Normalizing the image | ||
img = img * 1./255. | ||
|
||
#Reshaping the image | ||
img = cv2.resize(img,(256,256)) | ||
|
||
#Converting the image into array | ||
img = np.array(img, dtype = np.float64) | ||
|
||
#reshaping the image from 256,256,3 to 1,256,256,3 | ||
img = np.reshape(img, (1,256,256,3)) | ||
|
||
#making prediction on the image | ||
is_defect = model.predict(img) | ||
|
||
#if tumour is not present we append the details of the image to the list | ||
if np.argmax(is_defect) == 0: | ||
image_id.append(i) | ||
has_mask.append(0) | ||
mask.append('No mask') | ||
continue | ||
|
||
#Read the image | ||
img = io.imread(path) | ||
|
||
#Creating a empty array of shape 1,256,256,1 | ||
X = np.empty((1, 256, 256, 3)) | ||
|
||
#resizing the image and coverting them to array of type float64 | ||
img = cv2.resize(img,(256,256)) | ||
img = np.array(img, dtype = np.float64) | ||
|
||
#standardising the image | ||
img -= img.mean() | ||
img /= img.std() | ||
|
||
#converting the shape of image from 256,256,3 to 1,256,256,3 | ||
X[0,] = img | ||
|
||
#make prediction | ||
predict = model_seg.predict(X) | ||
|
||
#if the sum of predicted values is equal to 0 then there is no tumour | ||
if predict.round().astype(int).sum() == 0: | ||
image_id.append(i) | ||
has_mask.append(0) | ||
mask.append('No mask') | ||
else: | ||
#if the sum of pixel values are more than 0, then there is tumour | ||
image_id.append(i) | ||
has_mask.append(1) | ||
mask.append(predict) | ||
|
||
|
||
return image_id, mask, has_mask | ||
|
||
|
||
|
||
|
||
|
||
''' | ||
We need a custom loss function to train this ResUNet.So, we have used the loss function as it is from https://github.com/nabsabraham/focal-tversky-unet/blob/master/losses.py | ||
@article{focal-unet, | ||
title={A novel Focal Tversky loss function with improved Attention U-Net for lesion segmentation}, | ||
author={Abraham, Nabila and Khan, Naimul Mefraz}, | ||
journal={arXiv preprint arXiv:1810.07842}, | ||
year={2018} | ||
} | ||
''' | ||
def tversky(y_true, y_pred, smooth = 1e-6): | ||
y_true_pos = K.flatten(y_true) | ||
y_pred_pos = K.flatten(y_pred) | ||
true_pos = K.sum(y_true_pos * y_pred_pos) | ||
false_neg = K.sum(y_true_pos * (1-y_pred_pos)) | ||
false_pos = K.sum((1-y_true_pos)*y_pred_pos) | ||
alpha = 0.7 | ||
return (true_pos + smooth)/(true_pos + alpha*false_neg + (1-alpha)*false_pos + smooth) | ||
|
||
def tversky_loss(y_true, y_pred): | ||
return 1 - tversky(y_true,y_pred) | ||
|
||
def focal_tversky(y_true,y_pred): | ||
pt_1 = tversky(y_true, y_pred) | ||
gamma = 0.75 | ||
return K.pow((1-pt_1), gamma) |