强化学习 Policy 的 Tracking 能力全解析,以Legged_gym为例解说Policy的训练流程

强化学习 Policy 的 Tracking 能力全解析

从运动跟踪到多模态控制的统一框架


概述:Policy 作为通用 Tracker

Tracking 的核心思想

Legged Gym 的强化学习 Policy 本质上是一个通用的多模态跟踪系统。通过设计不同的观测空间和奖励函数,Policy 可以学习跟踪各种目标:

🎯 Tracking = 观测 (Observation) + 目标 (Target) + 奖励 (Reward)

训练的本质是让 Policy 学会:

  1. 感知当前状态 (Where am I?)
  2. 理解目标 (Where should I be?)
  3. 计算偏差 (How far am I from the goal?)
  4. 输出控制 (What should I do?)

为什么 RL Policy 擅长 Tracking?

优势传统控制器RL Policy
非线性系统需要精确建模✅ 从数据学习
高维耦合难以设计✅ 端到端学习
扰动鲁棒性需要显式设计✅ 隐式学习
多目标平衡需要手动调优✅ 奖励函数自动平衡
泛化能力局限于设计范围✅ 可泛化到未见场景

Legged Gym 的 Tracking 框架

基于 Isaac Gym 和 PPO (Proximal Policy Optimization) 算法,Legged Gym 提供了一个统一的框架,可以训练 Policy 跟踪:

  • 速度指令 (Velocity Commands)
  • 参考轨迹 (Reference Trajectories)
  • 关节状态 (Joint States)
  • 身体姿态 (Body Pose)
  • 足端位置 (Foot Positions)
  • 运动风格 (Motion Style)
  • 多模态组合 (Multi-modal Combinations)

Tracking 的本质与范式

Tracking 的数学定义

在强化学习中,Tracking 问题可以形式化为:

min⁡πEτ∼π[∑t=0T∥st−stref∥2] \min_{\pi} \mathbb{E}_{\tau \sim \pi} \left[ \sum_{t=0}^{T} \| s_t - s_t^{ref} \|^2 \right] πminEτπ[t=0Tststref2]

其中:

  • π\piπ: Policy 策略
  • sts_tst: 当前状态
  • strefs_t^{ref}stref: 参考目标
  • τ\tauτ: 轨迹

Tracking 的三种范式

1. 目标跟踪 (Target Tracking)
给定固定/变化的目标 → Policy 持续跟踪
例:速度指令跟踪、姿态保持
2. 轨迹跟踪 (Trajectory Tracking)
给定时间序列轨迹 → Policy 按时间跟随
例:动作捕捉数据重现、预定义步态
3. 分布跟踪 (Distribution Tracking)
给定状态分布 → Policy 采样符合分布的行为
例:风格模仿、多样性生成

Legged Gym 中的 Tracking 实现

在 Legged Gym 中,所有 Tracking 任务都遵循统一的框架:

# 统一的 Tracking 框架
class TrackingPolicy:
    def __init__(self):
        self.target = None  # 跟踪目标
        self.current_state = None  # 当前状态
        
    def compute_tracking_error(self):
        """计算跟踪误差"""
        return self.target - self.current_state
    
    def compute_tracking_reward(self, error):
        """将误差转换为奖励"""
        return torch.exp(-error / sigma)  # 高斯核
    
    def step(self, obs):
        """执行控制"""
        action = self.policy(obs)
        return action

可跟踪的模态与量

1. 运动学量 (Kinematic Quantities)

维度观测空间典型范围
线速度3D身体坐标系[-2, 2] m/s
角速度3D身体坐标系[-2, 2] rad/s
位置3D世界坐标系任意
朝向4D (四元数)世界坐标系SO(3)
关节角度12D关节空间机械限位
关节角速度12D关节空间[-10, 10] rad/s

2. 动力学量 (Dynamic Quantities)

维度观测空间应用场景
关节力矩12D关节空间力控制任务
接触力4×3D足端力感知步态
动量3D身体坐标系跳跃、冲击
能量消耗1D标量效率优化

3. 几何量 (Geometric Quantities)

维度观测空间应用场景
足端位置4×3D身体坐标系精确落脚
质心位置3D身体坐标系平衡控制
支撑多边形2D水平面稳定性
地形高度图187D局部栅格地形适应

4. 时序量 (Temporal Quantities)

维度观测空间应用场景
相位1D[0, 2π]周期步态
步频1D[0.5, 3] Hz运动节奏
占空比1D[0, 1]支撑/摆动
时序轨迹T×D时间窗口动作复现

5. 高级语义量 (Semantic Quantities)

维度观测空间应用场景
步态类型离散{trot, pace, gallop}步态切换
运动风格连续潜在空间风格化运动
任务目标混合任务相关多任务学习
地形类型离散{flat, rough, stairs}地形识别

Tracking 可行性分析

不是所有量都适合直接跟踪,需要考虑:

因素说明示例
可观测性能否实时测量?✅ 关节角度 vs ❌ 肌肉激活
可控性Policy 能否影响?✅ 速度 vs ❌ 外部扰动
时间尺度控制频率匹配?✅ 50Hz 关节控制 vs ❌ 0.1Hz 规划
耦合度是否强耦合?⚠️ 位置+姿态耦合
物理约束是否违反动力学?❌ 突变速度指令

运动跟踪 (Motion Tracking)

基础:速度指令跟踪

这是 Legged Gym 的默认任务,也是最基础的 Tracking 形式。

跟踪目标
commands = [vel_x, vel_y, ang_vel_yaw, heading]  # 4D 向量
# vel_x:      前后线速度 (m/s)
# vel_y:      左右线速度 (m/s)  
# ang_vel_yaw: 偏航角速度 (rad/s)
# heading:    目标朝向 (rad) - 可选
观测空间
obs = [
    base_lin_vel,       # [3] 当前线速度
    base_ang_vel,       # [3] 当前角速度
    projected_gravity,  # [3] 重力投影
    commands[:3],       # [3] 速度指令
    dof_pos,           # [12] 关节位置
    dof_vel,           # [12] 关节速度
    actions_prev,      # [12] 上一动作
    # height_map        # [187] 地形高度(可选)
]  # Total: 48 或 235
跟踪奖励
def _reward_tracking_lin_vel(self):
    """线速度跟踪奖励"""
    error = torch.sum(
        torch.square(self.commands[:, :2] - self.base_lin_vel[:, :2]), 
        dim=1
    )
    return torch.exp(-error / 0.25)  # 高斯核,sigma=0.25

def _reward_tracking_ang_vel(self):
    """角速度跟踪奖励"""
    error = torch.square(self.commands[:, 2] - self.base_ang_vel[:, 2])
    return torch.exp(-error / 0.25)

奖励曲线

误差 (m/s) | 奖励
-----------|------
0.0        | 1.00  ← 完美跟踪
0.1        | 0.85
0.25       | 0.37
0.5        | 0.14
1.0        | 0.02
训练配置
class rewards:
    class scales:
        tracking_lin_vel = 1.0   # 主要优化目标
        tracking_ang_vel = 0.5   # 次要目标
        lin_vel_z = -2.0         # 惩罚跳动
        ang_vel_xy = -0.05       # 惩罚翻滚

进阶:身体姿态跟踪

跟踪目标
target_pose = {
    'position': [x, y, z],           # 目标位置
    'orientation': [qw, qx, qy, qz], # 目标四元数
    'height': h                      # 目标高度
}
姿态跟踪奖励
def _reward_orientation(self):
    """惩罚非水平姿态"""
    # 使用重力投影向量
    # 理想值: [0, 0, -1] (完全水平)
    error = torch.sum(
        torch.square(self.projected_gravity[:, :2]), 
        dim=1
    )
    return torch.exp(-error / 0.25)

def _reward_base_height(self):
    """高度跟踪"""
    target_height = 0.52  # ANYmal-C 标称高度
    error = torch.square(
        self.root_states[:, 2] - target_height
    )
    return torch.exp(-error / 0.1)
应用场景
  1. 平衡任务: 在动态扰动下保持姿态
  2. 俯仰控制: 爬坡时调整身体角度
  3. 滚转避免: 侧向行走时防止翻倒

轨迹跟踪 (Trajectory Tracking)

概述

轨迹跟踪要求 Policy 不仅跟踪目标值,还要按照时间顺序精确重现整条轨迹。

参考轨迹来源

1. Motion Capture 数据
# 从动捕系统获取参考轨迹
mocap_data = {
    'joint_pos': [T, 12],      # 关节角度轨迹
    'joint_vel': [T, 12],      # 关节速度轨迹
    'root_pos': [T, 3],        # 根节点位置
    'root_orient': [T, 4],     # 根节点朝向
    'time': [T]                # 时间戳
}
2. 动画/仿真数据
# 从物理仿真或动画软件导出
animation_data = load_animation("trot_cycle.npy")
# Shape: [100, 48]  # 100 帧,48 维状态
3. 专家策略生成
# 使用传统控制器生成示范
expert_trajectory = classical_controller.rollout(
    duration=5.0, 
    command=[1.0, 0.0, 0.0]
)

实现:时序轨迹跟踪

扩展观测空间
class LeggedRobotCfg:
    class env:
        num_observations = 48 + 12  # 原始观测 + 参考状态
        reference_state_dim = 12    # 参考关节角度
        
obs = torch.cat([
    standard_obs,           # [48] 标准观测
    reference_joint_pos     # [12] 当前时刻的参考关节角度
], dim=-1)
轨迹跟踪奖励
def _reward_joint_tracking(self):
    """关节轨迹跟踪"""
    # 获取当前时间对应的参考状态
    t = self.episode_length_buf * self.dt
    ref_pos = self.get_reference_at_time(t)
    
    # 计算跟踪误差
    error = torch.sum(
        torch.square(self.dof_pos - ref_pos), 
        dim=1
    )
    return torch.exp(-error / 0.1)

def _reward_joint_vel_tracking(self):
    """关节速度跟踪"""
    t = self.episode_length_buf * self.dt
    ref_vel = self.get_reference_vel_at_time(t)
    
    error = torch.sum(
        torch.square(self.dof_vel - ref_vel), 
        dim=1
    )
    return torch.exp(-error / 0.5)
参考轨迹插值
def get_reference_at_time(self, t):
    """根据时间插值参考轨迹"""
    # 循环轨迹
    t_normalized = t % self.traj_duration
    
    # 线性插值
    idx = (t_normalized / self.dt).long()
    alpha = (t_normalized / self.dt) - idx
    
    ref_pos = (1 - alpha) * self.trajectory[idx] + \
              alpha * self.trajectory[idx + 1]
    
    return ref_pos

相位跟踪 (Phase Tracking)

周期步态的相位表示
# 相位作为观测
phase = (t % T) / T * 2 * π  # [0, 2π]
obs = torch.cat([
    standard_obs,
    torch.sin(phase),  # 正弦编码
    torch.cos(phase)   # 余弦编码
], dim=-1)
相位条件奖励
def _reward_phase_based_foot_contact(self):
    """相位同步的接触奖励"""
    phase = self.get_phase()
    
    # 期望接触模式 (trot 步态)
    expected_contact = torch.tensor([
        [1, 0, 0, 1],  # 对角支撑 1
        [0, 1, 1, 0]   # 对角支撑 2
    ])
    
    phase_idx = (phase / (2*π) * 2).long() % 2
    target = expected_contact[phase_idx]
    
    actual_contact = self.contact_forces[:, self.feet_indices, 2] > 1.0
    
    # 匹配度奖励
    match = (actual_contact == target).float().mean(dim=1)
    return match

状态跟踪 (State Tracking)

多关节状态同步跟踪

挑战

同时跟踪 12 个关节的位置和速度,共 24 维状态:

target_state = {
    'dof_pos': [12],  # 关节角度
    'dof_vel': [12]   # 关节速度
}
加权跟踪奖励
def _reward_weighted_joint_tracking(self):
    """加权关节跟踪"""
    # 不同关节不同权重
    weights = torch.tensor([
        1.0, 1.0, 0.8,  # 前左腿 (HAA, HFE, KFE)
        1.0, 1.0, 0.8,  # 前右腿
        1.0, 1.0, 0.8,  # 后左腿
        1.0, 1.0, 0.8   # 后右腿
    ])
    
    error = torch.square(self.dof_pos - self.target_dof_pos)
    weighted_error = (error * weights).sum(dim=1)
    
    return torch.exp(-weighted_error / 0.5)

足端轨迹跟踪

正向运动学计算
def compute_foot_positions(self):
    """计算足端位置(身体坐标系)"""
    # 使用机器人运动学模型
    foot_pos = []
    for leg_id in range(4):
        joint_angles = self.dof_pos[:, leg_id*3:(leg_id+1)*3]
        pos = forward_kinematics(joint_angles, leg_id)
        foot_pos.append(pos)
    
    return torch.stack(foot_pos, dim=1)  # [N, 4, 3]
足端跟踪奖励
def _reward_foot_position_tracking(self):
    """足端位置跟踪"""
    current_foot_pos = self.compute_foot_positions()
    target_foot_pos = self.get_target_foot_positions()
    
    error = torch.sum(
        torch.square(current_foot_pos - target_foot_pos), 
        dim=[1, 2]
    )
    return torch.exp(-error / 0.05)

多模态融合跟踪

概述

实际应用中,往往需要同时跟踪多个目标,这需要在奖励函数中平衡多个 Tracking 目标。

多目标跟踪框架

class MultiModalTracking:
    def compute_reward(self):
        """多模态跟踪总奖励"""
        reward = 0
        
        # 1. 速度跟踪 (主要目标)
        reward += 1.0 * self.tracking_lin_vel()
        reward += 0.5 * self.tracking_ang_vel()
        
        # 2. 姿态跟踪 (次要目标)
        reward += 0.2 * self.orientation_tracking()
        reward += 0.1 * self.height_tracking()
        
        # 3. 步态跟踪 (风格约束)
        reward += 0.3 * self.phase_based_gait()
        
        # 4. 稳定性 (软约束)
        reward -= 2.0 * self.lin_vel_z_penalty()
        reward -= 0.05 * self.ang_vel_xy_penalty()
        
        return reward

典型组合场景

场景 1: 速度 + 姿态跟踪
# 应用:斜坡行走
class SlopeWalkingTracking:
    rewards = {
        'tracking_lin_vel': 1.0,      # 保持速度
        'tracking_ang_vel': 0.5,      # 转向能力
        'orientation': 0.5,           # 保持水平(抵抗斜坡倾斜)
        'base_height': 0.2,           # 高度稳定
        'torques': -0.0001,           # 能量效率
    }
场景 2: 轨迹 + 相位跟踪
# 应用:动作复现
class MotionImitationTracking:
    rewards = {
        'joint_pos_tracking': 2.0,    # 关节角度匹配
        'joint_vel_tracking': 1.0,    # 关节速度匹配
        'phase_tracking': 0.5,        # 时序对齐
        'foot_contact': 0.3,          # 接触时机
        'root_pose': 0.5,             # 身体位姿
    }

Policy 网络结构

Actor-Critic 架构

系统使用 Actor-Critic 双网络结构:

class ActorCritic(nn.Module):
    def __init__(self):
        # Actor: 策略网络,用于控制
        self.actor = MLP(
            input_dim=num_observations,      # 观测维度
            output_dim=num_actions,          # 动作维度 (12个关节)
            hidden_dims=[512, 256, 128],     # 隐藏层
            activation='elu'
        )
        
        # Critic: 价值网络,仅用于训练
        self.critic = MLP(
            input_dim=num_observations,      # 或 num_privileged_obs
            output_dim=1,                    # 输出状态价值
            hidden_dims=[512, 256, 128],
            activation='elu'
        )

网络配置示例

配置 1: 轻量级速度跟踪
class policy:
    actor_hidden_dims = [128, 64, 32]
    critic_hidden_dims = [128, 64, 32]
    activation = 'elu'
    
# 参数量:~50K
# 推理速度:<0.5ms
# 适用:简单平地行走
配置 2: 标准复杂地形跟踪
class policy:
    actor_hidden_dims = [512, 256, 128]
    critic_hidden_dims = [512, 256, 128]
    activation = 'elu'
    
# 参数量:~400K
# 推理速度:~1ms
# 适用:复杂地形、多目标跟踪
配置 3: 轨迹跟踪 (LSTM)
class policy:
    actor_hidden_dims = [256, 128]
    critic_hidden_dims = [256, 128]
    rnn_type = 'lstm'
    rnn_hidden_size = 256
    rnn_num_layers = 2
    
# 参数量:~600K
# 推理速度:~2ms
# 适用:动作模仿、相位跟踪

指令跟踪机制

指令的生成与更新

def _resample_commands(self, env_ids):
    """随机采样速度指令"""
    # 线速度 x (前后)
    self.commands[env_ids, 0] = torch_rand_float(
        self.command_ranges["lin_vel_x"][0],  # -1.0 m/s
        self.command_ranges["lin_vel_x"][1],  # +1.0 m/s
        (len(env_ids), 1), 
        device=self.device
    ).squeeze(1)
    
    # 线速度 y (左右)
    self.commands[env_ids, 1] = torch_rand_float(
        self.command_ranges["lin_vel_y"][0],  # -1.0 m/s
        self.command_ranges["lin_vel_y"][1],  # +1.0 m/s
        (len(env_ids), 1),
        device=self.device
    ).squeeze(1)
    
    # 角速度 yaw (旋转) 或 heading (朝向)
    if self.cfg.commands.heading_command:
        # Heading 模式:指定目标朝向
        self.commands[env_ids, 3] = torch_rand_float(
            self.command_ranges["heading"][0],   # -π
            self.command_ranges["heading"][1],   # +π
            (len(env_ids), 1),
            device=self.device
        ).squeeze(1)
    else:
        # 直接角速度模式
        self.commands[env_ids, 2] = torch_rand_float(
            self.command_ranges["ang_vel_yaw"][0],  # -1 rad/s
            self.command_ranges["ang_vel_yaw"][1],  # +1 rad/s
            (len(env_ids), 1),
            device=self.device
        ).squeeze(1)
    
    # 过滤小速度指令(避免不稳定)
    self.commands[env_ids, :2] *= (
        torch.norm(self.commands[env_ids, :2], dim=1) > 0.2
    ).unsqueeze(1)

指令重采样时机

def _post_physics_step_callback(self):
    """每个物理步后的回调"""
    # 定期重采样指令 (默认 10 秒)
    env_ids = (
        self.episode_length_buf % int(self.cfg.commands.resampling_time / self.dt) == 0
    ).nonzero(as_tuple=False).flatten()
    self._resample_commands(env_ids)
    
    # Heading 模式:计算角速度指令
    if self.cfg.commands.heading_command:
        forward = quat_apply(self.base_quat, self.forward_vec)
        heading = torch.atan2(forward[:, 1], forward[:, 0])
        # 将 heading 误差转换为角速度指令
        self.commands[:, 2] = torch.clip(
            0.5 * wrap_to_pi(self.commands[:, 3] - heading), 
            -1.0, 
            1.0
        )

指令课程学习 (Curriculum)

def update_command_curriculum(self, env_ids):
    """根据表现逐步增加指令难度"""
    # 如果线速度跟踪奖励 > 80% 最大值
    if torch.mean(self.episode_sums["tracking_lin_vel"][env_ids]) / \
       self.max_episode_length > 0.8 * self.reward_scales["tracking_lin_vel"]:
        
        # 扩大速度指令范围
        self.command_ranges["lin_vel_x"][0] = np.clip(
            self.command_ranges["lin_vel_x"][0] - 0.5,  # 减小下限
            -self.cfg.commands.max_curriculum,           # 最低 -max_curriculum
            0.
        )
        self.command_ranges["lin_vel_x"][1] = np.clip(
            self.command_ranges["lin_vel_x"][1] + 0.5,  # 增大上限
            0.,
            self.cfg.commands.max_curriculum            # 最高 +max_curriculum
        )

训练后的 Policy 使用方式

Policy 的本质

训练完成后,Policy 是一个映射函数

f: Observations → Actions

关键理解:

  1. ✅ Policy 需要外部提供速度指令 (commands)
  2. ✅ Policy 根据指令和当前状态输出关节动作
  3. ❌ Policy 不会自己决定往哪里走
  4. ❌ Policy 没有全局路径规划能力

推理/部署流程

# 来自 play.py
def play(args):
    # 1. 加载环境和配置
    env_cfg, train_cfg = task_registry.get_cfgs(name=args.task)
    
    # 2. 创建环境
    env, _ = task_registry.make_env(name=args.task, args=args, env_cfg=env_cfg)
    obs = env.get_observations()
    
    # 3. 加载训练好的 Policy
    train_cfg.runner.resume = True
    ppo_runner, _ = task_registry.make_alg_runner(
        env=env, 
        name=args.task, 
        args=args, 
        train_cfg=train_cfg
    )
    policy = ppo_runner.get_inference_policy(device=env.device)
    
    # 4. 推理循环
    for i in range(10 * int(env.max_episode_length)):
        # 获取动作
        actions = policy(obs.detach())
        
        # 执行动作
        obs, _, rews, dones, infos = env.step(actions.detach())

Policy 调用示例

# 伪代码:实际机器人部署
class RobotController:
    def __init__(self):
        self.policy = load_trained_policy("model.pt")
        self.commands = [0.0, 0.0, 0.0, 0.0]  # [vx, vy, yaw, heading]
    
    def set_velocity_command(self, vx, vy, yaw):
        """外部设置速度指令"""
        self.commands = [vx, vy, yaw, 0.0]
    
    def control_step(self):
        """单步控制"""
        # 1. 读取传感器获取观测
        obs = self.get_observations()  # [48] 或 [235]
        # obs 中已包含 self.commands
        
        # 2. Policy 推理
        actions = self.policy(obs)  # [12] 关节目标
        
        # 3. 发送到低层控制器
        self.send_joint_targets(actions)
    
    def move_forward(self):
        """示例:向前走"""
        self.set_velocity_command(vx=0.5, vy=0.0, yaw=0.0)
        for _ in range(100):
            self.control_step()
    
    def turn_left(self):
        """示例:左转"""
        self.set_velocity_command(vx=0.0, vy=0.0, yaw=0.5)
        for _ in range(50):
            self.control_step()
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值