11.06课堂作业2:PPO算法详细步骤

2024年11月6日 策略优化

作业题目

题目:详细阐述PPO(Proximal Policy Optimization)算法的完整实现步骤,包括初始化、数据收集、优势函数计算、网络优化等各个阶段。

PPO算法概述

PPO是一种近端策略优化算法,属于策略梯度方法的一种改进版本。它通过限制每次策略更新的幅度来平衡探索与利用,提高训练的稳定性。

稳定性

限制策略更新幅度

平衡性

探索与利用的权衡

高效性

样本复用提高效率

第一阶段:初始化设置

1.1 网络架构初始化

策略网络 πθ(a|s)

  • 输入:状态s(可以是图像、向量等)
  • 输出:动作概率分布(离散动作)或动作均值和方差(连续动作)
  • 网络结构:通常使用MLP或CNN,包含输入层、隐藏层、输出层
  • 激活函数:隐藏层使用ReLU或tanh,输出层根据任务选择

价值网络 Vφ(s)

  • 输入:状态s
  • 输出:状态价值标量
  • 网络结构:可与策略网络共享特征提取层
  • 激活函数:隐藏层使用ReLU,输出层通常不使用激活函数

1.2 超参数设置

参数名称 符号 典型值 作用说明
裁剪参数 ε 0.1-0.2 控制策略更新幅度
学习率 α 1e-4到3e-4 梯度下降步长
折扣因子 γ 0.99-0.999 权衡即时奖励和长期奖励
GAE参数 λ 0.9-0.95 偏差-方差权衡
批次大小 T 2048-8192 每次收集的数据量
更新轮数 K 3-10 同一批数据的更新次数
熵系数 c₂ 0.01-0.1 鼓励探索
价值函数系数 c₁ 0.5-1.0 价值损失权重

第二阶段:数据收集过程

2.1 环境交互

state = env.reset()
done = False
episode_reward = 0

while not done and collected_steps < T:
    # 使用策略网络选择动作
    action_probs = policy_network(state)
    action = sample_from_distribution(action_probs)
    log_prob = log_probability(action, action_probs)
    
    # 执行动作
    next_state, reward, done, info = env.step(action)
    
    # 存储数据
    states.append(state)
    actions.append(action)
    rewards.append(reward)
    log_probs.append(log_prob)
    dones.append(done)
    values.append(value_network(state))
    
    state = next_state
    episode_reward += reward
    collected_steps += 1

2.2 数据预处理

数据格式转换

  • 将列表转换为numpy数组或torch张量
  • 确保数据类型和设备一致性

数据归一化(可选)

  • 状态归一化:计算均值和方差,标准化输入
  • 奖励归一化:计算均值和方差,标准化奖励
  • 优势函数归一化:零均值单位方差

第三阶段:优势函数计算

3.1 计算时间差分误差

使用当前价值网络计算TD误差:

for t in range(T):
    if not done[t]:
        bootstrap_value = value_network(states[t+1])
    else:
        bootstrap_value = 0
    
    delta[t] = rewards[t] + gamma * bootstrap_value * (1 - done[t]) - values[t]

3.2 计算GAE优势函数

使用GAE(Generalized Advantage Estimation)计算优势函数,平衡偏差和方差:

$$A_t^{GAE} = \sum_{l=0}^{\infty} (\gamma\lambda)^l \delta_{t+l}$$
advantage[T-1] = delta[T-1]
for t in reversed(range(T-1)):
    if not done[t]:
        advantage[t] = delta[t] + gamma * lambda * advantage[t+1] * (1 - done[t])
    else:
        advantage[t] = delta[t]

3.3 计算目标价值

计算用于价值函数训练的目标值:

$$\text{returns} = \text{advantages} + \text{values}$$
returns = advantages + values

第四阶段:网络优化过程

4.1 数据批处理

batch_size = T // num_mini_batches
indices = random.permutation(T)

for i in range(num_mini_batches):
    start_idx = i * batch_size
    end_idx = (i + 1) * batch_size
    batch_indices = indices[start_idx:end_idx]
    
    batch_states = states[batch_indices]
    batch_actions = actions[batch_indices]
    batch_old_log_probs = old_log_probs[batch_indices]
    batch_advantages = advantages[batch_indices]
    batch_returns = returns[batch_indices]

4.2 前向传播计算

策略网络前向传播

action_dists = policy_network(batch_states)
batch_new_log_probs = action_dists.log_prob(batch_actions)
entropy = action_dists.entropy().mean()

价值网络前向传播

batch_new_values = value_network(batch_states)

4.3 计算重要性采样比率

$$r_t(\theta) = \frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{old}}(a_t|s_t)} = \exp(\log\pi_\theta(a_t|s_t) - \log\pi_{\theta_{old}}(a_t|s_t))$$
ratio = torch.exp(batch_new_log_probs - batch_old_log_probs)

4.4 计算各项损失函数

策略损失(裁剪项)

$$L^{CLIP}(\theta) = \hat{\mathbb{E}}_t\left[\min\left(r_t(\theta)\hat{A}_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon)\hat{A}_t\right)\right]$$
surr1 = ratio * batch_advantages
surr2 = torch.clamp(ratio, 1 - epsilon, 1 + epsilon) * batch_advantages
policy_loss = -torch.min(surr1, surr2).mean()

价值函数损失

$$L^{VF} = \frac{1}{N}\sum_i (V_\phi(s_i) - \text{returns}_i)^2$$
value_loss = F.mse_loss(batch_new_values, batch_returns)

熵损失

$$L^{ENT} = -\mathbb{E}[H(\pi)]$$
entropy_loss = -entropy

总损失

$$L^{TOTAL} = L^{CLIP} + c_1 L^{VF} + c_2 L^{ENT}$$
total_loss = policy_loss + c1 * value_loss + c2 * entropy_loss

4.5 反向传播和参数更新

optimizer.zero_grad()
total_loss.backward()
torch.nn.utils.clip_grad_norm_(policy_network.parameters(), max_grad_norm)
torch.nn.utils.clip_grad_norm_(value_network.parameters(), max_grad_norm)
optimizer.step()

第五阶段:重复更新过程

5.1 多轮更新

在同一批数据上进行K轮更新,提高样本效率:

for epoch in range(K):
    for mini_batch in mini_batches:
        # 执行步骤4.2-4.5的所有计算
        # 前向传播 → 计算比率 → 计算损失 → 反向传播
        pass

5.2 策略更新约束(PPO-Penalty变体)

如果使用KL散度惩罚版本:

# 计算当前策略和旧策略的KL散度
kl_divergence = (old_action_probs * (old_action_probs.log() - new_action_probs.log())).sum(-1)
kl_mean = kl_divergence.mean()

# 动态调整惩罚系数
if kl_mean < target_kl / 1.5:
    beta *= 2
elif kl_mean > target_kl * 1.5:
    beta /= 2

# 修改损失函数
policy_loss = -(ratio * advantages - beta * kl_divergence).mean()

第六阶段:训练监控和调整

6.1 性能指标监控

平均奖励

最近N个episode的平均奖励

策略损失

policy_loss的值

价值损失

value_loss的值

熵损失

entropy_loss的值

梯度范数

参数梯度的L2范数

KL散度

策略更新前后的KL散度

裁剪比例

被裁剪的样本比例

6.2 学习率调度

# 线性衰减学习率
def linear_schedule(initial_lr, final_lr, total_timesteps):
    def schedule(progress):
        return final_lr + (initial_lr - final_lr) * max(0, 1 - progress)
    return schedule

6.3 早停策略

  • 如果平均奖励在连续M轮中没有提升,降低学习率
  • 如果性能持续下降,回滚到之前的最佳模型

第七阶段:训练完成和评估

7.1 模型保存

torch.save({
    'policy_state_dict': policy_network.state_dict(),
    'value_state_dict': value_network.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'training_stats': training_stats
}, 'best_model.pth')

7.2 最终评估

def evaluate_model(model, env, num_episodes=100):
    total_rewards = []
    for episode in range(num_episodes):
        state = env.reset()
        done = False
        episode_reward = 0
        
        while not done:
            action = model.get_greedy_action(state)
            state, reward, done, _ = env.step(action)
            episode_reward += reward
        
        total_rewards.append(episode_reward)
    
    return np.mean(total_rewards), np.std(total_rewards)

PPO算法完整伪代码

def PPO_algorithm():
    # 初始化
    policy_network = PolicyNetwork()
    value_network = ValueNetwork()
    optimizer = Adam(list(policy_network.parameters()) + list(value_network.parameters()))
    
    for iteration in range(max_iterations):
        # 数据收集阶段
        states, actions, rewards, log_probs, values, dones = [], [], [], [], [], []
        state = env.reset()
        
        for t in range(batch_size):
            # 策略执行
            action_dist = policy_network(state)
            action = action_dist.sample()
            log_prob = action_dist.log_prob(action)
            value = value_network(state)
            
            # 环境交互
            next_state, reward, done, _ = env.step(action)
            
            # 存储数据
            states.append(state)
            actions.append(action)
            rewards.append(reward)
            log_probs.append(log_prob)
            values.append(value)
            dones.append(done)
            
            state = next_state
            if done:
                state = env.reset()
        
        # 计算优势函数
        advantages, returns = compute_gae(rewards, values, dones, gamma, lambda_)
        
        # 策略更新阶段
        old_log_probs = log_probs.clone()
        
        for epoch in range(update_epochs):
            # 随机打乱数据
            indices = torch.randperm(batch_size)
            
            for start in range(0, batch_size, mini_batch_size):
                end = start + mini_batch_size
                batch_indices = indices[start:end]
                
                # 计算新的策略概率和价值
                batch_states = states[batch_indices]
                batch_actions = actions[batch_indices]
                batch_old_log_probs = old_log_probs[batch_indices]
                batch_advantages = advantages[batch_indices]
                batch_returns = returns[batch_indices]
                
                action_dist = policy_network(batch_states)
                new_log_probs = action_dist.log_prob(batch_actions)
                new_values = value_network(batch_states)
                
                # 计算比率
                ratio = torch.exp(new_log_probs - batch_old_log_probs)
                
                # 计算策略损失
                surr1 = ratio * batch_advantages
                surr2 = torch.clamp(ratio, 1 - epsilon, 1 + epsilon) * batch_advantages
                policy_loss = -torch.min(surr1, surr2).mean()
                
                # 计算价值损失
                value_loss = F.mse_loss(new_values, batch_returns)
                
                # 计算熵损失
                entropy_loss = -action_dist.entropy().mean()
                
                # 总损失
                total_loss = policy_loss + c1 * value_loss + c2 * entropy_loss
                
                # 更新参数
                optimizer.zero_grad()
                total_loss.backward()
                optimizer.step()
        
        # 记录训练统计信息
        log_training_stats(iteration, policy_loss, value_loss, entropy_loss)
        
        # 评估模型
        if iteration % eval_freq == 0:
            avg_reward = evaluate_model(policy_network, env)
            print(f"Iteration {iteration}: Average Reward = {avg_reward:.2f}")


# GAE计算函数
def compute_gae(rewards, values, dones, gamma, lambda_):
    advantages = []
    returns = []
    gae = 0
    
    for t in reversed(range(len(rewards))):
        if t == len(rewards) - 1:
            next_value = 0
        else:
            next_value = values[t + 1]
        
        delta = rewards[t] + gamma * next_value * (1 - dones[t]) - values[t]
        gae = delta + gamma * lambda_ * (1 - dones[t]) * gae
        advantages.insert(0, gae)
        returns.insert(0, gae + values[t])
    
    return torch.tensor(advantages), torch.tensor(returns)

总结

PPO算法通过七个阶段的精心设计,实现了稳定高效的策略优化:

1. 初始化

设置网络架构和超参数

2. 数据收集

与环境交互收集经验

3. 优势计算

使用GAE计算优势函数

4. 网络优化

通过裁剪目标函数更新策略

5. 重复更新

多轮更新提高样本效率

6. 训练监控

跟踪性能指标并调整

7. 评估保存

评估模型并保存最佳版本