1.简介
随着机器人形态的日益多样化以及机器人控制策略的不断进步,当前的机器人系统已经能够执行固定任务或根据指令执行任务,为构建现实世界中的交互式智能体提供了无限可能。然而这些机器人系统通常专注于单智能体任务,但在许多现实世界的应用场景中往往需要多个具身智能体协同完成任务。这种协同合作对于应对超出单个智能体能力范围的任务以及通过多智能体部署提高任务效率至关重要。
这篇文章提出了一种创新的框架,旨在通过引入组合约束来解决多智能体系统中的协作问题,并自动生成安全、高效的训练数据。文章的核心贡献在于设计了一种能够有效管理多智能体协作的框架——RoboFactory,该框架通过逻辑、空间和时间三个维度的约束,确保智能体在执行任务时的行为既安全又高效。
论文地址:https://arxiv.org/pdf/2503.16408
项目主页:RoboFactory: Exploring Embodied Agent Collaboration with Compositional Constraints
github地址:https://github.com/MARS-EAI/RoboFactory
-
-
2.论文详解
组合约束
约束条件定义了基于现实世界条件的实用且高效的边界。在多机器人协作场景中,更复杂的决策空间需要各种约束条件来确保安全性和有效性。
以下,作者提出了三类常见的约束条件,这些约束条件对于在多具身智能体决策中建模现实世界的边界至关重要。
- 逻辑约束。逻辑约束定义了智能体允许执行的动作和交互规则,重点关注高级逻辑,例如交互对象、接触点和运动方向,而不是操作的时机或顺序。这些约束编码了任务场景中的结构规则,例如交互对象的使用权限(例如,只有特定工具可用于处理某些材料)、接触点限制(例如,智能体必须从指定点抓取对象)以及方向一致性(例如,当多个智能体搬运一个对象时,它们施加的力必须保持一致)。通过形式化交互关系——例如动作兼容性、交互约束和空间协调要求——逻辑约束确保智能体遵循连贯的操作程序。
- 空间约束。空间约束定义了智能体可以在哪里操作以及物理交互的结构。这些约束包括几何边界(例如,任何智能体不得进入活动机械周围1米范围)、协作工作空间划分(例如,将建筑工地划分为互斥区域以防止碰撞)以及特定任务的放置要求(例如,组件必须放置在其目标坐标2厘米范围内才能进行有效装配)。它们还管理适应性空间行为,例如动态绕过新检测到的障碍物或调整夹具方向以适应狭窄开口。通过将物理可行性与时间和逻辑因素分离,这些约束确保智能体在安全且结构连贯的环境中运行。
- 时间约束。时间约束规定了动作必须何时以及以何种顺序执行,涉及同步、截止时间和顺序依赖性。这些约束确保智能体的行为符合时间敏感性要求,例如强制执行分阶段工作流程(例如,智能体C必须在智能体D完成焊接后等待5秒才能开始检查)或协调具有严格时间窗口的并行动作(例如,两个智能体必须在0.5秒的容差内同时抬起一个物体)。它们还管理动态调整,例如根据环境延迟延长任务持续时间或在先前步骤超时时重新安排动作。与定义决策有效性的逻辑约束不同,时间约束严格关注时间可行性——确保动作既不会过早也不会过晚发生,以维持安全性和效率。
组合约束。多具身智能体的协作依赖于逻辑、时间和空间约束的整合。逻辑约束定义了交互协议和共享目标,时间约束将动作与任务依赖性同步,空间约束编码了几何和语义边界。这些约束共同平衡了去中心化的自主性与全局一致性,实现了冲突解决、资源优化和适应性。这一统一框架确保局部决策汇聚成稳健、高效且可执行的协作行为。
-
RoboFactory框架
RoboBrain负责任务分解、约束生成和轨迹规划,而RoboChecker则确保生成的轨迹符合安全和效率标准。
RoboBrain 是一个复杂任务处理系统,主要负责将全局任务描述分解为可执行的子目标。例如,它可以将“拿起茶壶倒水”这一复杂任务分解为多个步骤,如“靠近茶壶”“提起茶壶”“移动茶壶使壶嘴对准杯口”以及“倾斜茶壶倒水”。同时,RoboBrain 还会根据任务逻辑和环境信息生成逻辑、空间和时间约束,以确保智能体的行为符合任务要求。此外,它通过调用预定义的动作原语,生成每个智能体的无约束轨迹序列,以实现相应的子目标。RoboBrain 还利用大型语言模型(如 GPT-4o)进行全局感知和决策,构建动态时空感知、规划指导和反馈纠错机制,从而实现对整个任务的高效管理和优化。
与此同时,RoboChecker 是一个与 RoboBrain 协同工作的系统,主要负责将 RoboBrain 生成的文本形式的约束转换为具体的约束接口,使其能够与现实世界进行有效交互。在智能体执行轨迹时,RoboChecker 会检查其是否违反了任何逻辑、空间或时间约束。一旦检测到违规行为,它会立即停止轨迹执行,并将违规原因反馈给 RoboBrain,以便重新规划。此外,RoboChecker 还会持续监测智能体的行为,确保其符合预设的安全和协作标准,从而为整个系统的稳定运行提供保障。
-
流程
这个框架生成数据的流程如下:
- 这一流程首先从输入数据准备开始,包括全局任务描述、RGB观察、先前子目标以及约束违规反馈等关键信息。这些输入数据为后续的任务分解和决策提供了基础。
- 随后,RoboBrain作为核心决策模块,根据任务描述、先前子目标和约束违规反馈,生成每个智能体的下一个子目标。这些子目标是完成全局任务的具体步骤。同时,RoboBrain还会生成与子目标相关的逻辑、空间和时间约束,这些约束确保智能体的行为符合任务逻辑、避免碰撞并优化协作效率。基于生成的子目标,RoboBrain调用预定义的动作原语,生成每个智能体的无约束轨迹序列,这些轨迹是智能体完成子目标的具体动作序列。
- 紧接着,约束接口生成环节将RoboBrain生成的文本形式的约束转换为具体的约束接口。逻辑约束接口确保智能体的动作符合任务逻辑;空间约束接口通过深度估计和3D占用表示,确保智能体在物理空间中的安全操作;时间约束接口通过动态占用建模,确保智能体的动作在时间上协调一致。
- 进入RoboChecker验证阶段,该模块根据生成的约束接口和当前多智能体状态,评估每个智能体的轨迹是否违反任何约束。如果检测到违规,RoboChecker立即停止轨迹执行,并将违规原因反馈给RoboBrain,以便重新规划。如果没有检测到违规,则标记子目标为完成,并继续执行下一个子目标。
作者给出了一个例子:
约束接口
约束接口的转换是将RoboBrain生成的文本形式的约束转换为具体的、能够与物理世界交互的表示形式。这一过程确保了约束能够直接应用于智能体的行为规划和轨迹验证中。
逻辑约束(Logical Constraints)的转换:逻辑约束定义了智能体的行为规则,例如交互对象、接触点和运动方向等。这些约束需要转换为具体的交互逻辑,以便在物理世界中执行。
-
交互位置(Interaction Position):为每个3D资产标注交互位置。不同的位置代表不同的交互逻辑。例如,抓取相机和使用相机有不同的交互位置。在RoboChecker中,会检查智能体的抓取位置是否符合预定义的交互位置。
-
交互方向(Interaction Direction):为每个3D资产标记交互方向。不同的交互行为遵循不同的方向逻辑。例如,按下相机快门需要机械臂的夹持器面向快门方向。在RoboChecker中,会检查智能体的运动方向是否符合预定义的交互方向。
空间约束(Spatial Constraints)的转换:空间约束定义了智能体可以在哪里操作以及物理交互的结构。这些约束需要转换为具体的3D占用表示,以便检测碰撞和空间冲突。
-
3D占用表示(3D Occupancy Representation):通过深度估计技术(使用硬件设备如深度相机或深度估计方法)获取当前3D场景的深度信息,并结合机械臂的状态计算每个关节点的绝对坐标,从而获得机械臂和物体的占用信息。为了降低计算成本,采用5cm×5cm×5cm的体素作为基本的离散占用单元。
-
碰撞检测:在RoboChecker中,根据智能体位置变化与其他元素的占用关系,判断是否发生碰撞以及是否违反空间逻辑。如果检测到碰撞,会立即停止轨迹执行,并将违规原因反馈给RoboBrain。
时间约束(Temporal Constraints)的转换:时间约束定义了动作的执行时间和顺序,确保智能体的行为在时间上协调一致。这些约束需要转换为具体的调度验证接口,以便优化任务的执行顺序和时间安排。
-
动态占用建模(Dynamic Occupancy Modeling):对于需要在子目标集合下移动的所有智能体,进行动态占用建模。通过分析智能体在每个时间步的位置变化,生成动态占用信息。
-
调度验证:在RoboChecker中,利用智能体的时间占用信息,检测并防止不合理的调度以及违反时间逻辑的行为。例如,确保需要同步执行的动作能够同时进行,而有顺序依赖的动作能够按正确的顺序执行。
-
约束接口的转换过程涉及以下几个关键步骤:
-
文本解析:RoboBrain生成的文本形式的约束首先被解析为具体的参数和条件。例如,逻辑约束中的“Agent_1的夹持器必须垂直于物体”会被解析为Agent_1、物体和垂直方向等参数。
-
接口生成:根据解析后的参数,生成具体的约束接口。例如,对于逻辑约束,生成交互位置和方向的验证接口;对于空间约束,生成3D占用表示和碰撞检测接口;对于时间约束,生成动态占用建模和调度验证接口。
-
验证协议:基于生成的约束接口,构建验证协议(CheckCode)。验证协议是一个具体的函数或代码片段,用于在每个时间步检查智能体的轨迹是否违反约束。例如,验证协议会检查智能体的运动方向是否符合预定义的交互方向,或者智能体的轨迹是否与其他智能体或物体发生碰撞。
-
反馈与调整:如果验证协议检测到违规,会立即停止轨迹执行,并将违规原因反馈给RoboBrain。RoboBrain根据反馈信息重新规划子目标和轨迹,以确保后续执行符合所有约束。
-
多智能体模仿学习架构
本文设计了四种基于模仿学习的多智能体系统架构,这些架构根据智能体的观察空间和策略共享策略进行分类:
-
全局视图 + 共享策略(Global View + Shared Policy, Arch1):所有智能体共享同一个全局观察,并使用单一共享策略生成联合动作序列,然后将动作分配给相应的智能体。
-
局部视图 + 共享策略(Local View + Shared Policy, Arch2):每个智能体有自己的局部观察,但所有智能体共享同一个策略来生成各自的动作序列。
-
全局视图 + 分离策略(Global View + Separate Policy, Arch3):所有智能体共享同一个全局观察,但每个智能体有自己的独立策略来生成动作序列。
-
局部视图 + 分离策略(Local View + Separate Policy, Arch4):每个智能体有自己的局部观察,并且每个智能体使用自己的独立策略生成动作序列。
-
Diffusion Policy
(论文没有这部分,额外内容)
Diffusion Policy 是一种基于扩散模型(Diffusion Model)的先进策略学习方法,专门用于机器人控制和动作生成任务。它通过利用扩散模型的强大生成能力,能够为机器人在复杂环境中生成高质量的动作序列,从而实现高效的任务完成。
Diffusion Policy 通过逐步向数据中添加噪声,然后学习如何逆转这一过程来生成数据。在机器人控制的背景下,Diffusion Policy 利用这一机制来生成动作序列。具体来说,它从一个完全随机的噪声分布开始,逐步去除噪声,最终生成符合任务要求的动作序列。这一过程类似于从模糊到清晰的图像生成过程,只不过在这里生成的是机器人的动作指令。
动作序列是 Diffusion Policy 的主要输出,它包含了机器人在每个时间步需要执行的动作指令。这些动作指令可以是关节角度、关节速度、力矩控制信号,甚至是末端执行器的开合状态等,具体取决于任务的需求和机器人的类型。例如,在机械臂操作任务中,动作序列可能是一个包含多个关节角度或速度的向量序列;而在移动机器人导航任务中,动作序列可能是一个包含线速度和角速度的向量序列。Diffusion Policy 的优势在于它能够直接生成未来多步的动作序列,而不是仅关注眼前的一两步,这使得动作更加具有连贯性,并且可以在高维连续控制中实现。
在本文的框架中,作者基于diffusion policy进行训练。
-
评估
作者通过在RoboFactory基准测试上评估扩散策略(Diffusion Policy, DP),展示了该框架在多智能体任务中的有效性和挑战。RoboFactory基准测试包含了11个任务,这些任务涉及不同数量的智能体(从1个到4个),旨在评估多智能体系统在复杂任务中的协作能力。
- 模型选择:使用基于模仿学习的扩散策略(Diffusion Policy, DP)作为评估模型。该模型通过学习专家的示范数据来训练智能体的行为策略。
- 数据量影响:分别使用50、100和150个专家示范数据来训练模型,以评估不同数据量对模型性能的影响。
- 任务类型:实验涵盖了单智能体任务(如“抓取肉类”)和多智能体任务(如“抬起障碍物”和“放置食物”)。
- 评估指标:主要使用任务的成功率(Success Rate)作为评估指标,衡量模型在不同任务上的表现。
-
实验结果表明,随着训练数据量的增加,模型在单智能体任务上的成功率普遍提高,这表明更多的训练数据有助于提升模型性能。然而,在多智能体任务中,模型的表现并不总是随着数据量的增加而提升。例如,在“抬起障碍物”任务中,使用100个示范数据时模型的成功率最高,而使用150个示范数据时成功率反而下降。这可能是因为多智能体任务的复杂性增加,导致模型在学习过程中容易过拟合,从而影响了其泛化能力。
此外,随着智能体数量的增加,任务的成功率显著下降。例如,在“长管道传递”任务中,模型的成功率始终为0%,这表明当前的扩散策略在处理复杂多智能体任务时存在局限性,尤其是在需要精确协作和长期时间依赖的任务中。
表3展示了不同多智能体模仿学习架构在两个代表性任务(Lift Barrier 和 Place Food)上的性能比较。关键结论如下:
-
分离策略优于共享策略:分离策略(Arch3 和 Arch4)在多智能体任务中表现优于共享策略(Arch1 和 Arch2)。分离策略允许每个智能体独立学习,更好地适应任务需求。
-
局部视图的重要性:局部视图(Arch4)在某些任务中表现优于全局视图(Arch3),尤其是在需要精细操作的任务中。局部视图提供了更详细的信息,有助于智能体更好地执行任务。
-
任务复杂性的影响:随着任务复杂性的增加(如从单智能体任务到多智能体任务),成功率显著下降。这表明现有的模仿学习方法在处理复杂多智能体任务时存在局限性,需要进一步改进。
表4和表5分别展示了不同组合约束(逻辑、空间、时间约束)对数据生成成功率和数据质量(平均剧集长度)的影响。
这些表通过实验结果验证了组合约束在提高数据生成效率和质量方面的有效性:逻辑、空间和时间约束的组合使用能够显著提高数据生成的成功率和质量。单独使用逻辑约束虽然有一定效果,但组合约束能够更好地优化智能体的行为,减少碰撞和时间浪费。
-
-
3.代码详解
环境搭建
安装虚拟环境,注意需要Python3.9,然后安装依赖包(注意需要在子目录RoboFactory下运行命令)
pip install -r requirements.txt
conda install -c conda-forge networkx=2.5
然后下载3D文件
python script/download_assets.py
测试代码,如果出现大概3秒的动画,说明运行成功
python script/run_task.py configs/table/lift_barrier.yaml
注意,如果出现了以下报错:
AttributeError: partially initialized module 'networkx' has no attribute '_dispatch' (most likely due to a circular import)
请卸载networkx,然后重新安装(最新版本),亲测有效
pip uninstall networkx
pip install networkx
-
如果需要更复杂的场景,如https://github.com/robocasa/robocasa
下载:
python -m mani_skill.utils.download_asset RoboCasa
然后运行:
python script/run_task.py configs/robocasa/lift_barrier.yaml
-
生成数据
- config_path:在table文件夹下有若干配置文件,可以自行尝试其他。
- traj_num:采样几条数据
# Format: python script/generate_data.py {config_path} {traj_num} [--save-video]
python script/generate_data.py configs/table/lift_barrier.yaml 150 --save-video
生成的数据是960*240分辨率的,如果想要更高,需更改config文件的camera/sensor部分的若干个width和height,如原版是320*240,我可以更改为1280*960
cameras:
sensor:
-
uid: head_camera_agent0
pose:
type: look_at
params: [[-0.5, 0.115, 0.4], [0, -0.235, 0.1]]
width: 320
height: 240
fov: 1.5707963268
near: 0.01
far: 10
-
uid: head_camera_agent1
pose:
type: look_at
params: [[0.5, -0.115, 0.4], [0, 0.235, 0.1]]
width: 320
height: 240
fov: 1.5707963268
near: 0.01
far: 10
-
uid: head_camera_global
pose:
type: pose
params: [[0.538897, 0.0328311, 0.7419], [0.00132558, -0.278757, 0.00042972, 0.960361]]
width: 320
height: 240
fov: 1.5707963268
near: 0.01
far: 10
然后我们需要将生成的h5文件和json文件进行转换(注意原文件是以时间为名的,为了方便操作,我统一转换为LiftBarrier-rf.json和LiftBarrier-rf.h5)
# 1. make data folder in the first time.
mkdir data
mkdir -p data/{h5_data,pkl_data,zarr_data}
# 2. move your .h5 and .json file into the data/h5_data folder.
mv LiftBarrier-rf.h5 data/h5_data/LiftBarrier-rf.h5
mv LiftBarrier-rf.json data/h5_data/LiftBarrier-rf.json
# 3. run the script to process the data.
# NOTE: This is the script for default config. If you add the additional camera in config yaml, modify the script to adapt the data.
# Example:
python script/parse_h5_to_pkl_multi.py --task_name LiftBarrier-rf --load_num 150 --agent_num 2
# For 2 agents task, convert 2 .pkl file into .zarr file respectively.
# Example:
python script/parse_pkl_to_zarr_dp.py --task_name LiftBarrier-rf --load_num 150 --agent_id 0
python script/parse_pkl_to_zarr_dp.py --task_name LiftBarrier-rf --load_num 150 --agent_id 1
然后就OK了
-
不想生成数据也可以直接使用官方提供的数据:https://huggingface.co/datasets/FACEONG/RoboFactory_Dataset/tree/main
把他们放在h5_data下面,然后使用第三步的脚本进行转换即可。
-
训练
直接在命令行运行以下代码即可
- task_name:数据集名称
- load_num:要加载的数据集大小
- agent_id:训练第几个agent
- seed:种子
- gpu_id:训练的gpu
# bash policy/Diffusion-Policy/train.sh ${task_name} ${load_num} ${agent_id} ${seed} ${gpu_id}
# Example:
bash policy/Diffusion-Policy/train.sh LiftBarrier-rf 150 0 100 0
bash policy/Diffusion-Policy/train.sh LiftBarrier-rf 150 1 100 0
train.py
@hydra.main( # Hydra 框架的主函数装饰器
version_base=None, # 不启用配置版本控制
config_path=str(pathlib.Path(__file__).parent.joinpath( # 指定配置文件路径为当前文件所在目录下的 'diffusion_policy/config' 文件夹
'diffusion_policy','config'))
)
def main(cfg: OmegaConf):
# resolve immediately so all the ${now:} resolvers
# will use the same time.
OmegaConf.resolve(cfg) # 解析配置中的变量
cls = hydra.utils.get_class(cfg._target_) # 根据配置中的 _target_ 获取对应的类
workspace: BaseWorkspace = cls(cfg) # 创建类的实例 workspace
print(cfg.task.dataset.zarr_path, cfg.task_name)
workspace.run() # 调用其 run() 方法启动任务
其中workspace.run()如下,这段代码定义了一个机器人训练的工作流程 ,主要包括以下功能:
- 恢复训练:根据配置加载之前的检查点继续训练。
- 数据集与模型准备:构建训练和验证数据集及数据加载器,设置归一化器。
- 优化器与学习率调度:配置学习率调度器。
- EMA(指数移动平均)支持:用于提升模型稳定性。
- 训练循环:执行多个训练轮次,包括前向传播、损失计算、反向传播、梯度更新等。
EMA模型通过计算模型参数的指数移动平均值,平滑了参数的更新过程。这有助于减少训练过程中的噪声,使模型参数更加稳定。它在减少训练过程中的波动、提高模型的泛化能力、减少过拟合、提高收敛速度等方面都有显著的优势。EMA模型广泛应用于深度学习、强化学习、生成模型和多智能体系统中,是一种非常实用的技术。
完整代码如下:
class RobotWorkspace(BaseWorkspace):
def run(self):
cfg = copy.deepcopy(self.cfg)
# pdb.set_trace()
# 1.恢复训练:如果配置中指定恢复训练,则加载最新的检查点 resume training
if cfg.training.resume:
lastest_ckpt_path = self.get_checkpoint_path()
if lastest_ckpt_path.is_file():
print(f"Resuming from checkpoint {lastest_ckpt_path}")
self.load_checkpoint(path=lastest_ckpt_path)
# 2.数据集配置:构建训练和验证数据集与数据加载器 configure dataset
dataset: BaseImageDataset
dataset = hydra.utils.instantiate(cfg.task.dataset)
assert isinstance(dataset, BaseImageDataset)
train_dataloader = create_dataloader(dataset, **cfg.dataloader)
normalizer = dataset.get_normalizer() # 获取归一化器
# configure validation dataset
val_dataset = dataset.get_validation_dataset()
val_dataloader = create_dataloader(val_dataset, **cfg.val_dataloader)
self.model.set_normalizer(normalizer) # 将归一化设置给模型
if cfg.training.use_ema:
self.ema_model.set_normalizer(normalizer)
# 3.模型与优化配置 configure lr scheduler
lr_scheduler = get_scheduler(
cfg.training.lr_scheduler,
optimizer=self.optimizer,
num_warmup_steps=cfg.training.lr_warmup_steps,
num_training_steps=(
len(train_dataloader) * cfg.training.num_epochs) \
// cfg.training.gradient_accumulate_every,
# pytorch assumes stepping LRScheduler every epoch
# however huggingface diffusers steps it every batch
last_epoch=self.global_step-1
)
# 启用 EMAconfigure ema
ema: EMAModel = None
if cfg.training.use_ema: # 根据配置实例化 EMAModel。
ema = hydra.utils.instantiate(
cfg.ema,
model=self.ema_model)
env_runner = None
# 创建 TopK 模型检查点管理器,用于保存训练过程中性能最好的若干模型。 configure checkpoint
topk_manager = TopKCheckpointManager(
save_dir=os.path.join(self.output_dir, 'checkpoints'),
**cfg.checkpoint.topk
)
# 4. 设备迁移:将模型和优化器移动到指定设备device transfer
device = torch.device(cfg.training.device)
self.model.to(device)
if self.ema_model is not None:
self.ema_model.to(device)
optimizer_to(self.optimizer, device)
# save batch for sampling
train_sampling_batch = None
if cfg.training.debug:
cfg.training.num_epochs = 2
cfg.training.max_train_steps = 3
cfg.training.max_val_steps = 3
cfg.training.rollout_every = 1
cfg.training.checkpoint_every = 1
cfg.training.val_every = 1
cfg.training.sample_every = 1
# 5. 训练循环 training loop
log_path = os.path.join(self.output_dir, 'logs.json.txt')
with JsonLogger(log_path) as json_logger:
for local_epoch_idx in range(cfg.training.num_epochs):
step_log = dict()
# ========= train for this epoch ==========
if cfg.training.freeze_encoder:
self.model.obs_encoder.eval() # 评估模式
self.model.obs_encoder.requires_grad_(False) # 禁止计算梯度,防止参数更新。
train_losses = list()
with tqdm.tqdm(train_dataloader, desc=f"Training epoch {self.epoch}",
leave=False, mininterval=cfg.training.tqdm_interval_sec) as tepoch: # 创建一个训练进度条,使用训练数据的迭代器进行循环
for batch_idx, batch in enumerate(tepoch):
batch = dataset.postprocess(batch, device)
if train_sampling_batch is None:
train_sampling_batch = batch
# compute loss
# pdb.set_trace()
raw_loss = self.model.compute_loss(batch) # 计算当前批次的原始损失值
loss = raw_loss / cfg.training.gradient_accumulate_every # 将损失除以梯度累积次数,用于累积累积的梯度。
loss.backward() # 执行反向传播
# 梯度累积下的优化器 step optimizer
if self.global_step % cfg.training.gradient_accumulate_every == 0:
self.optimizer.step()
self.optimizer.zero_grad()
lr_scheduler.step()
# update ema
if cfg.training.use_ema: # 启用了EMA(指数移动平均),则对模型参数进行更新。
ema.step(self.model)
# logging
raw_loss_cpu = raw_loss.item()
tepoch.set_postfix(loss=raw_loss_cpu, refresh=False)
train_losses.append(raw_loss_cpu)
step_log = {
'train_loss': raw_loss_cpu,
'global_step': self.global_step,
'epoch': self.epoch,
'lr': lr_scheduler.get_last_lr()[0]
}
is_last_batch = (batch_idx == (len(train_dataloader)-1))
if not is_last_batch:
# log of last step is combined with validation and rollout
json_logger.log(step_log)
self.global_step += 1
if (cfg.training.max_train_steps is not None) \
and batch_idx >= (cfg.training.max_train_steps-1):
break
# at the end of each epoch
# replace train_loss with epoch average
train_loss = np.mean(train_losses) # 计算平均训练损失
step_log['train_loss'] = train_loss
# ========= eval for this epoch ==========
policy = self.model
if cfg.training.use_ema: # 若使用EMA,则切换到EMA模型进行评估。
policy = self.ema_model
policy.eval()
# run rollout
# if (self.epoch % cfg.training.rollout_every) == 0:
# runner_log = env_runner.run(policy)
# # log all
# step_log.update(runner_log)
# 进行一次模型验证 run validation
if (self.epoch % cfg.training.val_every) == 0:
with torch.no_grad():
val_losses = list()
with tqdm.tqdm(val_dataloader, desc=f"Validation epoch {self.epoch}",
leave=False, mininterval=cfg.training.tqdm_interval_sec) as tepoch:
for batch_idx, batch in enumerate(tepoch):
batch = dataset.postprocess(batch, device)
loss = self.model.compute_loss(batch)
val_losses.append(loss)
if (cfg.training.max_val_steps is not None) \
and batch_idx >= (cfg.training.max_val_steps-1):
break
if len(val_losses) > 0:
val_loss = torch.mean(torch.tensor(val_losses)).item()
# log epoch average validation loss
step_log['val_loss'] = val_loss
# run diffusion sampling on a training batch 每隔若干训练轮次(epoch),使用训练集中的一个采样批次数据,通过策略模型预测动作,并计算预测动作与真实动作之间的均方误差(MSE),记录到日志中。
if (self.epoch % cfg.training.sample_every) == 0:
with torch.no_grad():
# sample trajectory from training set, and evaluate difference
batch = train_sampling_batch
obs_dict = batch['obs']
gt_action = batch['action']
result = policy.predict_action(obs_dict) # 调用策略模型 policy.predict_action 预测动作。
pred_action = result['action_pred']
mse = torch.nn.functional.mse_loss(pred_action, gt_action)
step_log['train_action_mse_error'] = mse.item()
del batch
del obs_dict
del gt_action
del result
del pred_action
del mse
# 检查点保存 checkpoint
if ((self.epoch + 1) % cfg.training.checkpoint_every) == 0:
# checkpointing
save_name = pathlib.Path(self.cfg.task.dataset.zarr_path).stem
self.save_checkpoint(f'checkpoints/{save_name}/{self.epoch + 1}.ckpt') # TODO
# ========= eval end for this epoch ==========
policy.train()
# end of epoch
# log of last step is combined with validation and rollout
json_logger.log(step_log)
self.global_step += 1
self.epoch += 1
其余部分我们不多介绍,我们主要介绍第5部分(训练),代码主要功能如下:
- 按轮次(epoch)训练模型
- 使用梯度累积优化显存效率:损失先累加,每隔一定步数再更新参数。
- 在评估时使用EMA模型。
- 每轮结束后计算平均损失并切换到评估模式。
for local_epoch_idx in range(cfg.training.num_epochs): # epoch
step_log = dict()
# ========= train for this epoch ==========
if cfg.training.freeze_encoder:
self.model.obs_encoder.eval() # 评估模式
self.model.obs_encoder.requires_grad_(False) # 禁止计算梯度,防止参数更新。
train_losses = list()
with tqdm.tqdm(train_dataloader, desc=f"Training epoch {self.epoch}",
leave=False, mininterval=cfg.training.tqdm_interval_sec) as tepoch: # 创建一个训练进度条,使用训练数据的迭代器进行循环
for batch_idx, batch in enumerate(tepoch):
batch = dataset.postprocess(batch, device)
if train_sampling_batch is None:
train_sampling_batch = batch
# compute loss
raw_loss = self.model.compute_loss(batch) # 计算当前批次的原始损失值
loss = raw_loss / cfg.training.gradient_accumulate_every # 将损失除以梯度累积次数,用于累积累积的梯度。
loss.backward() # 执行反向传播
# 梯度累积下的优化器 step optimizer
if self.global_step % cfg.training.gradient_accumulate_every == 0:
self.optimizer.step()
self.optimizer.zero_grad()
lr_scheduler.step()
# update ema
if cfg.training.use_ema: # 启用了EMA(指数移动平均),则对模型参数进行更新。
ema.step(self.model)
...
# at the end of each epoch replace train_loss with epoch average
train_loss = np.mean(train_losses) # 计算平均训练损失
step_log['train_loss'] = train_loss
# ========= eval for this epoch ==========
policy = self.model
if cfg.training.use_ema: # 若使用EMA,则切换到EMA模型进行评估。
policy = self.ema_model
policy.eval()
其中dataset.process()如下: 主要用于对图像数据进行归一化处理(除以255)。返回包含观测和动作的数据字典。
class RobotImageDataset(BaseImageDataset):
def postprocess(self, samples, device):
"""
将采样的数据转换到设备并标准化。
:param samples: 采样的数据。
:param device: PyTorch 设备。
:return: 经过标准化后的数据字典
"""
agent_pos = samples['state'].to(device, non_blocking=True) # 状态数据 [batchsize,time,特征维度D]=[64,8,8]
head_cam = samples['head_camera'].to(device, non_blocking=True) / 255.0 # 图像数据 [batchsize,time,3,h,w]=[64,8,3,240,320]
# front_cam = samples['front_camera'].to(device, non_blocking=True) / 255.0
# left_cam = samples['left_camera'].to(device, non_blocking=True) / 255.0
# right_cam = samples['right_camera'].to(device, non_blocking=True) / 255.0
action = samples['action'].to(device, non_blocking=True) # 动作数据 [batchsize,time,特征维度D]=[64,8,8]
return {
'obs': { # 观察
'head_cam': head_cam, # B, T, 3, H, W
# 'front_cam': front_cam, # B, T, 3, H, W
# 'left_cam': left_cam, # B, T, 3, H, W
# 'right_cam': right_cam, # B, T, 3, H, W
'agent_pos': agent_pos, # B, T, D
},
'action': action # B, T, D
}
其中model.compute_loss()如下,该函数实现了一个用于训练扩散模型的损失计算流程,主要包括以下步骤:
- 数据预处理:对观测和动作进行归一化处理;
- 条件编码:根据配置决定是否将观测作为全局或局部条件拼接到动作轨迹中;
- 噪声添加:为轨迹数据添加指定时间步的噪声,模拟扩散过程;
- 条件掩码应用:使用掩码保留原始条件数据部分;
- 噪声预测与损失计算:模型预测噪声残差,并计算均方误差损失。
class DiffusionUnetImagePolicy(BaseImagePolicy):
def compute_loss(self, batch, **kwargs):
# import pdb; pdb.set_trace()
# 数据预处理:对观测(obs)和动作(action)进行归一化 normalize input
assert 'valid_mask' not in batch
nobs = self.normalizer.normalize(batch['obs'])
nactions = self.normalizer['action'].normalize(batch['action'])
batch_size = nactions.shape[0]
horizon = nactions.shape[1]
# handle different ways of passing observation
local_cond = None
global_cond = None
trajectory = nactions
cond_data = trajectory
if self.obs_as_global_cond: # 若是全局条件
# reshape B, T, ... to B*T
this_nobs = dict_apply(nobs, # 取前 self.n_obs_steps 步的观测并展平前两维(B, T,... → B*T)
lambda x: x[:,:self.n_obs_steps,...].reshape(-1,*x.shape[2:]))
nobs_features = self.obs_encoder(this_nobs) # 经 obs_encoder 编码后,再 reshape 成 (B, Do) 作为全局条件
# reshape back to B, Do
global_cond = nobs_features.reshape(batch_size, -1) # 直接作为轨迹数据
else:
# reshape B, T, ... to B*T
this_nobs = dict_apply(nobs, lambda x: x.reshape(-1, *x.shape[2:]))
nobs_features = self.obs_encoder(this_nobs)
# reshape back to B, T, Do
nobs_features = nobs_features.reshape(batch_size, horizon, -1)
cond_data = torch.cat([nactions, nobs_features], dim=-1)
trajectory = cond_data.detach()
# 条件编码:根据配置决定是否将观测作为全局条件输入 generate impainting mask
condition_mask = self.mask_generator(trajectory.shape)
# Sample noise that we'll add to the images
noise = torch.randn(trajectory.shape, device=trajectory.device) # 生成噪声 [64,8,8]
bsz = trajectory.shape[0]
# Sample a random timestep for each image
timesteps = torch.randint( # 随机采样时间步 [64]
0, self.noise_scheduler.config.num_train_timesteps,
(bsz,), device=trajectory.device
).long()
# Add noise to the clean images according to the noise magnitude at each timestep(this is the forward diffusion process)
noisy_trajectory = self.noise_scheduler.add_noise( # 添加噪声得到加噪图像 [64,8,8]
trajectory, noise, timesteps)
# compute loss mask
loss_mask = ~condition_mask
# 应用条件信息 apply conditioning
noisy_trajectory[condition_mask] = cond_data[condition_mask]
# Predict the noise residual
pred = self.model(noisy_trajectory, timesteps, # 预测噪声残差
local_cond=local_cond, global_cond=global_cond)
pred_type = self.noise_scheduler.config.prediction_type
if pred_type == 'epsilon': # 若预测噪声(epsilon),则目标为原始噪声noise
target = noise
elif pred_type == 'sample':
target = trajectory
else:
raise ValueError(f"Unsupported prediction type {pred_type}")
loss = F.mse_loss(pred, target, reduction='none') # 计算损失
loss = loss * loss_mask.type(loss.dtype)
loss = reduce(loss, 'b ... -> b (...)', 'mean')
loss = loss.mean()
if 'output_pred' in kwargs:
return loss, pred
return loss
其中最后采样时policy.predict_action(obs_dict)的过程如下:
class DiffusionUnetImagePolicy(BaseImagePolicy):
def predict_action(self, obs_dict: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
"""
obs_dict: must include "obs" key
result: must include "action" key
"""
assert 'past_action' not in obs_dict # not implemented yet
# normalize input
nobs = self.normalizer.normalize(obs_dict) # 将输入观测 obs_dict 进行归一化处理。
value = next(iter(nobs.values()))
B, To = value.shape[:2]
T = self.horizon
Da = self.action_dim
Do = self.obs_feature_dim
To = self.n_obs_steps
# build input
device = self.device
dtype = self.dtype
# handle different ways of passing observation
local_cond = None
global_cond = None
if self.obs_as_global_cond:
# 将观测编码为全局条件向量 condition through global feature
this_nobs = dict_apply(nobs, lambda x: x[:,:To,...].reshape(-1,*x.shape[2:])) # pos:[192,8] cam:[192,3,240,320]
nobs_features = self.obs_encoder(this_nobs) # [192,520]
# reshape back to B, Do
global_cond = nobs_features.reshape(B, -1) # [64,1560]
# empty data for action
cond_data = torch.zeros(size=(B, T, Da), device=device, dtype=dtype)
cond_mask = torch.zeros_like(cond_data, dtype=torch.bool)
else:
# condition through impainting
this_nobs = dict_apply(nobs, lambda x: x[:,:To,...].reshape(-1,*x.shape[2:]))
nobs_features = self.obs_encoder(this_nobs)
# reshape back to B, T, Do
nobs_features = nobs_features.reshape(B, To, -1)
cond_data = torch.zeros(size=(B, T, Da+Do), device=device, dtype=dtype)
cond_mask = torch.zeros_like(cond_data, dtype=torch.bool)
cond_data[:,:To,Da:] = nobs_features
cond_mask[:,:To,Da:] = True
# run sampling
nsample = self.conditional_sample(
cond_data,
cond_mask,
local_cond=local_cond,
global_cond=global_cond,
**self.kwargs)
# 反归一化输出结果 unnormalize prediction
naction_pred = nsample[...,:Da]
action_pred = self.normalizer['action'].unnormalize(naction_pred)
# 截取指定步长的动作 get action
start = To - 1
end = start + self.n_action_steps
action = action_pred[:,start:end]
result = {
'action': action,
'action_pred': action_pred
}
return result
-
模型
这段代码实现了一个条件化的1D U-Net模型(ConditionalUnet1D
),通常用于处理序列数据,如音频、时间序列等。模型结构包括:
-
扩散步编码器(Diffusion Step Encoder)
- SinusoidalPosEmb:将时间步 t 映射为正弦和余弦形式的位置嵌入,生成与时间步相关的向量表示。
-
Linear 和 Mish:通过线性变换和非线性激活函数进一步处理时间步嵌入,使其能够更好地融入模型的特征空间。
-
下采样模块(Down Modules):下采样层用于减少特征的空间维度,逐步提取更高级别的特征。
-
中间模块(Middle Modules):这些模块在特征空间的最深层次上进行操作,有助于捕捉全局特征。
-
上采样模块(Up Modules):上采样层用于增加特征的空间维度,逐步恢复到原始输入的尺寸。
class ConditionalUnet1D(nn.Module):
def __init__():
....
diffusion_step_encoder = nn.Sequential(
SinusoidalPosEmb(dsed), # 位置编码方法,用于将时间步 t 映射为向量表示。生成与时间步相关的正弦和余弦形式的位置嵌入
nn.Linear(dsed, dsed * 4),
nn.Mish(),
nn.Linear(dsed * 4, dsed),
)
....
if local_cond_dim is not None:
_, dim_out = in_out[0]
dim_in = local_cond_dim
local_cond_encoder = nn.ModuleList([
# down encoder
ConditionalResidualBlock1D(
dim_in, dim_out, cond_dim=cond_dim,
kernel_size=kernel_size, n_groups=n_groups,
cond_predict_scale=cond_predict_scale),
# up encoder
ConditionalResidualBlock1D(
dim_in, dim_out, cond_dim=cond_dim,
kernel_size=kernel_size, n_groups=n_groups,
cond_predict_scale=cond_predict_scale)
])
mid_dim = all_dims[-1]
self.mid_modules = nn.ModuleList([
ConditionalResidualBlock1D(
mid_dim, mid_dim, cond_dim=cond_dim,
kernel_size=kernel_size, n_groups=n_groups,
cond_predict_scale=cond_predict_scale
),
ConditionalResidualBlock1D(
mid_dim, mid_dim, cond_dim=cond_dim,
kernel_size=kernel_size, n_groups=n_groups,
cond_predict_scale=cond_predict_scale
),
])
down_modules = nn.ModuleList([])
for ind, (dim_in, dim_out) in enumerate(in_out):
is_last = ind >= (len(in_out) - 1)
down_modules.append(nn.ModuleList([
ConditionalResidualBlock1D(
dim_in, dim_out, cond_dim=cond_dim,
kernel_size=kernel_size, n_groups=n_groups,
cond_predict_scale=cond_predict_scale),
ConditionalResidualBlock1D(
dim_out, dim_out, cond_dim=cond_dim,
kernel_size=kernel_size, n_groups=n_groups,
cond_predict_scale=cond_predict_scale),
Downsample1d(dim_out) if not is_last else nn.Identity()
]))
up_modules = nn.ModuleList([])
for ind, (dim_in, dim_out) in enumerate(reversed(in_out[1:])):
is_last = ind >= (len(in_out) - 1)
up_modules.append(nn.ModuleList([
ConditionalResidualBlock1D(
dim_out*2, dim_in, cond_dim=cond_dim,
kernel_size=kernel_size, n_groups=n_groups,
cond_predict_scale=cond_predict_scale),
ConditionalResidualBlock1D(
dim_in, dim_in, cond_dim=cond_dim,
kernel_size=kernel_size, n_groups=n_groups,
cond_predict_scale=cond_predict_scale),
Upsample1d(dim_in) if not is_last else nn.Identity()
]))
final_conv = nn.Sequential(
Conv1dBlock(start_dim, start_dim, kernel_size=kernel_size),
nn.Conv1d(start_dim, input_dim, 1),
)
......
def forward(self,
sample: torch.Tensor,
timestep: Union[torch.Tensor, float, int],
local_cond=None, global_cond=None, **kwargs):
"""
x: (B,T,input_dim)
timestep: (B,) or int, diffusion step
local_cond: (B,T,local_cond_dim)
global_cond: (B,global_cond_dim)
output: (B,T,input_dim)
"""
sample = einops.rearrange(sample, 'b h t -> b t h')
# 1. time
timesteps = timestep
if not torch.is_tensor(timesteps):
# TODO: this requires sync between CPU and GPU. So try to pass timesteps as tensors if you can
timesteps = torch.tensor([timesteps], dtype=torch.long, device=sample.device)
elif torch.is_tensor(timesteps) and len(timesteps.shape) == 0: # 若是0维张量,扩展为1维
timesteps = timesteps[None].to(sample.device)
# broadcast to batch dimension in a way that's compatible with ONNX/Core ML
timesteps = timesteps.expand(sample.shape[0]) # 扩展timesteps以匹配sample的批量大小。
global_feature = self.diffusion_step_encoder(timesteps) # 将时间步编码 [64]->[64,128]
if global_cond is not None:
global_feature = torch.cat([ # 将全局特征与局部特征连接起来。[64,1688]
global_feature, global_cond
], axis=-1)
# 处理局部条件输入 encode local features
h_local = list()
if local_cond is not None:
local_cond = einops.rearrange(local_cond, 'b h t -> b t h')
resnet, resnet2 = self.local_cond_encoder
x = resnet(local_cond, global_feature)
h_local.append(x)
x = resnet2(local_cond, global_feature)
h_local.append(x)
x = sample
h = []
for idx, (resnet, resnet2, downsample) in enumerate(self.down_modules): # 下采样
x = resnet(x, global_feature) # [64,256,8]/[64,512,4]/[64,1024,2]
if idx == 0 and len(h_local) > 0:
x = x + h_local[0]
x = resnet2(x, global_feature)
h.append(x)
x = downsample(x)
for mid_module in self.mid_modules: # 中间处理阶段
x = mid_module(x, global_feature) # [64,1024,2]
for idx, (resnet, resnet2, upsample) in enumerate(self.up_modules): # 上采样阶段
x = torch.cat((x, h.pop()), dim=1) # [64,2048,2]
x = resnet(x, global_feature)
# The correct condition should be:
# if idx == (len(self.up_modules)-1) and len(h_local) > 0:
# However this change will break compatibility with published checkpoints.
# Therefore it is left as a comment.
if idx == len(self.up_modules) and len(h_local) > 0:
x = x + h_local[1]
x = resnet2(x, global_feature)
x = upsample(x)
x = self.final_conv(x) # [64,256,8]->[64,8,8]
x = einops.rearrange(x, 'b t h -> b h t')
return x
其中ConditionalResidualBlock1D是一个1D卷积残差块,通过FiLM(Feature-wise Linear Modulation)机制将外部条件信息(cond)融入网络。在对输入的一维信号(如时间序列或音频)进行特征提取时,该模块根据给定的条件向量(conditioning vector预测每个通道的缩放因子(scale)和偏置(bias),然后以out = scale * out + bias的形式将条件嵌入加到特征上。
class ConditionalResidualBlock1D(nn.Module):
def __init__():
super().__init__()
self.blocks = nn.ModuleList([
Conv1dBlock(in_channels, out_channels, kernel_size, n_groups=n_groups),
Conv1dBlock(out_channels, out_channels, kernel_size, n_groups=n_groups),
])
...
def forward(self, x, cond):
out = self.blocks[0](x) # Conv1dBlock
embed = self.cond_encoder(cond) # Mish+Linear+Rearrange [64,1688]->[64,2,256,1]
if self.cond_predict_scale:
embed = embed.reshape(
embed.shape[0], 2, self.out_channels, 1)
scale = embed[:,0,...] # [64,256,1]
bias = embed[:,1,...] # [64,256,1]
out = scale * out + bias
else:
out = out + embed
out = self.blocks[1](out)
out = out + self.residual_conv(x) # Conv1d
return out
-
评估
训练完成后使用以下代码即可进行评估
bash policy/Diffusion-Policy/eval_multi.sh ${config_name} ${DATA_NUM} ${CHECKPOINT_NUM} ${DEBUG_MODE} ${TASK_NAME}
# Example
bash policy/Diffusion-Policy/eval_multi.sh configs/table/lift_barrier.yaml 150 300 1 LiftBarrier-rf
-
-
4.总结
这篇文章提出了一种创新的框架,旨在通过引入组合约束来解决多智能体系统中的协作问题,并自动生成安全、高效的训练数据。文章的核心贡献在于设计了一种能够有效管理多智能体协作的框架——RoboFactory,该框架通过逻辑、空间和时间三个维度的约束,确保智能体在执行任务时的行为既安全又高效。
RoboFactory框架包含两个核心模块:RoboBrain和RoboChecker。RoboBrain负责根据全局任务描述、先前的子目标和约束违规反馈,生成每个智能体的下一个子目标和文本形式的组合约束。这些约束包括逻辑约束(定义智能体的行为规则)、空间约束(规定智能体的操作空间)和时间约束(协调智能体的动作顺序)。然后,RoboBrain调用预定义的动作原语生成每个智能体的无约束轨迹序列。RoboChecker则将文本约束转换为具体的约束接口,并在智能体执行轨迹时检查是否违反约束。如果检测到违规,轨迹执行立即停止,并将违规原因反馈给RoboBrain,以便重新规划。
文章进一步介绍了如何通过RoboFactory框架生成高质量的多智能体协作数据集。该数据集基于ManiSkill模拟器构建,包含11个任务,涉及不同数量的智能体。这些任务不仅需要高阶规划,还需要低阶控制的精确执行。通过随机初始化设置和多样化的任务设计,数据集能够有效支持多智能体系统的训练和评估。
此外,文章还对RoboFactory框架的有效性进行了实验验证。通过在多个任务上部署扩散策略(Diffusion Policy),并评估其性能,文章展示了组合约束在提高数据生成质量和效率方面的显著效果。实验结果表明,引入组合约束可以显著提高任务的成功率,并减少数据的平均长度,从而提高训练和推理的效率。
总的来说,这篇文章不仅提出了一个创新的框架来解决多智能体协作中的复杂问题,还通过实验验证了该框架的有效性。RoboFactory框架和生成的数据集为构建安全、高效的多智能体系统提供了坚实的基础,为未来的研究和应用提供了宝贵的资源和启示。
🌟 感谢您的阅读! 🌟
如果您觉得这篇文章对您有帮助,或者您对多智能体系统和RoboFactory框架感兴趣,请不要吝啬您的支持哦!
👍 点赞:您的点赞是我继续创作的动力!
⭐ 收藏:方便您随时回顾,也让更多人看到这些有价值的内容!
🌞关注:第一时间获取更多前沿技术和研究分享!
让我们一起探索多智能体协作的无限可能,共同进步!🚀
感谢您的支持,祝您有个美好的一天!🌞