【asyncio】asyncio.Semaphore :限制同时运行的异步任务数量

在 Python 中,asyncio.Semaphoreasyncio 模块提供的一个异步并发控制工具,用于限制同时运行的异步任务数量。它是信号量(Semaphore)的异步版本,适合在异步编程中管理资源访问或限制并发,例如控制对共享资源的访问(如网络请求、数据库连接)或限制协程的并行执行数量。asyncio.Semaphore 常用于异步爬虫、并发 I/O 操作等场景。

以下是对 asyncio.Semaphore 的详细介绍,包括其定义、用法、示例、应用场景、最佳实践和注意事项。


1. asyncio.Semaphore 的定义和原理

1.1 定义

asyncio.Semaphore 是一个异步信号量类,控制对资源的并发访问。信号量维护一个内部计数器,表示可用资源的数量:

  • 当计数器大于 0 时,协程可以通过信号量访问资源。
  • 当计数器为 0 时,协程必须等待直到资源可用。
1.2 原理
  • 计数器机制:信号量初始化时设置一个计数器值(如 5),表示允许的并发数量。
  • 获取资源:通过 acquire() 或上下文管理器(async with semaphore)减少计数器。
  • 释放资源:通过 release() 增加计数器,允许其他协程获取。
  • 异步等待:如果计数器为 0,acquire() 会异步等待(不阻塞事件循环)。
1.3 构造函数
asyncio.Semaphore(value=1, *, loop=None)
  • value:初始计数器值,默认为 1(类似锁)。
  • loop:事件循环(Python 3.10 起已废弃,自动使用当前事件循环)。

返回值:一个 Semaphore 对象。


2. asyncio.Semaphore 的基本用法

2.1 创建信号量

初始化信号量,设置最大并发数。

import asyncio

semaphore = asyncio.Semaphore(3)  # 允许 3 个协程同时运行
2.2 使用 async with 获取和释放

async with 是最常用的方式,自动管理资源的获取和释放。

示例

import asyncio

async def worker(semaphore, name):
    async with semaphore:
        print(f"Worker {name} acquired semaphore")
        await asyncio.sleep(1)  # 模拟耗时任务
        print(f"Worker {name} releasing semaphore")

async def main():
    semaphore = asyncio.Semaphore(2)  # 最多 2 个并发
    tasks = [worker(semaphore, i) for i in range(5)]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

输出

Worker 0 acquired semaphore
Worker 1 acquired semaphore
Worker 0 releasing semaphore
Worker 2 acquired semaphore
Worker 1 releasing semaphore
Worker 3 acquired semaphore
Worker 2 releasing semaphore
Worker 4 acquired semaphore
Worker 3 releasing semaphore
Worker 4 releasing semaphore
  • 说明:每次最多 2 个 worker 同时运行,其余等待。
2.3 手动获取和释放

使用 acquire()release() 手动控制。

示例

import asyncio

async def worker(semaphore, name):
    await semaphore.acquire()
    try:
        print(f"Worker {name} acquired semaphore")
        await asyncio.sleep(1)
    finally:
        print(f"Worker {name} releasing semaphore")
        semaphore.release()

async def main():
    semaphore = asyncio.Semaphore(2)
    tasks = [worker(semaphore, i) for i in range(3)]
    await asyncio.gather(*tasks)

asyncio.run(main())

3. asyncio.Semaphore 的核心方法

3.1 acquire()
  • 作用:异步获取信号量,减少计数器。
  • 返回True(成功获取)或等待直到计数器大于 0。
  • 用法
    await semaphore.acquire()
    
3.2 release()
  • 作用:释放信号量,增加计数器。
  • 用法
    semaphore.release()
    
3.3 locked()
  • 作用:检查信号量是否被锁定(计数器为 0)。
  • 返回True(锁定)或 False(未锁定)。
  • 用法
    print(semaphore.locked())  # 输出: True 或 False
    
3.4 上下文管理器(async with
  • 作用:自动获取和释放信号量,推荐使用。
  • 用法
    async with semaphore:
        # 访问资源
    

4. 应用场景

4.1 限制并发请求

在异步爬虫中,限制同时发送的 HTTP 请求数量。

示例(结合 aiohttp):

import asyncio
import aiohttp

async def fetch(url, semaphore):
    async with semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.text()

async def main():
    semaphore = asyncio.Semaphore(3)  # 限制 3 个并发请求
    urls = ["https://example.com"] * 5
    tasks = [fetch(url, semaphore) for url in urls]
    results = await asyncio.gather(*tasks)
    print(f"Fetched {len(results)} pages")

asyncio.run(main())
4.2 数据库连接池

限制对数据库的并发访问(如 SQLite 异步操作)。

示例(结合 aiosqlite):

import asyncio
import aiosqlite

async def query_db(semaphore, db_name, query):
    async with semaphore:
        async with aiosqlite.connect(db_name) as db:
            async with db.execute(query) as cursor:
                return await cursor.fetchall()

async def main():
    semaphore = asyncio.Semaphore(2)
    tasks = [query_db(semaphore, "example.db", "SELECT * FROM users") for _ in range(5)]
    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())
4.3 限流任务

控制并发任务数量,避免过载。

示例

import asyncio

async def task(semaphore, id):
    async with semaphore:
        print(f"Task {id} running")
        await asyncio.sleep(1)

async def main():
    semaphore = asyncio.Semaphore(2)
    tasks = [task(semaphore, i) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())

5. 最佳实践

  1. 选择合适的并发限制

    • 根据资源(如 CPU、网络带宽、服务器限制)设置信号量的 value
    • 示例:爬虫中,设置 Semaphore(10) 避免服务器限流。
  2. 优先使用 async with

    • 使用上下文管理器确保资源正确释放。
    • 示例:
      async with semaphore:
          await do_work()
      
  3. 异常处理

    • async withtry-finally 中捕获异常,确保 release() 被调用。
    • 示例:
      async def worker(semaphore):
          try:
              await semaphore.acquire()
              # 工作代码
          except Exception as e:
              print(f"Error: {e}")
          finally:
              semaphore.release()
      
  4. 测试并发行为

    • 使用 pytestpytest-asyncio 测试信号量行为。
    • 示例:
      import asyncio
      import pytest
      
      @pytest.mark.asyncio
      async def test_semaphore():
          semaphore = asyncio.Semaphore(1)
          async def task():
              async with semaphore:
                  await asyncio.sleep(0.1)
                  return True
          tasks = [task() for _ in range(2)]
          results = await asyncio.gather(*tasks)
          assert all(results)
      
  5. 日志记录

    • 添加日志,跟踪信号量的获取和释放。
    • 示例:
      import logging
      logging.basicConfig(level=logging.INFO)
      
      async def worker(semaphore, name):
          logging.info(f"{name} acquiring")
          async with semaphore:
              logging.info(f"{name} acquired")
              await asyncio.sleep(1)
      

6. 注意事项

  1. 版本要求

    • asyncio.Semaphore 需要 Python 3.5+(异步编程支持)。
    • 示例:检查版本:
      import sys
      assert sys.version_info >= (3, 5), "Requires Python 3.5+"
      
  2. 资源泄漏

    • 忘记调用 release() 或未使用 async with 可能导致死锁。
    • 示例:始终使用上下文管理器:
      async with semaphore:
          # 安全代码
      
  3. 计数器溢出

    • 多次调用 release() 可能导致计数器超过初始值,谨慎使用。
    • 示例:检查 locked() 状态:
      if not semaphore.locked():
          semaphore.release()
      
  4. 性能考虑

    • 信号量适合 I/O 密集型任务(如网络请求),不适合 CPU 密集型任务(考虑 multiprocessing)。
    • 示例:CPU 密集型任务使用进程池:
      from multiprocessing import Pool
      
  5. 调试并发

    • 使用 asyncio.all_tasks() 或日志调试并发问题。
    • 示例:
      print(asyncio.all_tasks())
      

7. 总结

asyncio.Semaphore 是 Python 异步编程中的关键工具,用于限制并发任务数量,确保资源安全访问。其核心特点包括:

  • 定义:异步信号量,控制并发访问。
  • 用法:通过 async withacquire/release 管理资源。
  • 应用:异步爬虫、数据库连接池、任务限流。
  • 最佳实践:使用上下文管理器、异常处理、测试并发。

通过合理设置信号量值和上下文管理,asyncio.Semaphore 可以高效管理异步并发。

参考文献

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

彬彬侠

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值