卷积神经网络——LeNet——FashionMNIST
学习BigData 2024-07-17 08:07:03 阅读 76
目录
一、文件结构二、model.py三、model_train.py四、model_test.py
一、文件结构
GitHub
二、model.py
<code>import torch
from torch import nn
from torchsummary import summary
class LeNet(nn.Module):
def __init__(self):
super(LeNet,self).__init__()
self.c1 = nn.Conv2d(in_channels=1,out_channels=6,kernel_size=5,padding=2)
self.sig = nn.Sigmoid()
self.s2 = nn.AvgPool2d(kernel_size=2,stride=2)
self.c3 = nn.Conv2d(in_channels=6,out_channels=16,kernel_size=5)
self.s4 = nn.AvgPool2d(kernel_size=2,stride=2)
self.flatten = nn.Flatten()
self.f5 = nn.Linear(in_features=5*5*16,out_features=120)
self.f6 = nn.Linear(in_features=120,out_features=84)
self.f7 = nn.Linear(in_features=84,out_features=10)
def forward(self,x):
x = self.sig(self.c1(x))
x = self.s2(x)
x = self.sig(self.c3(x))
x = self.s4(x)
x = self.flatten(x)
x = self.f5(x)
x = self.f6(x)
x = self.f7(x)
return x
# if __name__ =="__main__":
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#
# model = LeNet().to(device)
#
# print(summary(model,input_size=(1,28,28)))
三、model_train.py
# 导入所需的Python库
from torchvision.datasets import FashionMNIST
from torchvision import transforms
import torch.utils.data as Data
import torch
from torch import nn
import time
import copy
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from model import LeNet # model.py中定义了LeNet模型
from tqdm import tqdm # 导入tqdm库,用于显示进度条
# 定义数据加载和处理函数
def train_val_data_process():
# 加载FashionMNIST数据集,Resize到28x28尺寸,并转换为Tensor
train_data = FashionMNIST(root="./data",code>
train=True,
transform=transforms.Compose([transforms.Resize(size=28), transforms.ToTensor()]),
download=True)
# 将加载的数据集分为80%的训练数据和20%的验证数据
train_data, val_data = Data.random_split(train_data, lengths=[round(0.8 * len(train_data)), round(0.2 * len(train_data))])
# 为训练数据和验证数据创建DataLoader,设置批量大小为32,洗牌,2个进程加载数据
train_dataloader = Data.DataLoader(dataset=train_data,
batch_size=32,
shuffle=True,
num_workers=2)
val_dataloader = Data.DataLoader(dataset=val_data,
batch_size=32,
shuffle=True,
num_workers=2)
# 返回训练和验证的DataLoader
return train_dataloader, val_dataloader
# 定义模型训练和验证过程的函数
def train_model_process(model, train_dataloader, val_dataloader, num_epochs):
# 设置使用CUDA如果可用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 打印使用的设备
dev = "cuda" if torch.cuda.is_available() else "cpu"
print(f'当前模型训练设备为: {dev}')
# 初始化Adam优化器和交叉熵损失函数
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
# 将模型移动到选定的设备上
model = model.to(device)
# 复制模型权重用于后续更新最佳模型
best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0 # 初始化最佳准确度
# 初始化用于记录训练和验证过程中损失和准确度的列表
train_loss_all = []
val_loss_all = []
train_acc_all = []
val_acc_all = []
# 记录训练开始时间
start_time = time.time()
# 迭代指定的训练轮数
for epoch in range(1, num_epochs + 1):
# 记录每个epoch开始的时间
since = time.time()
# 打印分隔符和当前epoch信息
print("-" * 10)
print(f"Epoch: {epoch}/{num_epochs}")
# 初始化训练和验证过程中的损失和正确预测数量
train_loss = 0.0
train_corrects = 0
val_loss = 0.0
val_corrects = 0
# 初始化批次计数器
train_num = 0
val_num = 0
# 创建训练进度条
progress_train_bar = tqdm(total=len(train_dataloader), desc=f'Training {epoch}', unit='batch')code>
# 训练数据集的遍历
for step, (b_x, b_y) in enumerate(train_dataloader):
# 将数据移动到相应的设备上
b_x = b_x.to(device)
b_y = b_y.to(device)
# 训练模型
model.train()
# 前向传播
output = model(b_x)
# 计算预测标签
pre_label = torch.argmax(output, dim=1)
# 计算损失
loss = criterion(output, b_y)
# 清空梯度
optimizer.zero_grad()
# 反向传播
loss.backward()
# 更新权重
optimizer.step()
# 累加损失和正确预测数量
train_loss += loss.item() * b_x.size(0)
train_corrects += torch.sum(pre_label == b_y.data)
# 更新批次计数器
train_num += b_x.size(0)
# 更新训练进度条
progress_train_bar.update(1)
# 关闭训练进度条
progress_train_bar.close()
# 创建验证进度条
progress_val_bar = tqdm(total=len(val_dataloader), desc=f'Validation {epoch}', unit='batch')code>
# 验证数据集的遍历
for step, (b_x, b_y) in enumerate(val_dataloader):
# 将数据移动到相应的设备上
b_x = b_x.to(device)
b_y = b_y.to(device)
# 评估模型
model.eval()
# 前向传播
output = model(b_x)
# 计算预测标签
pre_label = torch.argmax(output, dim=1)
# 计算损失
loss = criterion(output, b_y)
# 累加损失和正确预测数量
val_loss += loss.item() * b_x.size(0)
val_corrects += torch.sum(pre_label == b_y.data)
# 更新批次计数器
val_num += b_x.size(0)
# 更新验证进度条
progress_val_bar.update(1)
# 关闭验证进度条
progress_val_bar.close()
# 计算并记录epoch的平均损失和准确度
train_loss_all.append(train_loss / train_num)
train_acc_all.append(train_corrects.double().item() / train_num)
val_loss_all.append(val_loss / val_num)
val_acc_all.append(val_corrects.double().item() / val_num)
# 打印训练和验证的损失与准确度
print(f'{epoch} Train Loss: {train_loss_all[-1]:.4f} Train Acc: {train_acc_all[-1]:.4f}')
print(f'{epoch} Val Loss: {val_loss_all[-1]:.4f} Val Acc: {val_acc_all[-1]:.4f}')
# 计算并打印epoch训练耗费的时间
time_use = time.time() - since
print(f'第 {epoch} 个 epoch 训练耗费时间: {time_use // 60:.0f}m {time_use % 60:.0f}s')
# 若当前epoch的验证准确度为最佳,则更新最佳模型权重
if val_acc_all[-1] > best_acc:
best_acc = val_acc_all[-1]
best_model_wts = copy.deepcopy(model.state_dict())
# 训练结束,保存最佳模型权重
torch.save(best_model_wts, 'D:/Pycharm/deepl/LeNet/weight/best_model.pth')
# 如果当前epoch为总epoch数,则保存最终模型权重
if epoch == num_epochs:
torch.save(model.state_dict(), f'D:/Pycharm/deepl/LeNet/weight/{num_epochs}_model.pth')
# 将训练过程中的统计数据整理成DataFrame
train_process = pd.DataFrame(data={
"epoch": range(1, num_epochs + 1),
"train_loss_all": train_loss_all,
"val_loss_all": val_loss_all,
"train_acc_all": train_acc_all,
"val_acc_all": val_acc_all
})
# 打印总训练时间
consume_time = time.time() - start_time
print(f'总耗时:{consume_time // 60:.0f}m {consume_time % 60:.0f}s')
# 返回包含训练过程统计数据的DataFrame
return train_process
# 定义绘制训练和验证过程中损失与准确度的函数
def matplot_acc_loss(train_process):
# 创建图形和子图
plt.figure(figsize=(12, 4))
# 绘制训练和验证损失
plt.subplot(1, 2, 1)
plt.plot(train_process["epoch"], train_process["train_loss_all"], 'ro-', label="train_loss")code>
plt.plot(train_process["epoch"], train_process["val_loss_all"], 'bs-', label="val_loss")code>
plt.legend()
plt.xlabel("epoch")
plt.ylabel("loss")
# 保存损失图像
plt.savefig('./result_picture/training_loss_accuracy.png', bbox_inches='tight')code>
# 绘制训练和验证准确度
plt.subplot(1, 2, 2)
plt.plot(train_process["epoch"], train_process["train_acc_all"], 'ro-', label="train_acc")code>
plt.plot(train_process["epoch"], train_process["val_acc_all"], 'bs-', label="val_acc")code>
plt.legend()
plt.xlabel("epoch")
plt.ylabel("accuracy")
# 保存准确率曲线图
plt.savefig('./result_picture/training_accuracy.png', bbox_inches='tight')code>
plt.show()
if __name__ == "__main__":
model = LeNet()
train_dataloader, val_dataloader = train_val_data_process()
train_process = train_model_process(model, train_dataloader, val_dataloader, num_epochs=20)
matplot_acc_loss(train_process)
四、model_test.py
import torch
import torch.utils.data as Data
from torchvision import transforms
from torchvision.datasets import FashionMNIST
from model import LeNet
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
# t代表test
def t_data_process():
test_data = FashionMNIST(root="./data",code>
train=False,
transform=transforms.Compose([transforms.Resize(size=28), transforms.ToTensor()]),
download=True)
test_dataloader = Data.DataLoader(dataset=test_data,
batch_size=1,
shuffle=True,
num_workers=0)
return test_dataloader
def t_model_process(model, test_dataloader):
if model is not None:
print('Successfully loaded the model.')
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)
# 初始化参数
test_corrects = 0.0
test_num = 0
all_preds = [] # 存储所有预测标签
all_labels = [] # 存储所有实际标签
# 只进行前向传播,不计算梯度
with torch.no_grad():
for test_x, test_y in test_dataloader:
test_x = test_x.to(device)
test_y = test_y.to(device)
# 设置模型为验证模式
model.eval()
# 前向传播得到一个batch的结果
output = model(test_x)
# 查找最大值对应的行标
pre_lab = torch.argmax(output, dim=1)
# 收集预测和实际标签
all_preds.extend(pre_lab.tolist())
all_labels.extend(test_y.tolist())
# 计算准确率
test_corrects += torch.sum(pre_lab == test_y.data)
# 将所有的测试样本进行累加
test_num += test_x.size(0)
# 计算准确率
test_acc = test_corrects.double().item() / test_num
print(f'测试的准确率:{test_acc}')
# 绘制混淆矩阵
conf_matrix = confusion_matrix(all_labels, all_preds)
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')code>
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
plt.show()
plt.savefig('./result_picture/Confusion_Matrix.png', bbox_inches='tight')code>
if __name__=="__main__":
# 加载模型
model = LeNet()
print('loading model')
# 加载权重
model.load_state_dict(torch.load('D:/Pycharm/deepl/LeNet/weight/best_model.pth'))
# 加载测试数据
test_dataloader = t_data_process()
# 加载模型测试的函数
t_model_process(model,test_dataloader)
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)
classes = ['T-shirt/top','Trouser','Pullover','Dress','coat','Sandal','Shirt','Sneaker','Bag','Ankle boot']
with torch.no_grad():
for b_x,b_y in test_dataloader:
b_x = b_x.to(device)
b_y = b_y.to(device)
model.eval()
output = model(b_x)
pre_lab = torch.argmax(output,dim=1)
result = pre_lab.item()
label = b_y.item()
print(f'预测值:{classes[result]}',"-----------",f'真实值:{classes[label]}')
声明
本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。