python 进程池阻塞和非阻塞_python多进程—multiprocessing

本文详细介绍了Python的多进程模块multiprocessing,包括cpu_count()和active_children()方法,Process的创建与常用方法,以及多进程间的独立性。接着讲解了lock组件的重要性,通过实例展示了无锁和加锁情况下多进程读写文件的区别。然后探讨了共享内存Value和Array,以及如何通过Manager实现更复杂的共享数据结构如字典和列表。最后,对比了Pool的apply_async非阻塞和apply阻塞方式在处理任务队列时的区别,并给出了实际示例。
部署运行你感兴趣的模型镜像

一、进程

python中提供多进程包:multiprocessing,支持子进程,通信,共享内存,执行不同形式的同步,提供了Process、Pipi、Lock等组件

多进程和多线程区别:

多线程使用的是CPU的一个核,适合IO密集型

多进程使用的是CPU的多个核,适合运算密集型

1)multiprocessing的方法

cpu_count():统计cpu总数

active_children():获取所有子进程

例子:

#!/usr/bin/env python

import multiprocessing

p = multiprocessing.cpu_count()

m = multiprocessing.active_children()

print(p)

print(m)

运行结果:

8

[]

2)Process进程

创建一个Process对象:p = multiprocessing.Precess(target=worker,args=(2,))

说明:

target = 函数名字

args = 函数需要的的参数,以tuple形式传入

3)Process常用方法

is_alive():判断进程是否存活

run():启动进程

start():启动进程,会自动调用run方法,常用

join(timeout=):等待进程结束或者直到超时

4)Process常用属性

name:进程名字

pid:进程的pid

例子:

#!/usr/bin/env python

import time

import multiprocessing

def worker(interval):

time.sleep(interval)

print("hello,China")

if __name__ == "__main__":

p = multiprocessing.Process(target=worker,args=(5,))

p.start()

print(p.is_alive())

p.join(timeout=3)  # 只等待3秒,如果进程还没结束,则向下执行print(p.name)

print(p.name)

print(p.pid)

print("This is end")

运行结果:

True

Process-1

121764

This is end

hello,China

实例:多进程

import time

import multiprocessing

def worker(name,interval):

print("{0} start" .format(name))

time.sleep(interval)

print("{0} end" .format(name))

if __name__ == "__main__":

print("main start")

print("The computer has {0} core" .format(multiprocessing.cpu_count()))

p1 = multiprocessing.Process(target=worker,args=("worker",2))

p2 = multiprocessing.Process(target=worker,args=("worker",3))

p3 = multiprocessing.Process(target=worker,args=("worker",4))

p1.start()

p2.start()

p3.start()

for p in  multiprocessing.active_children():

print("The pid of {0} is {1}" .format(p.name,p.pid) )

print("main end")

运行结果:

main start

The computer has 4 core

The pid of Process-1 is 21112

The pid of Process-3 is 20536

The pid of Process-2 is 2116

main end

worker start

worker start

worker start

worker end

worker end

worker end

说明:启动的多个进程之间都是相互独立存在的

二、lock组件

当我们用多进程来读写文件时,如果一个写一个读同时进行时不行的,必须一个写完之后,另一个才可以读。因此需要用到一个锁机制进行控制

实例1:多进程不加锁

import multiprocessing

import time

def add(number,value,lock):

print("init,member = {1}".format(value,number))

for i in xrange(1,6):

number += value

time.sleep(1)

print("add {0},number = {1}".format(value,number))

if __name__ == "__main__":

lock = multiprocessing.Lock()

number = 0

p1 = multiprocessing.Process(target=add,args=(number,1,lock))

p3 = multiprocessing.Process(target=add,args=(number,3,lock))

p1.start()

p3.start()

print("main end")

运行结果:

main end

init,member = 0

init,member = 0

add 1,number = 1

add 3,number = 3

add 1,number = 2

add 3,number = 6

add 1,number = 3

add 3,number = 9

add 1,number = 4

add 3,number = 12

add 1,number = 5

add 3,number = 15

说明:多进程互不干扰,同时进行;进程1: 0、1、2、3、4、5;进程3: 0、3、6、9、15

实例2:多进程加锁

import multiprocessing

import time

def add(number,value,lock):

lock.acquire()      # 获取锁

try:

print("init,member = {1}".format(value,number))

for i in xrange(1,6):

number += value

time.sleep(1)

print("add {0},number = {1}".format(value,number))

except Exception as e:

raise e

finally:

lock.release()       # 释放锁

if __name__ == "__main__":

lock = multiprocessing.Lock()  # 定义锁

number = 0

p1 = multiprocessing.Process(target=add,args=(number,1,lock))

p3 = multiprocessing.Process(target=add,args=(number,3,lock))

p1.start()

p3.start()

print("main end")

运行结果:

main end

init,member = 0

add 1,number = 1

add 1,number = 2

add 1,number = 3

add 1,number = 4

add 1,number = 5

init,member = 0

add 3,number = 3

add 3,number = 6

add 3,number = 9

add 3,number = 12

add 3,number = 15

说明:进程1和进程3,谁先抢到锁,则另一个进程只能等待抢到者执行完之后,才能执行

三、共享内存

两个“同时“读写的文件,其中一个作用的结果对另外一个有影响。multiprocessing提供了Value和Array模块

实例1:多进程内存共享不加锁

import multiprocessing

import time

def add(number,value1):

try:

print("init,member = {1}".format(value1,number.value))   # 值的调用方式 number.value

for i in xrange(1,6):

number.value += value1

time.sleep(1)

print("add {0},number = {1}".format(value1,number.value))

except Exception as e:

raise e

if __name__ == "__main__":

lock = multiprocessing.Lock()

number = multiprocessing.Value("i",0)   # Value共享内存模块

p1 = multiprocessing.Process(target=add,args=(number,1))

p3 = multiprocessing.Process(target=add,args=(number,3))

p1.start()

p3.start()

print("main end")

运行结果:

main end

init,member = 0

init,member = 1

add 1,number = 4

add 3,number = 5

add 1,number = 8

add 3,number = 9

add 1,number = 12

add 3,number = 13

add 1,number = 16

add 3,number = 17

add 1,number = 20

add 3,number = 20

说明:不加锁,进程1和进程3在彼此运算完之后的结果上继续运算,同时进行

实例2: 多进程共享内存加锁

import multiprocessing

import time

def add(number,value1,lock):

lock.acquire()

try:

print("init,member = {1}".format(value1,number.value))

for i in xrange(1,6):

number.value += value1

time.sleep(1)

print("add {0},number = {1}".format(value1,number.value))

except Exception as e:

raise e

finally:

lock.release()

if __name__ == "__main__":

lock = multiprocessing.Lock()

number = multiprocessing.Value("i",0)

p1 = multiprocessing.Process(target=add,args=(number,1,lock))

p3 = multiprocessing.Process(target=add,args=(number,3,lock))

p1.start()

p3.start()

print("main end")

运行结果:

main end

init,member = 0

add 1,number = 1

add 1,number = 2

add 1,number = 3

add 1,number = 4

add 1,number = 5

init,member = 5

add 3,number = 8

add 3,number = 11

add 3,number = 14

add 3,number = 17

add 3,number = 20

说明:加锁,进程3等待进程1执行完毕之后,在前者的结果上,继续执行

四、多进程Manager

一般实现的数据共享的方式只有两种结构Value和Array。Python中提供了强大的Manage专门用来做数据共享的,其支持的类型非常多,包括,Value, Array,list,dict, Queue, Lock等

例:支持字典和列表类型

import multiprocessing

def worker(d,l):

l += range(11,16)   # 返回一个列表序列的特殊写法

for i in xrange(1,6):

key = "key {0}".format(i)

value = "value {0}".format(i)

d[key] = value

if __name__ == "__main__":

manager = multiprocessing.Manager()

l = manager.list()

d = manager.dict()

p = multiprocessing.Process(target=worker,args=(d,l))

p.start()

p.join()

print(d)

print(l)

print("main end")

运行结果:

{'key 1': 'value 1', 'key 2': 'value 2', 'key 3': 'value 3', 'key 4': 'value 4', 'key 5': 'value 5'}

[11, 12, 13, 14, 15]

main end

五、进程池

Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程

阻塞和非阻塞

Pool.apply_async: 非阻塞,定义的进程池进程最大数可以同时执行

Pool.apply:阻塞,一个进程结束,释放回进程池,下一个进程才可以开始

例1:非阻塞import multiprocessing

import time

def worker(msg):

print("###### start {0}#######".format(msg))

time.sleep(1)

print("###### end {0}#######".format(msg))

if __name__ == "__main__":

print("main start")

pool = multiprocessing.Pool(processes=3)

for i in xrange(1,10):

msg = "hello {0}".format(i)

pool.apply_async(func=worker,args=(msg,))  # pool.apply_async()非阻塞型

pool.close()

pool.join()     #调用join之前,先调用close函数,否则会出错。执行完close后如果没有新的进程加入到pool,则join函数等待所有子进程结束

print("main end")

运行结果:

main start

###### start hello 1#######

###### start hello 2#######

###### start hello 3#######

###### end hello 1#######

###### start hello 4#######

###### end hello 2#######

###### start hello 5#######

###### end hello 3#######

###### start hello 6#######

###### end hello 4#######

###### start hello 7#######

###### end hello 5#######

###### start hello 8#######

###### end hello 6#######

###### start hello 9#######

###### end hello 7#######

###### end hello 8#######

###### end hello 9#######

main end

说明:一开始启动3个进程,之后先关闭一个进程,再补充另外一个进程进来,始终保持3个,直至结束

例2:阻塞型

import multiprocessing

import time

def worker(msg):

print("###### start {0}#######".format(msg))

time.sleep(1)

print("###### end {0}#######".format(msg))

if __name__ == "__main__":

print("main start")

pool = multiprocessing.Pool(processes=3)

for i in xrange(1,10):

msg = "hello {0}".format(i)

pool.apply(func=worker,args=(msg,))   # pool.apply() 阻塞型

pool.close()

pool.join()

print("main end")

运行结果:

main start

###### start hello 1#######

###### end hello 1#######

###### start hello 2#######

###### end hello 2#######

###### start hello 3#######

###### end hello 3#######

###### start hello 4#######

###### end hello 4#######

###### start hello 5#######

###### end hello 5#######

###### start hello 6#######

###### end hello 6#######

###### start hello 7#######

###### end hello 7#######

###### start hello 8#######

###### end hello 8#######

###### start hello 9#######

###### end hello 9#######

main end

说明:每次只能启动一个进程,启动新进程前,需关闭老进程

您可能感兴趣的与本文相关的镜像

Python3.9

Python3.9

Conda
Python

Python 是一种高级、解释型、通用的编程语言,以其简洁易读的语法而闻名,适用于广泛的应用,包括Web开发、数据分析、人工智能和自动化脚本

评论
成就一亿技术人!
拼手气红包6.0元
还能输入1000个字符  | 博主筛选后可见
 
红包 添加红包
表情包 插入表情
 条评论被折叠 查看
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值