Datawhale AI 夏令营——Deepfake_task1
Junk269 2024-07-25 15:01:13 阅读 62
本次挑战是为了应对各种类型的Deepfake攻击,而设计的一个检测模型,task1的任务就是跑通预设模型。我将学习预设模型的代码,并附上对应理解。
Step1:安装必要包、导入配件、处理数据
<code>pip install timm # timm是一个包含多种预训练模型的库,常用于计算机视觉任务,有了这个包,我们便可以继续接下来的任务。
from PIL import Image
Image.open('/kaggle/input/deepfake/phase1/trainset/63fee8a89581307c0b4fd05a48e0ff79.jpg')
# 通过此步选择一张照片,进行分析
import torch # 导入PyTorch库,它提供了大量的机器学习和深度学习功能。
torch.manual_seed(0) #设置了一个随机数生成的种子
torch.backends.cudnn.deterministic = False # 这个设置与CUDA卷积神经网络(cuDNN)有关,False意味着允许cuDNN使用可能不完全确定但通常更快的算法。
torch.backends.cudnn.benchmark = True # 这个设置与CUDA卷积神经网络(cuDNN)有关,当设为True时,cuDNN会搜索最适合当前硬件和特定输入大小的卷积算法,然后缓存该算法以供后续使用,提供更好的性能
# 导入了一些针对视觉模块的库
import torchvision.models as models
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data.dataset import Dataset
import timm
import time
import pandas as pd
import numpy as np
import cv2
from PIL import Image
from tqdm import tqdm_notebook
train_label = pd.read_csv('/kaggle/input/deepfake/phase1/trainset_label.txt')
val_label = pd.read_csv('/kaggle/input/deepfake/phase1/valset_label.txt')
# 读取训练集和测试集,被记为DataFrame格式,再在下一步给每个DataFrame增加了一个新的列path,用于存储每个图像的完整路径,方便后续的图像加载和处理。
train_label['path'] = '/kaggle/input/deepfake/phase1/trainset/' + train_label['img_name']
val_label['path'] = '/kaggle/input/deepfake/phase1/valset/' + val_label['img_name']
# 统计计数训练集和验证集的标签,会分别返回标签为1和0的个数。可以用来数据集中各类别样本的数量是否均衡,防止数据集不平衡导致模型在训练时偏向于多数类,从而影响模型的泛化能力
train_label['target'].value_counts()
val_label['target'].value_counts()
# 显示前十行数据,看看每一行包含哪些列以及这些列的数据类型和值:image_name target path
train_label.head(10)
Step2:模型训练和验证
首先先设置三个类,AverageMeter
、ProgressMeter和FFDIDataset,
前两个用于监控和报告训练过程中的指标的实用工具。后者用于处理和加载深度伪造检测(DeepFake Detection,简称DFD或FFDI)任务的数据集。
class AverageMeter(object):
"""Computes and stores the average and current value"""
# AverageMeter类用于计算和存储某个指标(如损失、准确率等)的当前值、平均值、总和以及计数值。可以实时监控模型的学习动态。
def __init__(self, name, fmt=':f'):code>
self.name = name
self.fmt = fmt
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
def __str__(self):
fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'
return fmtstr.format(**self.__dict__)
class ProgressMeter(object):
# ProgressMeter类用于管理多个AverageMeter实例,并以友好的格式打印出整个训练过程的进度和指标。
def __init__(self, num_batches, *meters):
self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
self.meters = meters
self.prefix = ""
def pr2int(self, batch):
entries = [self.prefix + self.batch_fmtstr.format(batch)]
entries += [str(meter) for meter in self.meters]
print('\t'.join(entries))
def _get_batch_fmtstr(self, num_batches):
num_digits = len(str(num_batches // 1))
fmt = '{:' + str(num_digits) + 'd}'
return '[' + fmt + '/' + fmt.format(num_batches) + ']'
class FFDIDataset(Dataset):
# 这个数据集类的用途是在深度学习模型训练或验证阶段,提供一个接口用于迭代地获取图像及其标签。torch.utils.data.DataLoader通常会与这个类一起使用,以高效地批量加载和预处理数据。这样可以将数据集分割成小批量,同时利用多线程或进程来并行加载数据,从而加速训练过程。
def __init__(self, img_path, img_label, transform=None):
self.img_path = img_path
self.img_label = img_label
if transform is not None:
self.transform = transform
else:
self.transform = None
def __getitem__(self, index):
img = Image.open(self.img_path[index]).convert('RGB')
if self.transform is not None:
img = self.transform(img)
return img, torch.from_numpy(np.array(self.img_label[index]))
def __len__(self):
return len(self.img_path)
随后再设置validate
、predict
和train
三个函数,用于模型的验证、预测和训练。
validate
函数
这个函数接收数据加载器、模型、损失函数作为参数。它使用AverageMeter
和ProgressMeter
来监控和报告验证过程中的时间、损失和精度。模型被设置为评估模式(model.eval()
),禁用梯度计算以节省内存和加速处理(with torch.no_grad():
)。遍历验证集的每一个批次,计算模型的输出和损失,更新精度和损失的统计数据。计算每个批次的耗时并更新时间计量器。最终,函数返回验证集上的平均精度。
predict
函数
这个函数用于模型预测,特别是在测试集上。它也接收数据加载器和模型作为参数,并可选地接受一个tta
参数,即测试时增强(Test Time Augmentation)的次数。模型同样被设置为评估模式。对于每一次增强,遍历测试集的所有批次,计算模型的输出,并应用softmax函数以获得概率分布。输出结果被累积,最终返回平均预测结果。
train
函数
这个函数用于模型的训练过程,接收训练数据加载器、模型、损失函数、优化器和当前训练轮次作为参数。它同样使用AverageMeter
和ProgressMeter
来监控训练过程中的时间、损失和精度。模型被设置为训练模式(model.train()
),允许模型内部的如dropout和BN层正常工作。遍历训练集的每一个批次,计算模型的输出和损失,更新精度和损失的统计数据。计算梯度,执行优化器的参数更新步骤(optimizer.step()
)。计算每个批次的耗时并更新时间计量器。每100个批次,使用ProgressMeter
打印当前的训练状态。
三个函数的对应代码:
def validate(val_loader, model, criterion):
batch_time = AverageMeter('Time', ':6.3f')
losses = AverageMeter('Loss', ':.4e')
top1 = AverageMeter('Acc@1', ':6.2f')
progress = ProgressMeter(len(val_loader), batch_time, losses, top1)
# switch to evaluate mode
model.eval()
with torch.no_grad():
end = time.time()
for i, (input, target) in tqdm_notebook(enumerate(val_loader), total=len(val_loader)):
input = input.cuda()
target = target.cuda()
# compute output
output = model(input)
loss = criterion(output, target)
# measure accuracy and record loss
acc = (output.argmax(1).view(-1) == target.float().view(-1)).float().mean() * 100
losses.update(loss.item(), input.size(0))
top1.update(acc, input.size(0))
# measure elapsed time
batch_time.update(time.time() - end)
end = time.time()
# TODO: this should also be done with the ProgressMeter
print(' * Acc@1 {top1.avg:.3f}'
.format(top1=top1))
return top1
def predict(test_loader, model, tta=10):
# switch to evaluate mode
model.eval()
test_pred_tta = None
for _ in range(tta):
test_pred = []
with torch.no_grad():
end = time.time()
for i, (input, target) in tqdm_notebook(enumerate(test_loader), total=len(test_loader)):
input = input.cuda()
target = target.cuda()
# compute output
output = model(input)
output = F.softmax(output, dim=1)
output = output.data.cpu().numpy()
test_pred.append(output)
test_pred = np.vstack(test_pred)
if test_pred_tta is None:
test_pred_tta = test_pred
else:
test_pred_tta += test_pred
return test_pred_tta
def train(train_loader, model, criterion, optimizer, epoch):
batch_time = AverageMeter('Time', ':6.3f')
losses = AverageMeter('Loss', ':.4e')
top1 = AverageMeter('Acc@1', ':6.2f')
progress = ProgressMeter(len(train_loader), batch_time, losses, top1)
# switch to train mode
model.train()
end = time.time()
for i, (input, target) in enumerate(train_loader):
input = input.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
# compute output
output = model(input)
loss = criterion(output, target)
# measure accuracy and record loss
losses.update(loss.item(), input.size(0))
acc = (output.argmax(1).view(-1) == target.float().view(-1)).float().mean() * 100
top1.update(acc, input.size(0))
# compute gradient and do SGD step
optimizer.zero_grad()
loss.backward()
optimizer.step()
# measure elapsed time
batch_time.update(time.time() - end)
end = time.time()
if i % 100 == 0:
progress.pr2int(i)
Step3:加载模型
import timm
model = timm.create_model('resnet18', pretrained=True, num_classes=2)
model = model.cuda()
#从timm库中创建一个预训练的resnet18模型,并将其分类头的输出单元数设置为2(真实和伪造两类),然后将模型转移到GPU上
train_loader = torch.utils.data.DataLoader(
FFDIDataset(train_label['path'].head(1000), train_label['target'].head(1000),
transforms.Compose([
transforms.Resize((256, 256)),
transforms.RandomHorizontalFlip(),
transforms.RandomVerticalFlip(),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
), batch_size=40, shuffle=True, num_workers=4, pin_memory=True
)
# 使用训练集的前一千条数据,使用随机水平翻转和垂直翻转等数据增强技术,并进行标准化和尺寸调整。
val_loader = torch.utils.data.DataLoader(
FFDIDataset(val_label['path'].head(1000), val_label['target'].head(1000),
transforms.Compose([
transforms.Resize((256, 256)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
), batch_size=40, shuffle=False, num_workers=4, pin_memory=True
)
# 使用测试集的前一千条数据,并进行标准化和尺寸调整
criterion = nn.CrossEntropyLoss().cuda() #移动到GPU上
optimizer = torch.optim.Adam(model.parameters(), 0.005) # 使用Adam优化器,学习率为0.005。
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=4, gamma=0.85)
best_acc = 0.0
for epoch in range(2): #只训练了2个epoch,这可能不足以让模型收敛
scheduler.step() # 学习率在每个epoch开始时通过scheduler.step()进行更新。
print('Epoch: ', epoch)
train(train_loader, model, criterion, optimizer, epoch)
val_acc = validate(val_loader, model, criterion)
if val_acc.avg.item() > best_acc:
best_acc = round(val_acc.avg.item(), 2)
torch.save(model.state_dict(), f'./model_{best_acc}.pt')
#在2个epoch内训练模型,每个epoch结束后在验证集上评估模型,并根据验证集上的准确率决定是否保存模型的权重
test_loader = torch.utils.data.DataLoader(
FFDIDataset(val_label['path'], val_label['target'],
transforms.Compose([
transforms.Resize((256, 256)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
), batch_size=40, shuffle=False, num_workers=4, pin_memory=True
)
val_label['y_pred'] = predict(test_loader, model, 1)[:, 1]
val_label[['img_name', 'y_pred']].to_csv('submit.csv', index=None)
在这部分主要的运行代码中,影响loss大小的主要就是训练集和测试集的训练数和epoch的学习轮数,以及学习率等,更多的数据和减小学习率的值会增大运算时间,增加训练轮数可能会导致过拟合,所以每一步修改都要谨而慎之,既要尽可能减小loss,提高分数,又要防止过拟合,导致模型不可用。
这这一次中,我将训练集和测试集的数量提升至2000,再增加epoch的轮数到10轮,最终在kaggle上获得的分数由初始的0.539提升到0.751,说明了增加样本数和训练次数对提升模型精度是由帮助的。
之后的日子里,我也会继续跟进学习进阶技术,尽快提升自己的能力。
声明
本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。