PPO算法

一句话概述

MAPPO(Multi-Agent PPO)是PPO算法在多智能体场景的自然推广,通过参数共享和集中价值函数实现了令人瞩目的多智能体协同效果。在星际争霸II多智能体挑战赛(SMAC)中,MAPPO在没有值分解等专门设计的情况下,简单且稳定地匹敌甚至超越QMIX等专门的MARL算法。

教学与演示

什么是MAPPO

MAPPO全称Multi-Agent Proximal Policy Optimization,是单智能体PPO在多智能体场景的直接推广。它的核心思想非常简单:每个智能体运行一个PPO实例,但价值函数(Critic)可以选择使用全局信息。

MAPPO的简单性让人惊讶——不像QMIX需要精心设计的混合网络和单调性约束,不像COMA需要昂贵的反事实基线计算,MAPPO就是标准PPO加上「集中价值函数」。

MAPPO目标函数\(\mathcal{L}^{MAPPO}(\theta) = \mathbb{E} \left[ \min(r_t(\theta) A_t, \text{clip}(r_t(\theta), 1-\varepsilon, 1+\varepsilon) A_t) \right]\)

其中优势A_t由集中价值函数估计:V(s, o_1, ..., o_N),s是全局状态。这与单智能体PPO的唯一区别是Critic的输入维度变大了。

大白话 MAPPO的成功有点「反直觉」——在大家都觉得多智能体需要复杂方法(如值分解、反事实基线)的时候,直接把PPO搬过来、换个大一点的Critic输入,居然效果拔群。有时候最简单的方法反而是最好的。

IPPO与MAPPO的对比

在深入MAPPO之前,需要理解IPPO(Independent PPO)——每个智能体独立运行PPO,既不共享参数,也不共享Critic信息。

特性IPPOMAPPO
Critic输入仅局部观测全局状态或联合观测
参数共享不共享同质智能体共享
非平稳处理无专门处理集中Critic减轻非平稳
信用分配隐式(通过个体奖励)隐式(通过集中V函数)
实现复杂度最低低(比QMIX简单)

IPPO在简单任务中可以工作,但在需要协调的复杂任务(如星际争霸微操)中往往失败——因为它没有机制应对非平稳性。

MAPPO通过两个关键设计解决了IPPO的问题:

    undefined
大白话 IPPO就像五个互不交流的球员各踢各的球,MAPPO则给球场装了一套全局监控系统——虽然球员还是只能看到自己周围的画面,但教练(Critic)能从上帝视角告诉他们:「虽然你觉得这球传得好,但从全局看其实传给左边更好。」

MAPPO的训练流程

MAPPO的训练分为三大阶段:

阶段一:数据收集

    undefined

阶段二:优势计算

    undefined
MAPPO的GAE计算\(\hat{A}_t = \sum_{l=0}^{T-t-1} (\gamma\lambda)^l \delta_{t+l}, \quad \delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)\)

阶段三:参数更新

    undefined
大白话 MAPPO的训练流程和单智能体PPO几乎一样——收集→GAE优势→多轮更新。唯一区别就是多收集了几个智能体的数据、Critic看的多了一点。这种「少即是多」的设计哲学让人惊叹。

集中价值函数的设计选择

MAPPO中集中Critic的设计有三种常见选择:

1. 全局状态Critic:V(s),其中s是环境的全局状态(如所有单位的位置、血量等)。信息最完整,但状态空间大。

2. 拼接观测Critic:V([o_1, o_2, ..., o_N]),将所有智能体的局部观测拼接。更灵活,不依赖环境提供全局状态。

3. 局部观测Critic:V(o_i),即不使用全局信息(退化为IPPO)。

实验表明,使用全局状态通常能得到最好的训练效果,但拼接观测在无法获得全局状态的场景中是不错的替代方案。

大白话 集中Critic选哪种?有全局状态优先用(信息最全),没有就把所有人的眼睛拼在一起(全联盟观测)。实在不行才用局部观测退化成IPPO。

什么用

MAPPO的实际价值:

    undefined
大白话 如果你的多智能体任务不需要专门的信用分配机制,先试试MAPPO。实现成本极低,效果往往出人意料地好。

哪些坑

坑点原因解决方案
参数共享不适合异质智能体不同功能的智能体(如射手/坦克)行为差异大仅对同质智能体共享,异质智能体用独立网络或ID编码
集中Critic维度爆炸N个智能体×观测维度可能很大注意网络设计、使用embedding降维
训练时全局信息泄漏到Actor若Actor也能访问全局状态则违反CTDE严格确保Actor只接收局部观测
经验池数据量大N智能体×T步×多条轨迹mini-batch适当增大、distributed data collection
优势值需按智能体分别归一化不同智能体的奖励尺度可能不同对每个智能体的优势在不同时间步上独立归一化
序列长度不均衡不同episode长度差异大固定horizon切分、或使用RNN处理变长序列

核心代码演示

"""
MAPPO (Multi-Agent PPO) 核心实现
展示了参数共享、集中Critic和多智能体数据收集
"""
import numpy as np

# ===== 参数共享的Actor-Critic网络 =====
class SharedActorCritic:
    """
    MAPPO的参数共享网络
    同一套参数服务于所有同质智能体
    """
    def __init__(self, obs_dim, n_actions, use_global_critic, global_dim=0):
        self.obs_dim = obs_dim
        self.n_actions = n_actions
        self.use_global_critic = use_global_critic
        
        # Actor: 局部观测 → 动作概率
        self.actor_w = np.random.randn(obs_dim, n_actions) * 0.1
        self.actor_b = np.zeros(n_actions)
        
        # Critic: 局部/全局观测 → 状态值
        critic_input_dim = global_dim if use_global_critic else obs_dim
        self.critic_w = np.random.randn(critic_input_dim, 1) * 0.1
        self.critic_b = np.zeros(1)
    
    def get_action_and_value(self, obs, global_state=None, action=None):
        """
        获取动作概率和价值估计
        同时返回log_probs和entropy(用于PPO损失计算)
        
        参数:
        - obs: 局部观测 [obs_dim]
        - global_state: 全局状态 [global_dim](集中Critic用)
        - action: 如果给定,返回该动作的log_prob
        """
        # Actor前向传播
        logits = obs @ self.actor_w + self.actor_b
        # 数值稳定的softmax
        logits_shifted = logits - np.max(logits)
        probs = np.exp(logits_shifted) / np.sum(np.exp(logits_shifted))
        
        # 如果没有指定动作,采样一个
        if action is None:
            action = np.random.choice(self.n_actions, p=probs)
        
        log_prob = np.log(probs[action] + 1e-8)
        entropy = -np.sum(probs * np.log(probs + 1e-8))
        
        # Critic前向传播
        critic_input = global_state if self.use_global_critic else obs
        value = (critic_input @ self.critic_w + self.critic_b).item()
        
        return action, log_prob, entropy, value, probs

# ===== MAPPO训练器(简化版)=====
class MAPPOTrainer:
    """MAPPO训练器:协调多智能体的数据收集和参数更新"""
    def __init__(self, n_agents, obs_dim, n_actions, global_dim=0,
                 use_shared_params=True, use_global_critic=True):
        self.n_agents = n_agents
        self.obs_dim = obs_dim
        self.n_actions = n_actions
        self.use_shared_params = use_shared_params
        
        if use_shared_params:
            # 参数共享:所有智能体用同一套AC网络
            self.ac = SharedActorCritic(
                obs_dim, n_actions, use_global_critic, global_dim
            )
        else:
            # 独立参数:每个智能体有自己的AC网络
            self.acs = [
                SharedActorCritic(obs_dim, n_actions, use_global_critic, global_dim)
                for _ in range(n_agents)
            ]
        
        self.gamma = 0.99  # 折扣因子
        self.lam = 0.95    # GAE λ
        self.epsilon = 0.2  # PPO clip参数
    
    def collect_rollout(self, env, steps_per_iter=64):
        """
        收集多智能体轨迹数据
        
        返回一个batch包含所有智能体的经验
        """
        batch = {
            'obs': [], 'actions': [], 'log_probs': [],
            'values': [], 'rewards': [], 'dones': [],
            'global_states': []
        }
        
        obs = env.reset()  # [n_agents, obs_dim]
        global_state = env.get_global_state() if hasattr(env, 'get_global_state') else None
        
        for step in range(steps_per_iter):
            actions = np.zeros(self.n_agents, dtype=int)
            log_probs = np.zeros(self.n_agents)
            values = np.zeros(self.n_agents)
            
            # 多智能体同时做决策
            for agent_id in range(self.n_agents):
                ac = self.ac if self.use_shared_params else self.acs[agent_id]
                a, lp, _, v, _ = ac.get_action_and_value(obs[agent_id], global_state)
                actions[agent_id] = a
                log_probs[agent_id] = lp
                values[agent_id] = v
            
            # 环境步进
            next_obs, rewards, dones, info = env.step(actions)
            next_global_state = env.get_global_state() if hasattr(env, 'get_global_state') else None
            
            # 存储经验
            batch['obs'].append(obs.copy())
            batch['actions'].append(actions.copy())
            batch['log_probs'].append(log_probs.copy())
            batch['values'].append(values.copy())
            batch['rewards'].append(rewards.copy())
            batch['dones'].append(np.full(self.n_agents, float(dones)))
            if global_state is not None:
                batch['global_states'].append(global_state.copy())
            
            obs = next_obs
            global_state = next_global_state
            if dones:
                obs = env.reset()
        
        # 转换为numpy数组
        for k in batch:
            if len(batch[k]) > 0:
                batch[k] = np.array(batch[k])
        
        return batch
    
    def compute_gae(self, rewards, values, dones, last_value):
        """
        多智能体GAE计算
        对每个智能体独立计算GAE优势
        """
        n_agents = rewards.shape[1]
        T = len(rewards)
        advantages = np.zeros_like(rewards)
        returns = np.zeros_like(rewards)
        
        for agent_id in range(n_agents):
            gae = 0.0
            for t in reversed(range(T)):
                if t == T - 1:
                    next_non_terminal = 1.0 - dones[t, agent_id]
                    next_value = last_value[agent_id] * next_non_terminal
                else:
                    next_non_terminal = 1.0 - dones[t, agent_id]
                    next_value = values[t+1, agent_id] * next_non_terminal
                
                # TD误差 δ = r + γV' - V
                delta = rewards[t, agent_id] + self.gamma * next_value - values[t, agent_id]
                # GAE递推
                gae = delta + self.gamma * self.lam * (1.0 - dones[t, agent_id]) * gae
                advantages[t, agent_id] = gae
                returns[t, agent_id] = gae + values[t, agent_id]
        
        # 优势归一化(每个智能体独立)
        for agent_id in range(n_agents):
            adv = advantages[:, agent_id]
            advantages[:, agent_id] = (adv - np.mean(adv)) / (np.std(adv) + 1e-8)
        
        return advantages, returns


print("=" * 50)
print("MAPPO核心组件就绪")
print("=" * 50)
print("关键特性:")
print("1. 参数共享 → 一套AC网络服务所有同质智能体")
print("2. 集中Critic → 使用全局状态估计更准确的价值")
print("3. GAE独立计算 → 每个智能体的优势按自己的轨迹计算")
print("4. PPO-Clip → 单智能体PPO的clip机制原封不动移植")
"""
MAPPO vs IPPO 对比实验(简化的网格世界)
展示参数共享和集中Critic对多智能体协同的影响
"""
import numpy as np

# ===== 简单协作环境:需要配合才能得高分的网格世界 =====
class CooperationGridWorld:
    """两个智能体需要协同覆盖不同区域"""
    def __init__(self):
        self.size = 4
        self.n_agents = 2
        self.max_steps = 20
        self.reset()
    
    def reset(self):
        # 两个目标位置
        self.targets = [np.array([0, 3]), np.array([3, 0])]
        # 智能体初始位置
        self.positions = [np.array([0, 0]), np.array([3, 3])]
        self.steps = 0
        self.targets_covered = set()
        return self.get_obs()
    
    def get_obs(self):
        """每个智能体的局部观测"""
        obs = []
        for i in range(self.n_agents):
            pos = self.positions[i]
            # 观测:自己的位置 + 两个目标的相对位置
            o = np.concatenate([
                pos / self.size,  # 归一化位置
                (self.targets[0] - pos) / self.size,  # 目标0相对位置
                (self.targets[1] - pos) / self.size,  # 目标1相对位置
            ])
            obs.append(o)
        return np.array(obs)
    
    def step(self, actions):
        """执行动作并计算奖励"""
        # 移动(0:上 1:下 2:左 3:右)
        deltas = np.array([[-1,0], [1,0], [0,-1], [0,1]])
        for i in range(self.n_agents):
            new_pos = self.positions[i] + deltas[actions[i]]
            new_pos = np.clip(new_pos, 0, self.size - 1)
            # 避免碰撞
            other = 1 - i
            if not np.array_equal(new_pos, self.positions[other]):
                self.positions[i] = new_pos
        
        # 检查覆盖
        reward = 0.0
        for i in range(self.n_agents):
            for t_idx, target in enumerate(self.targets):
                if np.array_equal(self.positions[i], target):
                    if t_idx not in self.targets_covered:
                        self.targets_covered.add(t_idx)
                        reward += 10.0  # 首次覆盖目标给大奖励
        
        self.steps += 1
        reward += -0.1 * self.n_agents  # 时间惩罚
        done = len(self.targets_covered) == 2 or self.steps >= self.max_steps
        
        # 团队奖励(学习协作的关键)
        # IPPO:每个智能体得到团队奖励(缺乏信用分配)
        # MAPPO:由集中Critic红利分配(隐式信用分配)
        rewards = np.full(self.n_agents, reward / self.n_agents)
        
        return self.get_obs(), rewards, done, {}

# ===== 模拟IPPO和MAPPO的关键差异 =====
env = CooperationGridWorld()
obs = env.reset()
print("=== IPPO vs MAPPO 对比 ===\n")
print(f"智能体数量: {env.n_agents}")
print(f"局部观测维度: {obs.shape[1]}")
print(f"目标位置: {env.targets[0]} 和 {env.targets[1]}")

print(f"\nIPPO (Independent PPO):")
print("  - Critic只看局部观测 → V(o_i)")
print("  - 无法判断「我的动作对团队目标的贡献」")
print("  - 两个智能体容易「撞车」——都跑向同一个目标")
print("  - 复杂协作任务常失败")

print(f"\nMAPPO (Multi-Agent PPO):")
print("  - Critic看全局状态 → V(s, o_1, o_2)")
print("  - 能判断「目标A已被智能体1覆盖,智能体0应该去目标B」")
print("  - 参数共享降低参数量,加速学习")
print("  - SMAC等多种基准上接近或超越QMIX")

# 展示参数共享的优势
n_params_per_agent = 6 * 4 + 4 + 6 + 1  # 简化的参数量估算
print(f"\n参数对比(假设6维观测、4动作、6维全局状态):")
print(f"  IPPO (独立参数): {n_params_per_agent * env.n_agents} 个参数")
print(f"  MAPPO (参数共享): {n_params_per_agent} 个参数(共享Actor+Critic)")
print(f"  参数共享节省了 {(1 - 1/env.n_agents) * 100:.0f}% 的参数量!")

概念关系图谱

概念与MAPPO的关系说明
PPO算法基础MAPPO是PPO的多智能体扩展,损失函数和clip机制完全继承
CTDE设计范式MAPPO是CTDE的一个实例——集中Critic+分散Actor
参数共享核心技巧同质智能体共享AC网络参数,极大降低复杂度和提升效率
集中价值函数关键创新用全局信息估计V值,提供稳定准确的训练信号
IPPO基准对比独立PPO,MAPPO通过集中Critic+参数共享超越它
GAE优势估计多智能体GAE对每个智能体独立计算
QMIX竞争对手值分解类方法,MAPPO以更简单的设计达到了相近效果
SMAC基准环境星际争霸II多智能体挑战赛,MAPPO在此显示出强大能力

重点答疑

大白话 MAPPO的成功告诉我们一个道理:在你想用复杂方法(值分解、反事实基线、通信协议)之前,先试试把简单方法正确地用到多智能体场景。好的工程实践(参数共享、GAE、clip)往往比精巧的算法设计更重要。
💡 核心要点:MAPPO的三大杀手锏——(1)PPO的稳定clip机制防止训练崩溃;(2)集中Critic提供全局视角攻克非平稳性;(3)参数共享实现高效学习和泛化。三者合一,简单但强大。
    undefined

章节单词汇总

英文音标术语释义
MAPPO/mæpəʊ/多智能体近端策略优化PPO的多智能体扩展,集中Critic+参数共享
IPPO/aɪ piː piː əʊ/独立PPO每个智能体独立运行PPO,不共享信息或参数
Parameter Sharing/pəˈræmɪtə ˈʃeərɪŋ/参数共享多个同质智能体使用相同的神经网络参数
Centralized Critic/ˈsentrəlaɪzd ˈkrɪtɪk/集中评价器使用全局状态或联合观测训练的价值函数
Homogeneous/ˌhəʊməʊˈdʒiːniəs/同质的智能体具有相同的功能和动作空间
Heterogeneous/ˌhetərəˈdʒiːniəs/异质的智能体具有不同的功能或动作空间
Rollout/ˈrəʊlaʊt/轨迹采样将策略在环境中执行以收集经验数据
TD Error/tiː diː ˈerə/时序差分误差当前估计值与TD目标之间的差异
Global State/ˈɡləʊbəl steɪt/全局状态包含所有智能体完整信息的环境状态
Advantage Normalization/ədˈvæntɪdʒ ˌnɔːməlaɪˈzeɪʃən/优势归一化对每个智能体的优势值进行标准化