PPO算法
一句话概述
MAPPO(Multi-Agent PPO)是PPO算法在多智能体场景的自然推广,通过参数共享和集中价值函数实现了令人瞩目的多智能体协同效果。在星际争霸II多智能体挑战赛(SMAC)中,MAPPO在没有值分解等专门设计的情况下,简单且稳定地匹敌甚至超越QMIX等专门的MARL算法。
教学与演示
什么是MAPPO
MAPPO全称Multi-Agent Proximal Policy Optimization,是单智能体PPO在多智能体场景的直接推广。它的核心思想非常简单:每个智能体运行一个PPO实例,但价值函数(Critic)可以选择使用全局信息。
MAPPO的简单性让人惊讶——不像QMIX需要精心设计的混合网络和单调性约束,不像COMA需要昂贵的反事实基线计算,MAPPO就是标准PPO加上「集中价值函数」。
其中优势A_t由集中价值函数估计:V(s, o_1, ..., o_N),s是全局状态。这与单智能体PPO的唯一区别是Critic的输入维度变大了。
大白话 MAPPO的成功有点「反直觉」——在大家都觉得多智能体需要复杂方法(如值分解、反事实基线)的时候,直接把PPO搬过来、换个大一点的Critic输入,居然效果拔群。有时候最简单的方法反而是最好的。
IPPO与MAPPO的对比
在深入MAPPO之前,需要理解IPPO(Independent PPO)——每个智能体独立运行PPO,既不共享参数,也不共享Critic信息。
| 特性 | IPPO | MAPPO |
|---|---|---|
| Critic输入 | 仅局部观测 | 全局状态或联合观测 |
| 参数共享 | 不共享 | 同质智能体共享 |
| 非平稳处理 | 无专门处理 | 集中Critic减轻非平稳 |
| 信用分配 | 隐式(通过个体奖励) | 隐式(通过集中V函数) |
| 实现复杂度 | 最低 | 低(比QMIX简单) |
IPPO在简单任务中可以工作,但在需要协调的复杂任务(如星际争霸微操)中往往失败——因为它没有机制应对非平稳性。
MAPPO通过两个关键设计解决了IPPO的问题:
- undefined
大白话 IPPO就像五个互不交流的球员各踢各的球,MAPPO则给球场装了一套全局监控系统——虽然球员还是只能看到自己周围的画面,但教练(Critic)能从上帝视角告诉他们:「虽然你觉得这球传得好,但从全局看其实传给左边更好。」
MAPPO的训练流程
MAPPO的训练分为三大阶段:
阶段一:数据收集
- undefined
阶段二:优势计算
- undefined
阶段三:参数更新
- undefined
大白话 MAPPO的训练流程和单智能体PPO几乎一样——收集→GAE优势→多轮更新。唯一区别就是多收集了几个智能体的数据、Critic看的多了一点。这种「少即是多」的设计哲学让人惊叹。
集中价值函数的设计选择
MAPPO中集中Critic的设计有三种常见选择:
1. 全局状态Critic:V(s),其中s是环境的全局状态(如所有单位的位置、血量等)。信息最完整,但状态空间大。
2. 拼接观测Critic:V([o_1, o_2, ..., o_N]),将所有智能体的局部观测拼接。更灵活,不依赖环境提供全局状态。
3. 局部观测Critic:V(o_i),即不使用全局信息(退化为IPPO)。
实验表明,使用全局状态通常能得到最好的训练效果,但拼接观测在无法获得全局状态的场景中是不错的替代方案。
大白话 集中Critic选哪种?有全局状态优先用(信息最全),没有就把所有人的眼睛拼在一起(全联盟观测)。实在不行才用局部观测退化成IPPO。
什么用
MAPPO的实际价值:
- undefined
大白话 如果你的多智能体任务不需要专门的信用分配机制,先试试MAPPO。实现成本极低,效果往往出人意料地好。
哪些坑
| 坑点 | 原因 | 解决方案 |
|---|---|---|
| 参数共享不适合异质智能体 | 不同功能的智能体(如射手/坦克)行为差异大 | 仅对同质智能体共享,异质智能体用独立网络或ID编码 |
| 集中Critic维度爆炸 | N个智能体×观测维度可能很大 | 注意网络设计、使用embedding降维 |
| 训练时全局信息泄漏到Actor | 若Actor也能访问全局状态则违反CTDE | 严格确保Actor只接收局部观测 |
| 经验池数据量大 | N智能体×T步×多条轨迹 | mini-batch适当增大、distributed data collection |
| 优势值需按智能体分别归一化 | 不同智能体的奖励尺度可能不同 | 对每个智能体的优势在不同时间步上独立归一化 |
| 序列长度不均衡 | 不同episode长度差异大 | 固定horizon切分、或使用RNN处理变长序列 |
核心代码演示
"""
MAPPO (Multi-Agent PPO) 核心实现
展示了参数共享、集中Critic和多智能体数据收集
"""
import numpy as np
# ===== 参数共享的Actor-Critic网络 =====
class SharedActorCritic:
"""
MAPPO的参数共享网络
同一套参数服务于所有同质智能体
"""
def __init__(self, obs_dim, n_actions, use_global_critic, global_dim=0):
self.obs_dim = obs_dim
self.n_actions = n_actions
self.use_global_critic = use_global_critic
# Actor: 局部观测 → 动作概率
self.actor_w = np.random.randn(obs_dim, n_actions) * 0.1
self.actor_b = np.zeros(n_actions)
# Critic: 局部/全局观测 → 状态值
critic_input_dim = global_dim if use_global_critic else obs_dim
self.critic_w = np.random.randn(critic_input_dim, 1) * 0.1
self.critic_b = np.zeros(1)
def get_action_and_value(self, obs, global_state=None, action=None):
"""
获取动作概率和价值估计
同时返回log_probs和entropy(用于PPO损失计算)
参数:
- obs: 局部观测 [obs_dim]
- global_state: 全局状态 [global_dim](集中Critic用)
- action: 如果给定,返回该动作的log_prob
"""
# Actor前向传播
logits = obs @ self.actor_w + self.actor_b
# 数值稳定的softmax
logits_shifted = logits - np.max(logits)
probs = np.exp(logits_shifted) / np.sum(np.exp(logits_shifted))
# 如果没有指定动作,采样一个
if action is None:
action = np.random.choice(self.n_actions, p=probs)
log_prob = np.log(probs[action] + 1e-8)
entropy = -np.sum(probs * np.log(probs + 1e-8))
# Critic前向传播
critic_input = global_state if self.use_global_critic else obs
value = (critic_input @ self.critic_w + self.critic_b).item()
return action, log_prob, entropy, value, probs
# ===== MAPPO训练器(简化版)=====
class MAPPOTrainer:
"""MAPPO训练器:协调多智能体的数据收集和参数更新"""
def __init__(self, n_agents, obs_dim, n_actions, global_dim=0,
use_shared_params=True, use_global_critic=True):
self.n_agents = n_agents
self.obs_dim = obs_dim
self.n_actions = n_actions
self.use_shared_params = use_shared_params
if use_shared_params:
# 参数共享:所有智能体用同一套AC网络
self.ac = SharedActorCritic(
obs_dim, n_actions, use_global_critic, global_dim
)
else:
# 独立参数:每个智能体有自己的AC网络
self.acs = [
SharedActorCritic(obs_dim, n_actions, use_global_critic, global_dim)
for _ in range(n_agents)
]
self.gamma = 0.99 # 折扣因子
self.lam = 0.95 # GAE λ
self.epsilon = 0.2 # PPO clip参数
def collect_rollout(self, env, steps_per_iter=64):
"""
收集多智能体轨迹数据
返回一个batch包含所有智能体的经验
"""
batch = {
'obs': [], 'actions': [], 'log_probs': [],
'values': [], 'rewards': [], 'dones': [],
'global_states': []
}
obs = env.reset() # [n_agents, obs_dim]
global_state = env.get_global_state() if hasattr(env, 'get_global_state') else None
for step in range(steps_per_iter):
actions = np.zeros(self.n_agents, dtype=int)
log_probs = np.zeros(self.n_agents)
values = np.zeros(self.n_agents)
# 多智能体同时做决策
for agent_id in range(self.n_agents):
ac = self.ac if self.use_shared_params else self.acs[agent_id]
a, lp, _, v, _ = ac.get_action_and_value(obs[agent_id], global_state)
actions[agent_id] = a
log_probs[agent_id] = lp
values[agent_id] = v
# 环境步进
next_obs, rewards, dones, info = env.step(actions)
next_global_state = env.get_global_state() if hasattr(env, 'get_global_state') else None
# 存储经验
batch['obs'].append(obs.copy())
batch['actions'].append(actions.copy())
batch['log_probs'].append(log_probs.copy())
batch['values'].append(values.copy())
batch['rewards'].append(rewards.copy())
batch['dones'].append(np.full(self.n_agents, float(dones)))
if global_state is not None:
batch['global_states'].append(global_state.copy())
obs = next_obs
global_state = next_global_state
if dones:
obs = env.reset()
# 转换为numpy数组
for k in batch:
if len(batch[k]) > 0:
batch[k] = np.array(batch[k])
return batch
def compute_gae(self, rewards, values, dones, last_value):
"""
多智能体GAE计算
对每个智能体独立计算GAE优势
"""
n_agents = rewards.shape[1]
T = len(rewards)
advantages = np.zeros_like(rewards)
returns = np.zeros_like(rewards)
for agent_id in range(n_agents):
gae = 0.0
for t in reversed(range(T)):
if t == T - 1:
next_non_terminal = 1.0 - dones[t, agent_id]
next_value = last_value[agent_id] * next_non_terminal
else:
next_non_terminal = 1.0 - dones[t, agent_id]
next_value = values[t+1, agent_id] * next_non_terminal
# TD误差 δ = r + γV' - V
delta = rewards[t, agent_id] + self.gamma * next_value - values[t, agent_id]
# GAE递推
gae = delta + self.gamma * self.lam * (1.0 - dones[t, agent_id]) * gae
advantages[t, agent_id] = gae
returns[t, agent_id] = gae + values[t, agent_id]
# 优势归一化(每个智能体独立)
for agent_id in range(n_agents):
adv = advantages[:, agent_id]
advantages[:, agent_id] = (adv - np.mean(adv)) / (np.std(adv) + 1e-8)
return advantages, returns
print("=" * 50)
print("MAPPO核心组件就绪")
print("=" * 50)
print("关键特性:")
print("1. 参数共享 → 一套AC网络服务所有同质智能体")
print("2. 集中Critic → 使用全局状态估计更准确的价值")
print("3. GAE独立计算 → 每个智能体的优势按自己的轨迹计算")
print("4. PPO-Clip → 单智能体PPO的clip机制原封不动移植")
"""
MAPPO vs IPPO 对比实验(简化的网格世界)
展示参数共享和集中Critic对多智能体协同的影响
"""
import numpy as np
# ===== 简单协作环境:需要配合才能得高分的网格世界 =====
class CooperationGridWorld:
"""两个智能体需要协同覆盖不同区域"""
def __init__(self):
self.size = 4
self.n_agents = 2
self.max_steps = 20
self.reset()
def reset(self):
# 两个目标位置
self.targets = [np.array([0, 3]), np.array([3, 0])]
# 智能体初始位置
self.positions = [np.array([0, 0]), np.array([3, 3])]
self.steps = 0
self.targets_covered = set()
return self.get_obs()
def get_obs(self):
"""每个智能体的局部观测"""
obs = []
for i in range(self.n_agents):
pos = self.positions[i]
# 观测:自己的位置 + 两个目标的相对位置
o = np.concatenate([
pos / self.size, # 归一化位置
(self.targets[0] - pos) / self.size, # 目标0相对位置
(self.targets[1] - pos) / self.size, # 目标1相对位置
])
obs.append(o)
return np.array(obs)
def step(self, actions):
"""执行动作并计算奖励"""
# 移动(0:上 1:下 2:左 3:右)
deltas = np.array([[-1,0], [1,0], [0,-1], [0,1]])
for i in range(self.n_agents):
new_pos = self.positions[i] + deltas[actions[i]]
new_pos = np.clip(new_pos, 0, self.size - 1)
# 避免碰撞
other = 1 - i
if not np.array_equal(new_pos, self.positions[other]):
self.positions[i] = new_pos
# 检查覆盖
reward = 0.0
for i in range(self.n_agents):
for t_idx, target in enumerate(self.targets):
if np.array_equal(self.positions[i], target):
if t_idx not in self.targets_covered:
self.targets_covered.add(t_idx)
reward += 10.0 # 首次覆盖目标给大奖励
self.steps += 1
reward += -0.1 * self.n_agents # 时间惩罚
done = len(self.targets_covered) == 2 or self.steps >= self.max_steps
# 团队奖励(学习协作的关键)
# IPPO:每个智能体得到团队奖励(缺乏信用分配)
# MAPPO:由集中Critic红利分配(隐式信用分配)
rewards = np.full(self.n_agents, reward / self.n_agents)
return self.get_obs(), rewards, done, {}
# ===== 模拟IPPO和MAPPO的关键差异 =====
env = CooperationGridWorld()
obs = env.reset()
print("=== IPPO vs MAPPO 对比 ===\n")
print(f"智能体数量: {env.n_agents}")
print(f"局部观测维度: {obs.shape[1]}")
print(f"目标位置: {env.targets[0]} 和 {env.targets[1]}")
print(f"\nIPPO (Independent PPO):")
print(" - Critic只看局部观测 → V(o_i)")
print(" - 无法判断「我的动作对团队目标的贡献」")
print(" - 两个智能体容易「撞车」——都跑向同一个目标")
print(" - 复杂协作任务常失败")
print(f"\nMAPPO (Multi-Agent PPO):")
print(" - Critic看全局状态 → V(s, o_1, o_2)")
print(" - 能判断「目标A已被智能体1覆盖,智能体0应该去目标B」")
print(" - 参数共享降低参数量,加速学习")
print(" - SMAC等多种基准上接近或超越QMIX")
# 展示参数共享的优势
n_params_per_agent = 6 * 4 + 4 + 6 + 1 # 简化的参数量估算
print(f"\n参数对比(假设6维观测、4动作、6维全局状态):")
print(f" IPPO (独立参数): {n_params_per_agent * env.n_agents} 个参数")
print(f" MAPPO (参数共享): {n_params_per_agent} 个参数(共享Actor+Critic)")
print(f" 参数共享节省了 {(1 - 1/env.n_agents) * 100:.0f}% 的参数量!")概念关系图谱
| 概念 | 与MAPPO的关系 | 说明 |
|---|---|---|
| PPO | 算法基础 | MAPPO是PPO的多智能体扩展,损失函数和clip机制完全继承 |
| CTDE | 设计范式 | MAPPO是CTDE的一个实例——集中Critic+分散Actor |
| 参数共享 | 核心技巧 | 同质智能体共享AC网络参数,极大降低复杂度和提升效率 |
| 集中价值函数 | 关键创新 | 用全局信息估计V值,提供稳定准确的训练信号 |
| IPPO | 基准对比 | 独立PPO,MAPPO通过集中Critic+参数共享超越它 |
| GAE | 优势估计 | 多智能体GAE对每个智能体独立计算 |
| QMIX | 竞争对手 | 值分解类方法,MAPPO以更简单的设计达到了相近效果 |
| SMAC | 基准环境 | 星际争霸II多智能体挑战赛,MAPPO在此显示出强大能力 |
重点答疑
大白话 MAPPO的成功告诉我们一个道理:在你想用复杂方法(值分解、反事实基线、通信协议)之前,先试试把简单方法正确地用到多智能体场景。好的工程实践(参数共享、GAE、clip)往往比精巧的算法设计更重要。
💡 核心要点:MAPPO的三大杀手锏——(1)PPO的稳定clip机制防止训练崩溃;(2)集中Critic提供全局视角攻克非平稳性;(3)参数共享实现高效学习和泛化。三者合一,简单但强大。
- undefined
章节单词汇总
| 英文 | 音标 | 术语 | 释义 |
|---|---|---|---|
| MAPPO | /mæpəʊ/ | 多智能体近端策略优化 | PPO的多智能体扩展,集中Critic+参数共享 |
| IPPO | /aɪ piː piː əʊ/ | 独立PPO | 每个智能体独立运行PPO,不共享信息或参数 |
| Parameter Sharing | /pəˈræmɪtə ˈʃeərɪŋ/ | 参数共享 | 多个同质智能体使用相同的神经网络参数 |
| Centralized Critic | /ˈsentrəlaɪzd ˈkrɪtɪk/ | 集中评价器 | 使用全局状态或联合观测训练的价值函数 |
| Homogeneous | /ˌhəʊməʊˈdʒiːniəs/ | 同质的 | 智能体具有相同的功能和动作空间 |
| Heterogeneous | /ˌhetərəˈdʒiːniəs/ | 异质的 | 智能体具有不同的功能或动作空间 |
| Rollout | /ˈrəʊlaʊt/ | 轨迹采样 | 将策略在环境中执行以收集经验数据 |
| TD Error | /tiː diː ˈerə/ | 时序差分误差 | 当前估计值与TD目标之间的差异 |
| Global State | /ˈɡləʊbəl steɪt/ | 全局状态 | 包含所有智能体完整信息的环境状态 |
| Advantage Normalization | /ədˈvæntɪdʒ ˌnɔːməlaɪˈzeɪʃən/ | 优势归一化 | 对每个智能体的优势值进行标准化 |
面试练习
- undefined
- undefined