时间如白驹过隙。19年asyncio
作为 Python 标准库中的异步编程模块,仍处于探索阶段,许多项目尚在不稳定的分支上徘徊。现如今,Python 的异步生态系统经历了翻天覆地的变化,从 Python 3.5 引入 async
和 await
关键字,到 3.7 引入 asyncio.run()
函数,异步编程逐渐走向成熟。与此同时社区也涌现出如 Trio、AnyIO 等第三方库,进一步丰富了异步编程的选择,协程化异步编程已然变成主流。
目录
摘要
本博客将深入探讨标准库中的 asyncio 模块、第三方库 Trio,以及兼容两者的 AnyIO,帮助开发者构建更高效、可维护的异步应用。
我们将介绍 asyncio 在 Python 3.12 中的新特性,如 eager_task_factory
提供的主动任务执行机制、loop_factory
支持的自定义事件循环工厂,以及 current_task()
的 C 语言实现带来的性能提升。
Trio 作为一个现代异步框架,强调结构化并发(Structured Concurrency),提供更简洁的 API 和更强的错误处理能力。(阿里云开发者社区)
AnyIO 则作为桥梁,允许开发者在 asyncio 和 Trio 之间无缝切换后端。通过统一的 API,AnyIO 实现了在不同异步框架之间的代码复用,降低了迁移成本。(Ravenports)
通过对比 asyncio、Trio 和 AnyIO 的设计理念与使用场景,本文将展示如何在实际项目中选择合适的异步框架,并结合 Python 3.12 的新特性,实现现代化的异步编程。
协程
协程:又称为微线程,在一个线程中执行,执行函数时可以随时中断,由程序(用户)自身控制,执行效率极高,与多线程比较,没有切换线程的开销和多线程锁机制。
Python 的协程机制经历了从生成器(generator)到现代异步编程模型的演进。在早期,协程是基于生成器实现的,使用 yield
和 yield from
关键字来控制协程的执行和暂停。具体来说,yield
用于生成器函数中,允许函数在每次调用时返回一个值,并在下次调用时从上次停止的地方继续执行。而 yield from
是在 Python 3.3 中引入的,用于简化从一个生成器中调用另一个生成器的过程,使得协程之间的调用更加直观和高效。(博客园)
随着 Python 3.5 的发布,引入了 async
和 await
关键字,标志着协程从生成器的语法中独立出来,成为语言层面的特性。使用 async def
定义的函数被称为协程函数,返回一个协程对象;而 await
用于等待一个可等待对象的结果,使得异步代码的编写更加清晰和易于理解。这种新的语法不仅提高了代码的可读性,还增强了对异步操作的支持。
因此,虽然 Python 的协程最初是基于生成器的 yield
和 yield from
实现的,但现代的协程机制已经发展为使用 async
和 await
的形式,提供了更强大和直观的异步编程能力。
这里简单介绍下生成器的一些概念:
生成器是 Python 中用于实现惰性迭代的强大工具。它们允许函数在每次调用时返回一个值,并在下次调用时从上次停止的地方继续执行。以下是对 yield
、yield from
、next()
和 send()
的详细介绍:
🔹 yield
:生成器的核心
yield
关键字用于将一个函数定义为生成器函数。当函数执行到 yield
语句时,会暂停其执行状态,并返回 yield
后的值。下一次调用生成器时,会从上次暂停的地方继续执行。
示例:
def count_up_to(n):
count = 0
while count < n:
yield count
count += 1
counter = count_up_to(3)
print(next(counter)) # 输出: 0
print(next(counter)) # 输出: 1
🔹 next()
:获取下一个值
next()
函数用于从生成器中获取下一个值。每次调用 next()
,生成器会从上次暂停的地方继续执行,直到遇到下一个 yield
。如果生成器没有更多的值可供返回,调用 next()
会引发 StopIteration
异常。(codeleading.com)
🔹 send()
:向生成器发送值
send()
方法不仅可以恢复生成器的执行,还可以向生成器内部发送一个值。这个值会被赋给生成器内部 yield
表达式的左侧变量。需要注意的是,在生成器开始执行之前,必须先调用一次 next()
或 send(None)
来启动生成器。(codeleading.com)
在首次启动生成器时,必须使用 next() 或 send(None),因为生成器尚未执行到第一个 yield,无法接收外部发送的数据。
示例:
def echo():
received = yield
while True:
received = yield received
gen = echo()
next(gen) # 启动生成器
print(gen.send('Hello')) # 输出: Hello
print(gen.send('World')) # 输出: World
🔹 yield from
:委托子生成器
yield from
是在 Python 3.3 中引入的语法,用于简化从子生成器中获取值的过程。它可以将子生成器的所有值直接传递给调用者,避免了显式的循环。(simeonvisser.com)
示例:
def generator1():
yield from range(3)
for value in generator1():
print(value)
# 输出:
# 0
# 1
# 2
使用 yield from
可以使代码更加简洁,尤其是在需要从多个生成器中获取值时。
通过理解和合理使用 yield
、yield from
、next()
和 send()
,可以更高效地处理迭代和异步编程任务。这些工具在构建高性能、可维护的 Python 应用中发挥着重要作用。
这里引用廖雪峰Python教程中的例子:
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'
def produce(c):
c.send(None) # 唤醒消费者
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
c = consumer()
produce(c)
🔄 执行流程
-
创建生成器对象:
c = consumer()
此时,
consumer()
函数并未执行,返回的是一个生成器对象c
。 -
启动生成器:
c.send(None)
- 启动生成器,执行到
n = yield r
处暂停。 - 此时,
r
的初始值为''
,因此yield r
返回''
给调用方。 - 由于是首次启动,
n
被赋值为None
。
- 启动生成器,执行到
-
生产者开始生产数据:
n = 0 while n < 5: n = n + 1 print('[PRODUCER] Producing %s...' % n) r = c.send(n) print('[PRODUCER] Consumer return: %s' % r)
- 每次循环,生产者生成一个新的
n
,并通过c.send(n)
将其发送给消费者。 - 消费者接收到
n
,打印消费信息,并将r
设置为'200 OK'
。 - 下一次
yield r
返回'200 OK'
给生产者,生产者打印返回值。
- 每次循环,生产者生成一个新的
-
关闭生成器:
c.close()
- 关闭生成器,释放资源。
🔍 yield
和 send()
的双重角色
在消费者函数中:
n = yield r
yield r
:将当前的r
值发送给调用方,并暂停执行。n = yield r
:当生成器被重新激活时,send()
发送的值被赋给n
。
这种机制使得生成器可以与外部进行双向通信,既可以发送数据,也可以接收数据。
asyncio
asyncio
是用来编写并发代码的库,使用 async/await
语法。官方文档:https://docs.python.org/zh-cn/3/library/asyncio.html。
asyncio
被用作多个提供高性能 Python 异步框架的基础,包括:
- 网络和网站服务、
- 数据库连接库、
- 分布式任务队列等等。
asyncio
往往是构建 IO 密集型 和 高层级结构化网络代码 的最佳选择。
asyncio 提供的一组高层级 API 用于:
- 并发地运行 Python 协程,并对其执行过程实现完全控制;
- 执行网络 IO 和 IPC;
- 控制子进程;
- 通过队列实现分布式任务;
- 同步并发代码。
此外,还有一些低层级 API 支持库和框架开发者实现:
- 创建和管理事件循环,它提供用于连接网络、运行子进程、处理 OS 信号等功能的异步 API;
- 使用 transports 实现高效率协议;
- 通过
async/await
语法桥接基于回调的库和代码。
高级API
高级API索引:https://docs.python.org/zh-cn/3/library/asyncio-api-index.html
Runner运行器
Runner构建于事件循环 之上,其目标是简化针对常见通用场景的异步代码的用法。
run方法
asyncio.run(coro, *, debug=None, loop_factory=None)
执行协程 coro
并返回其结果。
这个函数会运行传入的协程,负责:
- 管理
asyncio
事件循环, - 清理异步生成器,
- 关闭线程池执行器(executor)。
⚠️ 注意:当当前线程中已有另一个 asyncio
事件循环正在运行时,不能调用此函数。
参数说明:
debug=True
:事件循环将以调试模式运行。debug=False
:显式禁用调试模式。debug=None
(默认):遵循全局调试模式配置。loop_factory
:如果提供,将用于创建新的事件循环;否则使用asyncio.new_event_loop()
。事件循环会在结束时被关闭。
使用建议:
- 该函数应作为 asyncio 程序的主要入口;
- 推荐只调用一次;
- 推荐使用
loop_factory
来配置事件循环,而不是使用全局策略(policy)机制; - 如果传入
asyncio.EventLoop
作为loop_factory
,可以在不依赖全局策略的情况下运行asyncio
。
executor注意事项:
- 当你调用
asyncio.run(main())
背后使用的事件循环(ProactorEventLoop 或 DefaultEventLoop)会默认配备一个线程池(通常是 ThreadPoolExecutor),用于执行:- 阻塞函数(通过 loop.run_in_executor());
- 一些 asyncio 内部操作(如 DNS 查询等)。
- asyncio.run() 调用结束时,会尝试优雅关闭这个自动创建的线程池执行器。
- executor的关闭过程最多等待 5 分钟;
- 若在该时间内未关闭完成,将输出警告并强制关闭。
例如如下代码会在5分钟后警告:
import asyncio
import time
def long_blocking_task():
time.sleep(600) # 睡10分钟
async def main():
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, long_blocking_task)
asyncio.run(main())
# RuntimeWarning: Executor shutdown timed out after 5 minutes. Executor will be closed.
Runner
上下文管理器
class asyncio.Runner(*, debug=None, loop_factory=None)
这是一个上下文管理器,用于简化在相同上下文中多次调用异步函数的操作。
有时候你需要在同一个事件循环和同一个 contextvars.Context
下运行多个顶层 async
函数,这时候 Runner
就非常适合。
参数说明:
-
debug=True
:事件循环将以调试模式运行。 -
debug=False
:明确关闭调试模式。 -
debug=None
(默认):遵循全局调试模式配置。 -
loop_factory
:可选的事件循环工厂,用来自定义事件循环的创建逻辑。- 如果指定了
loop_factory
,则由你负责将创建的事件循环设为当前循环; - 若未指定,则默认使用
asyncio.new_event_loop()
,并通过asyncio.set_event_loop()
设置为当前事件循环。
- 如果指定了
你可以将 asyncio.run()
重写为使用 Runner
:
async def main():
await asyncio.sleep(1)
print('hello')
with asyncio.Runner() as runner:
runner.run(main())
✅
Runner
是从 Python 3.11 开始新增的。
方法说明
run(coro, *, context=None)
- 在 Runner 内部的事件循环中运行协程
coro
。 - 返回协程运行的结果,或抛出运行时异常。
- 可选的关键字参数
context
:可以显式传入一个自定义的contextvars.Context
,用于隔离上下文变量。如果为None
,则使用 Runner 默认上下文。
⚠️ 注意:如果当前线程已有一个事件循环在运行,则不能调用此方法。
close()
关闭 Runner,执行一系列资源清理操作:
- 清理所有异步生成器;
- 关闭默认线程池执行器;
- 关闭事件循环;
- 释放嵌套的
contextvars.Context
。
get_loop()
返回 Runner 实例关联的事件循环对象。
其他说明
-
Runner
采用懒初始化策略:- 构造函数本身不会立即创建底层结构;
- 事件循环和上下文会在
with
块进入时,或首次调用run()
/get_loop()
时初始化。
处理键盘中断(Keyboard Interruption)
新增于 Python 3.11
当用户通过 Ctrl-C 触发 signal.SIGINT
信号时,默认情况下主线程会抛出 KeyboardInterrupt
异常。
但是,这种默认行为在 asyncio
中不起作用,因为它可能会中断 asyncio
内部机制,导致程序无法正常退出。
为了解决这个问题,asyncio
对 signal.SIGINT
的处理方式如下:
-
asyncio.Runner.run()
会在执行任何用户代码之前,安装一个自定义的signal.SIGINT
信号处理器,并在函数退出时移除该处理器。 -
Runner
会为传入的协程创建一个主任务(main task)来执行。 -
当通过 Ctrl-C 触发
signal.SIGINT
时,自定义的信号处理器会调用asyncio.Task.cancel()
取消主任务。- 这会导致主任务内部抛出
asyncio.CancelledError
异常, - Python 调用栈开始展开,
- 你可以在
try/except
或try/finally
代码块中捕获此异常以完成资源清理。
- 这会导致主任务内部抛出
-
主任务被取消后,
asyncio.Runner.run()
会抛出KeyboardInterrupt
异常。
如果用户写了一个无法被 asyncio.Task.cancel()
中断的紧密循环(比如没有让出控制权的死循环),那么第二次按下 Ctrl-C 时,会立即抛出 KeyboardInterrupt
,而不会先取消主任务。
比如:
import asyncio
import signal
async def main_task():
try:
print("任务开始,按 Ctrl-C 取消...")
# 模拟一个长时间运行的异步任务
while True:
await asyncio.sleep(1)
print("任务正在运行...")
except asyncio.CancelledError:
print("任务被取消,进行清理工作...")
# 这里可以放资源释放或其他清理代码
await asyncio.sleep(1)
print("清理完成。")
# 抛出 CancelledError 后,Runner 会抛出 KeyboardInterrupt
raise
if __name__ == "__main__":
try:
with asyncio.Runner() as runner:
runner.run(main_task())
except KeyboardInterrupt:
print("捕获到 KeyboardInterrupt,程序安全退出。")
- 程序启动后,会每秒打印“任务正在运行…”
- 当你按下 Ctrl-C,
signal.SIGINT
会被捕获,自定义信号处理器调用main_task
的取消操作。 - 任务捕获
asyncio.CancelledError
异常后,执行清理代码。 - 清理完成后,异常继续往外传,
asyncio.Runner.run()
抛出KeyboardInterrupt
。 - 最外层
try/except KeyboardInterrupt
捕获这个异常,输出提示,程序正常结束。
协程与任务
Awaitables可等待对象
如果一个对象可以用于 await
表达式中,那么它就是一个 awaitable(可等待)对象。许多 asyncio
的 API 都设计成接受 awaitable 对象。
awaitable 对象主要有三种类型:协程(coroutine)、任务(Task) 和 Future。
协程(Coroutines)
Python 的协程本身是 awaitable 的,因此可以在其他协程中使用 await
来等待它们的完成:
import asyncio
async def nested():
return 42
async def main():
# 直接调用 nested() 什么也不会发生,
# 它只会创建一个协程对象但未被 await,
# 因此 *不会执行*。
nested() # 会触发 "RuntimeWarning" 警告。
# 现在用 await 来等待它的结果:
print(await nested()) # 会打印 "42"。
asyncio.run(main())
重要说明
文档中“协程(coroutine)”一词有两个密切相关的含义:
- 协程函数:用
async def
定义的函数;- 协程对象:调用协程函数时返回的对象。
任务(Tasks)
任务用来并发调度协程。
当协程被包裹进任务中,比如用 asyncio.create_task()
创建时,协程会自动被安排尽快运行:
import asyncio
async def nested():
return 42
async def main():
# 调度 nested() 协程尽快运行,与 main() 并发执行
task = asyncio.create_task(nested())
# 通过 task 可以取消 nested(),或者
# 使用 await 等待它完成:
await task
asyncio.run(main())
Futures
Future 是一种特殊的低层 awaitable 对象,表示一个异步操作最终会得到的结果。
当等待一个 Future 对象时,意味着当前协程会暂停,直到该 Future 在别处被“完成”。
asyncio
中的 Future 主要用于使基于回调的代码能与 async/await
机制兼容。
一般情况下,应用层代码不需要主动创建 Future 对象。
某些库或 asyncio
的部分 API 会返回 Future 对象,这些对象可以被 await
:
async def main():
await function_that_returns_a_future_object()
# 下面这种写法也有效:
await asyncio.gather(
function_that_returns_a_future_object(),
some_python_coroutine()
)
一个典型的返回 Future 对象的底层函数例子是:loop.run_in_executor()
。
创建任务
asyncio.create_task(coro, *, name=None, context=None)
将协程 coro
包装成一个 Task,并安排它的执行。返回该 Task 对象。
-
如果参数
name
不为 None,会通过Task.set_name()
设置任务名称。 -
可选的关键字参数
context
允许指定一个自定义的contextvars.Context
,用于协程的执行。如果未提供,则会使用当前上下文的副本。 -
任务会在当前线程中由
get_running_loop()
返回的事件循环中执行;如果当前线程没有运行中的事件循环,会抛出RuntimeError
。
备注:
-
asyncio.TaskGroup.create_task()
是一个新的替代方案,利用结构化并发,可以安全地等待一组相关任务完成。 -
重要提示:
需要保存对此函数返回结果(Task对象)的引用,避免任务在执行过程中被垃圾回收。
事件循环只持有任务的弱引用,未被其他地方引用的任务可能随时被回收,甚至在执行完成前。
为了可靠地实现“触发即忘”(fire-and-forget)类型的后台任务,应将它们收集在一个容器中,例如:background_tasks = set() for i in range(10): task = asyncio.create_task(some_coro(param=i)) # 将任务添加到集合,创建强引用 background_tasks.add(task) # 为避免对已完成任务的引用无限增长, # 在任务完成后移除自身引用: task.add_done_callback(background_tasks.discard)
任务取消(Task Cancellation)
任务可以很方便且安全地被取消。当任务被取消时,asyncio.CancelledError
会在任务的下一次机会被抛出。
建议协程使用 try/finally
结构来稳健地执行清理逻辑。如果显式捕获了 asyncio.CancelledError
,通常应在清理完成后重新抛出它。asyncio.CancelledError
直接继承自 BaseException
,因此大多数代码不需要特别处理它。
支持结构化并发的 asyncio 组件,比如 asyncio.TaskGroup
和 asyncio.timeout()
,都是基于取消机制实现的,如果协程“吞掉”了 asyncio.CancelledError
,可能会导致异常行为。同样,用户代码一般不应调用 uncancel
。不过,如果确实需要抑制 asyncio.CancelledError
,则必须调用 uncancel()
以完全清除取消状态。
-
任务取消的本质
- 取消任务 = 在任务执行点抛出一个特殊的异常 当你调用
task.cancel()
时,Python 的 asyncio 会在该任务协程内部下一次能“停下来”的地方抛出一个asyncio.CancelledError
异常。这就像强制告诉任务“你得马上终止”。
- 取消任务 = 在任务执行点抛出一个特殊的异常 当你调用
-
协程中要做好“优雅退出”
- 由于取消是通过异常抛出实现的,协程应该用
try/finally
包裹关键代码,确保即使被取消,也能执行清理工作,比如释放资源、关闭连接等。 - 如果你捕获了
CancelledError
,一般要记得重新抛出(raise
),这样让 asyncio 知道任务确实被取消了,否则 asyncio 结构化并发机制(TaskGroup
等)就会乱掉。
- 由于取消是通过异常抛出实现的,协程应该用
-
CancelledError
不同于普通异常- 它继承自
BaseException
(而不是Exception
),这意味着大部分普通的异常捕获代码不会捕获到它,防止你无意“吃掉”取消信号。
- 它继承自
-
结构化并发和取消的关系
asyncio.TaskGroup
、asyncio.timeout()
这些高级结构都依赖取消机制来协调多个任务的生命周期。- 如果协程无视或者“吞掉”取消异常,会导致这些机制失效,可能造成任务没被正确取消,程序挂起等问题。
-
uncancel()
的特殊场景uncancel()
是一个可以“撤销取消状态”的方法,但很少用,通常也不推荐用户随意调用。- 只有在你非常确定要“抑制取消”,不让任务终止时,才会用到。这个操作要谨慎,否则会破坏 asyncio 的取消语义。
比如:
import asyncio
async def worker():
print('Worker: 启动')
try:
# 模拟任务做点事
while True:
print('Worker: 工作中...')
await asyncio.sleep(1)
except asyncio.CancelledError:
print('Worker: 收到取消信号,开始清理...')
# 做一些清理工作,比如关闭文件、释放资源等
await asyncio.sleep(0.5) # 模拟清理耗时
print('Worker: 清理完成,重新抛出取消异常')
raise # 重新抛出取消异常,通知上层任务确实被取消了
finally:
print('Worker: finally 代码块执行(不管有没有异常)')
async def main():
task = asyncio.create_task(worker())
await asyncio.sleep(3) # 让worker跑一会儿
print('Main: 取消worker任务')
task.cancel() # 发送取消请求
try:
await task
except asyncio.CancelledError:
print('Main: 捕获到worker任务取消异常')
asyncio.run(main())
Task Groups(任务组)
任务组结合了任务创建 API 和一种方便且可靠的等待所有组内任务完成的方法。
class asyncio.TaskGroup
这是一个异步上下文管理器,用于管理一组任务。可以通过 create_task()
方法将任务添加到任务组中。退出上下文管理器时,所有任务都会被等待完成。
create_task(coro, *, name=None, context=None)
在任务组中创建一个任务。签名与 asyncio.create_task()
相同。如果任务组处于非激活状态(例如尚未进入、已经完成或正在关闭过程中),会关闭传入的协程。
示例:
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(some_coro(...))
task2 = tg.create_task(another_coro(...))
print(f"两个任务已完成: {task1.result()}, {task2.result()}")
async with
语句会等待组内所有任务完成。- 等待期间可以继续添加新任务(例如传递
tg
给协程,由协程调用tg.create_task()
)。 - 当最后一个任务完成且退出
async with
代码块后,不允许再添加新任务。
任务失败与取消
- 当组内任一任务抛出非
asyncio.CancelledError
的异常,组内剩余任务会被取消,且不允许添加新任务。 - 如果此时
async with
语句体仍在执行中,包含该语句的任务也会被取消,抛出asyncio.CancelledError
,但不会让异常穿出async with
。 - 所有任务结束后,若有任务异常(非取消异常),会将这些异常合并成
ExceptionGroup
或BaseExceptionGroup
抛出。
特殊异常处理
- 如果任务失败异常是
KeyboardInterrupt
或SystemExit
,任务组仍取消剩余任务并等待,但会重新抛出这两个异常,而非异常组。 - 如果
async with
体自身因异常退出(即__aexit__()
收到异常),处理逻辑与任务异常相同,且__aexit__()
传入的异常也包含在异常组中。
嵌套任务组与取消管理
- 任务组能区分内部用于唤醒
__aexit__()
的取消与外部任务取消请求。 - 嵌套任务组同时遇到异常时,内层先处理异常,外层再收到取消请求处理自身异常。
- 若任务组外部被取消且需抛出异常组,任务组会调用父任务
cancel()
方法,确保取消不会丢失。
取消计数保持
- 任务组会保持
asyncio.Task.cancelling()
返回的取消计数,保证取消状态正确。
终止任务组示例:标准库不直接支持主动终止任务组,但可以通过向组内添加一个抛出异常的任务实现“强制终止”,并捕获该异常忽略它:
import asyncio
from asyncio import TaskGroup
class TerminateTaskGroup(Exception):
"""用于终止任务组的异常"""
async def force_terminate_task_group():
raise TerminateTaskGroup()
async def job(task_id, sleep_time):
print(f'Task {task_id}: start')
await asyncio.sleep(sleep_time)
print(f'Task {task_id}: done')
async def main():
try:
async with TaskGroup() as group:
group.create_task(job(1, 0.5))
group.create_task(job(2, 1.5))
await asyncio.sleep(1)
group.create_task(force_terminate_task_group())
except* TerminateTaskGroup:
pass
asyncio.run(main())
预期输出:
Task 1: start
Task 2: start
Task 1: done
gather,create_task,TaskGroup对比
特性 | asyncio.gather() | asyncio.create_task() | asyncio.TaskGroup |
---|---|---|---|
并发调度方式 | 自动调度多个协程,聚合结果 | 手动调度单个协程 | 自动调度多个协程,结构化管理 |
是否自动等待结果 | ✅ 是,所有协程完成后统一返回结果 | ❌ 否,需要手动 await 每个任务 | ✅ 是,在 async with 结束前自动等待 |
错误处理方式 | 默认传播首个异常,其他协程继续运行 | 异常在 await 时才暴露 | 首个异常会取消其它任务,并聚合异常 |
是否推荐用于结构化并发 | ❌ 否(容易遗忘 await 或误处理异常) | ❌ 否(自己管理任务生命周期) | ✅ 是,设计用于结构化并发 |
Python版本 | Python 3.5+ | Python 3.7+ | Python 3.11+(新) |
① asyncio.gather()
:适合“并发执行 + 等待所有结果”
results = await asyncio.gather(task1(), task2(), task3())
- ✅ 自动调度并发,✅ 自动等待,✅ 返回所有结果
- ❌ 首个任务报错就中断返回(默认)
② create_task()
:适合“后台调度,不立即等待”
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
await t1
await t2
- ✅ 控制更灵活:任务可以先创建后等待
- ❌ 容易忘记 await,导致未处理的异常
- ❌ 错误传播靠你自己管
③ TaskGroup
:推荐方式,结构化并发,有明确生命周期和异常管理
async with asyncio.TaskGroup() as tg:
tg.create_task(task1())
tg.create_task(task2())
# 此时所有任务已结束,异常已处理
- ✅ 自动调度 + 等待
- ✅ 异常聚合 + 自动取消其余任务
- ✅ 结构化作用域,防止任务泄漏
✅ 什么时候选哪一个?
你的需求 | 推荐使用 |
---|---|
我有多个任务需要并发执行并等待所有结果 | asyncio.gather |
我只想调度一个任务、异步运行就好 | create_task |
我有多个任务需要安全地并发执行,并确保异常处理与生命周期受控 | TaskGroup (最佳实践) |
🔥 推荐策略(2025 年后标准实践)
优先使用
TaskGroup
管理并发任务,它是现代 asyncio 的“结构化并发”基础,错误处理和任务管理更安全。
gather()
用于快速聚合结果,create_task()
用于 fire-and-forget(不推荐用于大量任务)。
睡眠(Sleeping)
async asyncio.sleep(delay, result=None)
使当前协程阻塞 delay
秒。
- 如果提供了
result
参数,则在协程完成后将其返回给调用者。 sleep()
总是会挂起当前任务,允许其他任务运行。- 将
delay
设置为0
,可以优化运行路径,用于让其他任务有机会执行。这在长时间运行的函数中很有用,可以避免长时间阻塞事件循环。
以下协程每秒显示一次当前时间,持续 5 秒:
import asyncio
import datetime
async def display_date():
loop = asyncio.get_running_loop()
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)
asyncio.run(display_date())
并发运行任务(Running Tasks Concurrently)
awaitable asyncio.gather(*aws, return_exceptions=False)
并发运行 aws
序列中的所有 awaitable 对象(可以是 coroutine、Task 或 Future)。
-
如果
aws
中的某个对象是协程(coroutine),gather()
会自动将其封装成Task
并调度执行。 -
所有任务成功完成后,返回值是所有结果的列表,顺序与参数传入的顺序一致。
-
如果
return_exceptions=False
(默认):- 第一个抛出异常的任务,其异常会立即传播,
gather()
会抛出该异常; - 其他任务不会被取消,仍将继续运行。
- 第一个抛出异常的任务,其异常会立即传播,
-
如果
return_exceptions=True
:- 所有异常会被当作普通结果处理,并收集在返回列表中;
- 不会触发异常传播。
-
如果
gather()
本身被取消,所有尚未完成的任务也将被取消。 -
如果
aws
中的某个Task
或Future
被取消,gather()
本身不会被取消,而是将其当作CancelledError
抛出。- 这样可避免一个任务取消导致其他任务也被取消。
注意:
asyncio.TaskGroup
是gather()
的新替代方式,它提供了更安全的结构化并发,能在某个子任务出错时自动取消所有相关任务。TaskGroup
会在任一任务出错时主动取消其他任务,而gather()
不会。
示例:
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({number}), currently i={i}...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
return f
async def main():
# 并发执行三个任务
L = await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)
print(L)
asyncio.run(main())
输出示例:
Task A: Compute factorial(2), currently i=2...
Task B: Compute factorial(3), currently i=2...
Task C: Compute factorial(4), currently i=2...
Task A: factorial(2) = 2
Task B: Compute factorial(3), currently i=3...
Task C: Compute factorial(4), currently i=3...
Task B: factorial(3) = 6
Task C: Compute factorial(4), currently i=4...
Task C: factorial(4) = 24
[2, 6, 24]
主动执行任务工厂(Eager Task Factory)
asyncio.eager_task_factory(loop, coro, *, name=None, context=None)
这是一个用于“主动执行任务(eager execution)”的任务工厂(task factory)。
启用方式:
loop.set_task_factory(asyncio.eager_task_factory)
启用后,当你创建一个任务时:
- 协程会在任务构造的瞬间同步启动执行;
- 只有当协程在运行中遇到
await
(即挂起)时,才会把任务调度到事件循环; - 如果协程在同步阶段就执行完了(如返回缓存结果),整个任务不会进入事件循环。
优点(性能提升场景)
- 避免调度开销,提高性能;
- 特别适合 缓存、记忆化(memoization) 等场景;
- 如果协程能立刻返回结果,就无需进入事件循环。
注意事项
主动执行(立即运行)是一个语义变化(semantic change),可能会改变应用行为:
- 如果协程同步完成或抛出异常,任务不会被调度进事件循环;
- 可能导致任务执行顺序不同,对某些依赖任务顺序的程序需谨慎;
- 特别注意调试、日志记录等对任务状态的依赖。
自定义任务构造器(custom_task_constructor)版本
asyncio.create_eager_task_factory(custom_task_constructor)
- 与
eager_task_factory
类似,但允许你传入一个自定义的任务构造函数(如子类化的Task
); custom_task_constructor
必须与Task.__init__
签名兼容,并返回一个符合asyncio.Task
接口的对象;- 返回的工厂函数可通过
loop.set_task_factory(factory)
设置到事件循环中。
特性 | 说明 |
---|---|
默认行为 | Task 创建时只是注册任务,实际运行由事件循环控制 |
eager 行为 | Task 创建时立即运行协程(除非挂起) |
性能优势 | 减少调度延迟,适合缓存/无需 I/O 的协程 |
风险 | 可能引起行为差异,任务顺序改变、异常提前抛出等 |
设置方式 | loop.set_task_factory(asyncio.eager_task_factory) |
可定制版本 | asyncio.create_eager_task_factory(custom_task_constructor) |
引入版本 | Python 3.12 |
比如:
import asyncio
async def coro(name):
print(f"{name} - started")
await asyncio.sleep(0)
print(f"{name} - ended")
async def main(use_eager=False):
loop = asyncio.get_running_loop()
if use_eager:
print(">>> Setting eager task factory")
loop.set_task_factory(asyncio.eager_task_factory)
print(">>> Creating tasks")
t1 = asyncio.create_task(coro("Task 1"))
t2 = asyncio.create_task(coro("Task 2"))
print(">>> Tasks created")
await t1
await t2
print("\n=== Default Task Factory ===")
asyncio.run(main())
print("\n=== Eager Task Factory ===")
asyncio.run(main(use_eager=True))
输出:
=== Default Task Factory ===
>>> Creating tasks
>>> Tasks created
Task 1 - started
Task 2 - started
Task 1 - ended
Task 2 - ended
=== Eager Task Factory ===
>>> Setting eager task factory
>>> Creating tasks
Task 1 - started
Task 2 - started
>>> Tasks created
Task 1 - ended
Task 2 - ended
默认模式中,create_task() 只是注册任务,实际执行发生在 await t1 之后。
eager_task_factory 下,任务在创建(create_task)时就同步执行协程体的开始部分,只要没有立即 await,就会执行很多逻辑(甚至完成整个任务)。
屏蔽取消(Shielding From Cancellation)
awaitable asyncio.shield(aw)
用于保护一个 awaitable 对象不被取消。
如果 aw
是一个协程(coroutine),它会自动被调度为一个 Task。
如下语句:
task = asyncio.create_task(something())
res = await shield(task)
等价于:
res = await something()
区别在于:
- 如果包含该
await
的协程被取消,something()
中的任务 不会被取消。 - 对
something()
来说,取消操作看似未发生,它会继续执行。 - 不过,
await shield(...)
这一行仍会抛出CancelledError
,因为调用者被取消了。
示例:完全忽略取消(不推荐)
如果你希望彻底忽略取消请求(即任务继续运行,且不传播取消错误),可以结合 try/except
使用:
task = asyncio.create_task(something())
try:
res = await shield(task)
except CancelledError:
res = None
⚠️ 不推荐这样做。这样会让你的任务继续运行,即使整个应用或用户尝试取消它,也无能为力。
注意事项
-
必须保存你传给
shield()
的任务引用。- 因为事件循环只对任务持有弱引用。
- 如果没有其它强引用,任务可能会在未完成前被垃圾回收!
超时处理(Timeouts)
asyncio.timeout(delay)
返回一个 异步上下文管理器,可用于限制某段代码的执行时间。
delay
可以是None
或一个表示等待秒数的float/int
。- 如果
delay
为None
,表示不设置时间限制,这在超时时间无法预先确定时非常有用。
你还可以使用 Timeout.reschedule()
在上下文创建后重新安排超时时间。
async def main():
async with asyncio.timeout(10):
await long_running_task()
如果 long_running_task()
超过 10 秒未完成:
- 当前任务会被取消。
- 取消操作会被上下文管理器内部捕获并转换为
TimeoutError
。 - 你可以在上下文外部捕获
TimeoutError
并处理。
注意事项:
上下文管理器 asyncio.timeout()
负责将 asyncio.CancelledError
转换为 TimeoutError
,因此 TimeoutError
只能在上下文之外捕获。
捕获 TimeoutError
示例:
async def main():
try:
async with asyncio.timeout(10):
await long_running_task()
except TimeoutError:
print("长任务超时,但我们已处理。")
print("无论是否超时,这句都会执行。")
class asyncio.Timeout(when)
这是一个更底层的 异步上下文管理器,用于取消过期的协程。
-
when
是绝对时间(基于事件循环时间loop.time()
):None
表示永不触发超时。- 如果
when < loop.time()
,则下一轮事件循环立即触发超时。
方法:
when()
→ 返回当前截止时间。reschedule(when)
→ 重新设置截止时间。expired()
→ 判断当前是否已超时。
示例:
async def main():
try:
async with asyncio.timeout(None) as cm:
new_deadline = get_running_loop().time() + 10
cm.reschedule(new_deadline)
await long_running_task()
except TimeoutError:
pass
if cm.expired():
print("看来任务没有在截止时间内完成。")
✅
asyncio.timeout()
管理器支持安全地嵌套使用。
asyncio.timeout_at(when)
与 asyncio.timeout()
类似,但 when
是 绝对时间,不是相对的秒数。
示例:
async def main():
loop = get_running_loop()
deadline = loop.time() + 20
try:
async with asyncio.timeout_at(deadline):
await long_running_task()
except TimeoutError:
print("长任务超时,但我们已处理。")
print("无论是否超时,这句都会执行。")
asyncio.wait_for(aw, timeout)
等待 aw
这个 awaitable 对象在指定时间内完成。
-
如果
aw
是协程,会被自动包装为Task
。 -
timeout
可以是None
或一个秒数。- 如果为
None
,表示无限期等待。
- 如果为
特性:
- 如果超时,
aw
会被取消,然后抛出TimeoutError
。 - 若想避免
aw
被取消,可以使用asyncio.shield()
包裹。 wait_for
会等待取消操作完成,因此总耗时可能略大于timeout
。- 如果
wait_for
本身被取消,aw
也会被取消。
示例:
async def eternity():
await asyncio.sleep(3600)
print('yay!')
async def main():
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except TimeoutError:
print('timeout!')
asyncio.run(main())
📌 输出结果:
timeout!
等待原语(Waiting Primitives)
asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
并发运行 aws
可迭代对象中的多个 Future
或 Task
,并阻塞直到满足 return_when
指定的条件。
aws
必须是一个 非空的可迭代对象。- 返回值是一个二元组:
(done, pending)
,表示已完成和未完成的任务集合。
示例用法:
done, pending = await asyncio.wait(aws)
参数说明:
timeout
(可选):表示等待的最长秒数(float
或int
),超时不会引发异常,只是把未完成的任务放入pending
集合。return_when
:指定满足什么条件时返回,支持以下常量:
常量 | 描述 |
---|---|
asyncio.FIRST_COMPLETED | 任意一个任务完成或被取消就返回 |
asyncio.FIRST_EXCEPTION | 任意一个任务以异常结束就返回;如果没有异常,则行为等同于 ALL_COMPLETED |
asyncio.ALL_COMPLETED | 所有任务完成或被取消后返回(默认值) |
⚠️ 与 wait_for()
不同:wait()
不会在超时时取消任务。
asyncio.as_completed(aws, *, timeout=None)
并发运行 aws
中的所有 awaitable 对象,并按完成顺序逐个获取结果。
返回一个可迭代对象,可以异步或同步地遍历获取结果:
- 异步迭代(推荐):
async for
方式,返回原始Task/Future
对象,可以用于判断来源。 - 普通迭代:
for
循环获取协程,使用await
获得结果或异常(适用于旧版本兼容场景)。
异步迭代示例:
ipv4_connect = create_task(open_connection("127.0.0.1", 80))
ipv6_connect = create_task(open_connection("::1", 80))
tasks = [ipv4_connect, ipv6_connect]
async for earliest_connect in as_completed(tasks):
reader, writer = await earliest_connect
if earliest_connect is ipv6_connect:
print("IPv6 连接成功")
else:
print("IPv4 连接成功")
earliest_connect
是最先完成的那个Task
。
普通迭代示例(向后兼容 3.13 之前版本):
ipv4_connect = create_task(open_connection("127.0.0.1", 80))
ipv6_connect = create_task(open_connection("::1", 80))
tasks = [ipv4_connect, ipv6_connect]
for next_connect in as_completed(tasks):
reader, writer = await next_connect
- 此时
next_connect
并不是原始任务对象,而是一个新的 awaitable,执行后返回结果或抛出异常。
超时处理:
-
如果在超时前所有任务未完成,会抛出
TimeoutError
。- 对于异步迭代:在
async for
内触发。 - 对于普通迭代:在
await
某个next_connect
时触发。
- 对于异步迭代:在
在线程中运行
async asyncio.to_thread(func, /, *args, **kwargs)
异步地在独立线程中运行函数 func
。
- 传入的
*args
和**kwargs
会直接传递给func
。 - 当前的
contextvars.Context
会被传播,使事件循环线程中的上下文变量在新线程中可用。 - 返回一个 可
await
的协程对象,最终会返回func
的执行结果。
这个协程函数主要用于执行 IO 密集型(IO-bound)函数/方法,如果这些函数在主线程中执行会阻塞事件循环,则应使用 to_thread()
将其转移到子线程中。
示例:
import asyncio
import time
def blocking_io():
print(f"开始 blocking_io:{time.strftime('%X')}")
# 这里可以是任意阻塞型 IO 操作,如文件读写、数据库访问等
time.sleep(1)
print(f"结束 blocking_io:{time.strftime('%X')}")
async def main():
print(f"开始 main:{time.strftime('%X')}")
await asyncio.gather(
asyncio.to_thread(blocking_io),
asyncio.sleep(1)
)
print(f"结束 main:{time.strftime('%X')}")
asyncio.run(main())
预期输出:
开始 main:19:50:53
开始 blocking_io:19:50:53
结束 blocking_io:19:50:54
结束 main:19:50:54
如果直接在协程中调用 blocking_io()
,会阻塞整个事件循环,程序会额外多耗时 1 秒。而使用 asyncio.to_thread()
,该操作被移至另一个线程中执行,主事件循环不被阻塞,从而实现并发执行。
注意事项:
- 由于 GIL(全局解释器锁)的存在,
asyncio.to_thread()
通常只适用于 IO 密集型操作。 - 对于 释放 GIL 的扩展模块(例如某些 C 扩展)或 不受 GIL 限制的 Python 实现,也可用于 CPU 密集型任务。
从其他线程调度
asyncio.run_coroutine_threadsafe(coro, loop)
将协程提交到指定的事件循环中。线程安全。
- 返回一个
concurrent.futures.Future
,可以用来从另一个操作系统线程等待结果。 - 该函数设计用于在与事件循环运行的线程不同的线程中调用。
示例:
# 创建一个协程
coro = asyncio.sleep(1, result=3)
# 将协程提交到指定的事件循环
future = asyncio.run_coroutine_threadsafe(coro, loop)
# 使用可选的超时参数等待结果
assert future.result(timeout) == 3
异常处理和取消:
如果协程中抛出异常,返回的 Future
会收到通知。它也可以用来取消事件循环中的任务:
try:
result = future.result(timeout)
except TimeoutError:
print('协程执行超时,正在取消任务...')
future.cancel()
except Exception as exc:
print(f'协程抛出异常: {exc!r}')
else:
print(f'协程返回结果: {result!r}')
说明:
- 请参考官方文档中关于并发和多线程的章节以获取更多细节。
- 与其他 asyncio 函数不同,这个函数必须显式传入事件循环参数
loop
。
Introspection
asyncio.current_task(loop=None)
返回当前正在运行的 Task 实例,如果没有任务在运行则返回 None
。
- 如果
loop
为None
,则使用get_running_loop()
获取当前事件循环。
asyncio.all_tasks(loop=None)
返回事件循环中所有尚未完成的 Task 对象集合。
- 如果
loop
为None
,则使用get_running_loop()
获取当前事件循环。
asyncio.iscoroutine(obj)
:如果 obj
是一个协程对象,则返回 True
。
Task 对象
class asyncio.Task(coro, *, loop=None, name=None, context=None, eager_start=False)
一个类似 Future 的对象,用于运行 Python 协程。非线程安全。
多个线程同时操作同一个 Task 对象时,Task 内部的状态和行为不会自动进行同步保护,可能导致竞态条件、数据不一致或未定义行为。
换句话说:
- 如果你在多个操作系统线程中并发调用 Task 的方法(比如
cancel()
,result()
等),没有额外的同步机制(如锁),就有可能出现问题。 - 你不能放心地从不同线程“同时”操作同一个 Task 实例,必须确保调用它的方法时在同一个线程,或者用线程同步手段保护访问。
具体到 asyncio 运行环境:
- asyncio 设计为单线程事件循环模型,绝大多数操作都在同一个线程(事件循环线程)里执行。
- 如果真的需要跨线程操作 Task,应该使用线程安全的机制,比如
asyncio.run_coroutine_threadsafe()
,而不是直接从其他线程调用 Task 的方法。
特性:
-
Task 用于在事件循环中运行协程。如果协程等待(await)一个 Future,Task 会暂停该协程的执行,直到 Future 完成。Future 完成后,协程继续执行。
-
事件循环采用协作式调度:一次只运行一个 Task。当某个 Task 等待 Future 完成时,事件循环会运行其他 Task、回调或执行 IO 操作。
-
推荐使用高级函数
asyncio.create_task()
创建 Task,或者使用低级的loop.create_task()
和asyncio.ensure_future()
。不建议手动实例化 Task。 -
取消正在运行的 Task 使用
cancel()
方法。调用后会向包装的协程抛出CancelledError
异常。如果协程在取消时正在等待 Future,则该 Future 也会被取消。 -
通过
cancelled()
方法检查 Task 是否被取消。如果协程没有抑制CancelledError
异常且确实被取消,则返回 True。 -
asyncio.Task
继承了Future
的所有接口,除了Future.set_result()
和Future.set_exception()
。 -
可选的关键字参数
context
允许指定自定义的contextvars.Context
供协程运行。如果未指定,Task 会复制当前上下文并在复制的上下文中运行协程。 -
可选关键字参数
eager_start
允许在创建 Task 时立即启动协程执行(如果事件循环正在运行),直到协程第一次阻塞。如果协程立即完成(返回或异常)则任务也立即完成,跳过事件循环调度。
方法与属性:
done()
返回 True 表示 Task 已完成(协程已返回结果、抛出异常或被取消)。
result()
返回 Task 的结果。
-
如果 Task 已完成,返回协程的返回值,或者重新抛出协程中的异常。
-
如果 Task 已取消,抛出
CancelledError
。 -
如果结果尚不可用,抛出
InvalidStateError
。
exception()
返回 Task 中协程抛出的异常。
-
如果协程正常返回,返回 None。
-
如果 Task 已取消,抛出
CancelledError
。 -
如果 Task 未完成,抛出
InvalidStateError
。
add_done_callback(callback, *, context=None)
添加回调函数,当 Task 完成时调用。
-
仅用于底层基于回调的代码。
-
详见
Future.add_done_callback()
。
remove_done_callback(callback)
移除已添加的回调函数。
-
仅用于底层基于回调的代码。
-
详见
Future.remove_done_callback()
。
get_stack(*, limit=None)
返回 Task 当前的调用栈帧列表。
-
如果协程未完成,返回挂起时的栈。
-
如果协程完成或取消,返回空列表。
-
如果协程因异常终止,返回异常的 traceback 栈帧。
-
栈帧从最老到最新排序。
-
仅返回一个挂起协程的栈帧。
-
limit
参数限制返回的最大帧数。
print_stack(*, limit=None, file=None)
打印 Task 的栈或 traceback。
-
输出格式类似 traceback 模块。
-
limit
传递给get_stack()
。 -
file
是输出流,默认写入sys.stdout
。
get_coro()
返回被 Task 包装的协程对象。
-
对于已通过 eager_start 立即完成的 Task,可能返回 None。
-
新增于 3.8,3.12 中因 eager_start 新特性结果可能为 None。
get_context()
返回与 Task 关联的 contextvars.Context
对象。
- 新增于 3.12。
get_name()
返回 Task 名称。
-
如果未显式指定,默认名称由 asyncio 生成。
-
新增于 3.8。
set_name(value)
设置 Task 名称。
-
value
可为任意对象,会被转换成字符串。 -
名称会在
repr()
输出中显示。
cancel(msg=None)
请求取消 Task。
-
如果 Task 已完成或取消,返回 False,否则返回 True。
-
会向协程抛出
CancelledError
异常(包含可选的msg
信息)。 -
协程可以通过捕获并抑制该异常拒绝取消请求(不推荐)。
-
新增于 3.9 的
msg
参数;3.11 中msg
会传递给等待 Task。
示例代码:
async def cancel_me():
print('cancel_me(): before sleep')
try:
await asyncio.sleep(3600)
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')
async def main():
task = asyncio.create_task(cancel_me())
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("main(): cancel_me is cancelled now")
asyncio.run(main())
预期输出:
cancel_me(): before sleep
cancel_me(): cancel sleep
cancel_me(): after sleep
main(): cancel_me is cancelled now
cancelled()
返回 True 表示 Task 已被取消(协程已抛出并未抑制 CancelledError)。
uncancel()
减少 Task 的取消请求计数。
-
返回剩余的取消请求次数。
-
一旦取消完成,后续调用无效。
-
仅 asyncio 内部使用,用户代码通常不调用。
-
用于结构化并发和超时场景中隔离取消影响。
-
新增于 3.11,3.13 版本增强取消请求逻辑。
cancelling()
返回当前未处理的取消请求数(调用 cancel()
减去 uncancel()
的次数)。
-
该数字大于 0 时,
cancelled()
仍可能返回 False(协程尚未响应取消)。 -
仅 asyncio 内部使用。
Queues队列
队列应当用于在多个 asyncio 任务之间分配工作,实现连接池,以及发布/订阅模式。
asyncio 队列的设计与标准库中的 queue 模块的类类似。虽然 asyncio 队列不是线程安全的,但它们专门设计用于 async/await 异步代码中。
注意,asyncio 队列的方法不支持 timeout 参数;如果需要带超时的队列操作,请使用 asyncio.wait_for() 函数。
Queue类
class asyncio.Queue(maxsize=0)
一个先进先出(FIFO)队列。
- 如果 maxsize 小于或等于 0,队列大小无限制。
- 如果是大于 0 的整数,当队列达到 maxsize 时,await put() 会阻塞,直到有项目被 get() 移除。
与标准库中的线程队列不同,队列大小始终可知,可以通过调用 qsize() 方法获取。
该类不是线程安全的。
属性&方法:
-
maxsize:队列中允许的最大项目数量。
-
empty():如果队列为空,返回 True,否则返回 False。
-
full():如果队列中有 maxsize 个项目,返回 True。如果队列初始化时 maxsize=0(默认),则 full() 永远返回 False。
-
async get():从队列中移除并返回一个项目。如果队列为空,则等待直到有项目可用。
如果队列已关闭且为空,或立即关闭,则抛出 QueueShutDown 异常。 -
get_nowait():如果有项目立即可用,返回该项目,否则抛出 QueueEmpty 异常。
-
async join():阻塞直到队列中的所有项目都被接收并处理完毕。未完成任务计数在每次放入项目时增加。消费者协程调用 task_done() 表示该项目已取出且处理完成,计数减少。当未完成任务计数降为零时,join() 解除阻塞。
-
async put(item):将一个项目放入队列。如果队列已满,则等待直到有空位后再放入。如果队列已关闭,则抛出 QueueShutDown 异常。
-
put_nowait(item):无阻塞地将项目放入队列。如果没有空位立即可用,则抛出 QueueFull 异常。
-
qsize():返回队列中项目的数量。
-
shutdown(immediate=False):关闭队列,使得 get() 和 put() 调用抛出 QueueShutDown 异常。默认情况下,关闭队列时,get() 只有在队列为空时才抛出异常。将 immediate 设为 True,则 get() 会立即抛出异常。所有阻塞的 put() 和 get() 调用都会被解除阻塞。如果 immediate 为 True,则队列中每个剩余项目都会调用一次 task_done(),可能解除阻塞 join() 调用。
-
task_done():表示先前放入队列的工作项已完成。由队列消费者调用。每次通过 get() 获取一个工作项后,调用 task_done() 告诉队列该工作项已处理完成。如果有 join() 调用阻塞,它将在所有项目都处理完成后恢复(即对每个 put() 进队的项目都有对应的 task_done() 调用)。shutdown(immediate=True) 时,会对队列中剩余项目调用 task_done()。如果调用次数超过队列中放入的项目数,则抛出 ValueError。
优先队列
class asyncio.PriorityQueue
Queue 的一种变体;按优先级顺序检索条目(优先级数值越低,优先级越高,先取出)。
条目通常是形如 (priority_number, data) 的元组。
后进先出队列(LIFO)
class asyncio.LifoQueue
Queue 的一种变体;先检索最近添加的条目(后进先出)。
异常
-
exception asyncio.QueueEmpty
当在空队列上调用 get_nowait() 方法时抛出此异常。 -
exception asyncio.QueueFull
当在达到 maxsize 的队列上调用 put_nowait() 方法时抛出此异常。 -
exception asyncio.QueueShutDown
当在已关闭的队列上调用 put() 或 get() 时抛出此异常。
队列可用于在多个并发任务间分配工作负载:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# 从队列中取出“工作项”
sleep_for = await queue.get()
# 睡眠指定秒数
await asyncio.sleep(sleep_for)
# 通知队列“工作项”已处理完毕
queue.task_done()
print(f'{name} 已经睡了 {sleep_for:.2f} 秒')
async def main():
# 创建用于存储“工作负载”的队列
queue = asyncio.Queue()
# 生成随机时间并放入队列
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# 创建三个并发处理队列的 worker 任务
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# 等待队列中的所有工作项处理完毕
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# 取消所有 worker 任务
for task in tasks:
task.cancel()
# 等待所有 worker 任务取消完成
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 个 worker 并行睡眠了 {total_slept_for:.2f} 秒')
print(f'总预期睡眠时间: {total_sleep_time:.2f} 秒')
asyncio.run(main())
流Stream
流(Streams)是用于处理网络连接的高级异步(async/await)原语。使用流可以发送和接收数据,而无需使用回调函数或底层协议与传输细节。
下面是一个使用 asyncio streams 编写的 TCP echo 客户端示例::
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
以下顶层 asyncio 函数可用于创建和操作流:
-
asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)
建立网络连接,返回一对
(reader, writer)
对象。返回的
reader
和writer
分别是StreamReader
和StreamWriter
类的实例。limit
参数控制StreamReader
的缓冲区大小,默认是 64 KiB。其他参数直接传递给
loop.create_connection()
。注意:
sock
参数会将 socket 所有权转移给StreamWriter
,关闭 socket 应调用它的close()
方法。- 3.7 版本新增
ssl_handshake_timeout
参数 - 3.8 版本新增
happy_eyeballs_delay
和interleave
参数 - 3.10 版本移除
loop
参数 - 3.11 版本新增
ssl_shutdown_timeout
参数
- 3.7 版本新增
-
asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, keep_alive=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)
启动一个 socket 服务器。
每当有客户端连接建立时,调用
client_connected_cb
回调,传入(reader, writer)
对象。client_connected_cb
可以是普通函数或协程函数(如果是协程,会自动调度为 Task)。limit
参数控制StreamReader
的缓冲区大小,默认 64 KiB。其他参数直接传递给
loop.create_server()
。注意:
sock
参数会将 socket 所有权转移给服务器,关闭时调用服务器的close()
方法。- 3.7 版本新增
ssl_handshake_timeout
和start_serving
参数 - 3.10 版本移除
loop
参数 - 3.11 版本新增
ssl_shutdown_timeout
参数 - 3.13 版本新增
keep_alive
参数
- 3.7 版本新增
Unix Socket
-
asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)
建立 Unix socket 连接,返回(reader, writer)
对。类似于
open_connection()
,但作用于 Unix socket。见
loop.create_unix_connection()
文档。注意:
sock
参数会将 socket 所有权转移给StreamWriter
,关闭时调用其close()
方法。仅限 Unix 系统。
- 3.7 版本新增
ssl_handshake_timeout
参数,path
支持类路径对象 - 3.10 版本移除
loop
参数 - 3.11 版本新增
ssl_shutdown_timeout
参数
- 3.7 版本新增
-
asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)
启动 Unix socket 服务器。类似于
start_server()
,但作用于 Unix socket。见
loop.create_unix_server()
文档。注意:
sock
参数会将 socket 所有权转移给服务器,关闭时调用服务器的close()
方法。仅限 Unix 系统。
- 3.7 版本新增
ssl_handshake_timeout
和start_serving
参数,path
支持类路径对象 - 3.10 版本移除
loop
参数 - 3.11 版本新增
ssl_shutdown_timeout
参数
- 3.7 版本新增
Unix socket 和普通的网络 socket(TCP/UDP socket)主要区别在于它们的通信范围和使用场景:
-
Unix Socket(Unix 域套接字)
- 只在同一台主机内的进程间通信(IPC,Inter-Process Communication)。
- 使用文件系统中的路径(比如
/tmp/mysocket
)作为地址。 - 不经过网络协议栈,直接通过内核提供的机制交换数据,速度更快、开销更小。
-
普通 Socket(网络套接字)
- 支持跨网络通信,能在不同机器之间通过 IP 地址和端口号通信。
- 使用网络协议(TCP、UDP)进行数据传输。
- 需要网络协议栈处理,开销相对较大。
地址格式
- Unix socket 地址是文件系统路径(路径名)。
- 网络 socket 地址是 IP 地址 + 端口号。
性能
- Unix socket 在本机进程间通信时,性能通常比 TCP socket 更好,因为它省去了网络协议栈和物理网络传输的开销。
安全和权限
- Unix socket 依赖于文件系统权限,可以通过文件权限控制访问权限。
- TCP socket 通常使用网络层安全机制(如防火墙、TLS 等)。
使用场景
- Unix socket 适用于同一台机器内高效的进程间通信,例如数据库服务本地访问、进程间通信系统。
- 网络 socket 适用于需要跨主机通信的场景,比如客户端访问远程服务器。
Unix socket = 进程间本地通信的“套接字文件”;网络 socket = 跨网络通信的端点(IP + 端口)。
如果你只需要在同一台机器内的不同进程间通信,Unix socket 是更高效和简单的选择;如果需要跨机器通信,就必须用普通的网络 socket。
StreamReader
class asyncio.StreamReader
表示一个读取器对象,提供从 IO 流读取数据的 API。作为异步可迭代对象,支持 async for
语句。
不建议直接实例化 StreamReader 对象;推荐使用 open_connection()
和 start_server()
来获取。
-
feed_eof()
标记 EOF(文件结尾)已到达。 -
async read(n=-1)
读取最多 n 个字节。- 如果 n 未提供或为 -1,则读取直到 EOF,返回所有读取的字节。
- 如果 EOF 已到且内部缓冲区为空,返回空字节对象。
- 如果 n 为 0,立即返回空字节对象。
- 如果 n 为正数,内部缓冲区至少有 1 字节时,返回最多 n 字节。
- 如果在读取任何字节前接收到 EOF,返回空字节对象。
-
async readline()
读取一行,以\n
结尾的字节序列。- 如果 EOF 到达且未找到
\n
,返回部分读取的数据。 - 如果 EOF 到达且缓冲区为空,返回空字节对象。
- 如果 EOF 到达且未找到
-
async readexactly(n)
精确读取 n 字节。- 如果在读取完 n 字节前遇到 EOF,抛出
IncompleteReadError
,其partial
属性包含部分读取的数据。
- 如果在读取完 n 字节前遇到 EOF,抛出
-
async readuntil(separator=b’\n’)
读取直到找到分隔符separator
。- 成功时,分隔符及之前数据会从缓冲区移除,返回的数据包含分隔符。
- 如果读取数据超过配置的限制,抛出
LimitOverrunError
,数据留在缓冲区可再次读取。 - 如果 EOF 之前未找到完整分隔符,抛出
IncompleteReadError
,并重置缓冲区,partial
可能包含分隔符的一部分。 - 分隔符可以是一个元组,返回值是最短匹配任一分隔符的内容。
-
at_eof()
如果缓冲区为空且调用过feed_eof()
,返回 True。
StreamWriter
class asyncio.StreamWriter
表示一个写入器对象,提供向 IO 流写入数据的 API。
不建议直接实例化 StreamWriter 对象;推荐使用 open_connection()
和 start_server()
来获取。
-
write(data)
尝试立即写入数据到底层套接字。若失败,数据放入内部写缓冲区等待发送。
应与drain()
配合使用:stream.write(data) await stream.drain()
-
writelines(data)
写入字节列表或可迭代对象。若不能立即写入,则放入缓冲区等待。
应与drain()
配合使用:stream.writelines(lines) await stream.drain()
-
close()
关闭流和底层套接字。
建议与wait_closed()
配合使用:stream.close() await stream.wait_closed()
-
can_write_eof()
如果底层传输支持write_eof()
方法,返回 True,否则 False。 -
write_eof()
在已缓冲的写入数据被刷新后关闭流的写入端。 -
transport
返回底层 asyncio 传输对象。 -
get_extra_info(name, default=None)
访问可选的传输信息,详见BaseTransport.get_extra_info()
。 -
async drain()
等待直到可以继续写入流。
这是一个流控方法,当写缓冲区大小达到高水位时(最大上限),drain()
会阻塞,直到缓冲区大小降到低水位。无等待时立即返回。
示例:writer.write(data) await writer.drain()
-
*async start_tls(sslcontext, , server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)
将现有基于流的连接升级为 TLS。
参数包括:sslcontext
:配置好的 SSLContext 实例。server_hostname
:用于匹配服务器证书的主机名。ssl_handshake_timeout
:TLS 握手超时时间,默认为 60 秒。ssl_shutdown_timeout
:TLS 关闭超时时间,默认为 30 秒。
-
is_closing()
如果流已关闭或正在关闭,返回 True。 -
async wait_closed()
等待流关闭。
应在调用close()
后调用,确保所有数据刷新完成。
例子
使用流的 TCP 回显客户端
使用 asyncio.open_connection()
函数实现的 TCP 回显客户端:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
使用流的 TCP 回显服务器
TCP 回显服务器使用 asyncio.start_server()
函数:
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
asyncio.run(main())
获取 HTTP 标头
查询命令行传入 URL 的 HTTP 标头的简单示例:
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# 忽略响应体,关闭套接字
writer.close()
await writer.wait_closed()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
用法:
python example.py http://example.com/path/page.html
或使用 HTTPS:
python example.py https://example.com/path/page.html
注册一个打开的套接字以等待使用流的数据
使用 open_connection()
函数实现等待直到套接字接收到数据的协程:
import asyncio
import socket
async def wait_for_data():
# 获取当前事件循环引用
# 因为我们想访问低层级 API。
loop = asyncio.get_running_loop()
# 创建一对已连接的套接字
rsock, wsock = socket.socketpair()
# 注册打开的套接字以等待数据
reader, writer = await asyncio.open_connection(sock=rsock)
# 模拟从网络接收数据
loop.call_soon(wsock.send, 'abc'.encode())
# 等待数据
data = await reader.read(100)
# 获得数据后关闭套接字
print("Received:", data.decode())
writer.close()
await writer.wait_closed()
# 关闭第二个套接字
wsock.close()
asyncio.run(wait_for_data())
同步原语
asyncio 的同步原语设计上与 threading 模块相似,但有两个重要注意点:
- asyncio 的同步原语不是线程安全的,因此不应当用于操作系统线程间同步(这类场景请使用 threading 模块);
- 这些同步原语的方法不接受超时参数;如果需要超时操作,应使用
asyncio.wait_for()
函数。
asyncio 基本同步原语包括:
- Lock(锁)
- Event(事件)
- Condition(条件变量)
- Semaphore(信号量)
- BoundedSemaphore(有界信号量)
- Barrier(屏障)
Lock
类:asyncio.Lock
实现了 asyncio 任务的互斥锁,不是线程安全的。
asyncio 的锁可用于保证对共享资源的独占访问。
推荐的用法是用 async with
语句:
lock = asyncio.Lock()
# ...稍后
async with lock:
# 访问共享状态
等价于:
lock = asyncio.Lock()
# ...稍后
await lock.acquire()
try:
# 访问共享状态
finally:
lock.release()
3.10 版本变更:移除了
loop
参数。
-
async acquire()
获取锁。该方法会等待直到锁被释放,然后将锁设为已锁定并返回 True。
如果多个协程同时等待锁,只有第一个开始等待的协程会获得锁(公平锁)。 -
release()
释放锁。锁已锁定时,设为未锁定并返回;若锁本身未锁定,抛出RuntimeError
。 -
locked()
返回锁当前是否被锁定(True/False)。
Event
类:asyncio.Event
事件对象,不是线程安全的。
asyncio 的事件可用于通知多个 asyncio 任务某个事件已发生。
事件对象维护一个内部标志,初始为 False。调用 set()
将标志设为 True,clear()
将标志重置为 False。调用 wait()
会阻塞直到标志为 True。
3.10 版本变更:移除了
loop
参数。
示例:
async def waiter(event):
print('waiting for it ...')
await event.wait()
print('... got it!')
async def main():
event = asyncio.Event()
waiter_task = asyncio.create_task(waiter(event))
await asyncio.sleep(1)
event.set()
await waiter_task
asyncio.run(main())
-
async wait()
等待事件被设置。若事件已设置,立即返回 True;否则阻塞直到其他任务调用set()
。 -
set()
设置事件标志为 True,唤醒所有等待该事件的任务。 -
clear()
清除事件标志(设为 False),之后等待的任务会阻塞直到再次调用set()
。 -
is_set()
返回事件标志是否为 True。
Condition
类:asyncio.Condition(lock=None)
条件变量对象,不是线程安全的。
asyncio 条件变量可用于任务等待某事件发生,然后获得对共享资源的独占访问。
本质上,Condition 结合了 Event 和 Lock 的功能。多个 Condition 对象可以共享同一个 Lock,以协调对共享资源的独占访问。
可选参数 lock
必须是 Lock 对象或 None(None 时自动创建新的 Lock)。
3.10 版本变更:移除了
loop
参数。
推荐用法是 async with
:
cond = asyncio.Condition()
# ...稍后
async with cond:
await cond.wait()
等价于:
cond = asyncio.Condition()
# ...稍后
await cond.acquire()
try:
await cond.wait()
finally:
cond.release()
-
async acquire()
获取底层锁。等待锁解锁后设置为锁定并返回 True。 -
notify(n=1)
唤醒等待此条件变量的最多 n 个任务(默认 1 个)。调用前必须先获得锁,调用后需释放锁。否则抛出RuntimeError
。 -
locked()
返回底层锁是否被获取。 -
notify_all()
唤醒所有等待此条件变量的任务。调用前后与notify()
相同要求。 -
release()
释放底层锁。未锁定时调用会抛出RuntimeError
。 -
async wait()
等待通知。调用时任务必须先获取锁,否则抛RuntimeError
。该方法释放底层锁并阻塞,直到被notify()
或notify_all()
唤醒,唤醒后重新获得锁并返回 True。
注意调用可能出现虚假唤醒,因此调用者应重新检查状态,或使用wait_for()
。 -
async wait_for(predicate)
等待直到传入的可调用predicate
返回 True。会重复调用wait()
直到满足条件,最终返回predicate
的值。
Semaphore
类:asyncio.Semaphore(value=1)
信号量对象,不是线程安全的。
信号量管理一个内部计数器,acquire()
会减少计数器,release()
会增加计数器。计数器永远不会小于零;当为零时,acquire()
会阻塞直到其他任务调用 release()
。
value
为计数器初始值(默认为 1),不能小于 0,否则抛 ValueError
。
3.10 版本变更:移除了
loop
参数。
推荐用法是 async with
:
sem = asyncio.Semaphore(10)
# ...稍后
async with sem:
# 使用共享资源
等价于:
sem = asyncio.Semaphore(10)
# ...稍后
await sem.acquire()
try:
# 使用共享资源
finally:
sem.release()
-
async acquire()
获取信号量。如果计数器大于零,减一后立即返回 True;否则等待直到有信号量释放。 -
locked()
若无法立即获取信号量,返回 True。 -
release()
释放信号量,计数器加一。可以唤醒等待的任务。
与 BoundedSemaphore 不同,Semaphore 允许调用release()
多于acquire()
。
BoundedSemaphore
类:asyncio.BoundedSemaphore(value=1)
有界信号量,不是线程安全的。
与 Semaphore 类似,但在调用 release()
时如果计数器超过初始值会抛出 ValueError
。
3.10 版本变更:移除了
loop
参数。
Barrier
类:asyncio.Barrier(parties)
屏障对象,不是线程安全的。
屏障是一种简单的同步原语,允许阻塞直到指定数量(parties)的任务同时等待屏障。调用 wait()
的任务会阻塞直到达到数量,之后所有任务同时继续。
async with
语法也可用于等待屏障。
屏障可重复使用。
示例:
async def example_barrier():
b = asyncio.Barrier(3) # 三方屏障
asyncio.create_task(b.wait())
asyncio.create_task(b.wait())
await asyncio.sleep(0)
print(b)
await b.wait() # 第三个任务通过屏障
print(b)
print("barrier passed")
await asyncio.sleep(0)
print(b)
asyncio.run(example_barrier())
示例输出:
<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
barrier passed
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>
新增于 3.11 版本。
async wait()
通过屏障。当所有任务都调用此方法时,它们都会同时解除阻塞。
如果等待中的任务被取消,该任务退出屏障,屏障状态保持不变。若状态为“填充”,等待任务数量减少1。
返回值是一个范围在 0 到 parties-1 之间的整数,用于区分任务,比如选择一个任务做特殊处理。
async with barrier as position:
if position == 0:
print('End of *draining phase*')
该方法可能抛出 BrokenBarrierError
(屏障被破坏或重置时),或者抛出 CancelledError
(任务取消时)。
-
async reset()
将屏障重置为初始空状态,所有等待任务收到BrokenBarrierError
异常。 -
async abort()
将屏障置为破坏状态,导致所有现有或未来的wait()
调用失败并抛出BrokenBarrierError
。用于其中一个任务需要中止时,避免无限等待。 -
属性
parties
通过屏障所需的任务数量。 -
属性
n_waiting
当前等待通过屏障的任务数。 -
属性
broken
屏障是否处于破坏状态。 -
异常
asyncio.BrokenBarrierError
屏障被重置或破坏时抛出,继承自RuntimeError
。
子进程
下面是一个示例,展示如何用 asyncio 运行一个 shell 命令并获取结果:
import asyncio
async def run(cmd):
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
print(f'[{cmd!r} exited with {proc.returncode}]')
if stdout:
print(f'[stdout]\n{stdout.decode()}')
if stderr:
print(f'[stderr]\n{stderr.decode()}')
asyncio.run(run('ls /zzz'))
执行结果会是:
['ls /zzz' exited with 1]
[stderr]
ls: /zzz: No such file or directory
由于所有 asyncio 的子进程函数都是异步的,并且 asyncio 提供了丰富的工具来处理此类函数,因此很容易同时执行并监控多个子进程。比如,修改上述示例来同时运行多个命令也非常简单:
async def main():
await asyncio.gather(
run('ls /zzz'),
run('sleep 1; echo "hello"'))
asyncio.run(main())
创建子进程
asyncio.create_subprocess_exec(program, *args, stdin=None, stdout=None, stderr=None, limit=None, **kwds)
创建一个子进程。
limit
参数设置用于 stdout 和 stderr 的StreamReader
缓冲区大小(当 stdout 和 stderr 设置为subprocess.PIPE
时生效)。- 返回一个
Process
实例。
请参阅 loop.subprocess_exec()
的文档了解其他参数。
asyncio.create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, limit=None, **kwds)
运行 shell 命令 cmd
。
limit
参数作用同上。- 返回一个
Process
实例。
请参阅 loop.subprocess_shell()
的文档了解其他参数。
重要提示: 应用程序必须确保所有空白符和特殊字符都已正确加引号,以避免 shell 注入漏洞。可以使用 shlex.quote()
函数对构造 shell 命令的字符串进行适当转义。
注意:
-
子进程支持 Windows 平台仅限于使用
ProactorEventLoop
时可用,详细信息见“Subprocess Support on Windows”章节。 -
此外,asyncio 还提供了以下底层子进程相关 API:
loop.subprocess_exec()
,loop.subprocess_shell()
,loop.connect_read_pipe()
,loop.connect_write_pipe()
,以及“Subprocess Transports and Subprocess Protocols”。
create_subprocess_exec
和 create_subprocess_shell
的主要区别在于:
特点 | create_subprocess_exec | create_subprocess_shell |
---|---|---|
执行方式 | 直接执行可执行程序和参数(无需通过 shell) | 通过 shell 执行整个命令字符串(例如 /bin/sh -c "cmd" ) |
参数传递 | 以列表形式传入程序名和参数,如 ('ls', '-l') | 传入完整的 shell 命令字符串,如 'ls -l /tmp' |
需要手动处理 shell 特殊字符 | 不需要(因为直接执行程序,不经过 shell) | 需要注意转义和引号,防止 shell 注入风险 |
跨平台行为 | 直接调用对应的程序,通常更安全、更高效 | 在 Unix/Linux 下通过 /bin/sh 执行;Windows 下使用 cmd.exe |
-
create_subprocess_exec(program, *args, ...)
这个函数是用来直接执行一个可执行文件(程序)并传递参数。它不会经过 shell 解释,参数会原样传递给程序,所以不会有 shell 特殊字符的解释风险,也不支持像管道(|
)、重定向(>
)等 shell 功能。
适合执行已知的命令和参数,且不需要 shell 语法支持的场景。 -
create_subprocess_shell(cmd, ...)
这个函数是通过 shell 运行一个命令字符串,等同于你在命令行输入的完整命令。它支持 shell 的所有功能,如管道、重定向、变量替换等。但这也带来了注入风险,需要你自己保证命令字符串中的特殊字符已正确转义。
适合需要复杂 shell 功能,或者命令本身是字符串时使用。
import asyncio
# 直接执行程序,不通过shell
async def example_exec():
proc = await asyncio.create_subprocess_exec(
'ls', '-l', '/tmp',
stdout=asyncio.subprocess.PIPE)
stdout, _ = await proc.communicate()
print(stdout.decode())
# 通过shell执行命令字符串,支持shell语法
async def example_shell():
proc = await asyncio.create_subprocess_shell(
'ls -l /tmp | grep py',
stdout=asyncio.subprocess.PIPE)
stdout, _ = await proc.communicate()
print(stdout.decode())
如果你想执行复杂的 shell 命令,或者需要使用管道、重定向,就用 create_subprocess_shell
;如果只是执行一个程序和参数,且不需要 shell 特性,用 create_subprocess_exec
会更安全高效。
常量
-
asyncio.subprocess.PIPE
可传递给stdin
、stdout
或stderr
参数。- 如果传给
stdin
,Process.stdin
为StreamWriter
实例。 - 如果传给
stdout
或stderr
,对应的属性为StreamReader
实例。
- 如果传给
-
asyncio.subprocess.STDOUT
特殊值,可用于stderr
参数,表示将标准错误重定向到标准输出。 -
asyncio.subprocess.DEVNULL
特殊值,可用于stdin
、stdout
或stderr
,表示使用特殊文件os.devnull
作为对应流。
与子进程交互
create_subprocess_exec()
和 create_subprocess_shell()
返回 Process
类实例。Process
是一个高级封装,允许与子进程通信并监视其完成情况。
class asyncio.subprocess.Process
该类封装了由 create_subprocess_exec()
和 create_subprocess_shell()
创建的操作系统进程。
设计上与 subprocess.Popen
类似,但有几点显著区别:
- 不支持
poll()
方法; communicate()
和wait()
方法没有timeout
参数,使用asyncio.wait_for()
实现超时;Process.wait()
是异步的,而Popen.wait()
是阻塞的忙等待;- 不支持
universal_newlines
参数; - 线程不安全。
详情见“Subprocess and Threads”章节。
方法和属性
-
async wait()
等待子进程结束,设置并返回returncode
属性。注意: 使用
stdout=PIPE
或stderr=PIPE
时,如果子进程输出过多导致 OS 管道缓冲区阻塞,会造成死锁。建议使用communicate()
方法避免此情况。 -
async communicate(input=None)
与子进程交互:- 向 stdin 发送数据(如果
input
非 None); - 关闭 stdin;
- 读取 stdout 和 stderr,直到 EOF;
- 等待进程结束。
返回
(stdout_data, stderr_data)
二元组。写入 stdin 时如遇
BrokenPipeError
或ConnectionResetError
会忽略异常(通常因进程已退出)。若想发送数据,需创建进程时指定
stdin=PIPE
;若想获取数据,需指定stdout=PIPE
和/或stderr=PIPE
。注意: 该方法会将数据缓存在内存中,不适用于大数据量场景。
3.12 版本起,即使
input=None
也会关闭 stdin。 - 向 stdin 发送数据(如果
-
send_signal(signal)
向子进程发送信号。Windows 平台:
SIGTERM
相当于terminate()
;CTRL_C_EVENT
和CTRL_BREAK_EVENT
可用于带有CREATE_NEW_PROCESS_GROUP
标志的进程。 -
terminate()
停止子进程。- POSIX 发送
SIGTERM
; - Windows 调用 Win32 API
TerminateProcess()
。
- POSIX 发送
-
kill()
杀死子进程。- POSIX 发送
SIGKILL
; - Windows 等同于
terminate()
。
- POSIX 发送
-
stdin
标准输入流(StreamWriter
)或None
。 -
stdout
标准输出流(StreamReader
)或None
。 -
stderr
标准错误流(StreamReader
)或None
。 -
警告: 建议使用
communicate()
,避免直接使用process.stdin.write()
、await process.stdout.read()
或await process.stderr.read()
,以防死锁。 -
pid
子进程的进程 ID。对于create_subprocess_shell()
,是 shell 进程的 PID。 -
returncode
进程退出码。None
表示进程尚未结束;- 负数
-N
表示被信号N
终止(仅限 POSIX)。
子进程与线程
- 标准 asyncio 事件循环支持跨线程运行子进程。
- Windows 上仅
ProactorEventLoop
支持子进程,SelectorEventLoop
不支持。 - UNIX 使用子进程观察器等待进程结束,3.8 版本起支持
ThreadedChildWatcher
,允许多线程创建子进程。 - 如果当前的子进程观察器处于非活动状态,调用会抛出
RuntimeError
。 - 不同事件循环实现可能有各自限制,具体参考对应文档。
详见“Concurrency and multithreading in asyncio”章节。
使用 Process
类控制子进程,并用 StreamReader
读取其标准输出。
使用 create_subprocess_exec()
创建子进程:
import asyncio
import sys
async def get_date():
code = 'import datetime; print(datetime.datetime.now())'
# 创建子进程,将标准输出重定向到管道
proc = await asyncio.create_subprocess_exec(
sys.executable, '-c', code,
stdout=asyncio.subprocess.PIPE)
# 读取一行输出
data = await proc.stdout.readline()
line = data.decode('ascii').rstrip()
# 等待子进程退出
await proc.wait()
return line
date = asyncio.run(get_date())
print(f"Current date: {date}")
同样的示例也可以用低级 API 实现。
低级API
低层级 API 索引:https://docs.python.org/zh-cn/3/library/asyncio-llapi-index.html
事件循环
事件循环是每个 asyncio 应用程序的核心。事件循环负责运行异步任务和回调,执行网络 IO 操作,以及管理子进程。
应用开发者通常应该使用高级的 asyncio 函数,比如 asyncio.run()
,很少需要直接引用事件循环对象或调用其方法。
以下低级函数可用于获取、设置或创建事件循环:
-
asyncio.get_running_loop()
返回当前操作系统线程中正在运行的事件循环。
如果没有运行的事件循环,则抛出
RuntimeError
。该函数只能在协程或回调函数中调用。
从 Python 3.7 版本开始提供。
-
asyncio.get_event_loop()
获取当前事件循环。
当从协程或回调(例如通过
call_soon
等 API 调度)中调用时,始终返回正在运行的事件循环。如果没有运行的事件循环,函数将返回
get_event_loop_policy().get_event_loop()
的结果。由于该函数行为较复杂(尤其是自定义事件循环策略时),在协程和回调中建议优先使用
get_running_loop()
替代。如前所述,建议使用更高级的asyncio.run()
函数,而非手动调用这些低级函数来创建和关闭事件循环。自 3.12 版本弃用: 从 Python 3.12 起,如果没有当前事件循环,调用此函数会发出弃用警告,未来某个版本将直接报错。
-
asyncio.set_event_loop(loop)
设置当前操作系统线程的事件循环为
loop
。 -
asyncio.new_event_loop()
创建并返回一个新的事件循环对象。
注意,get_event_loop()
、set_event_loop()
和 new_event_loop()
的行为可以通过设置自定义事件循环策略来改变。
知道了如何创建事件循环,那么什么是事件循环呢?
事件循环是许多系统中相当常见的设计模式,并且已经存在了相当长的一段时间。如果你曾在浏览器中使用 JavaScript 发送异步 Web 请求,那么你已经在事件循环上创建了一个任务。Windows GUI 应用程序在幕后使用所谓的消息循环作为处理键盘输入等事件的主要机制,同时允许 UI 进行绘制。
最基本的事件循环非常简单,我们创建一个包含事件或消息列表的队列,然后启动循环,在消息进入队列时一次处理一条消息。在 Python 中,一个基本的事件循环可能看起来像下面这样:
from collections import deque
messages = deque()
while True:
if messages:
message = messages.pop()
process_message(message)
在 asyncio 中,事件循环保留任务队列而不是消息。任务是协程的包装器,协程可以在遇到 IO 密集型操作时暂停执行,并让事件循环运行其他不等待 IO 操作完成的任务。
创建一个事件循环时,会创建一个空的任务队列,然后将任务添加到要运行的队列中。事件循环的每次迭代都会检查需要运行的任务,并一次运行一个,直到任务遇到 IO 操作。然后任务将被暂停,指示操作系统监视相应的套接字以完成 IO,并寻找下一个要运行的任务。在事件循环的每次迭代中,会检查是否有 IO 操作已完成的任务,如果有则唤醒,并让它们继续运行。
主线程将任务提交给事件循环,此后事件循环可以运行它们。
为说明这一点,假设我们有三个任务,每个任务都发出一个异步 Web 请求。想象一下,这些任务有一些代码负责完成一些准备工作(是 CPU 密集型的),然后它们发出 Web 请求,然后又是一些 CPU 密集型的后处理代码。现在,同时将这些任务提交给事件循环。在伪代码中,可以这样写。
def make_request():
cpu_bound_setup()
io_bound_web_request()
cpu_bound_postprocess()
task_one = make_request()
task_two = make_request()
task_three = make_request()
三个任务都以 CPU 密集型工作开始,并且使用单线程,因此只有第一个任务开始执行代码,其他两个任务则在等待。一旦任务1 中的 CPU 密集型设置工作完成,则会遇到一个 IO 密集型操作,并会暂停自己:“我正在等待 IO,由于 IO 是操作系统负责的,并且不耗费 CPU,所以我要将执行权交出去,此时其他任何等待运行的任务都可以运行。”
当任务遇见 IO 阻塞的时候,会将执行权交给事件循环,事件循环再去找其他可以运行任务。
一旦发生这种情况,任务2 就可以开始执行了,任务2 同样会先执行其 CPU 密集型代码然后暂停,等待 IO。此时任务1 和任务2 在同时等待它们的网络请求完成,由于任务1 和任务2 都暂停等待 IO,于是开始运行任务3。任务3 同样也会暂停以等待其 IO 完成,此时三个任务都处于 IO 等待状态。
然后某一时刻任务1 的 Web 请求完成了,那么操作系统的事件通知系统会提醒我们此 IO 已完成(IO 操作不耗费 CPU,只要 IO 操作发起,剩下的交给操作系统。这时候线程可以去执行其它任务,当 IO 完成之后操作系统会通知我们),可以在任务2 和任务3 都在等待其IO完成时,继续执行任务1。
如果 CPU 密集代码的耗时忽略不计,IO 密集代码需要 2 秒钟,那么这三个任务执行完毕也只需要 2 秒钟,因为三个 web 请求是同时发出的。如果是同步代码,那么在返回响应之前,CPU 做不了其它事情,必须等到响应返回之后,才能发送下一个请求,那么整个过程就需要 6 秒钟的时间。
当任务一执行 cpu_bound_setup 的时候,任务二和任务三只能处于等待状态。当任务一进入 IO 的时候,任务二也开始执行 cpu_bound_setup,此时任务一和任务三需要处于等待状态。但对于任务一来说,由于它当前正处于 IO,CPU 给它也没法执行,不过也正因为它处于 IO(要 CPU 也没用),才将 CPU 交出去。
当任务二执行完 cpu_bound_setup 进入 IO 的时候,任务三开始执行 cpu_bound_setup,执行完之后进入 IO。
可以看到整个过程 CPU 没有处于空闲状态,在任务阻塞的时候立刻切换到其它任务上执行。然后当任务一、任务二、任务三都处于 IO 阻塞时,由于已经没有准备就绪的任务了,那么此时 CPU 就只能处于空闲了。接下来在某一时刻,任务一的 IO 结束了,操作系统会通知我们,然后执行任务一的 cpu_bound_postprocess。
所以每个任务等待 IO 的重叠是 asyncio 真正节省时间的地方。
事件循环的本质是什么?
事件循环本质是单线程单循环的,它不会自己创建多个线程或循环。默认情况下,asyncio.run() 会在主线程里创建并运行一个事件循环,直到你的主协程结束。
它不断“轮询”任务队列、回调、IO事件,看有没有可执行的任务。它的设计目标是避免阻塞:当等待IO时,它不会阻塞线程,而是挂起当前任务,让出控制权,去运行其他任务。事件循环依赖操作系统提供的异步IO能力(比如epoll、kqueue、IOCP),做到高效非阻塞。
下面用纯 Python(不借助 asyncio)模拟一个简单的“协程事件循环”,
import time
import heapq
from collections import deque
from types import coroutine
# ----------- 基础 awaitable 构造 -----------
@coroutine
def sleep(delay):
now = time.time()
yield ('sleep', now + delay)
# ----------- 事件循环实现 -----------
class EventLoop:
def __init__(self):
self.ready = deque() # 就绪任务队列
self.sleeping = [] # 睡眠协程优先队列(heapq)
def create_task(self, coro):
self.ready.append(coro)
def run_forever(self):
while self.ready or self.sleeping:
# 处理就绪任务
while self.ready:
coro = self.ready.popleft()
try:
op = coro.send(None)
if op[0] == 'sleep':
wakeup = op[1]
heapq.heappush(self.sleeping, (wakeup, coro))
except StopIteration:
pass
# 检查是否有睡眠任务该唤醒了
now = time.time()
while self.sleeping and self.sleeping[0][0] <= now:
_, coro = heapq.heappop(self.sleeping)
self.ready.append(coro)
# 没有任务立即可执行,sleep等待最早的唤醒时间
if self.ready:
continue
elif self.sleeping:
sleep_time = self.sleeping[0][0] - now
time.sleep(max(sleep_time, 0.01)) # 最少 sleep 一点防死循环
else:
break
# ----------- 示例任务 -----------
async def task(name, delay):
print(f"[{time.time():.2f}] Start {name}")
await sleep(delay)
print(f"[{time.time():.2f}] End {name}")
# ----------- 启动事件循环 -----------
loop = EventLoop()
loop.create_task(task("A", 1))
loop.create_task(task("B", 2))
loop.create_task(task("C", 1.5))
loop.run_forever()
事件循环是唯一的“大总管”,所有协程任务(无论是 IO、sleep 还是普通计算)都会被它统一调度。
它会用:
-
deque 管理 ready 队列(立即可运行的);
-
heapq 管理 sleeping(有延迟);
-
epoll 管理 IO 就绪事件(fd 可读写);
事件循环方法
事件循环提供以下底层 API:
- 运行和停止事件循环
- 调度回调函数
- 调度延迟回调
- 创建 Future 和 Task
- 打开网络连接
- 创建网络服务器
- 文件传输
- TLS 升级
- 监听文件描述符
- 直接操作 socket 对象
- DNS 解析
- 操作管道
- 处理 Unix 信号
- 在线程池或进程池中执行代码
- 错误处理接口
- 启用调试模式
- 运行子进程
运行和停止事件循环
loop.run_until_complete(future)
运行事件循环直到 future(Future 的实例)完成。
- 如果参数是协程对象,则会隐式将其调度为 asyncio.Task。
- 返回 Future 的结果或抛出它的异常。
loop.run_forever()
运行事件循环,直到调用 stop()
停止。
- 如果在调用
run_forever()
之前调用了stop()
,事件循环会用零超时轮询一次 I/O 选择器,执行所有因 I/O 事件调度的回调(包括已经调度的),然后退出。 - 如果在
run_forever()
运行期间调用stop()
,事件循环会完成当前批次回调后退出。注意,此时回调中新调度的回调不会执行,会在下次调用run_forever()
或run_until_complete()
时执行。
loop.stop()
停止事件循环。
loop.is_running()
如果事件循环正在运行,返回 True。
loop.is_closed()
如果事件循环已经关闭,返回 True。
loop.close() 关闭事件循环。
- 调用时事件循环不能处于运行状态。
- 会丢弃所有待执行的回调。
- 清空所有队列并关闭执行器(executor),但不等待执行器完成。
- 此方法幂等且不可逆,事件循环关闭后不能再调用其他方法。
async loop.shutdown_asyncgens()
调度关闭所有当前打开的异步生成器对象(通过调用 aclose())。
- 调用后,如果有新的异步生成器被迭代,事件循环会发出警告。
- 用于可靠地完成所有已调度的异步生成器关闭。
- 使用
asyncio.run()
时无需调用此函数。
示例:
try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
async loop.shutdown_default_executor(timeout=None)
调度关闭默认执行器,并等待其线程池中的所有线程退出。
- 调用后,使用
loop.run_in_executor()
时若使用默认执行器,会抛出 RuntimeError。 timeout
参数是等待线程退出的最长时间(秒),默认无限制。- 超时后会发出 RuntimeWarning,默认执行器将被强制终止,不等待线程退出。
- 使用
asyncio.run()
时不需调用此函数,因其会自动处理默认执行器关闭。
调度callback
loop.call_soon(callback, *args, context=None)*
调度回调函数 callback
,带参数 args
,在事件循环下一次迭代时执行。
- 返回
asyncio.Handle
实例,可用于取消回调。 - 回调按注册顺序执行,每个只调用一次。
- 可选的
context
参数指定回调运行的自定义contextvars.Context
,未提供时使用当前上下文。 - 该方法不是线程安全的。
loop.call_soon_threadsafe(callback, *args, context=None)
线程安全版本的 call_soon()
,用于从其他线程调度回调。
- 可安全在重入环境或信号处理程序中调用,但返回的
Handle
不适合在这些上下文中使用。 - 如果事件循环已关闭,调用会抛出 RuntimeError(如应用关闭时的次线程调用)。
- 3.7 版本新增
context
参数。
注意:大多数 asyncio 调度函数不支持传递关键字参数,若需传递请用
functools.partial()
,例如:
loop.call_soon(functools.partial(print, "Hello", flush=True))
使用 partial 对象通常比 lambda 更方便,且调试信息更友好。
调度延迟回调
事件循环提供机制调度回调函数在将来某个时间点调用,使用单调时钟计时。
*loop.call_later(delay, callback, args, context=None)
延迟 delay
秒后调用回调函数。
- 返回
asyncio.TimerHandle
实例,可用于取消回调。 - 回调只调用一次,若两个回调时间相同,调用顺序未定义。
- 位置参数
args
会传给回调;若需关键字参数请用functools.partial()
。 - 可选
context
指定回调上下文,未提供时用当前上下文。 - 3.7 版本新增
context
参数。 - 3.8 版本修复了延迟不能超过一天的问题。
*loop.call_at(when, callback, args, context=None)
在绝对时间 when
(同 loop.time()
时间参考)调用回调。
- 其他行为与
call_later()
相同。 - 返回
asyncio.TimerHandle
可用于取消。 - 3.7 版本新增
context
参数。 - 3.8 版本修复了时间差不能超过一天的问题。
loop.time()
返回事件循环内部单调时钟的当前时间(浮点数)。
注意:3.7 及以前版本中,超时时间不能超过一天,3.8 版本已修复。
另见 asyncio.sleep()
函数。
…
anyio
AnyIO 是一个基于 Python 的异步编程库,旨在提供统一且高级的异步 API,它在 asyncio、Trio 和 Curio 这几个不同的异步框架之上进行抽象,允许你用同一套接口编写异步代码,同时在不同的异步运行时环境之间无缝切换。
相比于 asyncio,AnyIO 有几个显著的优势和特点:
-
多后端支持
AnyIO 不仅支持标准的 asyncio,还支持 Trio 和 Curio 作为后端运行时。这意味着你编写的代码可以在不同异步库之间切换,而无需重写业务逻辑。这种兼容性让你的代码更加灵活且可移植。 -
更高级的抽象与结构化并发
AnyIO 引入了结构化并发(structured concurrency)的理念,借鉴了 Trio 的设计思想,使得异步任务的管理更加安全和可控。比如它提供了类似于“任务组”(task groups)的功能,能够优雅地启动和管理一组协程,并保证当某个协程失败时,整个任务组能正确地取消和清理,避免孤儿协程和资源泄露。 -
统一的接口和更友好的 API 设计
AnyIO 的 API 设计更加现代和简洁,提供了统一的同步原语(锁、事件、信号量等)、网络通讯、超时控制、取消机制等工具,这些接口对不同后端行为做了统一,降低了开发者对底层细节的关注和学习成本。 -
更好的错误处理和取消机制
AnyIO 采用了更健壮的错误传播和取消机制,支持任务取消的传播和干净的资源释放,使得编写健壮、可靠的异步程序更加简单。 -
跨平台的兼容性
AnyIO 支持 Windows、Linux、macOS 等多个操作系统,并对底层异步 I/O 的差异做了良好封装,提升了代码的可移植性。 -
社区活跃且文档完善
AnyIO 作为现代异步生态的重要组成部分,社区活跃,文档详尽,配套示例丰富,便于开发者上手和深入理解异步编程。
总结来说,AnyIO 并非单纯替代 asyncio,而是站在更高层,提供一个兼容多个异步框架的抽象层,让你写出的异步代码更加结构化、安全、易维护,且能适配不同异步运行时环境,减少未来迁移和扩展的成本。如果你想提升异步编程的效率和代码质量,学习 AnyIO 是非常值得的选择。
使用 AnyIO API 编写的应用程序和库,可以无需修改直接运行在 asyncio 或 trio 之上。AnyIO 也支持逐步集成到现有的库或应用中——可以一点一点地引入,无需完全重构。它能够与所选后端的原生库无缝融合。
完整文档请访问:https://anyio.readthedocs.io/
AnyIO 提供如下功能:
- 任务组(在 trio 术语中称为“nurseries”)
- 高级网络支持(TCP、UDP 和 UNIX 套接字)
- TCP 连接的 Happy Eyeballs 算法(比 Python 3.8 中 asyncio 的实现更健壮)
- 基于 async/await 风格的 UDP 套接字(区别于 asyncio 仍需使用 Transport 和 Protocol 的方式)
- 多功能的字节流和对象流 API
- 任务间同步与通信原语(锁、条件变量、事件、信号量、对象流)
- 工作线程支持
- 子进程管理
- 异步文件 I/O(通过工作线程实现)
- 信号处理
AnyIO 还自带了 pytest 插件,支持异步 fixture,甚至兼容流行的 Hypothesis 测试库。
快速开始
AnyIO 需要 Python 3.8 或更高版本才能运行。建议在开发或尝试 AnyIO 时使用 virtualenv
来创建虚拟环境。
安装 AnyIO,执行:
pip install anyio
如果你希望使用支持的 Trio 后端,可以通过如下方式额外安装:
pip install anyio[trio]
最简单的 AnyIO 程序如下所示:
from anyio import run
async def main():
print('Hello, world!')
run(main)
这段代码会在默认后端(即 asyncio
)上运行。若想使用其他后端(如 Trio),可以通过 backend
参数指定:
run(main, backend='trio')
不过 AnyIO 的代码不一定非要通过 run()
启动。你也可以直接使用后端库的原生 run()
方法来运行:
import sniffio
import trio
from anyio import sleep
async def main():
print('Hello')
await sleep(1)
print("I'm running on", sniffio.current_async_library())
trio.run(main)
后端专属选项
asyncio:
- 相关选项详见
asyncio.Runner
的文档。 use_uvloop
(布尔型,默认值为False
):若可用,则使用更快的uvloop
事件循环实现(这是loop_factory=uvloop.new_event_loop
的简写;当手动传入loop_factory
时此选项无效)。
Trio:
- Trio 的选项详见其 官方文档。
📌 版本变更:
3.2.0
:use_uvloop
的默认值改为False
4.0.0
:policy
选项被loop_factory
取代
AnyIO 允许你在同一个项目中混合使用 AnyIO 代码和你所选异步框架(如 asyncio 或 trio)编写的代码。但要注意以下规则:
-
只能使用当前后端支持的“原生”库 例如:不能将一个为 Trio 编写的库和为 asyncio 编写的库混用。
-
在非 Trio 后端中由“原生”库创建的任务,不受 AnyIO 强制的取消规则约束。
-
在 AnyIO 外部启动的线程,不能使用
from_thread.run()
来调用异步代码。
创建与管理任务
任务(Task)是异步程序中的执行单元,用于并发执行需要等待的操作。虽然可以创建任意数量的任务,但异步事件循环在任意时刻只能运行其中一个。当任务执行到一个 await
表达式,并需要等待某个事件时,事件循环就会切换去运行其他任务。一旦原任务所等待的事件完成,事件循环就会在适当时机恢复它的执行。
AnyIO 的任务管理大致遵循了 Trio 的模型。任务通过「任务组」(Task Group)创建。任务组是一个异步上下文管理器(async with
),其作用是在退出上下文块时,确保其所有子任务都已完成。如果上下文中的代码或子任务抛出异常,所有子任务将被取消;如果没有异常发生,任务组会等待所有子任务完成后再继续执行。
示例:
from anyio import sleep, create_task_group, run
async def sometask(num: int) -> None:
print('Task', num, 'running')
await sleep(1)
print('Task', num, 'finished')
async def main() -> None:
async with create_task_group() as tg:
for num in range(5):
tg.start_soon(sometask, num)
print('All tasks finished!')
run(main)
有时候,我们希望「等待任务成功初始化之后」再继续后续逻辑。例如,当你启动网络服务时,你可能想等监听器真正启动后,再通知主任务接着做其他事情。如果监听失败(如端口绑定失败),这个异常也会被传递回主任务,使其可以进行处理。
这可以通过 TaskGroup.start()
实现:
from anyio import (
TASK_STATUS_IGNORED,
create_task_group,
connect_tcp,
create_tcp_listener,
run,
)
from anyio.abc import TaskStatus
async def handler(stream):
...
async def start_some_service(
port: int, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED
):
async with await create_tcp_listener(
local_host="127.0.0.1", local_port=port
) as listener:
task_status.started()
await listener.serve(handler)
async def main():
async with create_task_group() as tg:
await tg.start(start_some_service, 5000)
async with await connect_tcp("127.0.0.1", 5000) as stream:
...
run(main)
注意:
- 被
start()
启动的协程必须调用task_status.started()
,否则调用方将一直阻塞,最后触发RuntimeError
。 - 与
start_soon()
不同,start()
是一个需要await
的异步方法。
一个任务组中可能会有多个任务抛出异常。例如,一个任务在响应取消时进入了 except
或 finally
块并抛出了异常。那么,任务组应该传播哪个异常?答案是:全部。
这会导致任务组抛出一个 ExceptionGroup
(或 BaseExceptionGroup
)类型的复合异常,它包含了多个异常对象。
Python 3.11 及以上可以使用 except*
语法捕获多个异常:
from anyio import create_task_group
try:
async with create_task_group() as tg:
tg.start_soon(some_task)
tg.start_soon(another_task)
except* ValueError as excgroup:
for exc in excgroup.exceptions:
... # 处理每个 ValueError
except* KeyError as excgroup:
for exc in excgroup.exceptions:
... # 处理每个 KeyError
兼容旧版本 Python可以使用 exceptiongroup
提供的 catch()
函数:
from anyio import create_task_group
from exceptiongroup import catch
def handle_valueerror(excgroup):
for exc in excgroup.exceptions:
... # 处理每个 ValueError
def handle_keyerror(excgroup):
for exc in excgroup.exceptions:
... # 处理每个 KeyError
with catch({
ValueError: handle_valueerror,
KeyError: handle_keyerror
}):
async with create_task_group() as tg:
tg.start_soon(some_task)
tg.start_soon(another_task)
如果需要在异常处理器中修改外部变量,请使用 nonlocal
关键字声明:
def handle_valueerror(exc):
nonlocal somevariable
somevariable = 'whatever'
每当创建新任务时,「上下文」会被复制到新任务中。这里的“上下文”是指 Python 的 contextvars
(上下文变量)。需要注意的是:复制的是调用 start()
或 start_soon()
的那个任务的上下文,而不是任务组创建者的上下文。
假设你有两个任务:
-
任务 A 创建了一个任务组 tg
-
任务 B 在这个任务组中通过 tg.start_soon(…) 创建了一个新任务 C
-
那么任务 C 的上下文(contextvars)复制自任务 B,而不是任务 A。
也就是说,哪个任务调用了 start_soon(),就复制它的上下文。
import anyio
from contextvars import ContextVar
var = ContextVar("var", default="default")
async def task2():
print("task2 sees var =", var.get()) # 期望看到 "B"
async def task1():
var.set("B")
async with anyio.create_task_group() as tg:
tg.start_soon(task2)
async def main():
var.set("A")
await task1()
anyio.run(main)
输出将是:
task2 sees var = B
与 asyncio.TaskGroup
的区别
Python 3.11 引入的 asyncio.TaskGroup
与 AnyIO 的 TaskGroup
类似,但也存在一些语义差异:
asyncio.TaskGroup
通过直接构造类对象创建,不使用工厂函数。- 它只支持
create_task()
来创建子任务,没有start()
或start_soon()
方法。 create_task()
返回Task
对象,可单独await
或cancel
。- 每个任务必须单独取消,
asyncio.TaskGroup
没有提供统一取消所有任务的机制。 - 如果任务在启动前就被取消,将不会有机会处理取消异常。
- 一旦某个任务抛出异常,触发任务组关闭,就不能再添加新任务。
- 取消语义不同,详见 asyncio 的取消规则说明。
为什么需要 contextvars?
在同步编程里,我们可以使用 threading.local() 实现“每个线程有一份自己的状态”:
import threading
local_data = threading.local()
local_data.user = "Alice"
但异步中,所有协程是运行在同一个线程的,你无法再靠 thread local 区分每个任务的状态 —— 所以 Python 引入了 contextvars。
它能实现:
var = ContextVar("var")
var.set("user-123")
# 当前协程内部使用
print(var.get()) # user-123
# 其他协程无法看到这个值,除非显式设置或继承
取消与超时
异步编程模型最显著的优势之一就是可以取消任务。 相比之下,线程无法被强制终止,关闭线程必须完全依赖其内部代码的主动配合。
在 AnyIO 中,任务的取消机制遵循 Trio 框架的设计:通过“取消域”(cancel scopes)来实现任务的取消。
取消域可以嵌套使用,并通过上下文管理器进行控制。**当一个取消域被取消时,其内部所有嵌套的取消域也会一并被取消。**如果一个任务正在等待某些操作(如 sleep()
),它会立即被取消;如果任务刚开始执行,它会继续运行直到遇到第一个“等待点”。
任务组(task group)自身就包含一个取消域,你可以通过取消该取消域来取消整个任务组。
在 asyncio
中,采用的是“边缘取消(edge cancellation)” 机制:任务被取消时,会抛出 CancelledError
异常,任务本身可以捕获、忽略甚至完全不处理这个异常。
相比之下,AnyIO 使用的是“层级取消(level cancellation)”机制:只要任务处于一个已经被取消的取消域内,每次遇到 await
、async with
、async for
等等待点,都会被重新触发取消异常。
这意味着一些为 asyncio 编写的代码在 AnyIO 中可能行为异常。例如,asyncio.Condition
的设计是忽略取消异常,直到成功重新获取底层锁。这会导致一个忙等循环(busy-wait loop),疯狂消耗 CPU 资源。
超时机制
网络操作通常可能很慢,因此我们希望设置超时机制防止任务无限卡住。AnyIO 提供两种超时方式:
move_on_after(timeout)
:超时后提前退出上下文块,但不会报错。fail_after(timeout)
:超时后抛出TimeoutError
异常。
这两个方法都会创建一个新的取消域(cancel scope)。你可以通过访问 .deadline
查看截止时间。不过外部的取消域也可能有更早的截止时间,因此可以用 current_effective_deadline()
获取实际生效的时间。
示例代码如下:
from anyio import create_task_group, move_on_after, sleep, run
async def main():
async with create_task_group() as tg:
with move_on_after(1) as scope:
print('Starting sleep')
await sleep(2)
print('This should never be printed')
print('Exited cancel scope, cancelled =', scope.cancelled_caught)
run(main)
注意: 不要在 fail_after()
中手动调用取消操作,这可能导致由于退出延迟,错误地抛出 TimeoutError
。
屏蔽取消(Shielding)
有时你需要暂时保护某些代码块不被取消,常见于资源释放、清理阶段。可以使用带 shield=True
的取消域实现:
from anyio import CancelScope, create_task_group, sleep, run
async def external_task():
print('Started sleeping in the external task')
await sleep(1)
print('This line should never be seen')
async def main():
async with create_task_group() as tg:
with CancelScope(shield=True) as scope:
tg.start_soon(external_task)
tg.cancel_scope.cancel()
print('Started sleeping in the host task')
await sleep(1)
print('Finished sleeping in the host task')
run(main)
被 shield=True
包裹的块不会被外部取消影响,除非其本身被直接取消。通常建议配合 move_on_after()
或 fail_after()
使用,它们也支持 shield=True
。
清理逻辑(Finalization)
有时你可能希望在操作失败后进行清理:
async def do_something():
try:
await run_async_stuff()
except BaseException:
# 执行清理操作
raise
如果你只想捕获取消异常,这会有点麻烦,因为不同框架定义的取消异常类不一样。AnyIO 提供了 get_cancelled_exc_class()
用来获取当前框架的取消异常类:
from anyio import get_cancelled_exc_class
async def do_something():
try:
await run_async_stuff()
except get_cancelled_exc_class():
# 执行清理操作
raise
⚠️ 务必重新抛出取消异常,否则你的应用可能会进入不确定状态。
如果你需要在清理逻辑中使用 await
,则必须包裹在一个 shield=True
的取消域中,否则由于已处于取消状态,await 会被立即打断:
async def do_something():
try:
await run_async_stuff()
except get_cancelled_exc_class():
with CancelScope(shield=True):
await some_cleanup_function()
raise
避免取消域栈结构损坏
取消域必须以 LIFO(后进先出的顺序进入与退出(即上下文嵌套必须对称)。否则可能导致取消栈结构损坏,例如以下几种情况容易出错:
- 手动调用
CancelScope.__enter__()
和__exit__()
,且顺序不正确 - 与
AsyncExitStack
混用,但未遵循嵌套语义 - 使用低层协程协议,在不同取消域中运行一个协程函数的部分代码
- 在取消域中使用
async generator
时发生yield
任务组(task group)内部也有取消域,所以上述问题同样适用。
以下代码是不安全的示例:
# ❌ 错误示例
async def some_generator():
async with create_task_group() as tg:
tg.start_soon(foo)
yield
这里的问题是:如果 foo
抛出异常,外部生成器已经返回,异常不会被正确传播,导致不可预期行为。
但在 async 上下文管理器中这样用通常是安全的:
# ✅ 一般来说没问题
@async_context_manager
async def some_context_manager():
async with create_task_group() as tg:
tg.start_soon(foo)
yield
从 AnyIO 3.6 开始,pytest 的异步生成器 fixture 也被修改为始终在同一个任务中运行,因此这种写法在 fixture 中也变得安全。
如果你手动实现异步上下文协议,且还要管理多个上下文,推荐使用 AsyncExitStack
保证入栈与出栈顺序一致:
from contextlib import AsyncExitStack
from anyio import create_task_group
class MyAsyncContextManager:
async def __aenter__(self):
self._exitstack = AsyncExitStack()
await self._exitstack.__aenter__()
self._task_group = await self._exitstack.enter_async_context(
create_task_group()
)
async def __aexit__(self, exc_type, exc_val, exc_tb):
return await self._exitstack.__aexit__(exc_type, exc_val, exc_tb)
以下是原文的中文翻译:
使用同步原语
同步原语是用于任务之间通信与协调的对象。它们在分发工作负载、通知其他任务以及保护对共享资源的访问等方面非常有用。
注意
AnyIO 的同步原语不是线程安全的,因此不应直接从工作线程中使用。若需在线程中使用,请通过run_sync()
调用。
事件(Event)
事件用于通知任务某些它们一直等待的事情已经发生。一个事件对象可以有多个监听者,一旦事件被触发,所有监听者都会收到通知。
示例:
from anyio import Event, create_task_group, run
async def notify(event):
event.set()
async def main():
event = Event()
async with create_task_group() as tg:
tg.start_soon(notify, event)
await event.wait()
print('收到通知!')
run(main)
注意
与标准库的Event
不同,AnyIO 的事件不可重用,应在每次使用后替换。这一做法能防止某类竞争条件,并符合 Trio 框架的语义。
信号量(Semaphore)
信号量用于限制对共享资源的访问。它从一个最大值开始,每次有任务获取信号量时就减 1,释放时加 1。若值降为 0,则后续尝试获取的任务将被阻塞,直到有任务释放它。
示例:
from anyio import Semaphore, create_task_group, sleep, run
async def use_resource(tasknum, semaphore):
async with semaphore:
print('任务', tasknum, '正在使用共享资源')
await sleep(1)
async def main():
semaphore = Semaphore(2)
async with create_task_group() as tg:
for num in range(10):
tg.start_soon(use_resource, num, semaphore)
run(main)
提示
若对信号量性能要求极高,可传入fast_acquire=True
参数。这会跳过cancel_shielded_checkpoint()
的调用(若无竞争时立即成功获取),但在某些循环中使用时可能导致任务永远不让出事件循环控制权。
锁(Lock)
锁用于保护共享资源,确保某一时刻只有一个任务能访问它。其行为类似于最大值为 1 的信号量,但只有持有锁的任务才能释放它。
示例:
from anyio import Lock, create_task_group, sleep, run
async def use_resource(tasknum, lock):
async with lock:
print('任务', tasknum, '正在使用共享资源')
await sleep(1)
async def main():
lock = Lock()
async with create_task_group() as tg:
for num in range(4):
tg.start_soon(use_resource, num, lock)
run(main)
提示
类似于信号量,可传入fast_acquire=True
以优化性能(跳过事件检查点)。
条件变量(Condition)
条件变量基本上是事件与锁的组合体。它会先获取锁,然后等待某个通知事件。一旦收到通知,它会释放锁。发送通知的任务可以选择唤醒一个、多个甚至全部等待者。
条件变量与锁类似,必须由获取它的任务来释放。
示例:
from anyio import Condition, create_task_group, sleep, run
async def listen(tasknum, condition):
async with condition:
await condition.wait()
print('唤醒任务', tasknum)
async def main():
condition = Condition()
async with create_task_group() as tg:
for tasknum in range(6):
tg.start_soon(listen, tasknum, condition)
await sleep(1)
async with condition:
condition.notify(1)
await sleep(1)
async with condition:
condition.notify(2)
await sleep(1)
async with condition:
condition.notify_all()
run(main)
容量限制器(CapacityLimiter)
容量限制器与信号量类似,但每个借用者(默认为当前任务)只能持有一个令牌。它还允许你代表任意可哈希对象借用令牌。
示例:
from anyio import CapacityLimiter, create_task_group, sleep, run
async def use_resource(tasknum, limiter):
async with limiter:
print('任务', tasknum, '正在使用共享资源')
await sleep(1)
async def main():
limiter = CapacityLimiter(2)
async with create_task_group() as tg:
for num in range(10):
tg.start_soon(use_resource, num, limiter)
run(main)
你可以通过设置 total_tokens
属性调整限制器的最大令牌数。
ResourceGuard
某些资源(如 socket)对并发访问非常敏感,甚至不允许尝试同时使用。这种情况可使用 ResourceGuard
保护:
class Resource:
def __init__(self):
self._guard = ResourceGuard()
async def do_something(self) -> None:
with self._guard:
...
如果另一个任务在第一个任务还未结束前尝试访问相同的资源方法,将会抛出 BusyResourceError
。
队列(Queues)
AnyIO 没有提供传统的队列机制,而是提供了更强大的内存对象流(memory object streams)作为替代。
流(Streams)
在 AnyIO 中,“流”是一个用于从一个地方传输信息到另一个地方的简单接口。它既可以用于进程内通信,也可以用于通过网络发送数据。AnyIO 将流分为两类:字节流和对象流。
字节流(在 Trio 中称为 “Streams”)是接收和/或发送字节块的对象。它们的设计基于流套接字的限制,意味着数据边界不会被保留。例如,如果你先调用 .send(b'hello ')
,然后再调用 .send(b'world')
,对方可能会以任意方式接收数据,如 (b'hello', b' world')
、b'hello world'
或 (b'hel', b'lo wo', b'rld')
。
对象流(在 Trio 中称为 “Channels”)处理的是 Python 对象。最常见的实现是内存对象流(memory object stream)。对象流的具体语义因实现方式不同而有较大差异。
许多流的实现是对其他流的封装。其中一些可以封装任何以字节为单位的流,即 ObjectStream[bytes]
和 ByteStream
,从而实现多种有趣的用途。
内存对象流(Memory Object Streams)
内存对象流用于在多个任务间实现生产者-消费者模式。使用 create_memory_object_stream()
会返回一对对象流:一个用于发送,一个用于接收。它们本质上类似队列,但支持关闭和异步迭代。
默认情况下,内存对象流的缓冲区大小为 0,意味着 send()
会阻塞直到有其他任务调用 receive()
。你可以在创建流时设置缓冲区大小,也可以传入 math.inf
实现无限缓冲,但这并不推荐。
可以通过调用 clone()
方法克隆内存对象流。每个克隆可以单独关闭,但只有当所有克隆都关闭后,该端才被视为真正关闭。例如,如果你有两个接收流的克隆,发送端只有在两个接收端都关闭后,才会抛出 BrokenResourceError
。
多个任务可以同时使用同一个流(或其克隆)进行发送和接收,但每个发送的项目只能被一个接收者接收。
接收端可以通过异步迭代协议遍历所有接收的对象。当所有发送端关闭后,循环结束。
示例:
from anyio import create_task_group, create_memory_object_stream, run
from anyio.streams.memory import MemoryObjectReceiveStream
async def process_items(receive_stream: MemoryObjectReceiveStream[str]) -> None:
async with receive_stream:
async for item in receive_stream:
print('received', item)
async def main():
send_stream, receive_stream = create_memory_object_stream[str]()
async with create_task_group() as tg:
tg.start_soon(process_items, receive_stream)
async with send_stream:
for num in range(10):
await send_stream.send(f'number {num}')
run(main)
与 AnyIO 中其他流不同(但与 Trio 中的 Channels 一致),内存对象流可以同步关闭,通过 close()
方法或使用上下文管理器:
from anyio.streams.memory import MemoryObjectSendStream
def synchronous_callback(send_stream: MemoryObjectSendStream[str]) -> None:
with send_stream:
send_stream.send_nowait('hello')
组合流(Stapled Streams)
组合流将兼容的接收流和发送流组合成一个双向流。
分为两种变体:
StapledByteStream
:组合ByteReceiveStream
与ByteSendStream
StapledObjectStream
:组合ObjectReceiveStream
与兼容的ObjectSendStream
缓冲字节流(Buffered Byte Streams)
缓冲字节流对现有的字节接收流进行封装,提供一些需要缓冲的功能,如接收固定字节数,或接收到特定分隔符为止。
示例:
from anyio import run, create_memory_object_stream
from anyio.streams.buffered import BufferedByteReceiveStream
async def main():
send, receive = create_memory_object_stream
buffered = BufferedByteReceiveStream(receive)
for part in b'hel', b'lo, ', b'wo', b'rld!':
await send.send(part)
result = await buffered.receive_exactly(8)
print(repr(result))
result = await buffered.receive_until(b'!', 10)
print(repr(result))
run(main)
输出:
b'hello, w'
b'orld'
文本流(Text Streams)
文本流将已有的发送/接收流封装起来,实现字符串到字节的编码/解码。
示例:
from anyio import run, create_memory_object_stream
from anyio.streams.text import TextReceiveStream, TextSendStream
async def main():
bytes_send, bytes_receive = create_memory_object_stream
text_send = TextSendStream(bytes_send)
await text_send.send('åäö')
result = await bytes_receive.receive()
print(repr(result))
text_receive = TextReceiveStream(bytes_receive)
await bytes_send.send(result)
result = await text_receive.receive()
print(repr(result))
run(main)
输出:
b'\xc3\xa5\xc3\xa4\xc3\xb6'
'åäö'
文件流(File Streams)
文件流用于从文件系统中的文件读取或写入数据,常用于日志记录、调试或模拟数据输入。
示例:
from anyio import run
from anyio.streams.file import FileReadStream, FileWriteStream
async def main():
path = '/tmp/testfile'
async with await FileWriteStream.from_path(path) as stream:
await stream.send(b'Hello, World!')
async with await FileReadStream.from_path(path) as stream:
async for chunk in stream:
print(chunk.decode(), end='')
print()
run(main)
文件流功能自 AnyIO 3.0 添加
TLS 流(TLS Streams)
TLS(传输层安全协议)是 SSL 的继任者,是 AnyIO 支持的 TCP 流认证与加密传输方式。
TLS 通常在连接建立后立即握手,主要步骤包括:
- 服务器发送证书
- 客户端验证对方证书是否为可信 CA 所签发
- 验证主机名是否与证书匹配
获取服务器证书
你可以通过以下三种方式获取 X.509 证书:
- 自签名证书(最简单,但不推荐用于生产环境)
- 使用 certbot 等工具从 Let’s Encrypt 自动获取
- 从证书提供商处购买(如需高级验证或无法自动续期)
示例:使用 openssl 创建自签名证书:
openssl req -x509 -newkey rsa:2048 -subj '/CN=localhost' -keyout key.pem -out cert.pem -nodes -days 365
设置 TLS 服务端
import ssl
from anyio import create_tcp_listener, run
from anyio.streams.tls import TLSListener
async def handle(client):
async with client:
name = await client.receive()
await client.send(b'Hello, %s\n' % name)
async def main():
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(certfile='cert.pem', keyfile='key.pem')
listener = TLSListener(await create_tcp_listener(local_port=1234), context)
await listener.serve(handle)
run(main)
连接 TLS 服务端
import ssl
from anyio import connect_tcp, run
async def main():
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
context.load_verify_locations(cafile='cert.pem')
async with await connect_tcp('localhost', 1234, ssl_context=context) as client:
await client.send(b'Client\n')
response = await client.receive()
print(response)
run(main)
动态生成自签名证书(用于测试)
推荐使用 trustme 库:
import ssl
import pytest
import trustme
@pytest.fixture(scope='session')
def ca():
return trustme.CA()
@pytest.fixture(scope='session')
def server_context(ca):
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ca.issue_cert('localhost').configure_cert(context)
return context
@pytest.fixture(scope='session')
def client_context(ca):
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ca.configure_trust(context)
return context
处理不完整的 TLS 关闭(ragged EOF)
根据 TLS 标准,连接应以关闭握手结束,以防止截断攻击。但如 HTTP 等协议经常跳过此步骤。
AnyIO 默认遵循标准(与 Python 内建 ssl
模块不同),因此,如果你实现的是非标准关闭的协议,应设置:
standard_compatible=False
用于 wrap()
或 TLSListener
,以避免报错。
使用套接字(sockets)和流(streams)
网络功能可以说是任何异步库中最重要的部分。AnyIO 在各个支持的底层后端提供的低级原语基础上,实现了自己的高级网络功能。
目前 AnyIO 支持以下网络功能:
- TCP 套接字(客户端 + 服务器)
- UNIX 域套接字(客户端 + 服务器)
- UDP 套接字
- UNIX 数据报套接字
目前不支持更特殊的网络形式,如原始套接字(raw sockets)和 SCTP。
注意
与标准 BSD 套接字接口及大多数其他网络库不同,AnyIO(从 2.0 版本起)通过抛出 EndOfStream
异常来标识流结束,而非返回空字节对象。
使用 TCP 套接字
TCP(传输控制协议)是互联网中最常用的协议。它允许你连接到远程主机的端口,并可靠地发送和接收数据。
要连接到某个 TCP 监听端口,可以使用 connect_tcp()
:
from anyio import connect_tcp, run
async def main():
async with await connect_tcp('hostname', 1234) as client:
await client.send(b'Client\n')
response = await client.receive()
print(response)
run(main)
方便起见,你也可以通过传入 tls=True
,或传入非空的 ssl_context
或 tls_hostname
,使用 connect_tcp()
建立 TLS 会话。
要接收传入的 TCP 连接,首先创建 TCP 监听器 create_tcp_listener()
,然后调用它的 serve()
:
from anyio import create_tcp_listener, run
async def handle(client):
async with client:
name = await client.receive(1024)
await client.send(b'Hello, %s\n' % name)
async def main():
listener = await create_tcp_listener(local_port=1234)
await listener.serve(handle)
run(main)
使用 UNIX 套接字
UNIX 域套接字是在类 UNIX 操作系统中用于进程间通信的一种方式。它们不能用于连接远程主机,并且在 Windows 上不可用。
UNIX 域套接字的 API 类似于 TCP 套接字,不同之处在于使用文件系统路径代替主机/端口组合。
下面是将 TCP 客户端示例改写为 UNIX 套接字版本的样子:
from anyio import connect_unix, run
async def main():
async with await connect_unix('/tmp/mysock') as client:
await client.send(b'Client\n')
response = await client.receive(1024)
print(response)
run(main)
监听端示例:
from anyio import create_unix_listener, run
async def handle(client):
async with client:
name = await client.receive(1024)
await client.send(b'Hello, %s\n' % name)
async def main():
listener = await create_unix_listener('/tmp/mysock')
await listener.serve(handle)
run(main)
注意
UNIX 套接字监听器不会自动删除它创建的套接字文件,因此你可能需要手动删除。
发送和接收文件描述符
UNIX 套接字可用于向另一进程传递打开的文件描述符(套接字或文件)。接收方可以使用 os.fdopen()
或 socket.socket
获得可用的文件或套接字对象。
下面示例中,客户端连接到 UNIX 套接字服务器,接收服务器打开文件的描述符,读取文件内容并打印到标准输出。
客户端:
import os
from anyio import connect_unix, run
async def main():
async with await connect_unix('/tmp/mysock') as client:
_, fds = await client.receive_fds(0, 1)
with os.fdopen(fds[0]) as file:
print(file.read())
run(main)
服务器:
from pathlib import Path
from anyio import create_unix_listener, run
async def handle(client):
async with client:
with path.open('r') as file:
await client.send_fds(b'this message is ignored', [file])
async def main():
listener = await create_unix_listener('/tmp/mysock')
await listener.serve(handle)
path = Path('/tmp/examplefile')
path.write_text('Test file')
run(main)
使用 UDP 套接字
UDP(用户数据报协议)是一种网络数据包传输方式,不支持连接、重传或错误纠正等特性。
例如,你想创建一个 UDP “hello” 服务,读取数据包并向发送者回复带有 “Hello, ” 前缀的数据包,可以这样写:
import socket
from anyio import create_udp_socket, run
async def main():
async with await create_udp_socket(
family=socket.AF_INET, local_port=1234
) as udp:
async for packet, (host, port) in udp:
await udp.sendto(b'Hello, ' + packet, host, port)
run(main)
注意
如果你在本机测试或不确定使用哪个套接字族(family),建议将 family=socket.AF_INET
替换为 local_host='localhost'
。
如果需要向同一个目标发送大量数据包,可以“连接”UDP套接字到特定主机和端口,避免每次发送都传递地址和端口:
from anyio import create_connected_udp_socket, run
async def main():
async with await create_connected_udp_socket(
remote_host='hostname', remote_port=1234) as udp:
await udp.send(b'Hi there!\n')
run(main)
使用 UNIX 数据报套接字
UNIX 数据报套接字是 UNIX 域套接字的一个子集,区别在于:
- UNIX 套接字实现的是连续字节流的可靠通信(类似 TCP)
- UNIX 数据报套接字实现的是数据包的通信(类似 UDP)
其 API 仿照 UDP 套接字设计,使用文件系统路径代替主机/端口。
以下是用 UNIX 数据报套接字实现的 UDP “hello” 服务示例:
from anyio import create_unix_datagram_socket, run
async def main():
async with await create_unix_datagram_socket(
local_path='/tmp/mysock'
) as unix_dg:
async for packet, path in unix_dg:
await unix_dg.sendto(b'Hello, ' + packet, path)
run(main)
注意
如果没有设置 local_path
,UNIX 数据报套接字会绑定到一个未命名地址,通常无法接收其他 UNIX 数据报套接字发来的数据包。
类似于 UDP,如果你经常向同一个目标发送数据包,可以“连接”UNIX 数据报套接字到指定路径,避免每次发送时都传递路径:
from anyio import create_connected_unix_datagram_socket, run
async def main():
async with await create_connected_unix_datagram_socket(
remote_path='/dev/log'
) as unix_dg:
await unix_dg.send(b'Hi there!\n')
run(main)
使用线程
实际的异步应用有时需要运行网络、文件或计算密集型的操作。这类操作通常会阻塞异步事件循环,导致性能问题。解决方法是将这些代码放到工作线程(worker threads)中执行。这样事件循环可以继续执行其他任务,而工作线程执行阻塞调用。
在工作线程中运行函数
要在工作线程中运行一个(同步)可调用对象:
import time
from anyio import to_thread, run
async def main():
await to_thread.run_sync(time.sleep, 5)
run(main)
默认情况下,任务在等待工作线程完成时会被保护,不会被取消。如果需要允许任务被取消,可以传入 cancellable=True
参数。但需要注意,线程仍会继续运行,只是结果会被忽略。
从工作线程调用异步代码
如果需要在工作线程中调用协程函数,可以这样做:
from anyio import from_thread, sleep, to_thread, run
def blocking_function():
from_thread.run(sleep, 5)
async def main():
await to_thread.run_sync(blocking_function)
run(main)
注意:工作线程必须是使用 run_sync()
生成的线程。
从工作线程调用事件循环线程中的同步代码
有时你需要从工作线程调用事件循环线程中的同步代码,比如设置异步事件或向内存对象流发送数据。由于这些方法不是线程安全的,必须通过 run_sync()
在事件循环线程中执行:
import time
from anyio import Event, from_thread, to_thread, run
def worker(event):
time.sleep(1)
from_thread.run_sync(event.set)
async def main():
event = Event()
await to_thread.run_sync(worker, event)
await event.wait()
run(main)
从外部线程调用异步代码
如果需要从非事件循环生成的线程中运行异步代码,需要使用阻塞门户(blocking portal),该门户需从事件循环线程中获得。
一种方法是使用 start_blocking_portal
启动一个新的事件循环门户:
from anyio.from_thread import start_blocking_portal
with start_blocking_portal(backend='trio') as portal:
portal.call(...)
如果已经有运行中的事件循环且想让外部线程访问,可以直接创建一个 BlockingPortal
:
from anyio import run
from anyio.from_thread import BlockingPortal
async def main():
async with BlockingPortal() as portal:
# 将 portal 交给外部线程使用
await portal.sleep_until_stopped()
run(main)
从工作线程中启动任务
当你需要后台运行任务时,可以用 start_task_soon()
:
from concurrent.futures import as_completed
from anyio import sleep
from anyio.from_thread import start_blocking_portal
async def long_running_task(index):
await sleep(1)
print(f'Task {index} running...')
await sleep(index)
return f'Task {index} return value'
with start_blocking_portal() as portal:
futures = [portal.start_task_soon(long_running_task, i) for i in range(1, 5)]
for future in as_completed(futures):
print(future.result())
取消此类任务可以通过取消返回的 Future 来完成。
阻塞门户还有类似 TaskGroup.start()
的方法 start_task()
,它会等待任务调用 task_status.started()
表示已启动:
from anyio import sleep, TASK_STATUS_IGNORED
from anyio.from_thread import start_blocking_portal
async def service_task(*, task_status=TASK_STATUS_IGNORED):
task_status.started('STARTED')
await sleep(1)
return 'DONE'
with start_blocking_portal() as portal:
future, start_value = portal.start_task(service_task)
print('Task has started with value', start_value)
return_value = future.result()
print('Task has finished with return value', return_value)
在工作线程中使用异步上下文管理器
可以使用 wrap_async_context_manager()
将异步上下文管理器包装成同步上下文管理器:
from anyio.from_thread import start_blocking_portal
class AsyncContextManager:
async def __aenter__(self):
print('entering')
async def __aexit__(self, exc_type, exc_val, exc_tb):
print('exiting with', exc_type)
async_cm = AsyncContextManager()
with start_blocking_portal() as portal, portal.wrap_async_context_manager(async_cm):
print('inside the context manager block')
注意:不能在事件循环线程中的同步回调里使用被包装的异步上下文管理器。
上下文传播
在工作线程中运行函数时,当前上下文会复制到工作线程中,因此任务中的上下文变量对线程中的代码也有效。但对上下文变量的修改不会回传给调用的异步任务。
从工作线程调用异步代码时,上下文同样会复制到事件循环线程中运行的目标任务。
调整默认最大工作线程数
AnyIO 默认的工作线程限制器为 40,即无显式限制参数时,to_thread.run_sync()
最多会启动 40 个线程。可以这样调整:
from anyio import to_thread
async def foo():
# 将最大工作线程数设置为 60
to_thread.current_default_thread_limiter().total_tokens = 60
注意:AnyIO 默认的线程池限制不影响 asyncio 的默认线程池执行器。
在工作线程中响应取消
Python 无法取消正在运行的线程,但 AnyIO 提供了用户代码主动检查任务取消状态的机制,如果被取消则抛出取消异常,可调用 from_thread.check_cancelled()
实现:
from anyio import to_thread, from_thread
def sync_function():
while True:
from_thread.check_cancelled()
print("Not cancelled yet")
sleep(1)
async def foo():
with move_on_after(3):
await to_thread.run_sync(sync_function)
按需共享阻塞门户
如果你需要构建一个同步 API,按需启动阻塞门户,推荐使用更高效的 BlockingPortalProvider
:
from anyio.from_thread import BlockingPortalProvider
class MyAPI:
def __init__(self, async_obj) -> None:
self._async_obj = async_obj
self._portal_provider = BlockingPortalProvider()
def do_stuff(self) -> None:
with self._portal_provider as portal:
portal.call(self._async_obj.do_async_stuff)
这样无论多少线程同时调用 do_stuff()
,都会复用同一个阻塞门户,效率更高。
使用子进程
AnyIO 允许你在子进程中运行任意可执行程序,可以一次性调用,也可以打开进程句柄以便更灵活地控制子进程。
你可以将命令作为字符串传入,这样命令会被传给默认 shell(等同于 subprocess.run()
中的 shell=True
),或者将命令以字符串序列传入(shell=False
),此时序列第一个元素是可执行文件,后续元素是传给它的参数。
运行一次性命令
使用 run_process()
运行外部命令:
from anyio import run_process, run
async def main():
result = await run_process('ps')
print(result.stdout.decode())
run(main)
上例中 ps
命令是在 shell 中运行的。如果想直接运行(不经过 shell):
from anyio import run_process, run
async def main():
result = await run_process(['ps'])
print(result.stdout.decode())
run(main)
操作进程
当你需要更复杂的子进程交互时,可以使用 open_process()
启动子进程:
from anyio import open_process, run
from anyio.streams.text import TextReceiveStream
async def main():
async with await open_process(['ps']) as process:
async for text in TextReceiveStream(process.stdout):
print(text)
run(main)
在工作进程中运行函数
当你需要运行 CPU 密集型代码时,使用工作进程优于线程。因为除非使用 Python 3.13 及以后版本的实验性自由线程构建,否则当前的 Python 实现无法同时在多个线程中执行 Python 代码。
例外情况包括:
- 阻塞的 I/O 操作
- 显式释放全局解释器锁(GIL)的 C 扩展代码
- 子解释器工作进程(实验性,仅 Python 3.13 及以后版本支持)
如果你的代码不属于上述情况,建议使用工作进程来利用多核 CPU。使用 to_process.run_sync()
实现:
import time
from anyio import run, to_process
def cpu_intensive_function(arg1, arg2):
time.sleep(1)
return arg1 + arg2
async def main():
result = await to_process.run_sync(cpu_intensive_function, 'Hello, ', 'world!')
print(result)
# 使用 to_process.run_sync() 时,这个判断很重要
if __name__ == '__main__':
run(main)
技术细节
- 传入的参数必须可被 pickle(使用最高协议)序列化
- 返回值也必须可被 pickle 序列化
- 目标函数必须是可导入的(lambda 和内部函数不支持)
- 即使调用时
cancellable=False
,调用请求发送给工作进程之前也可能被取消 - 如果在工作进程执行期间调用被取消,工作进程会被杀死
- 工作进程会导入父进程的
__main__
模块,需用if __name__ == '__main__':
防止导入时产生副作用导致无限递归 sys.stdin
、sys.stdout
和sys.stderr
会重定向到/dev/null
,因此print()
和input()
不可用- 工作进程在闲置 5 分钟后或事件循环结束时会终止
- 在 asyncio 中,必须使用
asyncio.run()
或anyio.run()
来确保正确清理 - 当前不支持多进程样式的同步原语
使用子解释器(Subinterpreters)
子解释器提供了介于工作线程和工作进程之间的折中方案。它们允许你利用多个 CPU 核心来运行 Python 代码,同时避免了启动子进程带来的开销和复杂性。
子解释器功能目前仍属实验性质。Python 底层用于管理子解释器的 API 尚未定型,且缺乏足够的实际测试。因此,不建议在重要项目中使用此功能。
在工作子解释器中运行函数
当满足以下情况时,使用工作子解释器运行函数是合理的:
- 你想并行运行的代码是 CPU 密集型的;
- 代码是纯 Python 代码,或扩展模块代码且不释放全局解释器锁(GIL);
如果代码只是做阻塞的网络 I/O 或文件 I/O,使用工作线程更合适。
可以通过 interpreter.run_sync()
来实现:
import time
from anyio import run, to_interpreter
from yourothermodule import cpu_intensive_function
async def main():
result = await to_interpreter.run_sync(
cpu_intensive_function, 'Hello, ', 'world!'
)
print(result)
run(main)
限制
- 仅支持 Python 3.13 及以后版本;
- 不能运行
__main__
模块中的代码(因此 REPL 中定义的函数也无法运行); - 目标函数无法响应取消操作;
- 与线程不同,子解释器中的代码不能与其他解释器或线程共享可变数据,但共享不可变数据是允许的。
异步文件 I/O 支持
AnyIO 提供了对阻塞文件操作的异步封装。这些封装会在线程池中运行阻塞操作,从而实现异步效果。
示例:
from anyio import open_file, run
async def main():
async with await open_file('/some/path/somewhere') as f:
contents = await f.read()
print(contents)
run(main)
这些封装还支持逐行异步迭代文件内容,就像标准文件对象支持同步迭代一样:
from anyio import open_file, run
async def main():
async with await open_file('/some/path/somewhere') as f:
async for line in f:
print(line, end='')
run(main)
如果你已有一个已打开的同步文件对象,可以使用 wrap_file()
把它包装成异步文件对象:
from anyio import wrap_file, run
async def main():
with open('/some/path/somewhere') as f:
async for line in wrap_file(f):
print(line, end='')
run(main)
注意
关闭异步包装器时,也会关闭底层的同步文件对象。
异步路径操作
AnyIO 提供了 pathlib.Path 的异步版本。它与原版有以下不同:
- 进行磁盘 I/O 的操作(如
read_bytes()
)会在线程池中运行,因此需要await
; - 类似
glob()
之类的方法返回异步迭代器,迭代出的也是异步 Path 对象; - 通常返回 pathlib.Path 对象的属性和方法,现在返回 AnyIO 的 Path 对象;
- Python 3.10 的 API 方法和属性在所有版本中均可用;
- 不支持作为上下文管理器使用(因为 pathlib 已弃用该用法)。
例如,创建一个二进制内容的文件:
from anyio import Path, run
async def main():
path = Path('/foo/bar')
await path.write_bytes(b'hello, world')
run(main)
异步遍历目录内容示例:
from anyio import Path, run
async def main():
# 打印目录 /foo/bar 中每个文件的内容(假定为文本)
dir_path = Path('/foo/bar')
async for path in dir_path.iterdir():
if await path.is_file():
print(await path.read_text())
print('---------------------')
run(main)
异步临时文件和临时目录
本模块为使用 tempfile
模块处理临时文件和临时目录提供了异步封装。这些异步方法会在工作线程中执行阻塞操作。
临时文件
TemporaryFile
创建一个临时文件,文件在关闭时会被自动删除。
示例:
from anyio import TemporaryFile, run
async def main():
async with TemporaryFile(mode="w+") as f:
await f.write("Temporary file content")
await f.seek(0)
print(await f.read()) # 输出:Temporary file content
run(main)
命名临时文件
NamedTemporaryFile
的用法类似于 TemporaryFile
,但文件在文件系统中有可见的名字。
示例:
from anyio import NamedTemporaryFile, run
async def main():
async with NamedTemporaryFile(mode="w+", delete=True) as f:
print(f"Temporary file name: {f.name}")
await f.write("Named temp file content")
await f.seek(0)
print(await f.read())
run(main)
内存缓冲临时文件
SpooledTemporaryFile
适合临时数据较小且希望保存在内存中而非写入磁盘的情况。
示例:
from anyio import SpooledTemporaryFile, run
async def main():
async with SpooledTemporaryFile(max_size=1024, mode="w+") as f:
await f.write("Spooled temp file content")
await f.seek(0)
print(await f.read())
run(main)
临时目录
TemporaryDirectory
提供了创建临时目录的异步方法。
示例:
from anyio import TemporaryDirectory, run
async def main():
async with TemporaryDirectory() as temp_dir:
print(f"Temporary directory path: {temp_dir}")
run(main)
低级临时文件和目录创建
为了获得更细粒度的控制,模块还提供了以下低级函数:
mkstemp()
—— 创建临时文件,返回文件描述符和路径元组;mkdtemp()
—— 创建临时目录,返回目录路径;gettempdir()
—— 返回默认临时目录路径;gettempdirb()
—— 返回默认临时目录路径(二进制格式)。
示例:
from anyio import mkstemp, mkdtemp, gettempdir, run
import os
async def main():
fd, path = await mkstemp(suffix=".txt", prefix="mkstemp_", text=True)
print(f"Created temp file: {path}")
temp_dir = await mkdtemp(prefix="mkdtemp_")
print(f"Created temp dir: {temp_dir}")
print(f"Default temp dir: {await gettempdir()}")
os.remove(path)
run(main)
注意
使用这些低级函数时,需要手动清理创建的文件和目录。
接收操作系统信号
有时你可能需要以有意义的方式接收发送给应用程序的信号。例如,当接收到 signal.SIGTERM
信号时,应用程序应当优雅地关闭。同样,SIGHUP
通常用来请求应用程序重新加载配置。
AnyIO 提供了一个简单的机制来接收你感兴趣的信号:
import signal
from anyio import open_signal_receiver, run
async def main():
with open_signal_receiver(signal.SIGTERM, signal.SIGHUP) as signals:
async for signum in signals:
if signum == signal.SIGTERM:
return
elif signum == signal.SIGHUP:
print('Reloading configuration')
run(main)
注意
信号处理器只能安装在主线程中,因此在通过BlockingPortal
运行事件循环时,这些处理器将不起作用。
注意
Windows 原生不支持信号,因此跨平台应用不要依赖此功能。
处理 KeyboardInterrupt 和 SystemExit
默认情况下,不同后端对 Ctrl+C(Windows 下为 Ctrl+Break)以及外部终止信号(分别对应 KeyboardInterrupt
和 SystemExit
)的处理方式不同:Trio 会在应用内部抛出相关异常,而 asyncio 则会关闭所有任务并退出。如果你需要在这些情况下执行自定义清理工作,需要安装信号处理器:
import signal
from anyio import open_signal_receiver, create_task_group, run
from anyio.abc import CancelScope
async def signal_handler(scope: CancelScope):
with open_signal_receiver(signal.SIGINT, signal.SIGTERM) as signals:
async for signum in signals:
if signum == signal.SIGINT:
print('Ctrl+C pressed!')
else:
print('Terminated!')
scope.cancel()
return
async def main():
async with create_task_group() as tg:
tg.start_soon(signal_handler, tg.cancel_scope)
... # 继续启动实际的应用逻辑
run(main)
注意
Windows 不支持SIGTERM
信号,如果你需要在 Windows 上实现优雅关闭,需要采用其他方案。