Datawhale X 李宏毅苹果书 AI夏令营(Task2)

梦落花叶萱 2024-09-04 10:31:08 阅读 79

 一、学前概览

        任务内容:critical point并不一定是训练神经网络遇到的最大的阻碍,还有一种叫Adaptive Learning Rate的技术。

        任务目的:了解掌握Learning Rate和分类损失的计算。

        本节出现术语:自适应学习率(root mean square、RMSProp、Adam策略)、学习率调度策略、分类、回归、softmax。

1.学中疑问

        A.训练时候很少卡到saddle point或者local minima,那么上一个task中的图6是怎么画出来的?

        B.lr如何自动根据gradient的大小做调整?

        C.parameter dependent的lr有什么常见的计算方式

        D.分类问题中,为什么cross entropy会比mse常用?

2.学后解答

        A.一般的gradient descent无法做到,多数的training在走到critical point之前其实就已经停止了。

        B.见知识点1

        C.见知识点2

        D.见图10

二、Task2.1 自适应学习率

        大原则:如果在某一方向上,gradient的值非常小,非常平坦,那么lr会调大一点;如果在某一个方向上坡度很大,lr就可以设的小一点。

1.知识点1:不同的参数需要不同的lr

        训练过程中很容易遇到loss无法下降的情况,这种情况从大量的已知实验来看,很少是因为critical point导致的,所以原先的梯度下降的formulation已经不满足目前的需求,需要随着参数进行克制化的learning rate加入。此处以一个参数为例子,如图1所示,然后依次类推出所有参数的表现形式。

图1:加入克制化的learning rate

        那么,parameter dependent的lr有什么常见的计算方式呢?

2.知识点2:parameter dependent的lr常见的计算方式

       方法1:root mean square

        有点基础的话,看图2其实就能看得懂了。

图2:root mean square方式

        使用这种方式的一个前提是同一个参数的gradient大小固定。但是实际情况中,很难做到,因此衍生出第二种方法。

        方法2:RMSProp

        这种方法并没有文献来源,第一步跟方法一是一样的,区别在第二步以后。见图3。

图3:使用RMSProp动态调整参数

         方法3:Adam策略

        Adam策略,实际上就是RMSProp+Momentum

        

3.知识点3:学习率调度

       出现学习率调度的原因:梯度爆炸,在书籍57页展示了原因,还是很容易理解的。这里放出原文,如图4所示。

图4:梯度爆炸的原因

        解决办法也很简单,让η与时间扯上关系,有两种方法,一种是learning rate decay,使得η随着时间的增加作衰减。;另一种是warm up(远古时代就有,bert使用了它),η先递增后衰减。两者的特征如图5所示,但是对于warm up,其可解释性比较差,目前仍在探索。

图5:learning rate decay和warm up的特征

小结

        见图6

图6:公式总结

三、Task2.2:分类

1.知识点4:分类?回归?

        我们已经知道,回归的本质是模型输出一个数值,这个数值与真实标签的数值差距越小越好,那么接下来的目标便是缩小预测值与真实值之间的差距。

        在多分类的情况下,如果每一分类的输出是一个标量的结果,那我们可以变相把它当成回归问题去看。但这种方式存在一个问题,就是标量与标量之间存在有某种关系,当作预测结果时,说服力会很低。因此,比较常用的做法是将class用独热编码去表示。

        从模型输出的角度,回归问题输出的是一个数值,分类问题可以看成回归问题的输出重复n次。但是在做分类问题的时候,往往会把输出通过softmax,将输出限制在0~1之间。如图7所示。

图7:回归和分类的区别

2.知识点5:softmax的运作过程(简单版)

       softmax除了让分类结果限制在0~1之外,还会让大小值之间的差距变大。如图8所示。公式在右上角。

图8:softmax的运作过程

       

3.知识点6:分类损失

        分类损失有两种计算方式,一种是均方根误差,另一种是交叉熵损失。如图9。

图9:损失函数的计算方式

        从优化的角度,交叉熵是被更常用在分类上。解释如下:

        如图10所示:假设有一个三类的分类,网络先输出y1、y2 和y3,在通过softmax以后,产生 y′1、y′2 和y′3。假设y1、y2的变化都是从-10到10,y3固定设成-1000。正确答案是[1, 0, 0]T,因为y3的值很小,通过softmax后,y′3非常趋近于0,与正确答案非常接近,所以此时我们只需要看y1和y2有变化时对损失的影响。

        图10中,左上角损失大,右下角损失小,所以希望最后在训练的时候,参数可以“走” 到右下角的地方。假设参数优化开始的时候,对应的损失都是左上角。

        如果选择交叉熵(图10右),左上角圆圈所在的点有斜率的,所以可以通过梯度,一路往右下的地方“走”。

        如果选均方误差(图10左),左上角圆圈就卡住了,均方误差在这种损失很大的地方, 是非常平坦的,其梯度是趋近于 0 的。如果初始时在圆圈的位置,离目标非常远,其梯度又很小,是无法用梯度下降顺利地“走”到右下角的。

图10:交叉熵和均方根误差的损失比较

三、Task2.3:CNN图像分类实践

       其实在任务教程里也有代码的解析,但是我觉得跟着走一遍把代码copy到这边印象会更深。(不会贴源码,只贴我不懂的代码)

       以下范式适用于广泛的深度学习任务:准备数据——>训练模型——>应用模型。具体如下:

        1.导入所需要的库/工具包

        2.数据准备与预处理

        3.定义模型

        4.定义损失函数和优化器等其他配置

        5.训练模型

        6.评估模型

        7.进行预测

1.导包

<code># 选择随机种子没有特定的规则,它可以是任何整数。有些种子值(如42)因为被知名书籍或文献提及而变得流行。

myseed = 6666

# 确保在使用CUDA时,卷积运算具有确定性,以增强实验结果的可重复性

# 保证了每次运行网络时都会得到相同的结果,代价是牺牲性能

torch.backends.cudnn.deterministic = True

# 设为False时,会告诉CuDNN不进行算法的搜索和选择,而是使用默认的卷积实现。这意味着不会根据当前的GPU和输入数据来选择最快的卷积算法。确保每次运行时都使用相同的算法

torch.backends.cudnn.benchmark = False

2.数据准备与预处理

        分三个部分(预设置、数据加载类和调用读取数据)。

# 图像预设置

# 在测试和验证阶段,通常不需要图像增强。

# 我们所需要的只是调整PIL图像的大小并将其转换为Tensor。

test_tfm = transforms.Compose([transforms.Resize((128, 128)),transforms.ToTensor(),])

# 不过,在测试阶段使用图像增强也是有可能的。

# 你可以使用train_tfm生成多种图像,然后使用集成方法进行测试。

train_tfm = transforms.Compose([

# 将图像调整为固定大小(高度和宽度均为128)

transforms.Resize((128, 128)),

# 可以在此处添加一些图像增强的操作。

# ToTensor()应该是所有变换中的最后一个。

transforms.ToTensor(),

])

# 数据加载类

class FoodDataset(Dataset):

"""

用于加载食品图像数据集的类。

该类继承自Dataset,提供了对食品图像数据集的加载和预处理功能。

它可以自动从指定路径加载所有的jpg图像,并对这些图像应用给定的变换。

"""

def __init__(self, path, tfm=test_tfm, files=None):

"""

参数:

- path: 图像数据所在的目录路径。

- tfm: 应用于图像的变换方法(默认为测试变换)。

- files: 可选参数,用于直接指定图像文件的路径列表(默认为None)。

"""

super(FoodDataset).__init__()

self.path = path

# 列出目录下所有jpg文件,并按顺序排序

self.files = sorted([os.path.join(path, x) for x in os.listdir(path) if x.endswith(".jpg")])

if files is not None:

self.files = files # 如果提供了文件列表,则使用该列表

self.transform = tfm # 图像变换方法

def __len__(self):

"""返回数据集中图像的数量。"""

return len(self.files)

def __getitem__(self, idx):

"""

获取给定索引的图像及其标签。

参数:

idx: 图像在数据集中的索引。

返回:

im: 应用了变换后的图像。

label: 图像对应的标签(如果可用)。

"""

fname = self.files[idx]

im = Image.open(fname)

im = self.transform(im) # 应用图像变换

# 尝试从文件名中提取标签

try:

label = int(fname.split("/")[-1].split("_")[0])

except:

label = -1 # 如果无法提取标签,则设置为-1(测试数据无标签)

return im, label

# 加载数据

# 构建训练和验证数据集

# "loader" 参数定义了torchvision如何读取数据

train_set = FoodDataset("./hw3_data/train", tfm=train_tfm)

# 创建训练数据加载器,设置批量大小、是否打乱数据顺序、是否使用多线程加载以及是否固定内存地址

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True)

# 构建验证数据集

# "loader" 参数定义了torchvision如何读取数据

valid_set = FoodDataset("./hw3_data/valid", tfm=test_tfm)

# 创建验证数据加载器,设置批量大小、是否打乱数据顺序、是否使用多线程加载以及是否固定内存地址

valid_loader = DataLoader(valid_set, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True)

3.定义模型

class Classifier(nn.Module):

"""

定义一个图像分类器类,继承自PyTorch的nn.Module。

该分类器包含卷积层和全连接层,用于对图像进行分类。

"""

def __init__(self):

"""

初始化函数,构建卷积神经网络的结构。

包含一系列的卷积层、批归一化层、激活函数和池化层。

"""

super(Classifier, self).__init__()

# 定义卷积神经网络的序列结构

self.cnn = nn.Sequential(

nn.Conv2d(3, 64, 3, 1, 1), # 输入通道3,输出通道64,卷积核大小3,步长1,填充1

nn.BatchNorm2d(64), # 批归一化,作用于64个通道

nn.ReLU(), # ReLU激活函数

nn.MaxPool2d(2, 2, 0), # 最大池化,池化窗口大小2,步长2,填充0

nn.Conv2d(64, 128, 3, 1, 1), # 输入通道64,输出通道128,卷积核大小3,步长1,填充1

nn.BatchNorm2d(128), # 批归一化,作用于128个通道

nn.ReLU(),

nn.MaxPool2d(2, 2, 0), # 最大池化,池化窗口大小2,步长2,填充0

nn.Conv2d(128, 256, 3, 1, 1), # 输入通道128,输出通道256,卷积核大小3,步长1,填充1

nn.BatchNorm2d(256), # 批归一化,作用于256个通道

nn.ReLU(),

nn.MaxPool2d(2, 2, 0), # 最大池化,池化窗口大小2,步长2,填充0

nn.Conv2d(256, 512, 3, 1, 1), # 输入通道256,输出通道512,卷积核大小3,步长1,填充1

nn.BatchNorm2d(512), # 批归一化,作用于512个通道

nn.ReLU(),

nn.MaxPool2d(2, 2, 0), # 最大池化,池化窗口大小2,步长2,填充0

nn.Conv2d(512, 512, 3, 1, 1), # 输入通道512,输出通道512,卷积核大小3,步长1,填充1

nn.BatchNorm2d(512), # 批归一化,作用于512个通道

nn.ReLU(),

nn.MaxPool2d(2, 2, 0), # 最大池化,池化窗口大小2,步长2,填充0

)

# 定义全连接神经网络的序列结构

self.fc = nn.Sequential(

nn.Linear(512*4*4, 1024), # 输入大小512*4*4,输出大小1024

nn.ReLU(),

nn.Linear(1024, 512), # 输入大小1024,输出大小512

nn.ReLU(),

nn.Linear(512, 11) # 输入大小512,输出大小11,最终输出11个类别的概率

)

def forward(self, x):

"""

前向传播函数,对输入进行处理。

参数:

x -- 输入的图像数据,形状为(batch_size, 3, 128, 128)

返回:

输出的分类结果,形状为(batch_size, 11)

"""

out = self.cnn(x) # 通过卷积神经网络处理输入

out = out.view(out.size()[0], -1) # 展平输出,以适配全连接层的输入要求

return self.fc(out) # 通过全连接神经网络得到最终输出

4.定义损失函数和优化器等其他配置

# 根据GPU是否可用选择设备类型

device = "cuda" if torch.cuda.is_available() else "cpu"

# 初始化模型,并将其放置在指定的设备上

model = Classifier().to(device)

# 定义批量大小

batch_size = 64

# 定义训练轮数

n_epochs = 8

# 如果在'patience'轮中没有改进,则提前停止

patience = 5

# 对于分类任务,我们使用交叉熵作为性能衡量标准

criterion = nn.CrossEntropyLoss()

# 初始化优化器,您可以自行调整一些超参数,如学习率

optimizer = torch.optim.Adam(model.parameters(), lr=0.0003, weight_decay=1e-5)

5.训练模型

# 初始化追踪器,这些不是参数,不应该被更改

stale = 0

best_acc = 0

for epoch in range(n_epochs):

# ---------- 训练阶段 ----------

# 确保模型处于训练模式

model.train()

# 这些用于记录训练过程中的信息

train_loss = []

train_accs = []

for batch in tqdm(train_loader):

# 每个批次包含图像数据及其对应的标签

imgs, labels = batch

# imgs = imgs.half()

# print(imgs.shape,labels.shape)

# 前向传播数据。(确保数据和模型位于同一设备上)

logits = model(imgs.to(device))

# 计算交叉熵损失。

# 在计算交叉熵之前不需要应用softmax,因为它会自动完成。

loss = criterion(logits, labels.to(device))

# 清除上一步中参数中存储的梯度

optimizer.zero_grad()

# 计算参数的梯度

loss.backward()

# 为了稳定训练,限制梯度范数

grad_norm = nn.utils.clip_grad_norm_(model.parameters(), max_norm=10)

# 使用计算出的梯度更新参数

optimizer.step()

# 计算当前批次的准确率

acc = (logits.argmax(dim=-1) == labels.to(device)).float().mean()

# 记录损失和准确率

train_loss.append(loss.item())

train_accs.append(acc)

train_loss = sum(train_loss) / len(train_loss)

train_acc = sum(train_accs) / len(train_accs)

# 打印信息

print(f"[ 训练 | {epoch + 1:03d}/{n_epochs:03d} ] loss = {train_loss:.5f}, acc = {train_acc:.5f}")

# ---------- 验证阶段 ----------

# 确保模型处于评估模式,以便某些模块如dropout能够正常工作

model.eval()

# 这些用于记录验证过程中的信息

valid_loss = []

valid_accs = []

# 按批次迭代验证集

for batch in tqdm(valid_loader):

# 每个批次包含图像数据及其对应的标签

imgs, labels = batch

# imgs = imgs.half()

# 我们在验证阶段不需要梯度。

# 使用 torch.no_grad() 加速前向传播过程。

with torch.no_grad():

logits = model(imgs.to(device))

# 我们仍然可以计算损失(但不计算梯度)。

loss = criterion(logits, labels.to(device))

# 计算当前批次的准确率

acc = (logits.argmax(dim=-1) == labels.to(device)).float().mean()

# 记录损失和准确率

valid_loss.append(loss.item())

valid_accs.append(acc)

# break

# 整个验证集的平均损失和准确率是所记录值的平均

valid_loss = sum(valid_loss) / len(valid_loss)

valid_acc = sum(valid_accs) / len(valid_accs)

# 打印信息

print(f"[ 验证 | {epoch + 1:03d}/{n_epochs:03d} ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f}")

# 更新日志

if valid_acc > best_acc:

with open(f"./{_exp_name}_log.txt", "a"):

print(f"[ 验证 | {epoch + 1:03d}/{n_epochs:03d} ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f} -> 最佳")

else:

with open(f"./{_exp_name}_log.txt", "a"):

print(f"[ 验证 | {epoch + 1:03d}/{n_epochs:03d} ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f}")

# 保存模型

if valid_acc > best_acc:

print(f"在第 {epoch} 轮找到最佳模型,正在保存模型")

torch.save(model.state_dict(), f"{_exp_name}_best.ckpt") # 只保存最佳模型以防止输出内存超出错误

best_acc = valid_acc

stale = 0

else:

stale += 1

if stale > patience:

print(f"连续 {patience} 轮没有改进,提前停止")

break

6.评估模型

for epoch in range(n_epochs):

# ---------- 训练阶段 ----------

···

# ---------- 验证阶段 ----------

# 确保模型处于评估模式,以便某些模块如dropout能够正常工作

model.eval()

# 这些用于记录验证过程中的信息

valid_loss = []

valid_accs = []

# 按批次迭代验证集

for batch in tqdm(valid_loader):

# 每个批次包含图像数据及其对应的标签

imgs, labels = batch

# imgs = imgs.half()

# 我们在验证阶段不需要梯度。

# 使用 torch.no_grad() 加速前向传播过程。

with torch.no_grad():

logits = model(imgs.to(device))

# 我们仍然可以计算损失(但不计算梯度)。

loss = criterion(logits, labels.to(device))

# 计算当前批次的准确率

acc = (logits.argmax(dim=-1) == labels.to(device)).float().mean()

# 记录损失和准确率

valid_loss.append(loss.item())

valid_accs.append(acc)

# break

# 整个验证集的平均损失和准确率是所记录值的平均

valid_loss = sum(valid_loss) / len(valid_loss)

valid_acc = sum(valid_accs) / len(valid_accs)

# 打印信息

print(f"[ 验证 | {epoch + 1:03d}/{n_epochs:03d} ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f}")

# 更新日志

if valid_acc > best_acc:

with open(f"./{_exp_name}_log.txt", "a"):

print(f"[ 验证 | {epoch + 1:03d}/{n_epochs:03d} ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f} -> 最佳")

else:

with open(f"./{_exp_name}_log.txt", "a"):

print(f"[ 验证 | {epoch + 1:03d}/{n_epochs:03d} ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f}")

# 保存模型

if valid_acc > best_acc:

print(f"在第 {epoch} 轮找到最佳模型,正在保存模型")

torch.save(model.state_dict(), f"{_exp_name}_best.ckpt") # 只保存最佳模型以防止输出内存超出错误

best_acc = valid_acc

stale = 0

else:

stale += 1

if stale > patience:

print(f"连续 {patience} 轮没有改进,提前停止")

break

7.预测模型

        分两步(加载测试数据、测试并生成预测)

# 构建测试数据集

# "loader"参数指定了torchvision如何读取数据

test_set = FoodDataset("./hw3_data/test", tfm=test_tfm)

# 创建测试数据加载器,批量大小为batch_size,不打乱数据顺序,不使用多线程,启用pin_memory以提高数据加载效率

test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=True)

# 实例化分类器模型,并将其转移到指定的设备上

model_best = Classifier().to(device)

# 加载模型的最优状态字典

model_best.load_state_dict(torch.load(f"{_exp_name}_best.ckpt"))

# 将模型设置为评估模式

model_best.eval()

# 初始化一个空列表,用于存储所有预测标签

prediction = []

# 使用torch.no_grad()上下文管理器,禁用梯度计算

with torch.no_grad():

# 遍历测试数据加载器

for data, _ in tqdm(test_loader):

# 将数据转移到指定设备上,并获得模型的预测结果

test_pred = model_best(data.to(device))

# 选择具有最高分数的类别作为预测标签

test_label = np.argmax(test_pred.cpu().data.numpy(), axis=1)

# 将预测标签添加到结果列表中

prediction += test_label.squeeze().tolist()

# 创建测试csv文件

def pad4(i):

"""

将输入数字i转换为长度为4的字符串,如果长度不足4,则在前面补0。

:param i: 需要转换的数字

:return: 补0后的字符串

"""

return "0" * (4 - len(str(i))) + str(i)

# 创建一个空的DataFrame对象

df = pd.DataFrame()

# 使用列表推导式生成Id列,列表长度等于测试集的长度

df["Id"] = [pad4(i) for i in range(len(test_set))]

# 将预测结果赋值给Category列

df["Category"] = prediction

# 将DataFrame对象保存为submission.csv文件,不保存索引

df.to_csv("submission.csv", index=False)

8.可视化

# 导入必要的库和模块

import torch

import numpy as np

from sklearn.manifold import TSNE

import matplotlib.pyplot as plt

from tqdm import tqdm

import matplotlib.cm as cm

import torch.nn as nn

# 根据CUDA是否可用选择执行设备

device = 'cuda' if torch.cuda.is_available() else 'cpu'

# 加载训练好的模型

model = Classifier().to(device)

# 加载模型保存的参数

state_dict = torch.load(f"{_exp_name}_best.ckpt")

# 将参数加载到模型中

model.load_state_dict(state_dict)

# 设置模型为评估模式

model.eval()

# 打印模型结构

print(model)

from tqdm import tqdm

import numpy as np

import matplotlib.pyplot as plt

from sklearn.manifold import TSNE

import matplotlib.cm as cm

import torch

def forward_to_layer(model, input_tensor, layer_index):

outputs = []

for i, layer in enumerate(model.children()):

input_tensor = layer(input_tensor)

if i == layer_index:

break

outputs.append(input_tensor)

return outputs[-1] # 返回所选层的输出

# 假设model, test_tfm, FoodDataset, DataLoader已经被定义且正确初始化

# 加载由TA定义的验证集

valid_set = FoodDataset("./hw3_data/valid", tfm=test_tfm)

valid_loader = DataLoader(valid_set, batch_size=64, shuffle=False, num_workers=0, pin_memory=True)

# 提取模型特定层的表示

index = 19 # 假设你想提取第19层的特征

features = []

labels = []

# 定义设备

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

for batch in tqdm(valid_loader):

imgs, lbls = batch

imgs, lbls = imgs.to(device), lbls.to(device) # 确保数据在正确的设备上

with torch.no_grad():

# 获取特定层的特征

logits = forward_to_layer(model.cnn, imgs, index)

logits = logits.view(logits.size(0), -1)

labels.extend(lbls.cpu().numpy())

features.extend(logits.cpu().numpy())

# 将features和labels列表转换为numpy数组

features = np.array(features)

labels = np.array(labels)

# 应用t-SNE到特征上

features_tsne = TSNE(n_components=2, init='pca', random_state=42).fit_transform(features)code>

# 绘制t-SNE可视化图

plt.figure(figsize=(10, 8))

for label in np.unique(labels):

# 使用布尔索引选择特定标签的数据点

mask = (labels == label)

plt.scatter(features_tsne[mask, 0], features_tsne[mask, 1], label=f'Class {label}', s=5)

plt.legend()

plt.title('All Classes t-SNE Visualization')

plt.show()

# 绘制特定类别的t-SNE可视化图

plt.figure(figsize=(10, 8))

selected_label = 5

mask = (labels == selected_label)

if mask.any(): # 使用 .any() 替代 .sum() 来检查是否有True值

plt.scatter(features_tsne[mask, 0], features_tsne[mask, 1], label=f'Class {selected_label}', s=5)

plt.legend()

plt.title(f'Class {selected_label} t-SNE Visualization')

plt.show()



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。