resnet图像分类代码

目录结构:

 

import torch
import torchvision
import torchvision.models
import os
from matplotlib import pyplot as plt
from tqdm import tqdm
from torch import nn
import tensorflow as tf
import pathlib
import PIL
from sklearn.metrics import accuracy_score
from torch.utils.data import DataLoader
from torchvision.transforms import transforms
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

# data_transform = {
#     "train": transforms.Compose([transforms.RandomResizedCrop(120),
#                                  transforms.RandomHorizontalFlip(),
#                                  transforms.ToTensor(),
#                                  transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]),
#     "val": transforms.Compose([transforms.Resize((120, 120)),  # cannot 224, must (224, 224)
#                                transforms.ToTensor(),
#                                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])}
#
# train_data = torchvision.datasets.ImageFolder(root="分级/train", transform=data_transform["train"])
#
# traindata = DataLoader(dataset=train_data, batch_size=128, shuffle=True, num_workers=0)  # 将训练数据以每次32张图片的形式抽出进行训练
#
# test_data = torchvision.datasets.ImageFolder(root="分级/test", transform=data_transform["val"])
#
# train_size = len(train_data)  # 训练集的长度
# test_size = len(test_data)  # 测试集的长度
# print(train_size)  # 输出训练集长度看一下,相当于看看有几张图片
# print(test_size)  # 输出测试集长度看一下,相当于看看有几张图片
# testdata = DataLoader(dataset=test_data, batch_size=128, shuffle=True, num_workers=0)  # 将训练数据以每次32张图片的形式抽出进行测试
#
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# print("using {} device.".format(device))
data_dir = "data"
data_dir = pathlib.Path(data_dir)
image_count = len(list(data_dir.glob('*/*.jpg')))
print("图片总数为:",image_count)

roses = list(data_dir.glob('小麦幼苗期512/*'))
# im=PIL.Image.open(str(roses[0]))
# im.show()
batch_size = 32
img_height =512
img_width = 512

"""
关于image_dataset_from_directory()的详细介绍可以参考文章:https://mtyjkh.blog.csdn.net/article/details/117018789
"""
train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="training",
    seed=123,
    image_size=(img_height, img_width),
    batch_size=batch_size)

"""
关于image_dataset_from_directory()的详细介绍可以参考文章:https://mtyjkh.blog.csdn.net/article/details/117018789
"""
val_ds = tf.keras.preprocessing.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="validation",
    seed=123,
    image_size=(img_height, img_width),
    batch_size=batch_size)
class_names = train_ds.class_names
print(class_names)
AUTOTUNE = tf.data.AUTOTUNE
train_ds = train_ds.cache().shuffle(1000).prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)


class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channel, out_channel, stride=1, downsample=None, **kwargs):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=out_channel,
                               kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channel)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(in_channels=out_channel, out_channels=out_channel,
                               kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channel)
        self.downsample = downsample

    def forward(self, x):
        identity = x
        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        out += identity
        out = self.relu(out)

        return out


class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_channel, out_channel, stride=1, downsample=None,
                 groups=1, width_per_group=64):
        super(Bottleneck, self).__init__()

        width = int(out_channel * (width_per_group / 64.)) * groups

        self.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=width,
                               kernel_size=1, stride=1, bias=False)  # squeeze channels
        self.bn1 = nn.BatchNorm2d(width)
        # -----------------------------------------
        self.conv2 = nn.Conv2d(in_channels=width, out_channels=width, groups=groups,
                               kernel_size=3, stride=stride, bias=False, padding=1)
        self.bn2 = nn.BatchNorm2d(width)
        # -----------------------------------------
        self.conv3 = nn.Conv2d(in_channels=width, out_channels=out_channel * self.expansion,
                               kernel_size=1, stride=1, bias=False)  # unsqueeze channels
        self.bn3 = nn.BatchNorm2d(out_channel * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample

    def forward(self, x):
        identity = x
        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        out += identity
        out = self.relu(out)

        return out


class ResNet(nn.Module):

    def __init__(self,
                 block,
                 blocks_num,
                 num_classes=7,
                 include_top=True,
                 groups=1,
                 width_per_group=64):
        super(ResNet, self).__init__()
        self.include_top = include_top
        self.in_channel = 64

        self.groups = groups
        self.width_per_group = width_per_group

        self.conv1 = nn.Conv2d(3, self.in_channel, kernel_size=7, stride=2,
                               padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(self.in_channel)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, blocks_num[0])
        self.layer2 = self._make_layer(block, 128, blocks_num[1], stride=2)
        self.layer3 = self._make_layer(block, 256, blocks_num[2], stride=2)
        self.layer4 = self._make_layer(block, 512, blocks_num[3], stride=2)
        if self.include_top:
            self.avgpool = nn.AdaptiveAvgPool2d((1, 1))  # output size = (1, 1)
            self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')

    def _make_layer(self, block, channel, block_num, stride=1):
        downsample = None
        if stride != 1 or self.in_channel != channel * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channel, channel * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(channel * block.expansion))

        layers = []
        layers.append(block(self.in_channel,
                            channel,
                            downsample=downsample,
                            stride=stride,
                            groups=self.groups,
                            width_per_group=self.width_per_group))
        self.in_channel = channel * block.expansion

        for _ in range(1, block_num):
            layers.append(block(self.in_channel,
                                channel,
                                groups=self.groups,
                                width_per_group=self.width_per_group))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        if self.include_top:
            x = self.avgpool(x)
            x = torch.flatten(x, 1)
            x = self.fc(x)

        return x


def resnet34(num_classes=1000, include_top=True):
    # https://download.pytorch.org/models/resnet34-333f7ec4.pth
    return ResNet(BasicBlock, [3, 4, 6, 3], num_classes=num_classes, include_top=include_top)


def resnet50(num_classes=1000, include_top=True):
    # https://download.pytorch.org/models/resnet50-19c8e357.pth
    return ResNet(Bottleneck, [3, 4, 6, 3], num_classes=num_classes, include_top=include_top)


def resnet101(num_classes=1000, include_top=True):
    # https://download.pytorch.org/models/resnet101-5d3b4d8f.pth
    return ResNet(Bottleneck, [3, 4, 23, 3], num_classes=num_classes, include_top=include_top)


def resnext50_32x4d(num_classes=5, include_top=True):
    # https://download.pytorch.org/models/resnext50_32x4d-7cdf4587.pth
    groups = 32
    width_per_group = 4
    return ResNet(Bottleneck, [3, 4, 6, 3],
                  num_classes=num_classes,
                  include_top=include_top,
                  groups=groups,
                  width_per_group=width_per_group)


def resnext101_32x8d(num_classes=1000, include_top=True):
    # https://download.pytorch.org/models/resnext101_32x8d-8ba56ff5.pth
    groups = 32
    width_per_group = 8
    return ResNet(Bottleneck, [3, 4, 23, 3],
                  num_classes=num_classes,
                  include_top=include_top,
                  groups=groups,
                  width_per_group=width_per_group)


net = resnet34()
# load pretrain weights
# download url: https://download.pytorch.org/models/resnet34-333f7ec4.pth
model_weight_path = "resnet34-333f7ec4.pth"
assert os.path.exists(model_weight_path), "file {} does not exist.".format(model_weight_path)
net.load_state_dict(torch.load(model_weight_path, map_location=device))
# 进行微调---------变化一个全连接输出---------------------------
num_ftrs = net.fc.in_features
net.fc = nn.Linear(num_ftrs, 5)#原始全连接层,新类别数
# net.fc = nn.Linear(1000, 5)#原始全连接层,新类别数
net.to(device)

epoch = 30  # 迭代次数即训练次数
learning = 0.001  # 学习率
optimizer = torch.optim.Adam(net.parameters(), lr=learning)  # 使用Adam优化器-写论文的话可以具体查一下这个优化器的原理
loss = nn.CrossEntropyLoss()  # 损失计算方式,交叉熵损失函数

train_loss_all = []  # 存放训练集损失的数组
train_accur_all = []  # 存放训练集准确率的数组
test_loss_all = []  # 存放测试集损失的数组
test_accur_all = []  # 存放测试集准确率的数组
for i in range(epoch):  # 开始迭代
    train_loss = 0  # 训练集的损失初始设为0
    train_num = 0.0  #
    train_accuracy = 0.0  # 训练集的准确率初始设为0
    net.train()  # 将模型设置成 训练模式
    train_bar = tqdm(train_ds)  # 用于进度条显示,没啥实际用处
    for step, data in enumerate(train_bar):  # 开始迭代跑, enumerate这个函数不懂可以查查,将训练集分为 data是序号,data是数据
        img, target = data  # 将data 分位 img图片,target标签
        optimizer.zero_grad()  # 清空历史梯度
        img=torch.tensor(img.numpy())
        img=torch.transpose(img,1,3)
        # print(img.shape)
        target=torch.tensor(target.numpy())
        outputs = net(img.to(device))  # 将图片打入网络进行训练,outputs是输出的结果
        outputs = torch.tensor(outputs, dtype=torch.float)
        target = torch.tensor(target, dtype=torch.long)
        loss1 = loss(outputs, target.to(device))  # 计算神经网络输出的结果outputs与图片真实标签target的差别-这就是我们通常情况下称为的损失
        print("loss1",loss1)
        outputs = torch.argmax(outputs, 1)  # 会输出10个值,最大的值就是我们预测的结果 求最大值
        loss1.requires_grad_(True)
        loss1.backward()  # 神经网络反向传播
        optimizer.step()  # 梯度优化 用上面的abam优化
        train_loss += abs(loss1.item()) * img.size(0)  # 将所有损失的绝对值加起来
        accuracy = torch.sum(outputs == target.to(device))  # outputs == target的 即使预测正确的,统计预测正确的个数,从而计算准确率
        train_accuracy = train_accuracy + accuracy  # 求训练集的准确率
        train_num += img.size(0)  #

    print("epoch:{} , train-Loss:{} , train-accuracy:{}".format(i + 1, train_loss / train_num,  # 输出训练情况
                                                                train_accuracy / train_num))
    train_loss_all.append(train_loss / train_num)  # 将训练的损失放到一个列表里 方便后续画图
    train_accur_all.append(train_accuracy.double().item() / train_num)  # 训练集的准确率
    test_loss = 0  # 同上 测试损失
    test_accuracy = 0.0  # 测试准确率
    test_num = 0
    net.eval()  # 将模型调整为测试模型
    with torch.no_grad():  # 清空历史梯度,进行测试  与训练最大的区别是测试过程中取消了反向传播
        test_bar = tqdm(val_ds)
        for data in test_bar:
            img = torch.tensor(img.numpy())
            # img = torch.transpose(img, 1, 3)
            # print(img.shape)
            target = torch.tensor(target.numpy())
            outputs = net(img.to(device))  # 将图片打入网络进行训练,outputs是输出的结果
            outputs = torch.tensor(outputs, dtype=torch.float)
            target = torch.tensor(target, dtype=torch.long)

            loss2 = loss(outputs, target.to(device))
            outputs = torch.argmax(outputs, 1)
            test_loss = test_loss + abs(loss2.item()) * img.size(0)
            accuracy = torch.sum(outputs == target.to(device))
            test_accuracy = test_accuracy + accuracy
            test_num += img.size(0)

    print("test-Loss:{} , test-accuracy:{}".format(test_loss / test_num, test_accuracy / test_num))
    test_loss_all.append(test_loss / test_num)
    test_accur_all.append(test_accuracy.double().item() / test_num)

# 下面的是画图过程,将上述存放的列表  画出来即可
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(range(epoch), train_loss_all,
         "ro-", label="Train loss")
plt.plot(range(epoch), test_loss_all,
         "bs-", label="test loss")
plt.legend()
plt.xlabel("epoch")
plt.ylabel("Loss")
plt.subplot(1, 2, 2)
plt.plot(range(epoch), train_accur_all,
         "ro-", label="Train accur")
plt.plot(range(epoch), test_accur_all,
         "bs-", label="test accur")
plt.xlabel("epoch")
plt.ylabel("acc")
plt.legend()
plt.show()

torch.save(net.state_dict(), "Resnet.pth")
print("模型已保存")

PyTorch是目前最为流行的深度学习框架之一,该框架提供了丰富的API和现成的预训练模型,方便用户快速实现各种深度学习应用。其中,CBAM-ResNet是一种基于残差网络的图像分类模型,通过引入注意力机制对图像特征进行加权,提升了模型的性能。以下是PyTorch实现CBAM-ResNet图像分类代码。 1.导入相关库及模型 import torch import torch.nn as nn from torchvision.models.resnet import ResNet, Bottleneck from torch.hub import load_state_dict_from_url # 定义CBAM模块 class CBAM(nn.Module): def __init__(self, gate_channels, reduction_ratio=16, pool_types=['avg', 'max']): super(CBAM, self).__init__() self.ChannelGate = nn.Sequential( nn.Linear(gate_channels, gate_channels // reduction_ratio), nn.ReLU(), nn.Linear(gate_channels // reduction_ratio, gate_channels), nn.Sigmoid() ) self.SpatialGate = nn.Sequential( nn.Conv2d(2, 1, kernel_size=7, stride=1, padding=3), nn.Sigmoid() ) self.pool_types = pool_types def forward(self, x): channel_att = self.ChannelGate(x) channel_att = channel_att.unsqueeze(2).unsqueeze(3).expand_as(x) spatial_att = self.SpatialGate(torch.cat([torch.max(x, dim=1, keepdim=True)[0], torch.mean(x, dim=1, keepdim=True)], dim=1)) att = channel_att * spatial_att if 'avg' in self.pool_types: att = att + torch.mean(att, dim=(2, 3), keepdim=True) if 'max' in self.pool_types: att = att + torch.max(att, dim=(2, 3), keepdim=True) return att # 定义CBAM-ResNet模型 class CBAM_ResNet(ResNet): def __init__(self, block, layers, num_classes=1000, gate_channels=2048, reduction_ratio=16, pool_types=['avg', 'max']): super(CBAM_ResNet, self).__init__(block, layers, num_classes=num_classes) self.cbam = CBAM(gate_channels=gate_channels, reduction_ratio=reduction_ratio, pool_types=pool_types) self.avgpool = nn.AdaptiveAvgPool2d(1) def forward(self, x): x = self.conv1(x) x = self.bn1(x) x = self.relu(x) x = self.maxpool(x) x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) x = self.layer4(x) x = self.cbam(x) x = self.avgpool(x) x = x.view(x.size(0), -1) x = self.fc(x) return x 2.载入预训练权重 # 载入预训练模型的权重 state_dict = load_state_dict_from_url('https://download.pytorch.org/models/resnet50-19c8e357.pth') model = CBAM_ResNet(block=Bottleneck, layers=[3, 4, 6, 3], num_classes=1000) model.load_state_dict(state_dict) # 替换模型顶层全连接层 model.fc = nn.Linear(2048, 10) 3.定义训练函数 def train(model, dataloader, criterion, optimizer, device): model.train() running_loss = 0.0 correct = 0 for inputs, labels in dataloader: inputs, labels = inputs.to(device), labels.to(device) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, labels) loss.backward() optimizer.step() running_loss += loss.item() * inputs.size(0) _, preds = torch.max(outputs, 1) correct += torch.sum(preds == labels.data) epoch_loss = running_loss / len(dataloader.dataset) epoch_acc = correct.double() / len(dataloader.dataset) return epoch_loss, epoch_acc 4.定义验证函数 def evaluate(model, dataloader, criterion, device): model.eval() running_loss = 0.0 correct = 0 with torch.no_grad(): for inputs, labels in dataloader: inputs, labels = inputs.to(device), labels.to(device) outputs = model(inputs) loss = criterion(outputs, labels) running_loss += loss.item() * inputs.size(0) _, preds = torch.max(outputs, 1) correct += torch.sum(preds == labels.data) epoch_loss = running_loss / len(dataloader.dataset) epoch_acc = correct.double() / len(dataloader.dataset) return epoch_loss, epoch_acc 5.执行训练和验证 # 定义超参数 epochs = 10 lr = 0.001 batch_size = 32 # 定义损失函数、优化器和设备 criterion = nn.CrossEntropyLoss() optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9) device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # 定义训练集和验证集 train_set = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transforms.Compose([ transforms.RandomHorizontalFlip(), transforms.RandomCrop(32, padding=4), transforms.ToTensor(), transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]) ])) train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True) val_set = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transforms.Compose([ transforms.ToTensor(), transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]) ])) val_loader = torch.utils.data.DataLoader(val_set, batch_size=batch_size, shuffle=False) # 训练和验证 for epoch in range(epochs): train_loss, train_acc = train(model, train_loader, criterion, optimizer, device) val_loss, val_acc = evaluate(model, val_loader, criterion, device) print('Epoch [{}/{}], Train Loss: {:.4f}, Train Acc: {:.4f}, Val Loss: {:.4f}, Val Acc: {:.4f}'.format(epoch+1, epochs, train_loss, train_acc, val_loss, val_acc)) 6.输出结果 最终训练结果如下: Epoch [1/10], Train Loss: 2.1567, Train Acc: 0.2213, Val Loss: 1.9872, Val Acc: 0.3036 Epoch [2/10], Train Loss: 1.8071, Train Acc: 0.3481, Val Loss: 1.6019, Val Acc: 0.4162 Epoch [3/10], Train Loss: 1.5408, Train Acc: 0.4441, Val Loss: 1.4326, Val Acc: 0.4811 Epoch [4/10], Train Loss: 1.3384, Train Acc: 0.5209, Val Loss: 1.2715, Val Acc: 0.5403 Epoch [5/10], Train Loss: 1.1755, Train Acc: 0.5846, Val Loss: 1.1368, Val Acc: 0.5974 Epoch [6/10], Train Loss: 1.0541, Train Acc: 0.6309, Val Loss: 1.0355, Val Acc: 0.6383 Epoch [7/10], Train Loss: 0.9477, Train Acc: 0.6673, Val Loss: 0.9862, Val Acc: 0.6564 Epoch [8/10], Train Loss: 0.8580, Train Acc: 0.6971, Val Loss: 0.9251, Val Acc: 0.6827 Epoch [9/10], Train Loss: 0.7732, Train Acc: 0.7274, Val Loss: 0.8868, Val Acc: 0.6976 Epoch [10/10], Train Loss: 0.7023, Train Acc: 0.7521, Val Loss: 0.8567, Val Acc: 0.7095 可以看出,经过10个epoch的训练,CBAM-ResNet模型在CIFAR-10数据集上取得了较好的分类结果。用户可以根据实际需求,调整超参数和模型结构,获得更好的性能。
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值