#注入所需库
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import random
import numpy as np
import time
from sklearn.svm import SVC #支持向量机分类器
# from sklearn.neighbors import KNeighborsClassifier #K近邻分类器
# from sklearn.linear_model import LogisticRegression #逻辑回归分类器
import xgboost as xgb #XGBoost分类器
import lightgbm as lgb #LightGBM分类器
from sklearn.ensemble import RandomForestClassifier #随机森林分类器
# from catboost import CatBoostClassifier #CatBoost分类器
# from sklearn.tree import DecisionTreeClassifier #决策树分类器
# from sklearn.naive_bayes import GaussianNB #高斯朴素贝叶斯分类器
from skopt import BayesSearchCV
from skopt.space import Integer
from deap import base, creator, tools, algorithms
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score # 用于评估分类器性能的指标
from sklearn.metrics import classification_report, confusion_matrix #用于生成分类报告和混淆矩阵
from sklearn.metrics import make_scorer#定义函数
import warnings #用于忽略警告信息
warnings.filterwarnings("ignore") # 忽略所有警告信息
#设置中文字体及负号正确显示
plt.rcParams['font.sans-serif']=['STHeiti']
plt.rcParams['axes.unicode_minus']=True
plt.rcParams['figure.dpi']=120
#查看基本信息&读取数据
data=pd.read_csv(r'data.csv')
print(f'{data.info()}\n{data.isnull().sum()}\n{data.head()}\n{data.columns}')
#绘制图像
# plt.figure(figsize=(6,4))
# sns.boxplot(x=data['Annual Income'])
# plt.title('aa')
# plt.xlabel('年收入')
# plt.tight_layout()
# plt.show()
# sns.boxplot(x='Credit Default',y='Annual Income',data=data)
# plt.title('aa')
# plt.xlabel('credit default')
# plt.ylabel('count')
# plt.xticks([0,1],['n','y'])
# plt.tight_layout()
# plt.show()
# sns.histplot(x='Annual Income',
# hue='Credit Default',
# hue_order=[0,1],
# data=data,
# kde=True,
# element='bars')
# plt.title('aa')
# plt.xlabel('年收入')
# plt.ylabel('金额')
# plt.legend(labels=['否','是'])
# plt.tight_layout()
# plt.show()
#数据填补
for i in data.columns:
if data[i].dtype!='object':
if data[i].isnull().sum()>0:
data[i].fillna(data[i].mean(),inplace=True)
else:
if data[i].isnull().sum()>0:
data[i].fillna(data[i].mode()[0],inplace=True)
print(data.isnull().sum())
#数据编码
mapping={
'10+ years':0,
'9 years':1,
'8 years':2,
'7 years':3,
'6 years':4,
'5 years':5,
'4 years':6,
'3 years':7,
'2 years':8,
'1 year':9,
'< 1 year':10
}
data['Years in current job']=data['Years in current job'].map(mapping)
dummies_list=[]
data=pd.get_dummies(data=data,drop_first=True)
data2=pd.read_csv(r'data.csv')
for i in data.columns:
if i not in data2.columns:
dummies_list.append(i)
for i in dummies_list:
data[i]=data[i].astype(int)
# print(f'{data.info()}\n{data.isnull().sum()}\n{data.head()}\n{data.columns}')
# continuous_features=['Annual Income', 'Years in current job', 'Tax Liens',
# 'Number of Open Accounts', 'Years of Credit History',
# 'Maximum Open Credit', 'Number of Credit Problems',
# 'Months since last delinquent', 'Bankruptcies', 'Current Loan Amount',
# 'Current Credit Balance', 'Monthly Debt', 'Credit Score',
# 'Credit Default']
# correlation_matrix=data[continuous_features].corr()
# plt.figure(figsize=(12,10))
# sns.heatmap(correlation_matrix,annot=True,cmap='coolwarm',vmin=-1,vmax=1)
# plt.title('相关热力图')
# plt.xticks(rotation=45,ha='right')
# plt.show()
features=['Annual Income','Years in current job','Tax Liens','Number of Open Accounts']
# fig,axes=plt.subplots(2,2,figsize=(6,4))
# for i in range(len(features)):
# row=i//2
# col=i%2
# feature=features[i]
# axes[row,col].boxplot(data[feature].dropna())
# axes[row,col].set_title(f'boxplot of {feature}')
# axes[row,col].set_ylabel(feature)
# plt.tight_layout()
# plt.show()
# fig,axes=plt.subplots(2,2,figsize=(6,4))
# for i,feature in enumerate(features):
# row=i//2
# col=i%2
# axes[row,col].boxplot(data[feature])
# axes[row,col].set_title(f'box of {feature}')
# axes[row,col].set_ylabel(feature)
# plt.tight_layout()
# plt.show()
# fig,axes=plt.subplots(2,2,figsize=(6,4))
# for i,feature in enumerate(features):
# row=i//2
# col=i%2
# sns.histplot(
# x=feature,
# hue='Credit Default',
# data=data,
# kde=True,
# ax=axes[row,col]
# )
# axes[row,col].set_title(f'aa')
# axes[row,col].set_xlabel(feature)
# axes[row,col].set_ylabel(f'font')
# plt.tight_layout()
# plt.show()
# fig,axes=plt.subplots(2,2,figsize=(6,4))
# for i,feature in enumerate(features):
# row=i//2
# col=i%2
# sns.histplot(
# x=feature,
# hue='Credit Default',
# data=data,
# element='step',
# ax=axes[row,col]
# )
# axes[row,col].set_title(f'aa')
# axes[row,col].set_xlabel(feature)
# axes[row,col].set_ylabel(f'count')
# plt.tight_layout()
# plt.show()
from sklearn.model_selection import train_test_split
x=data.drop(['Credit Default'],axis=1)
y=data['Credit Default']
x_train,x_test,y_train,y_test=train_test_split(x,y,test_size=0.2,random_state=42)
print(f'train:{x_train.shape}\ntest:{x_test.shape}')
# #SVM
# print("--- 1. 默认参数随机森林 (训练集 -> 测试集) ---")
# start_time=time.time()
# svm_model=SVC(random_state=42)
# svm_model.fit(x_train,y_train)
# svm_pred=svm_model.predict(x_test)
# end_time=time.time()
# print(f'训练与预测耗时:{end_time - start_time:.4f}')
# print('\n SVM分类报告')
# print(classification_report(y_test,svm_pred))
# print('\nSVM混淆矩阵')
# print(confusion_matrix(y_test,svm_pred))
# #randomforest
# start_time=time.time()
# rf_model=RandomForestClassifier(random_state=42)
# rf_model.fit(x_train,y_train)
# rf_pred=rf_model.predict(x_test)
# end_time=time.time()
# print(f'训练与预测耗时:{end_time - start_time:.4f}')
# print('\n随机森林分类报告')
# print(classification_report(y_test,rf_pred))
# print('\n随机森林混淆矩阵')
# print(confusion_matrix(y_test,rf_pred))
# #XGboost
# print("--- 1. 默认参数随机森林 (训练集 -> 测试集) ---")
# start_time=time.time()
# xgb_model=xgb.XGBClassifier(random_state=42)
# xgb_model.fit(x_train,y_train)
# xgb_pred=xgb_model.predict(x_test)
# end_time=time.time()
# print(f'训练与预测耗时:{end_time - start_time:.4f}')
# print('\n XGBOOST分类报告')
# print(classification_report(y_test,xgb_pred))
# print('\n XGBOOST混淆矩阵')
# print(confusion_matrix(y_test,xgb_pred))
#定义约登指数
def youden_score(y_true,y_pred):
tn,fp,fn,tp=confusion_matrix(y_true,y_pred).ravel()
sensitivity=tp/(tp+fn)
spcificity=tn/(tn+fp)
return sensitivity+spcificity-1
youden_scorer=make_scorer(youden_score)
# #网格搜索
# print("\n--- 2. 网格搜索优化随机森林 (训练集 -> 测试集) ---")
# from sklearn.model_selection import GridSearchCV
# param_grid={
# 'n_estimators':[50,100,200],
# 'max_depth':[None,10,20,30],
# 'min_samples_split':[2,5,10],
# 'min_samples_leaf':[1,2,4]
# }
# grid_search=GridSearchCV(estimator=RandomForestClassifier(random_state=42),
# param_grid=param_grid,
# cv=5,
# n_jobs=-1,
# scoring=youden_scorer)
# start_time=time.time()
# grid_search.fit(x_train,y_train)
# end_time=time.time()
# print(f'网格搜索耗时:{end_time-start_time:.4f}秒')
# print('最佳参数:',grid_search.best_params_)
# best_model=grid_search.best_estimator_
# best_pred=best_model.predict(x_test)
# print('\n网格搜索优化后的随机森林在测试集上的分类报告')
# print(classification_report(y_test,best_pred))
# print('网格搜索优化后的随机森林在测试集上的混淆矩阵')
# print(confusion_matrix(y_test,best_pred))
# #贝叶斯优化
# search_space={
# 'n_estimators':Integer(20,200),
# 'max_depth':Integer(1,30),
# 'min_samples_split':Integer(2,10),
# 'min_samples_leaf':Integer(1,5)
# }
# bayes_search=BayesSearchCV(
# estimator=RandomForestClassifier(random_state=42),
# search_spaces=search_space,
# n_iter=32,
# cv=5,
# n_jobs=-1,
# scoring=youden_scorer
# )
# start_time=time.time()
# bayes_search.fit(x_train,y_train)
# end_time=time.time()
# print(f'贝叶斯优化耗时:{end_time-start_time}')
# print('最佳参数',bayes_search.best_params_)
# best_model=bayes_search.best_estimator_
# best_pred=best_model.predict(x_test)
# print('\n贝叶斯优化后的随机森林在测试集上的分类报告')
# print(classification_report(y_test,best_pred))
# print('\n贝叶斯优化后的随机森林在测试集上的混淆矩阵')
# print(confusion_matrix(y_test,best_pred))
# --- 2. 遗传算法优化随机森林 ---
print("\n--- 2. 遗传算法优化随机森林 (训练集 -> 测试集) ---")
# 定义适应度函数和个体类型
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)
# 定义超参数范围
n_estimators_range = (50, 200)
max_depth_range = (10, 30)
min_samples_split_range = (2, 10)
min_samples_leaf_range = (1, 4)
# 初始化工具盒
toolbox = base.Toolbox()
# 定义基因生成器
toolbox.register("attr_n_estimators", random.randint, *n_estimators_range)
toolbox.register("attr_max_depth", random.randint, *max_depth_range)
toolbox.register("attr_min_samples_split", random.randint, *min_samples_split_range)
toolbox.register("attr_min_samples_leaf", random.randint, *min_samples_leaf_range)
# 定义个体生成器
toolbox.register("individual", tools.initCycle, creator.Individual,
(toolbox.attr_n_estimators, toolbox.attr_max_depth,
toolbox.attr_min_samples_split, toolbox.attr_min_samples_leaf), n=1)
# 定义种群生成器
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
# 定义评估函数 - 使用约登指数替代准确率
def evaluate(individual):
n_estimators, max_depth, min_samples_split, min_samples_leaf = individual
model = RandomForestClassifier(n_estimators=n_estimators,
max_depth=max_depth,
min_samples_split=min_samples_split,
min_samples_leaf=min_samples_leaf,
random_state=42)
model.fit(x_train, y_train)
y_pred = model.predict(x_test)
# 使用约登指数替代准确率
tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
sensitivity = tp / (tp + fn)
specificity = tn / (tn + fp)
youden_index = sensitivity + specificity - 1
return youden_index,
# 注册评估函数
toolbox.register("evaluate", evaluate)
# 注册遗传操作
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutUniformInt, low=[n_estimators_range[0], max_depth_range[0],
min_samples_split_range[0], min_samples_leaf_range[0]],
up=[n_estimators_range[1], max_depth_range[1],
min_samples_split_range[1], min_samples_leaf_range[1]], indpb=0.1)
toolbox.register("select", tools.selTournament, tournsize=3)
# 初始化种群
pop = toolbox.population(n=20)
# 遗传算法参数
NGEN = 10 #迭代数量由10代修改为300代
CXPB = 0.5
MUTPB = 0.2
start_time = time.time()
# 运行遗传算法
for gen in range(NGEN):
offspring = algorithms.varAnd(pop, toolbox, cxpb=CXPB, mutpb=MUTPB)
fits = toolbox.map(toolbox.evaluate, offspring)
for fit, ind in zip(fits, offspring):
ind.fitness.values = fit
pop = toolbox.select(offspring, k=len(pop))
end_time = time.time()
# 找到最优个体
best_ind = tools.selBest(pop, k=1)[0]
best_n_estimators, best_max_depth, best_min_samples_split, best_min_samples_leaf = best_ind
print(f"遗传算法优化耗时: {end_time - start_time:.4f} 秒")
print("最佳参数: ", {
'n_estimators': best_n_estimators,
'max_depth': best_max_depth,
'min_samples_split': best_min_samples_split,
'min_samples_leaf': best_min_samples_leaf
})
# 使用最佳参数的模型进行预测
best_model = RandomForestClassifier(n_estimators=best_n_estimators,
max_depth=best_max_depth,
min_samples_split=best_min_samples_split,
min_samples_leaf=best_min_samples_leaf,
random_state=42)
best_model.fit(x_train, y_train)
best_pred = best_model.predict(x_test)
print("\n遗传算法优化后的随机森林 在测试集上的分类报告:")
print(classification_report(y_test, best_pred))
print("遗传算法优化后的随机森林 在测试集上的混淆矩阵:")
print(confusion_matrix(y_test, best_pred))
# --- 2. 粒子群优化算法优化随机森林 ---
print("\n--- 2. 粒子群优化算法优化随机森林 (训练集 -> 测试集) ---")
# 定义适应度函数,本质就是构建了一个函数实现 参数--> 评估指标的映射
def fitness_function(params):
n_estimators, max_depth, min_samples_split, min_samples_leaf = params # 序列解包,允许你将一个可迭代对象(如列表、元组、字符串等)中的元素依次赋值给多个变量。
model = RandomForestClassifier(n_estimators=int(n_estimators),
max_depth=int(max_depth),
min_samples_split=int(min_samples_split),
min_samples_leaf=int(min_samples_leaf),
random_state=42)
model.fit(x_train, y_train)
y_pred = model.predict(x_test)
# 使用约登指数替代准确率
tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
sensitivity = tp / (tp + fn) # 真阳性率
specificity = tn / (tn + fp) # 真阴性率
youden_index = sensitivity + specificity - 1 # 约登指数
return youden_index
# 粒子群优化算法实现
def pso(num_particles, num_iterations, c1, c2, w, bounds): # 粒子群优化算法核心函数
# num_particles:粒子的数量,即算法中用于搜索最优解的个体数量。
# num_iterations:迭代次数,算法运行的最大循环次数。
# c1:认知学习因子,用于控制粒子向自身历史最佳位置移动的程度。
# c2:社会学习因子,用于控制粒子向全局最佳位置移动的程度。
# w:惯性权重,控制粒子的惯性,影响粒子在搜索空间中的移动速度和方向。
# bounds:超参数的取值范围,是一个包含多个元组的列表,每个元组表示一个超参数的最小值和最大值。
num_params = len(bounds)
particles = np.array([[random.uniform(bounds[i][0], bounds[i][1]) for i in range(num_params)] for _ in
range(num_particles)])
velocities = np.array([[0] * num_params for _ in range(num_particles)])
personal_best = particles.copy()
personal_best_fitness = np.array([fitness_function(p) for p in particles])
global_best_index = np.argmax(personal_best_fitness)
global_best = personal_best[global_best_index]
global_best_fitness = personal_best_fitness[global_best_index]
for _ in range(num_iterations):
r1 = np.array([[random.random() for _ in range(num_params)] for _ in range(num_particles)])
r2 = np.array([[random.random() for _ in range(num_params)] for _ in range(num_particles)])
velocities = w * velocities + c1 * r1 * (personal_best - particles) + c2 * r2 * (
global_best - particles)
particles = particles + velocities
for i in range(num_particles):
for j in range(num_params):
if particles[i][j] < bounds[j][0]:
particles[i][j] = bounds[j][0]
elif particles[i][j] > bounds[j][1]:
particles[i][j] = bounds[j][1]
fitness_values = np.array([fitness_function(p) for p in particles])
improved_indices = fitness_values > personal_best_fitness
personal_best[improved_indices] = particles[improved_indices]
personal_best_fitness[improved_indices] = fitness_values[improved_indices]
current_best_index = np.argmax(personal_best_fitness)
if personal_best_fitness[current_best_index] > global_best_fitness:
global_best = personal_best[current_best_index]
global_best_fitness = personal_best_fitness[current_best_index]
return global_best, global_best_fitness
# 超参数范围
bounds = [(50, 200), (10, 30), (2, 10), (1, 4)] # n_estimators, max_depth, min_samples_split, min_samples_leaf
# 粒子群优化算法参数
num_particles = 20
num_iterations = 10
c1 = 1.5
c2 = 1.5
w = 0.5
start_time = time.time()
best_params, best_fitness = pso(num_particles, num_iterations, c1, c2, w, bounds)
end_time = time.time()
print(f"粒子群优化算法优化耗时: {end_time - start_time:.4f} 秒")
print("最佳参数: ", {
'n_estimators': int(best_params[0]),
'max_depth': int(best_params[1]),
'min_samples_split': int(best_params[2]),
'min_samples_leaf': int(best_params[3])
})
print(f"最佳约登指数: {best_fitness:.4f}")
# 使用最佳参数的模型进行预测
best_model = RandomForestClassifier(n_estimators=int(best_params[0]),
max_depth=int(best_params[1]),
min_samples_split=int(best_params[2]),
min_samples_leaf=int(best_params[3]),
random_state=42)
best_model.fit(x_train, y_train)
best_pred = best_model.predict(x_test)
print("\n粒子群优化算法优化后的随机森林 在测试集上的分类报告:")
print(classification_report(y_test, best_pred))
print("粒子群优化算法优化后的随机森林 在测试集上的混淆矩阵:")
print(confusion_matrix(y_test, best_pred))
# --- 2. 模拟退火算法优化随机森林 ---
print("\n--- 2. 模拟退火算法优化随机森林 (训练集 -> 测试集) ---")
# 定义适应度函数
def fitness_function(params):
n_estimators, max_depth, min_samples_split, min_samples_leaf = params
model = RandomForestClassifier(n_estimators=int(n_estimators),
max_depth=int(max_depth),
min_samples_split=int(min_samples_split),
min_samples_leaf=int(min_samples_leaf),
random_state=42)
model.fit(x_train, y_train)
y_pred = model.predict(x_test)
# 使用约登指数替代准确率
tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
sensitivity = tp / (tp + fn) # 真阳性率
specificity = tn / (tn + fp) # 真阴性率
youden_index = sensitivity + specificity - 1 # 约登指数
return youden_index
# 模拟退火算法实现
def simulated_annealing(initial_solution, bounds, initial_temp, final_temp, alpha):
current_solution = initial_solution
current_fitness = fitness_function(current_solution)
best_solution = current_solution
best_fitness = current_fitness
temp = initial_temp
while temp > final_temp:
# 生成邻域解
neighbor_solution = []
for i in range(len(current_solution)):
new_val = current_solution[i] + random.uniform(-1, 1) * (bounds[i][1] - bounds[i][0]) * 0.1
new_val = max(bounds[i][0], min(bounds[i][1], new_val))
neighbor_solution.append(new_val)
neighbor_fitness = fitness_function(neighbor_solution)
delta_fitness = neighbor_fitness - current_fitness
if delta_fitness > 0 or random.random() < np.exp(delta_fitness / temp):
current_solution = neighbor_solution
current_fitness = neighbor_fitness
if current_fitness > best_fitness:
best_solution = current_solution
best_fitness = current_fitness
temp *= alpha
return best_solution, best_fitness
# 超参数范围
bounds = [(50, 200), (10, 30), (2, 10), (1, 4)] # n_estimators, max_depth, min_samples_split, min_samples_leaf
# 模拟退火算法参数
initial_temp = 100 # 初始温度
final_temp = 0.1 # 终止温度
alpha = 0.95 # 温度衰减系数
# 初始化初始解
initial_solution = [random.uniform(bounds[i][0], bounds[i][1]) for i in range(len(bounds))]
start_time = time.time()
best_params, best_fitness = simulated_annealing(initial_solution, bounds, initial_temp, final_temp, alpha)
end_time = time.time()
print(f"模拟退火算法优化耗时: {end_time - start_time:.4f} 秒")
print("最佳参数: ", {
'n_estimators': int(best_params[0]),
'max_depth': int(best_params[1]),
'min_samples_split': int(best_params[2]),
'min_samples_leaf': int(best_params[3])
})
print(f"最佳约登指数: {best_fitness:.4f}")
# 使用最佳参数的模型进行预测
best_model = RandomForestClassifier(n_estimators=int(best_params[0]),
max_depth=int(best_params[1]),
min_samples_split=int(best_params[2]),
min_samples_leaf=int(best_params[3]),
random_state=42)
best_model.fit(x_train, y_train)
best_pred = best_model.predict(x_test)
print("\n模拟退火算法优化后的随机森林 在测试集上的分类报告:")
print(classification_report(y_test, best_pred))
print("模拟退火算法优化后的随机森林 在测试集上的混淆矩阵:")
print(confusion_matrix(y_test, best_pred))