【人工智能项目】深度学习实现胸腔X光肺炎检测
本次主要是任务:训练模型正确识别肺炎X光图片,0=正常,1=肺炎。
那么代码走起!!!
导包
# 导包
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from glob import glob
from keras.models import *
from keras.layers import *
from keras.preprocessing.image import *
from keras.utils import *
from keras.optimizers import *
from keras.applications import *
from keras.applications import imagenet_utils
from keras.callbacks import EarlyStopping,ReduceLROnPlateau,ModelCheckpoint,LearningRateScheduler
from efficientnet.keras import EfficientNetB3
EDA探索数据
# EDA探索数据
import matplotlib.pyplot as plt
import seaborn as sns
import os
train_dir_path = "./xray_dataset/train"
train_normal_path = os.path.join(train_dir_path,"NORMAL")
train_pneumonia_path = os.path.join(train_dir_path,"PNEUMONIA")
train_normal_length = len(os.listdir(train_normal_path))
train_pneumonia_length = len(os.listdir(train_pneumonia_path))
sns.set_style("whitegrid")
sns.barplot(x = ["NORMAL","PNEUMONIA"],y = np.array([train_normal_length,train_pneumonia_length]))
labels = "Normal","Pneumonia"
sizes = [train_normal_length,train_pneumonia_length]
colors = ["green","red"]
explode = (0.1,0)
plt.pie(sizes,labels=labels,colors=colors,explode=explode)
plt.axis("equal")
plt.show()
读取数据
img_rows,img_cols = 224,224
batch_size= 2
# ImageDataGenerator读取数据
from keras.preprocessing.image import ImageDataGenerator
import os
train_path = "C:\\Users\\LvChaoZhang\\00contest\\09 ai研习社胸腔X光肺炎检测\\xray_dataset\\train"
# train_datagen = ImageDataGenerator(rescale = 1./255,
# shear_range = 0.2,
# zoom_range = 0.2,
# horizontal_flip = True,
# vertical_flip=True,
# validation_split=0.2,
# fill_mode="nearest")
train_datagen = ImageDataGenerator(rescale = 1./255,
validation_split=0.2)
train_set = train_datagen.flow_from_directory(train_path,
target_size=(img_rows,img_cols),
batch_size=batch_size,
color_mode="rgb",
class_mode="binary",
shuffle=True,
seed=2019,
subset="training")
val_set = train_datagen.flow_from_directory(train_path,
target_size=(img_rows,img_cols),
batch_size=batch_size,
class_mode="binary",
color_mode="rgb",
shuffle=False,
subset="validation")
Found 3280 images belonging to 2 classes.
Found 819 images belonging to 2 classes.
train_set.class_indices
{'NORMAL': 0, 'PNEUMONIA': 1}
模型
# 初始模型
model = Sequential()
model.add(Conv2D(32,(3,3),input_shape=(img_rows,img_cols,3),activation='relu'))
model.add(Conv2D(32,(3,3),activation='relu'))
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(Dropout(0.2))
model.add(Conv2D(64,(3,3),activation='relu'))
model.add(Conv2D(64,(3,3),activation='relu'))
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(Dropout(0.2))
model.add(Conv2D(128,(3,3),activation='relu'))
model.add(Conv2D(128,(3,3),activation='relu'))
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(Dropout(0.2))
model.add(Flatten())
# model.add(Dense(units=256,activation='relu'))
# model.add(Dropout(0.2))
model.add(Dense(units=256,activation='relu'))
model.add(Dropout(0.2))
model.add(Dense(units=1,activation='sigmoid'))
# 模型 VGG161.0
base_model = VGG16(weights="imagenet",include_top=False,input_shape=(img_rows,img_cols,3))
for layer in base_model.layers:
layer.trainable=False
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(128,activation="relu")(x)
x = Dropout(0.5)(x)
out = Dense(1,activation="sigmoid")(x)
model = Model(base_model.input,out)
# 模型4.0
model = Sequential()
scale = 32
# convolution 1
model.add(Conv2D(scale,(3,3),input_shape=(img_rows,img_cols,3)))
model.add(LeakyReLU(alpha=0.1))
# convolution 2
model.add(Conv2D(2*scale,(3,3)))
model.add(LeakyReLU(alpha=0.1))
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(Dropout(0.1))
# convolution 3
model.add(Conv2D(3*scale,(3,3)))
model.add(LeakyReLU(alpha=0.1))
# convolution 4
model.add(Conv2D(4*scale,(3,3)))
model.add(LeakyReLU(alpha=0.1))
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(Dropout(0.1))
# convolution 5
model.add(Conv2D(5*scale,(3,3)))
model.add(LeakyReLU(alpha=0.1))
# convolution 6
model.add(Conv2D(3*scale,(3,3)))
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(Dropout(0.1))
# convolution 7
model.add(Conv2D(6*scale,(3,3)))
model.add(LeakyReLU(alpha=0.1))
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(Dropout(0.1))
# convolution 8
model.add(Conv2D(7*scale,(3,3)))
model.add(LeakyReLU(alpha=0.1))
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(Dropout(0.1))
# flatten layer
model.add(Flatten())
# first dense layer
model.add(Dense(units=15*scale))
model.add(LeakyReLU(alpha=0.1))
model.add(Dropout(0.5))
# second dense layer
model.add(Dense(units=15*scale))
model.add(LeakyReLU(alpha=0.1))
model.add(Dropout(0.5))
# third dense layer
model.add(Dense(units=15*scale))
model.add(LeakyReLU(alpha=0.1))
model.add(Dropout(0.5))
# output layer
model.add(Dense(1,activation="sigmoid"))
# 试验模型Efficientnet
base_model = EfficientNetB3(weights="imagenet",include_top=False,input_shape=(img_rows,img_cols,3))
for layer in base_model.layers:
layer.trainable=True
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(10,activation="relu")(x)
x = Dropout(0.5)(x)
out = Dense(1,activation="sigmoid")(x)
model = Model(base_model.input,out)
模型编译
# 模型编译
import keras.backend as K
# focal loss
def focal_loss(alpha=0.25,gamma=2.0):
def focal_crossentropy(y_true, y_pred):
bce = K.binary_crossentropy(y_true, y_pred)
y_pred = K.clip(y_pred, K.epsilon(), 1.- K.epsilon())
p_t = (y_true*y_pred) + ((1-y_true)*(1-y_pred))
alpha_factor = 1
modulating_factor = 1
alpha_factor = y_true*alpha + ((1-alpha)*(1-y_true))
modulating_factor = K.pow((1-p_t), gamma)
# compute the final loss and return
return K.mean(alpha_factor*modulating_factor*bce, axis=-1)
return focal_crossentropy
model.compile(loss=focal_loss(),metrics=["accuracy"],optimizer=Adam(lr=0.0001))
# Implement Learning rate decay
checkpoint = ModelCheckpoint("chest_xray_cnn3.h5",
monitor="val_loss",
mode="min",
save_best_only = True,
verbose=1)
earlystop = EarlyStopping(monitor = 'val_loss',
min_delta = 0,
patience = 5,
verbose = 1,
restore_best_weights = True)
reduce_lr = ReduceLROnPlateau(monitor = 'val_loss',
factor = 0.2,
patience = 3,
verbose = 1)
#min_delta = 0.00001)
callbacks = [earlystop, checkpoint, reduce_lr]
# 训练
history = model.fit_generator(train_set,
steps_per_epoch=3280//batch_size,
epochs = 25,
validation_data=val_set,
validation_steps=819 //batch_size,
shuffle=True,
callbacks=callbacks,
class_weight={0:1.84307554 ,1:0.68613994})
# 训练曲线
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
#Accuracy
plt.plot(history.history["acc"])
plt.plot(history.history["val_acc"])
plt.title("Model Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend(["Training Set","Validation Set"],loc="upper left")
# Loss
plt.plot(history.history["loss"])
plt.plot(history.history["val_loss"])
plt.title("Model loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend(["Training set","Test set"],loc="upper left")
plt.show()
预测
import cv2
import numpy as np
import pandas as pd
# 定义读取图片函数
def get_img(file_path,img_rows,img_cols):
img = cv2.imread(file_path)
img = cv2.resize(img,(img_rows,img_cols),interpolation=cv2.INTER_CUBIC)
# if img.shape[2] == 1:
# img = np.dstack([img,img,img])
img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
img = img.astype(np.float32)/255.
return img
# 定义加载测试集的函数
def load_test_data(test_path,img_rows,img_cols):
x_test_id = []
x_test = []
img_names = os.listdir(test_path)
img_names.sort(key=lambda x:int(x[:-4]))
for img_name in img_names:
feature = get_img(os.path.join(test_path,img_name),img_rows,img_cols)
id = img_name
x_test_id.append(id)
x_test.append(feature)
#对x_test进行转换
x_test = np.array(x_test)
return x_test,x_test_id
#test_dir_path = "./xray_dataset/test/"
test_dir_path = "D:\\01\\05CT\\ct\\test\\"
x_test,x_id = load_test_data(test_dir_path,img_rows,img_cols)
model.load_weights("chest_xray_cnn1.h5")
y_pred = model.predict(x_test)
y_pred[:10]
for i in range(len(x_test)):
if y_pred[i]>0.5:
y_pred[i]=1;
else:
y_pred[i]=0;
id = np.arange(len(x_test))
df = pd.DataFrame({"id":id,"predict":y_pred[:,0]})
df.to_csv("submit1.csv",index=None,header=None)
小结
那么本次到此结束!下次见瓷!