logologo

循环神经网络 (RNN)

Jul 9

概述

循环神经网络(Recurrent Neural Networks, RNNs)是一类专门处理序列数据的神经网络。与传统的前馈神经网络不同,RNN 具有"记忆"功能,能够利用之前的信息来影响当前的输出。

RNN 的核心优势

  • 序列建模: 能够处理变长序列数据
  • 时间依赖: 捕获序列中的时间依赖关系
  • 参数共享: 所有时间步共享相同的参数
  • 记忆机制: 通过隐状态保存历史信息

基本原理

隐状态概念

RNN 的关键在于引入了隐状态(Hidden State)ht,它作为网络的"记忆":

P(xt|xt1,,x1)P(xt|ht1)ht=f(xt,ht1)

其中:

  • ht是时间步t的隐状态
  • xt是时间步t的输入
  • f是状态更新函数

RNN 架构对比

graph TD
    A[传统神经网络] --> B[输入层]
    B --> C[隐藏层]
    C --> D[输出层]

    E[循环神经网络] --> F[输入序列]
    F --> G[循环层]
    G --> H[输出序列]
    G --> G

    classDef traditional fill:#E91E63,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef recurrent fill:#00BCD4,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef feedforward fill:#FF5722,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef sequence fill:#4CAF50,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef loop fill:#FF9800,stroke:#ffffff,stroke-width:4px,color:#fff

    class A traditional
    class E recurrent
    class B,C,D feedforward
    class F,H sequence
    class G loop

数学定义

无隐状态的神经网络

对于传统的多层感知机:

隐藏层

H=ϕ(XWxh+bh)

输出层

O=HWhq+bq

有隐状态的循环神经网络

RNN 在每个时间步的计算:

隐状态更新

Ht=ϕ(XtWxh+Ht1Whh+bh)

输出计算

Ot=HtWhq+bq

其中:

  • WxhRd×h:输入到隐状态的权重
  • WhhRh×h:隐状态到隐状态的权重
  • WhqRh×q:隐状态到输出的权重
  • bh,bq:偏置项

RNN 时间展开

graph LR
    X1[X₁] --> H1[H₁]
    X2[X₂] --> H2[H₂]
    X3[X₃] --> H3[H₃]

    H1 --> H2
    H2 --> H3

    H1 --> O1[O₁]
    H2 --> O2[O₂]
    H3 --> O3[O₃]

    classDef input fill:#FF1744,stroke:#ffffff,stroke-width:4px,color:#fff
    classDef hidden fill:#00E676,stroke:#ffffff,stroke-width:4px,color:#fff
    classDef output fill:#FF9800,stroke:#ffffff,stroke-width:4px,color:#fff

    class X1,X2,X3 input
    class H1,H2,H3 hidden
    class O1,O2,O3 output

RNN结构图

字符级语言模型

模型结构

RNN 可以用于构建字符级语言模型,根据前面的字符预测下一个字符:

字符级语言模型

训练过程

graph TD
    A["输入序列: machin"] --> B[独热编码]
    B --> C[RNN前向传播]
    C --> D[输出概率分布]
    D --> E[计算损失]
    E --> F[反向传播]
    F --> G[参数更新]
    G --> H[下一批次]

    I["目标序列: achine"] --> E

    classDef input fill:#FF1744,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef process fill:#00E676,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef output fill:#FF9500,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef loss fill:#9C27B0,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef optimization fill:#2196F3,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef target fill:#FF5722,stroke:#ffffff,stroke-width:3px,color:#fff

    class A input
    class B,C process
    class D output
    class E loss
    class F,G,H optimization
    class I target

RNN 完整工作流程

端到端流程图

graph TB
    A[文本数据] --> B[数据预处理]
    B --> C[词汇表构建]
    C --> D[序列分割]
    D --> E[独热编码]

    E --> F[RNN模型]
    F --> G[隐状态传递]
    G --> H[输出预测]
    H --> I[损失计算]

    I --> J[反向传播]
    J --> K[梯度裁剪]
    K --> L[参数更新]
    L --> M{训练完成?}

    M -->|否| F
    M -->|是| N[模型评估]
    N --> O[文本生成]

    classDef data fill:#E91E63,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef preprocess fill:#9C27B0,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef model fill:#2196F3,stroke:#ffffff,stroke-width:4px,color:#fff
    classDef training fill:#FF9800,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef optimization fill:#4CAF50,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef evaluation fill:#FF5722,stroke:#ffffff,stroke-width:3px,color:#fff

    class A,E data
    class B,C,D preprocess
    class F,G,H model
    class I,J training
    class K,L,M optimization
    class N,O evaluation

从零实现

独热编码

import torch
import torch.nn.functional as F

def one_hot_encode(indices, vocab_size):
    """将索引转换为独热向量"""
    return F.one_hot(indices, vocab_size).float()

# 示例
vocab_size = 28
indices = torch.tensor([0, 2])
one_hot = one_hot_encode(indices, vocab_size)
print(f"输入索引: {indices}")
print(f"独热编码形状: {one_hot.shape}")

独热编码示例

输入索引: tensor([0, 2])
独热编码形状: torch.Size([2, 28])

说明

  • 每个词元被表示为长度为词汇表大小的向量
  • 只有对应位置为 1,其余位置为 0
  • 便于神经网络处理离散的词汇索引

参数初始化

def get_rnn_params(vocab_size, num_hiddens, device):
    """初始化RNN参数"""
    num_inputs = num_outputs = vocab_size

    def normal(shape):
        return torch.randn(size=shape, device=device) * 0.01

    # 隐藏层参数
    W_xh = normal((num_inputs, num_hiddens))   # 输入到隐状态
    W_hh = normal((num_hiddens, num_hiddens))  # 隐状态到隐状态
    b_h = torch.zeros(num_hiddens, device=device)

    # 输出层参数
    W_hq = normal((num_hiddens, num_outputs))  # 隐状态到输出
    b_q = torch.zeros(num_outputs, device=device)

    # 启用梯度
    params = [W_xh, W_hh, b_h, W_hq, b_q]
    for param in params:
        param.requires_grad_(True)

    return params

# 使用示例
vocab_size = 28
num_hiddens = 512
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
params = get_rnn_params(vocab_size, num_hiddens, device)
print(f"参数数量: {sum(p.numel() for p in params)}")

参数统计

参数数量: 307,640

参数详解

  • W_xh: (28, 512) = 14,336 参数
  • W_hh: (512, 512) = 262,144 参数
  • W_hq: (512, 28) = 14,336 参数
  • 偏置项: 512 + 28 = 540 参数

RNN 前向传播

def init_rnn_state(batch_size, num_hiddens, device):
    """初始化RNN隐状态"""
    return (torch.zeros((batch_size, num_hiddens), device=device),)

初始化时返回隐藏状态,因为在 0 时刻的时候, 没有上一个时刻的隐藏状态

def rnn(inputs, state, params):
    """RNN前向传播"""
    W_xh, W_hh, b_h, W_hq, b_q = params
    H, = state # 当前的隐藏状态
    outputs = []

    # 按时间步处理输入
    for X in inputs:  # inputs 是一个三维的tensor 沿着第一个维度(时间)遍历,X形状: (batch_size, vocab_size)
        H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h)
        Y = torch.mm(H, W_hq) + b_q
        outputs.append(Y)

    return torch.cat(outputs, dim=0), (H,) # 此次行数发生了变化,tensor形状:(batch_size(批量大小) * num_steps(时间长度), num_outputs) ,同时也输出了更新后的隐藏状态

定义了如何在一个时间步内计算隐藏状态并输出

class RNNModelScratch:
    """从零实现的RNN模型"""
    def __init__(self, vocab_size, num_hiddens, device, get_params, init_state, forward_fn):
        self.vocab_size = vocab_size
        self.num_hiddens = num_hiddens
        self.params = get_params(vocab_size, num_hiddens, device)
        self.init_state = init_state
        self.forward_fn = forward_fn

    def __call__(self, X, state):
        X = F.one_hot(X.T, self.vocab_size).float()
        return self.forward_fn(X, state, self.params)

    def begin_state(self, batch_size, device):
        return self.init_state(batch_size, self.num_hiddens, device)

使用这个类包装模型,支持不同的输入和输出

# 模型实例化
net = RNNModelScratch(vocab_size, num_hiddens, device,
                      get_rnn_params, init_rnn_state, rnn)

前向传播示例

# 测试前向传播
batch_size, num_steps = 2, 5
X = torch.randint(0, vocab_size, (batch_size, num_steps))
state = net.begin_state(batch_size, device)
Y, new_state = net(X.to(device), state)

print(f"输入形状: {X.shape}")
print(f"输出形状: {Y.shape}")
print(f"隐状态形状: {new_state[0].shape}")

输出

输入形状: torch.Size([2, 5])
输出形状: torch.Size([10, 28])
隐状态形状: torch.Size([2, 512])

预测功能

def predict_rnn(prefix, num_preds, net, vocab, device):
    """使用RNN生成文本"""
    state = net.begin_state(batch_size=1, device=device) # 生成初始的隐藏状态
    outputs = [vocab[prefix[0]]] # 存储每个字符串在vocab的下标

    def get_input():
        return torch.tensor([outputs[-1]], device=device).reshape((1, 1)) # 把最近预测的值转换为下一次预测的输入 形状为(1, 1)

    # 预热期:处理前缀
    for y in prefix[1:]: # 从第二个字符开始进行处理
        _, state = net(get_input(), state) # 把前缀信息存入state中
        outputs.append(vocab[y]) # 存储当前字符的下标

    # 预测期:生成新字符
    for _ in range(num_preds):
        y, state = net(get_input(), state) # 每次把前一个时刻的输入与隐藏状态放入,得到下一个时刻的输出和新的隐藏状态
        outputs.append(int(y.argmax(dim=1).reshape(1))) # 获取预测结果,并转换为下标

    return ''.join([vocab.idx_to_token[i] for i in outputs]) # 索引转换为字符串

# 预测示例(未训练模型)
# result = predict_rnn('time traveller ', 10, net, vocab, device)
# 输出:'time traveller fqendfqend'

预测流程

  1. 预热期:模型读取前缀"time traveller ",更新内部状态
  2. 预测期:基于当前状态生成后续字符
  3. 迭代生成:每生成一个字符,都更新状态用于下次预测

训练优化

梯度裁剪

梯度裁剪原理

gmin(1,θg)g

作用

  • 限制梯度范数不超过阈值θ
  • 保持梯度方向不变
  • 防止梯度爆炸导致训练不稳定
import math

def grad_clipping(net, theta):
    """梯度裁剪防止梯度爆炸"""
    if isinstance(net, torch.nn.Module):
        params = [p for p in net.parameters() if p.requires_grad]
    else:
        params = net.params

    # 计算梯度范数
    norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))

    # 裁剪梯度
    if norm > theta:
        for param in params:
            param.grad[:] *= theta / norm

定义函数在一个迭代周期内训练模型

def train_epoch_rnn(net, train_iter, loss, updater, device, use_random_iter):
    """训练一个epoch"""
    state, timer = None, Timer() # 初始化状态和计时器
    metric = Accumulator(2)  # 训练损失之和, 词元数量

    for X, Y in train_iter:
        if state is None or use_random_iter:
            # 初始化状态
            state = net.begin_state(batch_size=X.shape[0], device=device)
        else:
            # 分离梯度
            if isinstance(net, torch.nn.Module) and not isinstance(state, tuple):
                state.detach_()
            else:
                for s in state:
                    s.detach_()
        # 将目标值Y进行转置并重塑,以便与模型的输出形状匹配
        y = Y.T.reshape(-1)

        # 将输入数据X和目标值y移动到指定的设备(如GPU)上,以进行高效的计算
        X, y = X.to(device), y.to(device)

        # 通过网络传递输入数据X和当前状态state,得到预测输出y_hat和新的状态state
        y_hat, state = net(X, state)

        # 计算预测输出y_hat与目标值y之间的损失,并取平均值
        l = loss(y_hat, y.long()).mean()

        # 如果更新器updater是torch.optim.Optimizer的实例,则进行梯度清零、反向传播、梯度裁剪和参数更新
        if isinstance(updater, torch.optim.Optimizer):
            updater.zero_grad()
            l.backward()
            grad_clipping(net, 1)
            updater.step()
        else:
            # 如果更新器updater不是torch.optim.Optimizer的实例,则仅进行反向传播和梯度裁剪
            l.backward()
            grad_clipping(net, 1)
            # 因为已经调用了mean函数,所以在这里使用batch_size=1进行参数更新
            updater(batch_size=1)

        # 更新度量metric,添加当前批次的平均损失乘以元素数量和元素总数量
        metric.add(l * y.numel(), y.numel())

    return math.exp(metric[0] / metric[1]), metric[1] / timer.stop() # 计算困惑度

训练循环

def train_rnn(net, train_iter, vocab, lr, num_epochs, device, use_random_iter=False):
    """训练RNN模型"""
    loss = torch.nn.CrossEntropyLoss()
    animator = Animator(xlabel='epoch', ylabel='perplexity',
                       legend=['train'], xlim=[10, num_epochs])

    # 初始化优化器
    if isinstance(net, torch.nn.Module):
        updater = torch.optim.SGD(net.parameters(), lr)
    else:
        updater = lambda batch_size: sgd(net.params, lr, batch_size)

    predict = lambda prefix: predict_rnn(prefix, 50, net, vocab, device)

    # 训练循环
    for epoch in range(num_epochs):
        ppl, speed = train_epoch_rnn(net, train_iter, loss, updater,
                                   device, use_random_iter)
        if (epoch + 1) % 10 == 0:
            print(predict('time traveller'))
            animator.add(epoch + 1, [ppl])

    print(f'困惑度 {ppl:.1f}, {speed:.1f} 词元/秒 {str(device)}')

训练输出示例

epoch 10: time traveller the the the the the the the
epoch 50: time traveller and the time machine was a
epoch 100: time traveller for so it will be convenient to speak

困惑度 1.2, 23500.0 词元/秒 cuda:0

高级 API 实现

PyTorch RNN 模块架构

graph TD
    A[输入序列] --> B[Embedding层]
    B --> C[RNN层]
    C --> D[线性输出层]
    D --> E[概率分布]

    F[隐状态] --> C
    C --> G[新隐状态]

    H[LSTM变体] --> C
    I[GRU变体] --> C
    J[双向RNN] --> C

    classDef input fill:#FF1744,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef embedding fill:#9C27B0,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef rnn fill:#2196F3,stroke:#ffffff,stroke-width:4px,color:#fff
    classDef output fill:#FF9800,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef state fill:#4CAF50,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef variants fill:#00BCD4,stroke:#ffffff,stroke-width:3px,color:#fff

    class A input
    class B embedding
    class C rnn
    class D,E output
    class F,G state
    class H,I,J variants
import torch.nn as nn

class RNNModel(nn.Module):
    """使用PyTorch高级API的RNN模型"""
    def __init__(self, rnn_layer, vocab_size):
        super(RNNModel, self).__init__()
        self.rnn = rnn_layer
        self.vocab_size = vocab_size
        self.num_hiddens = self.rnn.hidden_size

        # 输出层
        if self.rnn.bidirectional:
            self.num_directions = 2
        else:
            self.num_directions = 1
        self.linear = nn.Linear(self.num_hiddens * self.num_directions, vocab_size)

    def forward(self, inputs, state):
        X = F.one_hot(inputs.T.long(), self.vocab_size).float()
        Y, state = self.rnn(X, state)
        # 全连接层
        output = self.linear(Y.reshape((-1, Y.shape[-1])))
        return output, state

    def begin_state(self, device, batch_size=1):
        if not isinstance(self.rnn, nn.LSTM):
            # nn.GRU以及nn.RNN
            return torch.zeros((self.rnn.num_layers * self.num_directions,
                              batch_size, self.num_hiddens), device=device)
        else:
            # nn.LSTM
            return (torch.zeros((self.rnn.num_layers * self.num_directions,
                               batch_size, self.num_hiddens), device=device),
                    torch.zeros((self.rnn.num_layers * self.num_directions,
                               batch_size, self.num_hiddens), device=device))

# 使用示例
num_hiddens = 256
rnn_layer = nn.RNN(len(vocab), num_hiddens)
net = RNNModel(rnn_layer, vocab_size=len(vocab))

# 训练
num_epochs, lr = 500, 1
train_rnn(net, train_iter, vocab, lr, num_epochs, device)

不同 RNN 变体

# 标准RNN
rnn_layer = nn.RNN(input_size=vocab_size, hidden_size=num_hiddens,
                   num_layers=1, batch_first=False)

# LSTM(长短期记忆网络)
lstm_layer = nn.LSTM(input_size=vocab_size, hidden_size=num_hiddens,
                     num_layers=1, batch_first=False)

# GRU(门控循环单元)
gru_layer = nn.GRU(input_size=vocab_size, hidden_size=num_hiddens,
                   num_layers=1, batch_first=False)

# 双向RNN
bidirectional_rnn = nn.RNN(input_size=vocab_size, hidden_size=num_hiddens,
                           num_layers=1, bidirectional=True, batch_first=False)

困惑度评估

困惑度定义

困惑度是评估语言模型质量的重要指标:

PPL=exp(1nt=1nlogP(xt|x1,,xt1))

困惑度计算

def evaluate_perplexity(net, data_iter, device):
    """计算模型困惑度"""
    net.eval()
    total_loss = 0.0
    total_tokens = 0

    with torch.no_grad():
        for X, Y in data_iter:
            state = net.begin_state(device, X.shape[0])
            y = Y.T.reshape(-1)
            X, y = X.to(device), y.to(device)
            y_hat, _ = net(X, state)
            loss = F.cross_entropy(y_hat, y.long(), reduction='sum')
            total_loss += loss.item()
            total_tokens += y.numel()

    return math.exp(total_loss / total_tokens)

# 使用示例
# perplexity = evaluate_perplexity(net, test_iter, device)
# print(f"测试困惑度: {perplexity:.2f}")

困惑度解释

  • PPL = 1: 完美预测(理论最优)
  • PPL = 词汇表大小: 随机猜测的基线
  • PPL 越低越好: 表示模型预测越准确
  • 实际应用: 好的模型困惑度通常在 10-100 之间

RNN 的优缺点

优点

graph LR
    A[RNN优点] --> B[处理变长序列]
    A --> C[参数共享]
    A --> D[记忆能力]
    A --> E[理论简单]

    classDef main fill:#4CAF50,stroke:#ffffff,stroke-width:4px,color:#fff
    classDef advantage fill:#8BC34A,stroke:#ffffff,stroke-width:3px,color:#fff

    class A main
    class B,C,D,E advantage
  • 序列建模: 天然适合处理序列数据
  • 参数效率: 所有时间步共享参数
  • 灵活性: 支持多种输入输出模式
  • 可解释性: 隐状态具有明确的物理含义

缺点

graph LR
    A[RNN缺点] --> B[梯度消失]
    A --> C[计算串行]
    A --> D[长期依赖]
    A --> E[训练困难]

    classDef main fill:#F44336,stroke:#ffffff,stroke-width:4px,color:#fff
    classDef disadvantage fill:#FF5722,stroke:#ffffff,stroke-width:3px,color:#fff

    class A main
    class B,C,D,E disadvantage
  • 梯度消失: 难以捕获长期依赖
  • 计算串行: 无法充分并行化
  • 训练不稳定: 容易出现梯度爆炸
  • 性能限制: 对于复杂任务表现有限

技术演进路径

graph LR
    A[传统RNN] --> B[LSTM]
    B --> C[GRU]
    C --> D[双向RNN]
    D --> E[Attention]
    E --> F[Transformer]
    F --> G[BERT/GPT]

    classDef old fill:#FF5722,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef improved fill:#FF9800,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef advanced fill:#4CAF50,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef modern fill:#2196F3,stroke:#ffffff,stroke-width:3px,color:#fff
    classDef sota fill:#9C27B0,stroke:#ffffff,stroke-width:4px,color:#fff

    class A old
    class B,C improved
    class D,E advanced
    class F modern
    class G sota

发展趋势

  • LSTM/GRU: 解决梯度消失问题
  • Attention 机制: 改善长期依赖建模
  • Transformer: 完全基于注意力的架构
  • 预训练模型: BERT、GPT 等大规模模型

总结

  • 循环神经网络的输出取决于当下输入和前一时间的隐变量
  • 应用到语言模型中时,循环神经网络根据当前词预测下一次时刻词
  • 通常使用困惑度来衡量语言模型的好坏

参考资料:

浙ICP备2021022773号    2022-PRESENT © ZhengKe