目录
concurrent包
3.2版本引入的模块。
异步并行任务编程模块,提供一个高级的异步可执行的便利接口。提供2个池执行器:
ThreadPoolExcentor 异步调用线程池的Executor
ProcessPoolExecutor 异步调用的进程池的Executor
ThreadPoolExecutor对象
首先需要定义一个池的执行器对象,Exector子类对象
方法 含义 ThreadPoolExector(max_workers=1) 池中至多创建max_workers个线程的池来同时异步执行,返回Executor实例 submit(fn,*args,**kwargs) 提交执行的函数及其参数,返回Future实例 shutdown(wait=True) 清理池 Future类
方法 含义 done() 如果调用被成功的取消或者执行完成,返回True cancelled() 如果调用被成功的取消,返回True running() 如果正在运行且不能被取消,返回True cancel() 尝试取消调用,如果已经执行且不能取消返回False,否则返回True result(timeout=None) 取消返回的结果,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError异常 exception(timeout=None) 取返回的异常,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError异常 #ThreadPoolExecutor例子 import threading import logging import time from concurrent import futures #输出定义 FORMAT='%(asctime)-15s\t [%(processName)s:%(threadName)s,%(process)d:%(thread)8d] %(message)s' logging.basicConfig(level=logging.INFO,format=FORMAT) def worker(n): logging.info("begin to work{}".format(n)) time.sleep(5) logging.info("finished {}".format(n)) #创建线程池,池的容量为3 exect = futures.ThreadPoolExecutor(max_workers=3) fs = [] for i in range(3): future = exect.submit(worker,i) fs.append(future) for i in range(3,6): future = exect.submit(worker,i) while True: time.sleep(2) logging.info(threading.enumerate()) flag = True for _ in fs: #判断是否还有任务未完成 logging.info(_.done()) flag = flag and _.done() print("="*30) if flag: exect.shutdown() #清理池,池中线程全部杀掉 logging.info(threading.enumerate()) break #线程池一旦创建了,就不需要在频繁清楚 结果: 2021-06-27 16:06:37,177 [MainProcess:ThreadPoolExecutor-0_0,20060: 9812] begin to work0 2021-06-27 16:06:37,177 [MainProcess:ThreadPoolExecutor-0_1,20060: 2816] begin to work1 2021-06-27 16:06:37,177 [MainProcess:ThreadPoolExecutor-0_2,20060: 13268] begin to work2 2021-06-27 16:06:39,178 [MainProcess:MainThread,20060: 26076] [<_MainThread(MainThread, started 26076)>, <Thread(ThreadPoolExecutor-0_0, started daemon 9812)>, <Thread(ThreadPoolExecutor-0_1, started daemon 2816)>, <Thread(ThreadPoolExecutor-0_2, started daemon 13268)>] 2021-06-27 16:06:39,178 [MainProcess:MainThread,20060: 26076] False 2021-06-27 16:06:39,178 [MainProcess:MainThread,20060: 26076] False 2021-06-27 16:06:39,178 [MainProcess:MainThread,20060: 26076] False ============================== ============================== 2021-06-27 16:06:41,179 [MainProcess:MainThread,20060: 26076] [<_MainThread(MainThread, started 26076)>, <Thread(ThreadPoolExecutor-0_0, started daemon 9812)>, <Thread(ThreadPoolExecutor-0_1, started daemon 2816)>, <Thread(ThreadPoolExecutor-0_2, started daemon 13268)>] 2021-06-27 16:06:41,179 [MainProcess:MainThread,20060: 26076] False 2021-06-27 16:06:41,179 [MainProcess:MainThread,20060: 26076] False 2021-06-27 16:06:41,179 [MainProcess:MainThread,20060: 26076] False 2021-06-27 16:06:42,179 [MainProcess:ThreadPoolExecutor-0_2,20060: 13268] finished 2 2021-06-27 16:06:42,179 [MainProcess:ThreadPoolExecutor-0_2,20060: 13268] begin to work3 2021-06-27 16:06:42,179 [MainProcess:ThreadPoolExecutor-0_1,20060: 2816] finished 1 2021-06-27 16:06:42,179 [MainProcess:ThreadPoolExecutor-0_1,20060: 2816] begin to work4 2021-06-27 16:06:42,179 [MainProcess:ThreadPoolExecutor-0_0,20060: 9812] finished 0 2021-06-27 16:06:42,179 [MainProcess:ThreadPoolExecutor-0_0,20060: 9812] begin to work5 2021-06-27 16:06:43,181 [MainProcess:MainThread,20060: 26076] [<_MainThread(MainThread, started 26076)>, <Thread(ThreadPoolExecutor-0_0, started daemon 9812)>, <Thread(ThreadPoolExecutor-0_1, started daemon 2816)>, <Thread(ThreadPoolExecutor-0_2, started daemon 13268)>] 2021-06-27 16:06:43,181 [MainProcess:MainThread,20060: 26076] True 2021-06-27 16:06:43,181 [MainProcess:MainThread,20060: 26076] True 2021-06-27 16:06:43,181 [MainProcess:MainThread,20060: 26076] True ============================== 2021-06-27 16:06:47,180 [MainProcess:ThreadPoolExecutor-0_0,20060: 9812] finished 5 2021-06-27 16:06:47,180 [MainProcess:ThreadPoolExecutor-0_2,20060: 13268] finished 3 2021-06-27 16:06:47,180 [MainProcess:ThreadPoolExecutor-0_1,20060: 2816] finished 4 2021-06-27 16:06:47,180 [MainProcess:MainThread,20060: 26076] [<_MainThread(MainThread, started 26076)>]
ProcessPoolExecutor对象
方法一样,就是是哦那个多进程
#ThreadPoolExecutor例子 import threading import logging import time from concurrent import futures #输出定义 FORMAT='%(asctime)-15s\t [%(processName)s:%(threadName)s,%(process)d:%(thread)8d] %(message)s' logging.basicConfig(level=logging.INFO,format=FORMAT) def worker(n): logging.info("begin to work{}".format(n)) time.sleep(5) logging.info("finished {}".format(n)) if __name__ == '__main__': #创建线程池,池的容量为3 exect = futures.ProcessPoolExecutor(max_workers=3) fs = [] for i in range(3): future = exect.submit(worker,i) fs.append(future) for i in range(3,6): future = exect.submit(worker,i) while True: time.sleep(2) logging.info(threading.enumerate()) #多进程时看主线程已经没有必要了 flag = True for _ in fs: #判断是否还有任务未完成 logging.info(_.done()) flag = flag and _.done() print("="*30) if flag: exect.shutdown() #清理池,池中线程全部杀掉 logging.info(threading.enumerate()) #多进程时看主线程已经没有必要了 break #线程池一旦创建了,就不需要在频繁清楚 结果: 2021-06-27 16:11:18,415 [SpawnProcess-1:MainThread,25872: 25960] begin to work0 2021-06-27 16:11:18,421 [SpawnProcess-2:MainThread,22072: 29224] begin to work1 2021-06-27 16:11:18,428 [SpawnProcess-3:MainThread,18108: 20136] begin to work2 ============================== 2021-06-27 16:11:20,345 [MainProcess:MainThread,23556: 11200] [<_MainThread(MainThread, started 11200)>, <Thread(QueueManagerThread, started daemon 6588)>, <Thread(QueueFeederThread, started daemon 12680)>] 2021-06-27 16:11:20,345 [MainProcess:MainThread,23556: 11200] False 2021-06-27 16:11:20,345 [MainProcess:MainThread,23556: 11200] False 2021-06-27 16:11:20,345 [MainProcess:MainThread,23556: 11200] False 2021-06-27 16:11:22,346 [MainProcess:MainThread,23556: 11200] [<_MainThread(MainThread, started 11200)>, <Thread(QueueManagerThread, started daemon 6588)>, <Thread(QueueFeederThread, started daemon 12680)>] 2021-06-27 16:11:22,346 [MainProcess:MainThread,23556: 11200] False 2021-06-27 16:11:22,346 [MainProcess:MainThread,23556: 11200] False 2021-06-27 16:11:22,346 [MainProcess:MainThread,23556: 11200] False ============================== 2021-06-27 16:11:23,417 [SpawnProcess-1:MainThread,25872: 25960] finished 0 2021-06-27 16:11:23,417 [SpawnProcess-1:MainThread,25872: 25960] begin to work3 2021-06-27 16:11:23,422 [SpawnProcess-2:MainThread,22072: 29224] finished 1 2021-06-27 16:11:23,422 [SpawnProcess-2:MainThread,22072: 29224] begin to work4 2021-06-27 16:11:23,430 [SpawnProcess-3:MainThread,18108: 20136] finished 2 2021-06-27 16:11:23,430 [SpawnProcess-3:MainThread,18108: 20136] begin to work5 2021-06-27 16:11:24,347 [MainProcess:MainThread,23556: 11200] [<_MainThread(MainThread, started 11200)>, <Thread(QueueManagerThread, started daemon 6588)>, <Thread(QueueFeederThread, started daemon 12680)>] 2021-06-27 16:11:24,347 [MainProcess:MainThread,23556: 11200] True 2021-06-27 16:11:24,347 [MainProcess:MainThread,23556: 11200] True 2021-06-27 16:11:24,347 [MainProcess:MainThread,23556: 11200] True ============================== 2021-06-27 16:11:28,419 [SpawnProcess-1:MainThread,25872: 25960] finished 3 2021-06-27 16:11:28,423 [SpawnProcess-2:MainThread,22072: 29224] finished 4 2021-06-27 16:11:28,431 [SpawnProcess-3:MainThread,18108: 20136] finished 5 2021-06-27 16:11:28,441 [MainProcess:MainThread,23556: 11200] [<_MainThread(MainThread, started 11200)>]
支持上线文管理
concurrent.futures.ProcessPoolExecutor继承自concurent.futures.base.Execurot,而父类有__enter__、__exit__方法,支持上下文管理。可以使用with语句
__exit__方法本质还是调用的shutdown(wait=True),就是一直阻塞到所有运行的任务完成
使用方法
import threading from concurrent import futures with futures.ProcessPoolExecutor(max_workers=3) as executor: future = executor.submit(work,232,1235) print(future.result())
使用上下文改造上面的例子,增加返回计算的结果
#ThreadPoolExecutor例子 import threading import logging import time from concurrent import futures #输出定义 FORMAT='%(asctime)-15s\t [%(processName)s:%(threadName)s,%(process)d:%(thread)8d] %(message)s' logging.basicConfig(level=logging.INFO,format=FORMAT) def worker(n): logging.info("begin to work{}".format(n)) time.sleep(5) logging.info("finished {}".format(n)) return n + 100 if __name__ == '__main__': #创建线程池,池的容量为3 exect = futures.ProcessPoolExecutor(max_workers=3) with exect:#上下文 fs = [] for i in range(3): future = exect.submit(worker,i) fs.append(future) for i in range(3,6): future = exect.submit(worker,i) while True: time.sleep(2) logging.info(threading.enumerate()) #多进程时看主线程已经没有必要了 flag = True for _ in fs: #判断是否还有任务未完成 logging.info(_.done()) flag = flag and _.done() if _.done(): #做完了,看下结果 logging.info("result = {}".format(_.result())) print("="*30) if flag:break # exect.shutdown() #清理池,池中线程全部杀掉 logging.info(threading.enumerate()) #多进程时看主线程已经没有必要了 #线程池一旦创建了,就不需要在频繁清楚 结果: 2021-06-27 16:28:04,265 [SpawnProcess-1:MainThread,28816: 14260] begin to work0 2021-06-27 16:28:04,271 [SpawnProcess-2:MainThread,28308: 9024] begin to work1 2021-06-27 16:28:04,277 [SpawnProcess-3:MainThread,20960: 6168] begin to work2 2021-06-27 16:28:06,192 [MainProcess:MainThread,1592: 8024] [<_MainThread(MainThread, started 8024)>, <Thread(QueueManagerThread, started daemon 26972)>, <Thread(QueueFeederThread, started daemon 16680)>] 2021-06-27 16:28:06,192 [MainProcess:MainThread,1592: 8024] False 2021-06-27 16:28:06,192 [MainProcess:MainThread,1592: 8024] False 2021-06-27 16:28:06,192 [MainProcess:MainThread,1592: 8024] False ============================== 2021-06-27 16:28:08,194 [MainProcess:MainThread,1592: 8024] [<_MainThread(MainThread, started 8024)>, <Thread(QueueManagerThread, started daemon 26972)>, <Thread(QueueFeederThread, started daemon 16680)>] 2021-06-27 16:28:08,194 [MainProcess:MainThread,1592: 8024] False 2021-06-27 16:28:08,194 [MainProcess:MainThread,1592: 8024] False 2021-06-27 16:28:08,194 [MainProcess:MainThread,1592: 8024] False ============================== 2021-06-27 16:28:09,266 [SpawnProcess-1:MainThread,28816: 14260] finished 0 2021-06-27 16:28:09,266 [SpawnProcess-1:MainThread,28816: 14260] begin to work3 2021-06-27 16:28:09,272 [SpawnProcess-2:MainThread,28308: 9024] finished 1 2021-06-27 16:28:09,272 [SpawnProcess-2:MainThread,28308: 9024] begin to work4 2021-06-27 16:28:09,278 [SpawnProcess-3:MainThread,20960: 6168] finished 2 2021-06-27 16:28:09,278 [SpawnProcess-3:MainThread,20960: 6168] begin to work5 2021-06-27 16:28:10,196 [MainProcess:MainThread,1592: 8024] [<_MainThread(MainThread, started 8024)>, <Thread(QueueManagerThread, started daemon 26972)>, <Thread(QueueFeederThread, started daemon 16680)>] 2021-06-27 16:28:10,196 [MainProcess:MainThread,1592: 8024] True 2021-06-27 16:28:10,196 [MainProcess:MainThread,1592: 8024] result = 100 2021-06-27 16:28:10,196 [MainProcess:MainThread,1592: 8024] True 2021-06-27 16:28:10,196 [MainProcess:MainThread,1592: 8024] result = 101 2021-06-27 16:28:10,196 [MainProcess:MainThread,1592: 8024] True 2021-06-27 16:28:10,196 [MainProcess:MainThread,1592: 8024] result = 102 ============================== 2021-06-27 16:28:14,267 [SpawnProcess-1:MainThread,28816: 14260] finished 3 2021-06-27 16:28:14,273 [SpawnProcess-2:MainThread,28308: 9024] finished 4 2021-06-27 16:28:14,279 [SpawnProcess-3:MainThread,20960: 6168] finished 5 2021-06-27 16:28:14,288 [MainProcess:MainThread,1592: 8024] [<_MainThread(MainThread, started 8024)>]
总结
改库统一线程池、进程池调用,简化了编程;是Python简单的思想哲学的体现;唯一的缺点:无法设置线程名称,但这都是不值一提的