题目:详细阐述PPO(Proximal Policy Optimization)算法的完整实现步骤,包括初始化、数据收集、优势函数计算、网络优化等各个阶段。
PPO是一种近端策略优化算法,属于策略梯度方法的一种改进版本。它通过限制每次策略更新的幅度来平衡探索与利用,提高训练的稳定性。
限制策略更新幅度
探索与利用的权衡
样本复用提高效率
| 参数名称 | 符号 | 典型值 | 作用说明 |
|---|---|---|---|
| 裁剪参数 | ε | 0.1-0.2 | 控制策略更新幅度 |
| 学习率 | α | 1e-4到3e-4 | 梯度下降步长 |
| 折扣因子 | γ | 0.99-0.999 | 权衡即时奖励和长期奖励 |
| GAE参数 | λ | 0.9-0.95 | 偏差-方差权衡 |
| 批次大小 | T | 2048-8192 | 每次收集的数据量 |
| 更新轮数 | K | 3-10 | 同一批数据的更新次数 |
| 熵系数 | c₂ | 0.01-0.1 | 鼓励探索 |
| 价值函数系数 | c₁ | 0.5-1.0 | 价值损失权重 |
state = env.reset()
done = False
episode_reward = 0
while not done and collected_steps < T:
# 使用策略网络选择动作
action_probs = policy_network(state)
action = sample_from_distribution(action_probs)
log_prob = log_probability(action, action_probs)
# 执行动作
next_state, reward, done, info = env.step(action)
# 存储数据
states.append(state)
actions.append(action)
rewards.append(reward)
log_probs.append(log_prob)
dones.append(done)
values.append(value_network(state))
state = next_state
episode_reward += reward
collected_steps += 1
使用当前价值网络计算TD误差:
for t in range(T):
if not done[t]:
bootstrap_value = value_network(states[t+1])
else:
bootstrap_value = 0
delta[t] = rewards[t] + gamma * bootstrap_value * (1 - done[t]) - values[t]
使用GAE(Generalized Advantage Estimation)计算优势函数,平衡偏差和方差:
advantage[T-1] = delta[T-1]
for t in reversed(range(T-1)):
if not done[t]:
advantage[t] = delta[t] + gamma * lambda * advantage[t+1] * (1 - done[t])
else:
advantage[t] = delta[t]
计算用于价值函数训练的目标值:
returns = advantages + values
batch_size = T // num_mini_batches
indices = random.permutation(T)
for i in range(num_mini_batches):
start_idx = i * batch_size
end_idx = (i + 1) * batch_size
batch_indices = indices[start_idx:end_idx]
batch_states = states[batch_indices]
batch_actions = actions[batch_indices]
batch_old_log_probs = old_log_probs[batch_indices]
batch_advantages = advantages[batch_indices]
batch_returns = returns[batch_indices]
action_dists = policy_network(batch_states)
batch_new_log_probs = action_dists.log_prob(batch_actions)
entropy = action_dists.entropy().mean()
batch_new_values = value_network(batch_states)
ratio = torch.exp(batch_new_log_probs - batch_old_log_probs)
surr1 = ratio * batch_advantages
surr2 = torch.clamp(ratio, 1 - epsilon, 1 + epsilon) * batch_advantages
policy_loss = -torch.min(surr1, surr2).mean()
value_loss = F.mse_loss(batch_new_values, batch_returns)
entropy_loss = -entropy
total_loss = policy_loss + c1 * value_loss + c2 * entropy_loss
optimizer.zero_grad()
total_loss.backward()
torch.nn.utils.clip_grad_norm_(policy_network.parameters(), max_grad_norm)
torch.nn.utils.clip_grad_norm_(value_network.parameters(), max_grad_norm)
optimizer.step()
在同一批数据上进行K轮更新,提高样本效率:
for epoch in range(K):
for mini_batch in mini_batches:
# 执行步骤4.2-4.5的所有计算
# 前向传播 → 计算比率 → 计算损失 → 反向传播
pass
如果使用KL散度惩罚版本:
# 计算当前策略和旧策略的KL散度
kl_divergence = (old_action_probs * (old_action_probs.log() - new_action_probs.log())).sum(-1)
kl_mean = kl_divergence.mean()
# 动态调整惩罚系数
if kl_mean < target_kl / 1.5:
beta *= 2
elif kl_mean > target_kl * 1.5:
beta /= 2
# 修改损失函数
policy_loss = -(ratio * advantages - beta * kl_divergence).mean()
最近N个episode的平均奖励
policy_loss的值
value_loss的值
entropy_loss的值
参数梯度的L2范数
策略更新前后的KL散度
被裁剪的样本比例
# 线性衰减学习率
def linear_schedule(initial_lr, final_lr, total_timesteps):
def schedule(progress):
return final_lr + (initial_lr - final_lr) * max(0, 1 - progress)
return schedule
torch.save({
'policy_state_dict': policy_network.state_dict(),
'value_state_dict': value_network.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'training_stats': training_stats
}, 'best_model.pth')
def evaluate_model(model, env, num_episodes=100):
total_rewards = []
for episode in range(num_episodes):
state = env.reset()
done = False
episode_reward = 0
while not done:
action = model.get_greedy_action(state)
state, reward, done, _ = env.step(action)
episode_reward += reward
total_rewards.append(episode_reward)
return np.mean(total_rewards), np.std(total_rewards)
def PPO_algorithm():
# 初始化
policy_network = PolicyNetwork()
value_network = ValueNetwork()
optimizer = Adam(list(policy_network.parameters()) + list(value_network.parameters()))
for iteration in range(max_iterations):
# 数据收集阶段
states, actions, rewards, log_probs, values, dones = [], [], [], [], [], []
state = env.reset()
for t in range(batch_size):
# 策略执行
action_dist = policy_network(state)
action = action_dist.sample()
log_prob = action_dist.log_prob(action)
value = value_network(state)
# 环境交互
next_state, reward, done, _ = env.step(action)
# 存储数据
states.append(state)
actions.append(action)
rewards.append(reward)
log_probs.append(log_prob)
values.append(value)
dones.append(done)
state = next_state
if done:
state = env.reset()
# 计算优势函数
advantages, returns = compute_gae(rewards, values, dones, gamma, lambda_)
# 策略更新阶段
old_log_probs = log_probs.clone()
for epoch in range(update_epochs):
# 随机打乱数据
indices = torch.randperm(batch_size)
for start in range(0, batch_size, mini_batch_size):
end = start + mini_batch_size
batch_indices = indices[start:end]
# 计算新的策略概率和价值
batch_states = states[batch_indices]
batch_actions = actions[batch_indices]
batch_old_log_probs = old_log_probs[batch_indices]
batch_advantages = advantages[batch_indices]
batch_returns = returns[batch_indices]
action_dist = policy_network(batch_states)
new_log_probs = action_dist.log_prob(batch_actions)
new_values = value_network(batch_states)
# 计算比率
ratio = torch.exp(new_log_probs - batch_old_log_probs)
# 计算策略损失
surr1 = ratio * batch_advantages
surr2 = torch.clamp(ratio, 1 - epsilon, 1 + epsilon) * batch_advantages
policy_loss = -torch.min(surr1, surr2).mean()
# 计算价值损失
value_loss = F.mse_loss(new_values, batch_returns)
# 计算熵损失
entropy_loss = -action_dist.entropy().mean()
# 总损失
total_loss = policy_loss + c1 * value_loss + c2 * entropy_loss
# 更新参数
optimizer.zero_grad()
total_loss.backward()
optimizer.step()
# 记录训练统计信息
log_training_stats(iteration, policy_loss, value_loss, entropy_loss)
# 评估模型
if iteration % eval_freq == 0:
avg_reward = evaluate_model(policy_network, env)
print(f"Iteration {iteration}: Average Reward = {avg_reward:.2f}")
# GAE计算函数
def compute_gae(rewards, values, dones, gamma, lambda_):
advantages = []
returns = []
gae = 0
for t in reversed(range(len(rewards))):
if t == len(rewards) - 1:
next_value = 0
else:
next_value = values[t + 1]
delta = rewards[t] + gamma * next_value * (1 - dones[t]) - values[t]
gae = delta + gamma * lambda_ * (1 - dones[t]) * gae
advantages.insert(0, gae)
returns.insert(0, gae + values[t])
return torch.tensor(advantages), torch.tensor(returns)
PPO算法通过七个阶段的精心设计,实现了稳定高效的策略优化:
设置网络架构和超参数
与环境交互收集经验
使用GAE计算优势函数
通过裁剪目标函数更新策略
多轮更新提高样本效率
跟踪性能指标并调整
评估模型并保存最佳版本