Executor类 -- _invoke方法

   Taskflow 执行器的核心方法 _invoke,负责执行单个任务节点并处理后续调度逻辑,

  1. 执行给定任务节点的工作

  2. 处理任务的前置和后置条件(如信号量获取/释放)

  3. 管理任务依赖关系

  4. 调度后续任务

// Procedure: _invoke
inline void Executor::_invoke(Worker& worker, Node* node) {

  #define TF_INVOKE_CONTINUATION()  \
  if (cache) {                      \
    node = cache;                   \
    goto begin_invoke;              \
  }

  begin_invoke:

  Node* cache {nullptr};
  
  // if this is the second invoke due to preemption, directly jump to invoke task
  if(node->_nstate & NSTATE::PREEMPTED) {
    goto invoke_task;
  }

  // if the work has been cancelled, there is no need to continue
  if(node->_is_cancelled()) {
    _tear_down_invoke(worker, node, cache);
    TF_INVOKE_CONTINUATION();
    return;
  }

  // if acquiring semaphore(s) exists, acquire them first
  if(node->_semaphores && !node->_semaphores->to_acquire.empty()) {
    SmallVector<Node*> waiters;
    if(!node->_acquire_all(waiters)) {
      _schedule(worker, waiters.begin(), waiters.end());
      return;
    }
  }
  
  invoke_task:
  
  SmallVector<int> conds;

  // switch is faster than nested if-else due to jump table
  switch(node->_handle.index()) {
    // static task
    case Node::STATIC:{
      _invoke_static_task(worker, node);
    }
    break;
    
    // runtime task
    case Node::RUNTIME:{
      if(_invoke_runtime_task(worker, node)) {
        return;
      }
    }
    break;

    // subflow task
    case Node::SUBFLOW: {
      if(_invoke_subflow_task(worker, node)) {
        return;
      }
    }
    break;

    // condition task
    case Node::CONDITION: {
      _invoke_condition_task(worker, node, conds);
    }
    break;

    // multi-condition task
    case Node::MULTI_CONDITION: {
      _invoke_multi_condition_task(worker, node, conds);
    }
    break;

    // module task
    case Node::MODULE: {
      if(_invoke_module_task(worker, node)) {
        return;
      }
    }
    break;

    // async task
    case Node::ASYNC: {
      if(_invoke_async_task(worker, node)) {
        return;
      }
      _tear_down_async(worker, node, cache);
      TF_INVOKE_CONTINUATION();
      return;
    }
    break;

    // dependent async task
    case Node::DEPENDENT_ASYNC: {
      if(_invoke_dependent_async_task(worker, node)) {
        return;
      }
      _tear_down_dependent_async(worker, node, cache);
      TF_INVOKE_CONTINUATION();
      return;
    }
    break;

    // monostate (placeholder)
    default:
    break;
  }

  // if releasing semaphores exist, release them
  if(node->_semaphores && !node->_semaphores->to_release.empty()) {
    SmallVector<Node*> waiters;
    node->_release_all(waiters);
    _schedule(worker, waiters.begin(), waiters.end());
  }

  // Reset the join counter with strong dependencies to support cycles.
  // + We must do this before scheduling the successors to avoid race
  //   condition on _predecessors.
  // + We must use fetch_add instead of direct assigning
  //   because the user-space call on "invoke" may explicitly schedule 
  //   this task again (e.g., pipeline) which can access the join_counter.
  node->_join_counter.fetch_add(
    node->num_predecessors() - (node->_nstate & ~NSTATE::MASK), std::memory_order_relaxed
  );

  // acquire the parent flow counter
  auto& join_counter = (node->_parent) ? node->_parent->_join_counter :
                       node->_topology->_join_counter;

  // Invoke the task based on the corresponding type
  switch(node->_handle.index()) {

    // condition and multi-condition tasks
    case Node::CONDITION:
    case Node::MULTI_CONDITION: {
      for(auto cond : conds) {
        if(cond >= 0 && static_cast<size_t>(cond) < node->_num_successors) {
          auto s = node->_edges[cond]; 
          // zeroing the join counter for invariant
          s->_join_counter.store(0, std::memory_order_relaxed);
          join_counter.fetch_add(1, std::memory_order_relaxed);
          _update_cache(worker, cache, s);
        }
      }
    }
    break;

    // non-condition task
    default: {
      for(size_t i=0; i<node->_num_successors; ++i) {
        if(auto s = node->_edges[i]; s->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
          join_counter.fetch_add(1, std::memory_order_relaxed);
          _update_cache(worker, cache, s);
        }
      }
    }
    break;
  }
  
  // clean up the node after execution
  _tear_down_invoke(worker, node, cache);
  TF_INVOKE_CONTINUATION();
} 

主要组成部分

1. 宏定义和初始化

#define TF_INVOKE_CONTINUATION()  \
  if (cache) {                    \
    node = cache;                 \
    goto begin_invoke;            \
  }

begin_invoke:
Node* cache {nullptr};
  • TF_INVOKE_CONTINUATION 宏用于实现任务执行的连续性

  • cache 变量用于存储下一个要执行的任务节点

2. 预处理检查

if(node->_nstate & NSTATE::PREEMPTED) {
  goto invoke_task;
}

if(node->_is_cancelled()) {
  _tear_down_invoke(worker, node, cache);
  TF_INVOKE_CONTINUATION();
  return;
}
  • 检查任务是否被抢占(PREEMPTED),如果是则直接执行

  • 检查任务是否被取消,如果是则清理并返回

3. 信号量获取

if(node->_semaphores && !node->_semaphores->to_acquire.empty()) {
  SmallVector<Node*> waiters;
  if(!node->_acquire_all(waiters)) {
    _schedule(worker, waiters.begin(), waiters.end());
    return;
  }
}
  • 如果任务需要获取信号量,尝试获取

  • 如果获取失败,调度等待的任务并返回

4. 任务执行(核心部分)

invoke_task:
switch(node->_handle.index()) {
  case Node::STATIC: { ... }
  case Node::RUNTIME: { ... }
  case Node::SUBFLOW: { ... }
  case Node::CONDITION: { ... }
  case Node::MULTI_CONDITION: { ... }
  case Node::MODULE: { ... }
  case Node::ASYNC: { ... }
  case Node::DEPENDENT_ASYNC: { ... }
  default: break;
}
  • 根据任务类型调用对应的执行方法

  • 处理不同类型的任务(静态、运行时、子流、条件等)

5. 信号量释放

if(node->_semaphores && !node->_semaphores->to_release.empty()) {
  SmallVector<Node*> waiters;
  node->_release_all(waiters);
  _schedule(worker, waiters.begin(), waiters.end());
}
  • 如果任务持有信号量,释放它们

  • 唤醒等待这些信号量的任务

6. 依赖关系管理

node->_join_counter.fetch_add(
  node->num_predecessors() - (node->_nstate & ~NSTATE::MASK), std::memory_order_relaxed
);
  • 重置任务的 join counter,支持循环依赖

  • 使用原子操作保证线程安全

7. 后继任务调度

switch(node->_handle.index()) {
  case Node::CONDITION:
  case Node::MULTI_CONDITION: {
    // 条件任务特殊处理
  }
  break;
  
  default: {
    // 普通任务处理
    for(size_t i=0; i<node->_num_successors; ++i) {
      if(auto s = node->_edges[i]; s->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
        join_counter.fetch_add(1, std::memory_order_relaxed);
        _update_cache(worker, cache, s);
      }
    }
  }
  break;
}
  • 对于条件任务,根据条件结果调度特定后继

  • 对于普通任务,减少所有后继的 join counter,当 counter 归零时调度

8. 清理和继续

_tear_down_invoke(worker, node, cache);
TF_INVOKE_CONTINUATION();
  • 清理当前任务

  • 如果有缓存的任务,继续执行

关键设计特点

  1. 状态管理

    • 使用 _nstate 标志位管理任务状态(如 PREEMPTED)

    • 原子计数器管理任务依赖

  2. 性能优化

    • 使用 switch 代替 if-else 实现跳转表

    • 使用 goto 减少函数调用开销

    • 内存顺序标记优化原子操作

  3. 任务类型支持

    • 支持多种任务类型(静态、动态、条件、异步等)

    • 每种类型有专门的处理逻辑

  4. 信号量支持

    • 任务可以声明需要获取/释放的信号量

    • 自动管理信号量等待队列

  5. 依赖关系处理

    • 支持普通依赖和条件依赖

    • 支持循环依赖图

执行流程总结

  1. 检查任务状态(抢占、取消)

  2. 获取所需信号量

  3. 根据任务类型执行任务

  4. 释放持有的信号量

  5. 更新依赖计数器

  6. 调度符合条件的后继任务

  7. 清理当前任务

  8. 如果有缓存任务,继续执行

这个方法是 Taskflow 高效任务调度的核心,通过精细的状态管理和优化实现了高性能的任务执行。

 

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值