python基础-进程池、submit同异步调用、shutdown参数、ProcessPoolExecutor进程池、进程池ftp

引入进程池

在学习线程池之前,我们先看一个例子

from multiprocessing import  Process
import  time
def task(name):
    print("name",name)
    time.sleep(1)

if __name__ == "__main__":
    start = time.time()
    p1 = Process(target=task,args=("safly1",))
    p2 = Process(target=task, args=("safly2",))
    p3 = Process(target=task, args=("safly3",))

    p1.start()
    p2.start()
    p3.start()

    p1.join()
    p2.join()
    p3.join()

    print("main")

    end = time.time()
    print(end- start)

输出如下:

name safly1
name safly2
name safly3
main
1.2071197032928467

以上的方式是一个个创建进程,这样的耗费时间才1秒多,虽然高效,但是有什么弊端呢?
如果并发很大的话,会给服务器带来很大的压力,所以引入了进程池的概念

使用ProcessPoolExecutor进程池

Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象,对编写线程池/进程池提供了直接的支持。

通过ProcessPoolExecutor 来做示例。
我们来看一个最简单的进程池

from concurrent.futures import ProcessPoolExecutor
import  time
def task(name):
    print("name",name)
    time.sleep(1)

if __name__ == "__main__":
    start = time.time()
    ex = ProcessPoolExecutor(2)

    for i in range(5):
        ex.submit(task,"safly%d"%i)
    ex.shutdown(wait=True)

    print("main")
    end = time.time()
    print(end - start)

输出如下:

E:\python\python_sdk\python.exe "E:/python/py_pro/4 进程池.py"
name safly0
name safly1
name safly2
name safly3
name safly4
main
3.212218999862671

简单解释下:
ProcessPoolExecutor(2)创建一个进程池,容量为2,循环submit出5个进程,然后就在线程池队列里面,执行多个进程,ex.shutdown(wait=True)意思是进程都执行完毕,在执行主进程的内容

使用shutdown

ex.shutdown(wait=True)是进程池内部的进程都执行完毕,才会关闭,然后执行后续代码
如果改成false呢?看如下代码

from concurrent.futures import ProcessPoolExecutor
import  time
def task(name):
    print("name",name)
    time.sleep(1)

if __name__ == "__main__":
    start = time.time()
    ex = ProcessPoolExecutor(2)

    for i in range(5):
        ex.submit(task,"safly%d"%i)
    ex.shutdown(wait=False)

    print("main")
    end = time.time()
    print(end - start)

输出如下:

main
0.01500844955444336
name safly0
name safly1
name safly2
name safly3
name safly4

使用submit同步调用

同步调用:提交/调用一个任务,然后就在原地等着,等到该任务执行完毕拿到结果,再执行下一行代码

from concurrent.futures import ProcessPoolExecutor
import time, random, os

def piao(name, n):
    print('%s is piaoing %s' % (name, os.getpid()))
    time.sleep(1)
    return n ** 2


if __name__ == '__main__':
    p = ProcessPoolExecutor(2)
    start = time.time()
    for i in range(5):
        res=p.submit(piao,'safly %s' %i,i).result() #同步调用
        print(res)

    p.shutdown(wait=True)
    print('主', os.getpid())

    stop = time.time()
    print(stop - start)

输出如下:

E:\python\python_sdk\python.exe "E:/python/py_pro/4 进程池.py"
safly 0 is piaoing 12996
0
safly 1 is piaoing 14044
1
safly 2 is piaoing 12996
4
safly 3 is piaoing 14044
9
safly 4 is piaoing 12996
1612932
5.202786684036255

Process finished with exit code 0

使用submit异步调用

异步调用: 提交/调用一个任务,不在原地等着,直接执行下一行代码


# from multiprocessing import Process,Pool
from concurrent.futures import ProcessPoolExecutor
import time, random, os

def piao(name, n):
    print('%s is piaoing %s' % (name, os.getpid()))
    time.sleep(1)
    return n ** 2


if __name__ == '__main__':
    p = ProcessPoolExecutor(2)
    objs = []
    start = time.time()
    for i in range(5):
        obj = p.submit(piao, 'safly %s' % i, i)  # 异步调用
        objs.append(obj)

    p.shutdown(wait=True)
    print('主', os.getpid())
    for obj in objs:
        print(obj.result())

    stop = time.time()
    print(stop - start)

输出如下:

E:\python\python_sdk\python.exe "E:/python/py_pro/4 进程池.py"
safly 0 is piaoing 1548
safly 1 is piaoing 7872

safly 2 is piaoing 1548
safly 3 is piaoing 7872


safly 4 is piaoing 15487808
0
1
4
9
16
3.202626943588257

输出信息的换行是我标识有输出停顿的
简单说下执行流程:
由于进程池容量是容纳2个进程,所以会2+2+1 三次进入线程池执行,花费3秒

如果我们改下上面的代码,修改的代码如下:

from concurrent.futures import ProcessPoolExecutor
import time, random, os

def piao(name, n):
    print('%s is piaoing %s' % (name, os.getpid()))
    time.sleep(1)
    return n ** 2


if __name__ == '__main__':
    p = ProcessPoolExecutor(2)
    objs = []
    start = time.time()
    for i in range(5):
        obj = p.submit(piao, 'safly %s' % i, i)  # 异步调用
        objs.append(obj)

    for obj in objs:
        print(obj.result())


    p.shutdown(wait=True)
    print('主', os.getpid())


    stop = time.time()
    print(stop - start)

输出如下:(同样我用换行,标识出输出的时间段了)

E:\python\python_sdk\python.exe "E:/python/py_pro/4 进程池.py"
safly 0 is piaoing 7852
safly 1 is piaoing 8484


safly 2 is piaoing 7852
0
safly 3 is piaoing 8484
1



safly 4 is piaoing 7852
4
9


166816
3.178352117538452

进程池实现ftp

服务端:

from socket import *
from concurrent.futures import ProcessPoolExecutor
import os

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn,client_addr):
    print('进程pid: %s' %os.getpid())
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    p=ProcessPoolExecutor(5)
    while True:
        conn,client_addr=server.accept()
        p.submit(talk,conn,client_addr)

客户端:

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
<think>嗯,用户问的是Python多进程可以用哪些方式进行进程池管理。首先,我需要回忆一下Python中多进程模块的相关知识。Python的标准库中有multiprocessing模块,这个模块提供了Process和Pool等类来实现多进程编程。进程池管理主要是用Pool类,但可能还有其他方式或者第三方库吗? 用户可能想知道有哪些具体的方法或工具可以用来管理进程池,比如内置的模块、第三方库或者某些高级用法。我需要列举几种常见的方式,并简要说明它们的用法和特点。比如multiprocessing.Pool是最基本的,然后是concurrent.futures中的ProcessPoolExecutor,可能还有其他的比如使用队列(Queue)来手动管理进程,或者第三方库如joblib、celery等。 接下来,我需要考虑用户的使用场景。他们可能是在处理需要并行执行的任务,比如数据处理、批量计算等,想要提高效率。用户可能对Python的多进程有一定了解,但想更系统地了解不进程池管理方法,或者在选择合适的工具时需要指导。因此,回答需要结构清晰,分点说明每种方法,并比较它们的优缺点,帮助用户根据具体需求选择。 还要注意用户可能关心易用性、性能、功能扩展性等方面。例如,multiprocessing.Pool提供了比较基础的功能,而concurrent.futures可能接口更现代化,joblib则可能在科学计算中更常用,尤其是与numpy等库结合时。此外,可能需要提到异步处理或者分布式任务队列如Celery,但需要明确这些是否属于进程池管理的范畴,或者是否更适合分布式环境。 另外,要确保回答的准确性和实用性。比如,multiprocessing.Pool的apply、apply_async、map、map_async等方法的具体使用场景,以及如何控制进程数量、处理异常等。而ProcessPoolExecutor作为更高层次的抽象,可能更简洁,适合与future对象配合使用。还要提到一些注意事项,比如在Windows系统下多进程可能需要if __name__ == '__main__'的保护,避免无限递归创建子进程。 有没有可能用户的需求不仅仅是进程池管理,还包括进程间通信、数据共享等问题?但根据问题,用户明确询问的是进程池管理方式,所以可能不需要深入这部分,但可以简要提及相关机制,如队列、管道等,但重点还是放在池的管理上。 总结下来,回答的结构应该是先介绍主要的进程池管理方法,包括标准库中的工具和第三方库,然后分别详细说明每种方法的用法、示例代码和适用场景,最后进行比较和总结,帮助用户选择合适的方法。时,需要确保语言通俗易懂,避免过于技术化的术语,但又要准确无误。</think>--- ### **Python多进程的进程池管理方式** 在Python中,多进程的进程池管理主要通过以下方式实现,每种方法适用于不场景,具体选择需结合需求(如任务类型、性能要求、代码简洁性等): --- #### **一、`multiprocessing.Pool`(标准库)** 最基础进程池管理工具,提供灵活的任务分配方式。 ##### **1. 核心方法** - **`map(func, iterable)`**:并行处理可迭代对象,阻塞直到所有任务完成。 - **`map_async(func, iterable)`**:异步版本,返回`AsyncResult`对象,可通过`get()`获取结果。 - **`apply(func, args)`**:提交单个任务,阻塞等待结果。 - **`apply_async(func, args)`**:异步提交单个任务,返回`AsyncResult`。 - **`close()` + `join()`**:关闭进程池并等待所有任务完成。 ##### **2. 示例代码** ```python from multiprocessing import Pool def square(x): return x ** 2 if __name__ == '__main__': with Pool(processes=4) as pool: # 创建4个进程的池 results = pool.map(square, [1, 2, 3, 4]) # 步映射 print(results) # 输出 [1, 4, 9, 16] async_result = pool.apply_async(square, (5,)) # 异步提交单个任务 print(async_result.get()) # 输出 25 ``` ##### **3. 适用场景** - 批量处理数据(如并行计算、文件读写)。 - 需要精细控制任务提交和结果获取的场合。 --- #### **二、`concurrent.futures.ProcessPoolExecutor`(标准库)** 基于`Future`对象的高层接口,代码更简洁,适合现代异步编程模式。 ##### **1. 核心方法** - **`submit(func, *args)`**:提交单个任务,返回`Future`对象。 - **`map(func, iterable)`**:直接映射任务并获取结果迭代器。 - **`shutdown()`**:自动管理资源释放(建议使用`with`语句)。 ##### **2. 示例代码** ```python from concurrent.futures import ProcessPoolExecutor def cube(x): return x ** 3 if __name__ == '__main__': with ProcessPoolExecutor(max_workers=3) as executor: # 最大3个进程 # 提交单个任务 future = executor.submit(cube, 3) print(future.result()) # 输出 27 # 批量映射任务 results = executor.map(cube, [1, 2, 3]) print(list(results)) # 输出 [1, 8, 27] ``` ##### **3. 适用场景** - 需要与线程池(`ThreadPoolExecutor`)统一编程接口。 - 结合`asyncio`实现异步IO与多进程混合任务。 --- #### **三、第三方库:`joblib`** 简化科学计算中的并行任务,尤其适合与`numpy`、`pandas`结合使用。 ##### **1. 核心功能** - **`Parallel(n_jobs=N)`**:创建并行上下文。 - **`delayed(func)(args)`**:延迟函数执行,便于批量提交。 ##### **2. 示例代码** ```python from joblib import Parallel, delayed def multiply(a, b): return a * b if __name__ == '__main__': # 使用4个进程并行计算 results = Parallel(n_jobs=4)(delayed(multiply)(i, i+1) for i in range(5)) print(results) # 输出 [0, 2, 6, 12, 20] ``` ##### **3. 适用场景** - 机器学习模型训练、超参数搜索(如`scikit-learn`中的`GridSearchCV`)。 - 需要快速实现并行化的数值计算任务。 --- #### **四、手动管理进程与队列** 通过`multiprocessing.Queue`或`Manager`实现更灵活的控制,适合复杂任务分发。 ##### **1. 核心组件** - **`Process`**:创建单个进程。 - **`Queue`**:进程间安全传递数据。 - **`Manager`**:支持跨进程共享数据结构(如列表、字典)。 ##### **2. 示例代码** ```python from multiprocessing import Process, Queue def worker(q): while True: task = q.get() if task is None: # 终止信号 break print(f"Processing: {task}") if __name__ == '__main__': tasks = [1, 2, 3, None, None] # None表示结束信号 q = Queue() processes = [Process(target=worker, args=(q,)) for _ in range(2)] for p in processes: p.start() for task in tasks: q.put(task) for p in processes: p.join() ``` ##### **3. 适用场景** - 需要动态分配任务或复杂通信逻辑(如生产者-消费者模型)。 - 对进程生命周期有特殊要求的场景。 --- #### **五、分布式进程池(如`Celery`、`Dask`)** 适用于跨多机分布式计算,突破单机资源限制。 ##### **1. `Celery`** - **特点**:基于消息队列(如RabbitMQ、Redis),支持定时任务、重试机制。 - **场景**:Web后台任务、分布式作业调度。 ##### **2. `Dask`** - **特点**:模仿`PyData`栈(如并行化的`numpy`、`pandas`),支持动态任务图。 - **场景**:大规模数据分析、集群环境下的并行计算。 --- #### **六、对比与选型建议** | **方式** | **优势** | **劣势** | **适用场景** | |---------------------------|-----------------------------------|-------------------------------|---------------------------| | `multiprocessing.Pool` | 灵活控制任务提交与结果获取 | 代码相对冗长 | 精细控制的小规模任务 | | `ProcessPoolExecutor` | 接口简洁,与线程池统一 | 功能较基础 | 快速实现异步任务 | | `joblib` | 科学计算友好,易集成 | 依赖第三方库 | 机器学习、数值计算 | | 手动管理进程与队列 | 完全自定义任务分发逻辑 | 复杂度高,易出错 | 复杂通信需求的场景 | | 分布式框架(如`Dask`) | 支持多机扩展,处理海量数据 | 部署和维护成本高 | 大规模分布式计算 | --- #### **七、注意事项** 1. **全局变量与序列化**: - 跨进程传递的函数参数需可序列化(如避免传递lambda函数)。 2. **资源释放**: - 使用`with`语句或显式调用`close()` + `join()`防止僵尸进程。 3. **Windows系统限制**: - 多进程代码需放在`if __name__ == '__main__':`中,避免递归创建子进程。 4. **性能权衡**: - 进程创建和通信有开销,任务粒度太小时可能不如单进程快。 --- 通过上述方法,可根据具体需求选择最合适的进程池管理策略,兼顾开发效率与性能优化。
评论 2
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值