Taskflow 执行器的核心方法 _invoke
,负责执行单个任务节点并处理后续调度逻辑,
-
执行给定任务节点的工作
-
处理任务的前置和后置条件(如信号量获取/释放)
-
管理任务依赖关系
-
调度后续任务
// Procedure: _invoke
inline void Executor::_invoke(Worker& worker, Node* node) {
#define TF_INVOKE_CONTINUATION() \
if (cache) { \
node = cache; \
goto begin_invoke; \
}
begin_invoke:
Node* cache {nullptr};
// if this is the second invoke due to preemption, directly jump to invoke task
if(node->_nstate & NSTATE::PREEMPTED) {
goto invoke_task;
}
// if the work has been cancelled, there is no need to continue
if(node->_is_cancelled()) {
_tear_down_invoke(worker, node, cache);
TF_INVOKE_CONTINUATION();
return;
}
// if acquiring semaphore(s) exists, acquire them first
if(node->_semaphores && !node->_semaphores->to_acquire.empty()) {
SmallVector<Node*> waiters;
if(!node->_acquire_all(waiters)) {
_schedule(worker, waiters.begin(), waiters.end());
return;
}
}
invoke_task:
SmallVector<int> conds;
// switch is faster than nested if-else due to jump table
switch(node->_handle.index()) {
// static task
case Node::STATIC:{
_invoke_static_task(worker, node);
}
break;
// runtime task
case Node::RUNTIME:{
if(_invoke_runtime_task(worker, node)) {
return;
}
}
break;
// subflow task
case Node::SUBFLOW: {
if(_invoke_subflow_task(worker, node)) {
return;
}
}
break;
// condition task
case Node::CONDITION: {
_invoke_condition_task(worker, node, conds);
}
break;
// multi-condition task
case Node::MULTI_CONDITION: {
_invoke_multi_condition_task(worker, node, conds);
}
break;
// module task
case Node::MODULE: {
if(_invoke_module_task(worker, node)) {
return;
}
}
break;
// async task
case Node::ASYNC: {
if(_invoke_async_task(worker, node)) {
return;
}
_tear_down_async(worker, node, cache);
TF_INVOKE_CONTINUATION();
return;
}
break;
// dependent async task
case Node::DEPENDENT_ASYNC: {
if(_invoke_dependent_async_task(worker, node)) {
return;
}
_tear_down_dependent_async(worker, node, cache);
TF_INVOKE_CONTINUATION();
return;
}
break;
// monostate (placeholder)
default:
break;
}
// if releasing semaphores exist, release them
if(node->_semaphores && !node->_semaphores->to_release.empty()) {
SmallVector<Node*> waiters;
node->_release_all(waiters);
_schedule(worker, waiters.begin(), waiters.end());
}
// Reset the join counter with strong dependencies to support cycles.
// + We must do this before scheduling the successors to avoid race
// condition on _predecessors.
// + We must use fetch_add instead of direct assigning
// because the user-space call on "invoke" may explicitly schedule
// this task again (e.g., pipeline) which can access the join_counter.
node->_join_counter.fetch_add(
node->num_predecessors() - (node->_nstate & ~NSTATE::MASK), std::memory_order_relaxed
);
// acquire the parent flow counter
auto& join_counter = (node->_parent) ? node->_parent->_join_counter :
node->_topology->_join_counter;
// Invoke the task based on the corresponding type
switch(node->_handle.index()) {
// condition and multi-condition tasks
case Node::CONDITION:
case Node::MULTI_CONDITION: {
for(auto cond : conds) {
if(cond >= 0 && static_cast<size_t>(cond) < node->_num_successors) {
auto s = node->_edges[cond];
// zeroing the join counter for invariant
s->_join_counter.store(0, std::memory_order_relaxed);
join_counter.fetch_add(1, std::memory_order_relaxed);
_update_cache(worker, cache, s);
}
}
}
break;
// non-condition task
default: {
for(size_t i=0; i<node->_num_successors; ++i) {
if(auto s = node->_edges[i]; s->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
join_counter.fetch_add(1, std::memory_order_relaxed);
_update_cache(worker, cache, s);
}
}
}
break;
}
// clean up the node after execution
_tear_down_invoke(worker, node, cache);
TF_INVOKE_CONTINUATION();
}
主要组成部分
1. 宏定义和初始化
#define TF_INVOKE_CONTINUATION() \
if (cache) { \
node = cache; \
goto begin_invoke; \
}
begin_invoke:
Node* cache {nullptr};
-
TF_INVOKE_CONTINUATION
宏用于实现任务执行的连续性 -
cache
变量用于存储下一个要执行的任务节点
2. 预处理检查
if(node->_nstate & NSTATE::PREEMPTED) {
goto invoke_task;
}
if(node->_is_cancelled()) {
_tear_down_invoke(worker, node, cache);
TF_INVOKE_CONTINUATION();
return;
}
-
检查任务是否被抢占(PREEMPTED),如果是则直接执行
-
检查任务是否被取消,如果是则清理并返回
3. 信号量获取
if(node->_semaphores && !node->_semaphores->to_acquire.empty()) {
SmallVector<Node*> waiters;
if(!node->_acquire_all(waiters)) {
_schedule(worker, waiters.begin(), waiters.end());
return;
}
}
-
如果任务需要获取信号量,尝试获取
-
如果获取失败,调度等待的任务并返回
4. 任务执行(核心部分)
invoke_task:
switch(node->_handle.index()) {
case Node::STATIC: { ... }
case Node::RUNTIME: { ... }
case Node::SUBFLOW: { ... }
case Node::CONDITION: { ... }
case Node::MULTI_CONDITION: { ... }
case Node::MODULE: { ... }
case Node::ASYNC: { ... }
case Node::DEPENDENT_ASYNC: { ... }
default: break;
}
-
根据任务类型调用对应的执行方法
-
处理不同类型的任务(静态、运行时、子流、条件等)
5. 信号量释放
if(node->_semaphores && !node->_semaphores->to_release.empty()) {
SmallVector<Node*> waiters;
node->_release_all(waiters);
_schedule(worker, waiters.begin(), waiters.end());
}
-
如果任务持有信号量,释放它们
-
唤醒等待这些信号量的任务
6. 依赖关系管理
node->_join_counter.fetch_add(
node->num_predecessors() - (node->_nstate & ~NSTATE::MASK), std::memory_order_relaxed
);
-
重置任务的 join counter,支持循环依赖
-
使用原子操作保证线程安全
7. 后继任务调度
switch(node->_handle.index()) {
case Node::CONDITION:
case Node::MULTI_CONDITION: {
// 条件任务特殊处理
}
break;
default: {
// 普通任务处理
for(size_t i=0; i<node->_num_successors; ++i) {
if(auto s = node->_edges[i]; s->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
join_counter.fetch_add(1, std::memory_order_relaxed);
_update_cache(worker, cache, s);
}
}
}
break;
}
-
对于条件任务,根据条件结果调度特定后继
-
对于普通任务,减少所有后继的 join counter,当 counter 归零时调度
8. 清理和继续
_tear_down_invoke(worker, node, cache);
TF_INVOKE_CONTINUATION();
-
清理当前任务
-
如果有缓存的任务,继续执行
关键设计特点
-
状态管理:
-
使用
_nstate
标志位管理任务状态(如 PREEMPTED) -
原子计数器管理任务依赖
-
-
性能优化:
-
使用
switch
代替if-else
实现跳转表 -
使用
goto
减少函数调用开销 -
内存顺序标记优化原子操作
-
-
任务类型支持:
-
支持多种任务类型(静态、动态、条件、异步等)
-
每种类型有专门的处理逻辑
-
-
信号量支持:
-
任务可以声明需要获取/释放的信号量
-
自动管理信号量等待队列
-
-
依赖关系处理:
-
支持普通依赖和条件依赖
-
支持循环依赖图
-
执行流程总结
-
检查任务状态(抢占、取消)
-
获取所需信号量
-
根据任务类型执行任务
-
释放持有的信号量
-
更新依赖计数器
-
调度符合条件的后继任务
-
清理当前任务
-
如果有缓存任务,继续执行
这个方法是 Taskflow 高效任务调度的核心,通过精细的状态管理和优化实现了高性能的任务执行。