最新大模型架构TTT模型代码解析(一)
蓝砖教育 2024-10-15 09:31:06 阅读 57
这项来自斯坦福大学、加州大学伯克利分校、加州大学圣迭戈分校和 Meta 的研究提出了一个新颖的序列建模方法,称为测试时训练(Test-Time Training, TTT)层。TTT 层通过用机器学习模型取代 RNN 的隐藏状态,并使用输入 token 的实际梯度下降来压缩上下文。研究表明,这种方法在性能上可以优于现有的 Transformer 和 Mamba 架构。
TTT 层的设计亮点包括:
1. 替代自注意力机制:TTT 层直接取代了 Transformer 中的自注意力层,采用了线性模型(TTT-Linear)和两层 MLP(TTT-MLP)作为隐藏状态。
2. 线性复杂度架构:通过表达性记忆解锁线性复杂性架构,能够在上下文中训练具有数百万甚至数十亿个 token 的大语言模型(LLM)。
3. 更高效的上下文利用:TTT 层通过更高效的上下文压缩和模型记忆机制,相较于 RNN 层能更好地处理长上下文。
4. 更快的实际运行时间:尽管自注意力机制的复杂度是线性的,研究表明 TTT 层在实际运行时间上更快。
实验结果表明,TTT-Linear 和 TTT-MLP 在较大的上下文长度(如 8k token)下明显优于 Mamba 和 Transformer。具体观察到的优点包括:
- TTT 层在长上下文中的表现优于 Mamba,更具表现力并且更高效。
- 在相同 FLOPs 成本下,TTT-MLP 的困惑度(perplexity)比 TTT-Linear 更低,但由于 FLOPs 的额外成本,TTT-Linear 在效率方面有优势。
研究团队还在 JAX 和 PyTorch 上提供了代码,用于模型训练和测试,可以在以下链接中获取:
- JAX 代码:[https://github.com/test-time-training/ttt-lm-jax](https://github.com/test-time-training/ttt-lm-jax)
- PyTorch 推理代码:[https://github.com/test-time-training/ttt-lm-pytorch](https://github.com/test-time-training/ttt-lm-pytorch)
接下来我将对模型前半部分代码进行解析
定义配置类 <code>TTTConfig
定义基础模块如 rotate_half
, permute_qk
等
定义 RMSNorm
, SwiGluMLP
, RotaryEmbedding
等组件类
定义 Conv
卷积层
定义 TTTCache
缓存机制
TTTConfig
类
定义了一个用于配置 TTT 模型的类 `TTTConfig`,还包含了一些辅助函数,用于处理查询和键张量的旋转位置嵌入。这些函数和类主要用于模型的配置和预处理步骤,就不详细给出实现了
辅助函数
rotate_half
函数
这个函数将输入张量 x
一分为二,然后交换这两部分的位置,并返回结果。
permute_qk
函数
这个函数对查询(q
)和键(k
)张量进行重新排序,以匹配 JAX 实现中的维度顺序。
undo_permute_qk
函数
def undo_permute_qk(q, k):
bsz, num_head, seq_len, head_dim = q.shape
q = q.reshape(bsz, num_head, seq_len, 2, head_dim // 2).transpose(3, 4).reshape(bsz, num_head, seq_len, head_dim)
k = k.reshape(bsz, num_head, seq_len, 2, head_dim // 2).transpose(3, 4).reshape(bsz, num_head, seq_len, head_dim)
return q, k
这个函数将 permute_qk
函数的操作逆转,恢复原始维度顺序。
apply_rotary_pos_emb
函数
def apply_rotary_pos_emb(q, k, cos, sin, position_ids=None, unsqueeze_dim=1):
cos = cos.unsqueeze(unsqueeze_dim)
sin = sin.unsqueeze(unsqueeze_dim)
q_embed = (q * cos) + (rotate_half(q) * sin)
k_embed = (k * cos) + (rotate_half(k) * sin)
return q_embed, k_embed
这个函数将旋转位置嵌入应用于查询(q
)和键(k
)张量。通过对 cos
和 sin
张量进行适当的扩展,以便它们可以正确地广播到 q
和 k
的维度上。
RMSNorm类:
主要用于对输入张量进行Root Mean Square Layer Normalization。初始化方法定义了权重和防止除零的小常数。前向传播方法对输入张量进行归一化处理,并乘以权重
SwiGluMLP类:
这是一个使用SiLU激活函数和Gated Linear Units (GLU)的多层感知机。初始化方法定义了相关的线性层和激活函数。前向传播方法根据配置中的pretraining_tp
值,对输入张量进行切片和线性变换操作。
import torch
from torch import nn
import torch.nn.functional as F
# 激活函数字典,假设在其他地方定义
ACT2FN = {
"silu": torch.nn.SiLU(),
# 其他激活函数可以在这里添加
}
class RMSNorm(nn.Module):
def __init__(self, hidden_size, eps=1e-6):
"""初始化RMSNorm层。
Args:
hidden_size (int): 隐藏层维数。
eps (float): 防止除零的小常数。
"""
super().__init__()
self.weight = nn.Parameter(torch.ones(hidden_size)) # 初始化权重参数为全1
self.variance_epsilon = eps # 设置epsilon值
def forward(self, hidden_states):
"""前向传播函数。
Args:
hidden_states (torch.Tensor): 输入隐藏状态张量。
Returns:
torch.Tensor: 正则化后的隐藏状态张量。
"""
input_dtype = hidden_states.dtype # 保存输入张量的dtype
hidden_states = hidden_states.to(torch.float32) # 将输入转换为float32
variance = hidden_states.pow(2).mean(-1, keepdim=True) # 计算方差
hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon) # 进行方差归一化
return self.weight * hidden_states.to(input_dtype) # 恢复原始dtype并乘以权重
class SwiGluMLP(nn.Module):
def __init__(self, config):
"""初始化SwiGluMLP层。
Args:
config (TTTConfig): 模型配置对象。
"""
super().__init__()
self.config = config
self.hidden_size = config.hidden_size # 从配置中获取隐藏层大小
self.intermediate_size = config.intermediate_size # 从配置中获取中间层大小
self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) # 定义线性层
self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) # 定义线性层
self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False) # 定义线性层
self.act_fn = ACT2FN[config.hidden_act] # 从激活函数字典中获取激活函数
def forward(self, x):
"""前向传播函数。
Args:
x (torch.Tensor): 输入张量。
Returns:
torch.Tensor: 输出张量。
"""
if self.config.pretraining_tp > 1:
# 如果预训练tp值大于1,进行切分操作
slice = self.intermediate_size // self.config.pretraining_tp # 计算切分大小
gate_proj_slices = self.gate_proj.weight.split(slice, dim=0) # 切分gate_proj权重
up_proj_slices = self.up_proj.weight.split(slice, dim=0) # 切分up_proj权重
down_proj_slices = self.down_proj.weight.split(slice, dim=1) # 切分down_proj权重
# 分别对各个切片进行线性变换,然后拼接
gate_proj = torch.cat(
[F.linear(x, gate_proj_slices[i]) for i in range(self.config.pretraining_tp)],
dim=-1,
)
up_proj = torch.cat(
[F.linear(x, up_proj_slices[i]) for i in range(self.config.pretraining_tp)],
dim=-1,
)
# 计算中间状态,并将其切分
intermediate_states = (self.act_fn(gate_proj) * up_proj).split(slice, dim=2)
# 对每个切片进行线性变换,然后相加
down_proj = [
F.linear(intermediate_states[i], down_proj_slices[i]) for i in range(self.config.pretraining_tp)
]
down_proj = sum(down_proj) # 将所有切片相加
else:
# 如果预训练tp值不大于1,直接进行线性变换和激活函数计算
down_proj = self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x))
return down_proj # 返回输出张量
class RotaryEmbedding
import torch
from torch import nn
class RotaryEmbedding(nn.Module):
def __init__(
self,
dim,
max_position_embeddings=16,
base=10000,
device=None,
scaling_factor=1.0,
):
"""初始化RotaryEmbedding层。
Args:
dim (int): 位置嵌入的维度。
max_position_embeddings (int, optional): 最大位置嵌入数。默认值为16。
base (int, optional): 基数。默认值为10000。
device (torch.device, optional): 设备。默认值为None。
scaling_factor (float, optional): 缩放因子。默认值为1.0。
"""
super().__init__()
self.scaling_factor = scaling_factor # 设置缩放因子
self.dim = dim # 位置嵌入的维度
self.max_position_embeddings = max_position_embeddings # 最大位置嵌入数
self.base = base # 基数
# 计算逆频率
inv_freq = 1.0 / (self.base ** (torch.arange(0, self.dim, 2, dtype=torch.int64).float().to(device) / self.dim))
# 注册为buffer,以便在模型保存和加载时保持不变
self.register_buffer("inv_freq", inv_freq, persistent=False)
@torch.no_grad()
def forward(self, x, position_ids):
"""前向传播函数。
Args:
x (torch.Tensor): 输入张量,形状为[bs, num_attention_heads, seq_len, head_size]。
position_ids (torch.Tensor): 位置ID张量。
Returns:
tuple: 返回余弦和正弦位置嵌入张量。
"""
# 扩展逆频率张量的维度,以适应输入张量的形状
inv_freq_expanded = self.inv_freq[None, :, None].float().expand(position_ids.shape[0], -1, 1)
# 扩展位置ID张量的维度
position_ids_expanded = position_ids[:, None, :].float()
# 强制使用float32,以避免bfloat16在长上下文中失去精度
# 参见https://github.com/huggingface/transformers/pull/29285
device_type = x.device.type
device_type = device_type if isinstance(device_type, str) and device_type != "mps" else "cpu"
# 使用不自动混合精度的上下文进行计算
with torch.autocast(device_type=device_type, enabled=False):
# 计算频率嵌入
freqs = (inv_freq_expanded.float() @ position_ids_expanded.float()).transpose(1, 2)
# 将频率嵌入沿最后一个维度拼接
emb = torch.cat((freqs, freqs), dim=-1)
# 计算余弦和正弦位置嵌入
cos = emb.cos()
sin = emb.sin()
# 返回余弦和正弦位置嵌入张量,并将它们的dtype设置为输入张量的dtype
return cos.to(dtype=x.dtype), sin.to(dtype=x.dtype)
class Conv
import torch
from torch import nn
import torch.nn.functional as F
# 假设我们已经定义了RMSNorm类
class Conv(nn.Module):
def __init__(self, config, layer_idx):
"""初始化Conv层。
Args:
config (TTTConfig): 模型配置对象。
layer_idx (int): 当前层的索引。
"""
super().__init__()
self.config = config
self.layer_idx = layer_idx
# 定义RMSNorm层
self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
# 定义一维卷积层
self.conv = nn.Conv1d(
config.hidden_size, # 输入通道数
config.hidden_size, # 输出通道数
bias=True, # 是否使用偏置
kernel_size=config.conv_kernel, # 卷积核大小
groups=config.hidden_size, # 组数(组卷积)
padding=config.conv_kernel - 1, # 填充大小
)
def __call__(self, hidden_states, cache_params=None):
"""前向传播函数。
Args:
hidden_states (torch.Tensor): 输入隐藏状态张量。
cache_params (dict, optional): 缓存参数。默认值为None。
Returns:
torch.Tensor: 输出隐藏状态张量。
"""
seq_len = hidden_states.shape[1]
hidden_states = self.norm(hidden_states) # 归一化
hidden_states = hidden_states.transpose(1, 2) # 转换形状以适应Conv1d
# 如果没有定义causal_conv1d_fn
if causal_conv1d_fn is None:
if cache_params is not None:
if cache_params.seqlen_offset > 0:
# 处理缓存状态
conv_state = cache_params.conv_states_dic["pre_conv"][self.layer_idx]
conv_state = torch.roll(conv_state, shifts=-1, dims=-1)
conv_state[:, :, -1] = hidden_states[:, :, 0]
cache_params.conv_states_dic["pre_conv"][self.layer_idx].copy_(conv_state)
hidden_states = torch.sum(conv_state * self.conv.weight[:, 0, :], dim=-1)
hidden_states += self.conv.bias
hidden_states = hidden_states.unsqueeze(-1)
else:
conv_state = nn.functional.pad(
hidden_states,
(self.config.conv_kernel - hidden_states.shape[-1], 0),
)
cache_params.conv_states_dic["pre_conv"][self.layer_idx].copy_(conv_state)
hidden_states = self.conv(hidden_states)[..., :seq_len]
else:
hidden_states = self.conv(hidden_states)[..., :seq_len]
else:
conv_weights = self.conv.weight.view(self.conv.weight.size(0), self.conv.weight.size(2))
if cache_params is not None and cache_params.seqlen_offset > 0:
hidden_states = causal_conv1d_update(
hidden_states.squeeze(-1),
cache_params.conv_states_dic["pre_conv"][self.layer_idx],
conv_weights,
self.conv.bias,
None,
)
hidden_states = hidden_states.unsqueeze(-1)
else:
if cache_params is not None:
conv_states = nn.functional.pad(
hidden_states,
(self.config.conv_kernel - hidden_states.shape[-1], 0),
)
cache_params.conv_states_dic["pre_conv"][self.layer_idx].copy_(conv_states)
hidden_states = causal_conv1d_fn(hidden_states, conv_weights, self.conv.bias, activation=None)
hidden_states = hidden_states.transpose(1, 2) # 转换形状以适应后续层
return hidden_states
#########################
### TTT Layer Modules ###
#########################
def scan(f, init, xs, out, checkpoint_group=0):
"""模拟jax.lax.scan函数。
Args:
f (function): 扫描函数。
init (any): 初始化值。
xs (list or dict): 输入数据。
out (list): 输出数据。
checkpoint_group (int, optional): 检查点组的大小。默认值为0。
Returns:
any: 最终的carry值。
list: 输出数据。
"""
carry = init
if isinstance(xs, dict):
num_items = len(next(iter(xs.values())))
else:
num_items = len(xs[0])
def scan_fn(carry, i_start, i_end):
for i in range(i_start, i_end):
if isinstance(xs, dict):
x = {key: tensor[i] for key, tensor in xs.items()}
else:
x = [x[i] for x in xs]
carry, y = f(carry, x)
out[i] = y
return carry
if checkpoint_group > 0:
ckpt_every_n = num_items // checkpoint_group
for k in range(0, num_items, ckpt_every_n):
carry = torch.utils.checkpoint.checkpoint(
scan_fn, carry, k, min(k + ckpt_every_n, num_items), use_reentrant=False
)
else:
carry = scan_fn(carry, 0, num_items)
return carry, out
def ln_fwd(x, gamma, beta, eps=1e-6):
"""层归一化的前向传播。
Args:
x (torch.Tensor): 输入张量。
gamma (torch.Tensor): 缩放参数。
beta (torch.Tensor): 平移参数。
eps (float, optional): 防止除零的小常数。默认值为1e-6。
Returns:
torch.Tensor: 归一化后的张量。
"""
mu = x.mean(dim=-1, keepdim=True) # 计算均值
var = x.var(dim=-1, keepdim=True, unbiased=False) # 计算方差
std = torch.sqrt(var + eps) # 计算标准差
x_hat = (x - mu) / std # 标准化
y = gamma * x_hat + beta # 缩放和平移
return y
def ln_fused_l2_bwd(x, l2_target, gamma, beta, eps=1e-6):
"""融合L2损失的层归一化的反向传播。
Args:
x (torch.Tensor): 输入张量。
l2_target (torch.Tensor): L2目标张量。
gamma (torch.Tensor): 缩放参数。
beta (torch.Tensor): 平移参数。
eps (float, optional): 防止除零的小常数。默认值为1e-6。
Returns:
torch.Tensor: 梯度张量。
"""
D = x.shape[-1]
mu = x.mean(dim=-1, keepdim=True) # 计算均值
var = x.var(dim=-1, keepdim=True, unbiased=False) # 计算方差
std = torch.sqrt(var + eps) # 计算标准差
x_hat = (x - mu) / std # 标准化
y = gamma * x_hat + beta # 缩放和平移
grad_output = y - l2_target # 计算输出的梯度
grad_x_hat = grad_output * gamma # 计算标准化后的梯度
z = (
(1.0 / D)
* (
D * grad_x_hat
- grad_x_hat.sum(dim=-1, keepdim=True)
- x_hat * (grad_x_hat * x_hat).sum(dim=-1, keepdim=True)
)
/ std
) # 计算最终的梯度
return z
# 修改自 https://github.com/NVIDIA/Megatron-LM/blob/e33c8f78a35765d5aa37475a144da60e8a2349d1/megatron/core/fusions/fused_bias_gelu.py#L26
def gelu_bwd(x):
"""GELU激活函数的反向传播。
Args:
x (torch.Tensor): 输入张量。
Returns:
torch.Tensor: 梯度张量。
"""
tanh_out = torch.tanh(0.79788456 * x * (1 + 0.044715 * x * x)) # 计算tanh输出
ff = 0.5 * x * ((1 - tanh_out * tanh_out) * (0.79788456 + 0.1070322243 * x * x)) + 0.5 * (1 + tanh_out) # 计算梯度
return ff
Conv
类:
初始化方法定义了RMSNorm层和一维卷积层。前向传播方法根据是否存在缓存参数及其状态,对输入张量进行归一化、卷积操作。
scan
函数:
模拟了 jax.lax.scan
函数的行为,通过逐步应用函数 f
处理输入数据,并支持检查点机制以节省内存。
ln_fwd
函数:
实现了层归一化的前向传播,计算均值、方差,并进行标准好的
Class TTTCache
import torch
from torch import nn
from collections import defaultdict
import logging
logger = logging.getLogger(__name__)
class TTTCache:
"""
TTTCache 是一个用于存储 TTT 层的最后隐藏状态和梯度的数据结构。
Arguments:
model (TTTModel): 用于初始化缓存的模型。
batch_size (int): 批处理大小。
Attributes:
seqlen_offset (int): 序列长度偏移量。
mini_batch_size (int): 微批处理大小。
params_dict (Dict[str, Dict[int, torch.Tensor]]): 以字典形式保存的参数,*_states, *_grad -> 层索引 -> [批处理大小, ...]
conv_states_dic (Dict[str, Dict[int, torch.Tensor]]): 以字典形式保存的卷积状态,*_states -> 层索引 -> [批处理大小, ...]
"""
def __init__(self, model, batch_size: int):
config = model.config
self.seqlen_offset = 0 # 初始化序列长度偏移量
self.mini_batch_size = config.mini_batch_size # 获取微批处理大小
# 初始化TTT参数字典
self.ttt_params_dict = defaultdict(dict)
if "linear" in config.ttt_layer_type:
# 如果TTT层类型是线性,则使用W1和b1参数
self.ttt_param_names = ["W1", "b1"]
elif "mlp" in config.ttt_layer_type:
# 如果TTT层类型是MLP,则使用W1, b1, W2, b2参数
self.ttt_param_names = ["W1", "b1", "W2", "b2"]
else:
raise ValueError(f"TTT层类型 {config.ttt_layer_type} 目前不支持")
# 初始化卷积状态字典
self.conv_states_dic = defaultdict(dict)
logger.info(f"Creating cache of size: {batch_size}")
for layer_idx in range(config.num_hidden_layers):
for name in self.ttt_param_names:
weight = getattr(model.layers[layer_idx].seq_modeling_block, name)
# 将权重扩展至批处理大小
tiled_weight = torch.tile(weight.unsqueeze(0), (batch_size,) + (1,) * weight.dim()).to(model.device)
self.ttt_params_dict[f"{name}_states"][layer_idx] = tiled_weight
# 对于解码,需要存储梯度
self.ttt_params_dict[f"{name}_grad"][layer_idx] = torch.zeros_like(tiled_weight)
if config.pre_conv:
self.conv_states_dic["pre_conv"][layer_idx] = torch.zeros(
batch_size,
config.hidden_size,
config.conv_kernel,
device=model.device,
)
if config.share_qk:
self.conv_states_dic["ttt_conv_q"][layer_idx] = torch.zeros(
batch_size,
config.hidden_size,
config.conv_kernel,
device=model.device,
)
self.conv_states_dic["ttt_conv_k"][layer_idx] = torch.zeros(
batch_size,
config.hidden_size,
config.conv_kernel,
device=model.device,
)
def update(self, py_tree, layer_idx, seq_len):
"""更新缓存中的参数。
Args:
py_tree (dict): 参数树。
layer_idx (int): 层索引。
seq_len (int): 序列长度。
"""
if seq_len % self.mini_batch_size == 0:
# 复制最后一个微批处理的状态,清空梯度
for name in self.ttt_param_names:
self.ttt_params_dict[f"{name}_states"][layer_idx].copy_(py_tree[f"{name}_states"])
self.ttt_params_dict[f"{name}_grad"][layer_idx].zero_()
elif seq_len < self.mini_batch_size:
if seq_len != 1 and self.seqlen_offset > 0 and self.seqlen_offset % self.mini_batch_size != 0:
raise ValueError("不支持分段更新")
if (seq_len + self.seqlen_offset) % self.mini_batch_size == 0:
# 复制最后一个微批处理的状态,清空梯度
for name in self.ttt_param_names:
self.ttt_params_dict[f"{name}_states"][layer_idx].copy_(py_tree[f"{name}_states"])
self.ttt_params_dict[f"{name}_grad"][layer_idx].zero_()
else:
# 复制梯度以供下次更新使用
for name in self.ttt_param_names:
self.ttt_params_dict[f"{name}_grad"][layer_idx].copy_(py_tree[f"{name}_grad"])
else:
raise ValueError(f"序列长度 {seq_len} 是不支持的部分更新")
def ttt_params_to_dict(self, layer_idx):
"""将特定层的TTT参数转化为字典格式。
Args:
layer_idx (int): 层索引。
Returns:
dict: 包含特定层TTT参数的字典。
"""
return {name: self.ttt_params_dict[name][layer_idx] for name in self.ttt_params_dict}
__init__
方法:
初始化了各种参数及其默认值,包括序列长度偏移量、微批处理大小等。根据配置中的TTT层类型(线性或MLP),初始化相应的参数字典。初始化卷积状态字典,并将权重扩展至批处理大小。
update
方法:
更新缓存中的参数,如果序列长度是微批处理大小的整数倍,则复制状态并清空梯度。如果序列长度小于微批处理大小,检查偏移量是否支持分段更新,并进行相应操作。如果序列长度不满足以上条件,抛出错误。
ttt_params_to_dict
方法:
将特定层的TTT参数转化为字典格式,便于后续处理。
声明
本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。