学习通信

一句话概述

多智能体通信研究智能体之间如何通过发送和接收消息来协同决策。与预设通信协议不同,学习通信让智能体在训练中自动「发明」自己的通信语言——这带来了通信效率、带宽限制、说话时机等新挑战。DIAL和RIAL是早期经典算法,现代方法引入了注意力机制和通信图谱来高效管理信息流。

教学与演示

什么是多智能体通信

人类通过语言相互沟通协作——消防员用喊话协调救援方向,篮球队员用眼神和手势传递战术意图。多智能体通信研究的就是:RL智能体之间应该说什么、什么时候说、怎么说?

通信的内容是「消息」——一个由发送方智能体产生的向量,经过一个通信信道传送给接收方智能体。这个消息可以是连续的(向量值),也可以被量化(离散符号)。

大白话 想象两个人在一个漆黑的大迷宫里,各自只能看到身边一小块地方。他们通过喊话来协调:「我在左边第三个拐角!」「我看到出口了,朝你的方向走!」——这就是多智能体通信的本质。

为什么需要学习通信

为什么不直接用人类设好的通信协议?

第一,在复杂环境中,人类不知道什么信息是最有用的——该报坐标还是报偏转角?该用高精度向量还是压缩成几个离散符号?

第二,通信是有代价的——带宽有限、延迟存在、发送消息消耗能量。智能体需要学会在「通信开销」和「信息收益」之间权衡。

第三,最妙的:智能体可以发明出人类意想不到的通信方式。比如在某个实验中,两个智能体自发学会了用消息的「频率」而不是「内容」来编码信息——人完全看不懂,但机器之间「心有灵犀」。

大白话 不要什么都替AI想好。让智能体自己学会什么时候说话、说什么、怎么理解别人说的话——它们可能会发明出比人类设计更高效的「火星语」。

通信的基本架构

多智能体通信系统包含三个关键组件:

1. 消息生成(发送方):智能体根据自己的局部观测和收到的历史消息,生成要发送的消息:

消息生成函数\(m_i^t = f_{\text{send}}(o_i^t, h_i^t, m_{-i}^{t-1})\)

2. 消息聚合(接收方):智能体汇总自己收到的所有消息:

消息聚合函数\(c_i^t = \text{Aggregate}(\{m_j^t\}_{j \in \mathcal{N}(i)})\)

3. 决策整合:将聚合消息与自身观测结合,做出动作决策:

基于通信的动作选择\(a_i^t \sim \pi_i(\cdot | o_i^t, c_i^t)\)
大白话 通信流程就像群聊——你看到什么就说出来(消息生成),看到别人的消息后综合判断(消息聚合),最后结合自己的观察和收到的信息做决定(决策整合)。

DIAL与RIAL算法

RIAL(Reinforced Inter-Agent Learning) 是早期的学习通信方法。它的思路是:通信动作和物理动作一样,由Q网络选择。发送什么消息被当作一种「元动作」——除了上下左右这些物理动作之外,智能体还可以选择「发送消息1」「发送消息2」等。

DIAL(Differentiable Inter-Agent Learning) 则更进一步:它让消息「可微分」地通过信道。在集中训练阶段,一个智能体的消息输出直接作为另一个智能体的网络输入,梯度通过消息通道反向传播。

DIAL的梯度传播\(\frac{\partial Q_2}{\partial \theta_1} = \frac{\partial Q_2}{\partial m_1} \cdot \frac{\partial m_1}{\partial \theta_1}\)

这意味着智能体1可以「看到」它的消息如何影响了智能体2的Q值,从而学习发送更有用的消息。这就像老师告诉你:「你说的这句话,让队友理解错了,下次换种说法吧。」

大白话 DIAL的精髓是建立了消息通道上的「反馈机制」——接收方不理解的地方,梯度会反向传播告诉发送方「你说得不对,重来」。这种反馈在真实通信中不存在(你说话时不知道对方脑子里是怎么理解的),但训练时我们可以开挂。

学习型通信协议

当让智能体自由通信时,它们会自发演化出通信协议:

    undefined

涌现语言的研究发现:智能体发明的语言类似人类语言的某些特征——有组合性(简单元素组合成复杂含义)、有模因性(从简单到复杂逐渐演变)。

大白话 你给两个智能体一个任务,不给它们规定「该说什么」。它们很可能发明出一种你完全不懂的「机器黑话」。更神奇的是,这种语言会自然演化出类似人类语言的「语法」结构。

什么用

    undefined
大白话 通信让「我知道的你不知道」变成「你知道的我也知道」。在部分可观测环境中,这是从各自为政变成协同作战的关键飞跃。

哪些坑

坑点原因解决方案
通信爆炸所有智能体给所有人发消息,消息量O(n²)通信图谱剪枝、Top-K筛选邻居
消息无意义无梯度或稀疏奖励导致通信退化DIAL的跨智能体梯度、辅助通信损失
延迟与异步消息传输有延迟,旧消息可能误导时序编码、消息时间戳、注意力机制
带宽限制连续通信占用太多带宽消息量化、离散通信、变分信息瓶颈
环境过简单不需要通信也能解决,导致通信通道退化设计必须通信才能解决的基准任务
泛化失败训练时的通信协议在测试时失效通信正则化、噪声注入增强通信鲁棒性

核心代码演示

"""
多智能体通信基础实现 - 消息生成、聚合和决策
"""
import numpy as np

class CommunicatingAgent:
    """
    支持通信的多智能体
    沟通流程:观测→消息生成→消息聚合→决策
    """
    def __init__(self, agent_id, obs_dim, msg_dim, n_actions,
                 n_agents):
        self.id = agent_id
        self.obs_dim = obs_dim
        self.msg_dim = msg_dim
        self.n_actions = n_actions
        self.n_agents = n_agents
        
        # 消息生成网络:观测 → 消息
        self.msg_weights = np.random.randn(obs_dim, msg_dim) * 0.1
        # 决策网络:观测 + 聚合消息 → 动作概率
        self.action_weights = np.random.randn(obs_dim + msg_dim, n_actions) * 0.1
        
        # 存储收到的消息
        self.received_msgs = None
    
    def generate_message(self, obs):
        """根据自身观测生成消息(发送方)"""
        msg = obs @ self.msg_weights
        # 可选:限制消息范数(模拟带宽限制)
        msg = msg / (np.linalg.norm(msg) + 1e-8)  # 归一化
        return msg
    
    def receive_messages(self, all_messages):
        """接收所有智能体的消息(接收方)"""
        # 排除自己的消息
        other_msgs = [m for i, m in enumerate(all_messages) if i != self.id]
        if len(other_msgs) == 0:
            return np.zeros(self.msg_dim)
        
        # 简单聚合:取平均
        self.received_msgs = np.mean(other_msgs, axis=0)
        return self.received_msgs
    
    def decide(self, obs):
        """综合观测和消息做决策"""
        # 拼接观测和聚合消息
        joint_input = np.concatenate([obs, self.received_msgs])
        # 计算动作概率
        logits = joint_input @ self.action_weights
        probs = np.exp(logits - np.max(logits))
        probs = probs / np.sum(probs)
        return probs

# ===== 简单通信环境:两个智能体需要共享信息 =====
class CommunicationEnv:
    """
    两个智能体,每个只能看到目标的一部分信息
    必须通过通信共享信息才能找到最优动作
    """
    def __init__(self):
        self.reset()
    
    def reset(self):
        # 目标特征(只有两个智能体的观测拼接才能完整还原)
        self.target_feature = np.random.randn(4)
        # 智能体0看到的: 前2维,智能体1看到的: 后2维
        self.obs0 = np.concatenate([self.target_feature[:2], np.zeros(2)])
        self.obs1 = np.concatenate([np.zeros(2), self.target_feature[2:]])
        return self.obs0, self.obs1
    
    def step(self, action0, action1):
        # 正确动作 = argmax of target_feature(需要两方信息才能判断)
        correct_action = np.argmax(self.target_feature)
        # 智能体只能看到自己的一半
        r0 = 1.0 if action0 == correct_action else 0.0
        r1 = 1.0 if action1 == correct_action else 0.0
        return r0, r1, True  # 单步episode

# ===== 演示通信流程 =====
n_agents, msg_dim, n_actions = 2, 4, 4
agent_0 = CommunicatingAgent(0, 8, msg_dim, n_actions, n_agents)
agent_1 = CommunicatingAgent(1, 8, msg_dim, n_actions, n_agents)

env = CommunicationEnv()
obs0, obs1 = env.reset()

# 通信步骤
msg0 = agent_0.generate_message(obs0)
msg1 = agent_1.generate_message(obs1)
print(f"智能体0的消息: {msg0[:2]}...")
print(f"智能体1的消息: {msg1[:2]}...")

# 接收消息
all_msgs = [msg0, msg1]
agg0 = agent_0.receive_messages(all_msgs)
agg1 = agent_1.receive_messages(all_msgs)

# 决策
probs0 = agent_0.decide(obs0)
probs1 = agent_1.decide(obs1)
print(f"智能体0动作概率: {probs0}")
print(f"智能体1动作概率: {probs1}")
print("每个智能体通过通信获得了对方视角的信息!")
"""
DIAL (Differentiable Inter-Agent Learning) 的核心:跨智能体梯度传播
一个智能体的消息输出影响另一个智能体的Q值,梯度通过消息通道反向传播
"""
import numpy as np

def dial_demo():
    """
    DIAL的核心机制演示
    智能体1发送消息m1 → 智能体2使用m1做决策 → 
    智能体2的Q值梯度反向传播到智能体1的消息生成网络
    """
    obs_dim, msg_dim, n_actions = 4, 3, 2
    
    # 智能体1:消息生成网络参数
    w_msg = np.random.randn(obs_dim, msg_dim) * 0.1
    
    # 智能体2:决策网络参数
    w_act = np.random.randn(obs_dim + msg_dim, n_actions) * 0.1
    
    # ===== 前向传播 =====
    # 智能体1生成消息
    obs1 = np.random.randn(obs_dim)
    msg1 = obs1 @ w_msg  # 消息 = f(观测)
    
    # 智能体2使用消息做决策
    obs2 = np.random.randn(obs_dim)
    joint_input = np.concatenate([obs2, msg1])
    logits = joint_input @ w_act
    probs = np.exp(logits) / np.sum(np.exp(logits))
    
    # 智能体2的Q值(模拟)
    q2_target = 5.0
    q2_predicted = np.max(logits)
    loss = 0.5 * (q2_predicted - q2_target) ** 2
    
    # ===== 反向传播(DIAL的关键)=====
    # 梯度从智能体2传到智能体1的消息
    dloss_dq2 = q2_predicted - q2_target  # 标量
    
    # ∂loss/∂logits(只对最大logits有梯度,简化)
    dloss_dlogits = np.zeros(n_actions)
    dloss_dlogits[np.argmax(logits)] = dloss_dq2
    
    # ∂loss/∂joint_input = dloss_dlogits @ w_act^T
    dloss_djoint = dloss_dlogits @ w_act.T
    
    # ∂loss/∂msg1 = ∂loss/∂joint_input的后半部分(对应msg1的维度)
    dloss_dmsg1 = dloss_djoint[obs_dim:]  # 后msg_dim维=消息梯度
    
    # ∂loss/∂w_msg = obs1^T @ dloss_dmsg1(智能体1的消息生成网络梯度!)
    dloss_dw_msg = np.outer(obs1, dloss_dmsg1)
    
    print(f"智能体1的消息: {msg1[:2]}...")
    print(f"智能体1的消息网络梯度范数: {np.linalg.norm(dloss_dw_msg):.4f}")
    print()
    print("这正是DIAL的精髓:")
    print("1. 智能体2的Q值「不满意」→ 梯度反向传播")
    print("2. 梯度通过消息通道传回智能体1")
    print("3. 智能体1知道「我说的内容导致队友表现差」")
    print("4. 于是调整消息生成网络,下次发送更有帮助的消息")
    
    return dloss_dw_msg

dial_demo()
print("\n注意:在实际部署(分散执行)阶段,梯度不再反向传播")
print("DIAL只在训练(集中训练)阶段使用跨智能体梯度")
"""
基于注意力的通信聚合 - 选择性关注重要消息
解决「所有消息一视同仁」的问题
"""
import numpy as np

def attention_aggregate(agent_obs, all_messages, attn_weights):
    """
    注意力聚合:根据自身观测,决定更关注哪个智能体的消息
    
    参数:
    - agent_obs: 当前智能体的观测 [obs_dim]
    - all_messages: 所有智能体(包括自己)的消息 [n_agents, msg_dim]
    - attn_weights: 注意力权重矩阵 [obs_dim + msg_dim, 1]
    
    返回:
    - aggregated: 加权聚合后的消息 [msg_dim]
    - attention_scores: 注意力分布 [n_agents]
    """
    n_agents = len(all_messages)
    
    # 计算每个消息的注意力分数
    scores = []
    for i in range(n_agents):
        # 拼接自身观测和对方消息
        query_key = np.concatenate([agent_obs, all_messages[i]])
        score = np.tanh(query_key @ attn_weights).item()
        scores.append(score)
    
    # Softmax归一化
    scores = np.array(scores)
    scores = scores - np.max(scores)  # 数值稳定
    attention_scores = np.exp(scores) / np.sum(np.exp(scores))
    
    # 加权聚合
    aggregated = np.sum(all_messages * attention_scores[:, np.newaxis], axis=0)
    
    return aggregated, attention_scores

# ===== 演示 =====
np.random.seed(42)
n_agents, msg_dim, obs_dim = 4, 3, 5

# 智能体0的观测和收到的消息
agent_obs = np.random.randn(obs_dim)
all_messages = np.random.randn(n_agents, msg_dim)

# 注意力权重
attn_weights = np.random.randn(obs_dim + msg_dim, 1) * 0.1

# 聚合
aggregated, attn_scores = attention_aggregate(agent_obs, all_messages, attn_weights)

print(f"注意力分布: {attn_scores}")
print(f"最关注智能体{np.argmax(attn_scores)}的消息(得分: {attn_scores[np.argmax(attn_scores)]:.2f})")
print("注意力机制让智能体能区分哪些「话」值得听——不是每条消息都同等重要!")

概念关系图谱

概念与学习通信的关系说明
DIAL经典算法利用跨智能体梯度让消息可微分传播,实现通信学习
RIAL经典算法将通信动作嵌入Q-Learning框架,用量化的离散消息
注意力通信现代方法使用注意力机制智能聚合多源消息,选择性地关注相关信息
涌现语言研究现象智能体在无预设语义下自发形成具有组合性的通信协议
信息瓶颈理论工具平衡通信的信息量和压缩率,在带宽有限下优化通信效率
通信图谱结构约束定义谁能和谁通信,剪枝不必要的通信链路
Gumbel-Softmax技术工具让离散消息可微分,实现离散通信的梯度传播
部分可观测应用场景通信的核心价值场景,通过分享信息弥补观测不完整

重点答疑

大白话 多智能体通信最妙的地方在于:你不用规定「该说什么」,只给一个需要协作才能完成的任务和通信通道,让它们自己「发明语言」。训练好的智能体之间可能有你完全看不懂的「黑话」,但它们就是能靠这个协同得特别好。
💡 核心要点:DIAL的核心创新在于让消息「可微分」——通过跨智能体的梯度流动,发消息的智能体能直接「感受到」收消息的智能体是如何被影响的,从而实现端到端通信优化。
    undefined

章节单词汇总

英文音标术语释义
Inter-Agent Communication/ɪnˈtɜː ˈeɪdʒənt kəˌmjuːnɪˈkeɪʃən/智能体间通信多个智能体之间通过消息传递信息协调行为
Differentiable/ˌdɪfəˈrenʃəbəl/可微分的允许梯度通过运算反向传播的性质
Emergent Language/ɪˈmɜːdʒənt ˈlæŋɡwɪdʒ/涌现语言智能体在没有人工监督下自发形成的通信系统
Gumbel-Softmax/ˈɡʌmbəl ˈsɒftmæks/Gumbel-Softmax技巧让离散采样可微分的重参数化技术
Attention Mechanism/əˈtenʃən ˈmekənɪzəm/注意力机制根据相关性对不同信息赋予不同权重的聚合方法
Communication Graph/kəˌmjuːnɪˈkeɪʃən ɡræf/通信图谱定义智能体间允许通信的拓扑结构
Information Bottleneck/ˌɪnfəˈmeɪʃən ˈbɒtəlnek/信息瓶颈在信息压缩和信息保留之间取得平衡的理论框架
Bandwidth/ˈbændwɪdθ/带宽通信信道单位时间能传输的最大信息量
Multi-Agent Coordination/ˈmʌlti ˈeɪdʒənt kəʊˌɔːdɪˈneɪʃən/多智能体协调多个智能体通过信息共享达到协同目标
Discretization/dɪsˌkriːtaɪˈzeɪʃən/离散化将连续消息转化为有限个离散符号的过程