可重复利用的线程
队列计数器
队列自己内部有一个计数器,当计数为0时,join不会阻塞,否则就会阻塞
利用生产者与消费者模型,实现线程的重复利用。
主进程可以看成生产者,主要产生任务
线程作为消费者主要处理任务
import queue
import threading
import time
class MyThred(threading.Thread):
def __init__(self):
super().__init__()
self.daemon = True # 守护模式
self.queue = queue.Queue(4)
self.start() # 在实例化是直接开启线程
# 任务的处理
def run(self):
while True:
func, args, kwargs = self.queue.get()
func(*args, **kwargs) # 执行任务
self.queue.task_done() # 让计数器减一
# 任务的添加
def submit_tasks(self, func, args=(), kwargs={}):
self.queue.put((func, args, kwargs)) # 把任务提交到队列中
# 等待任务执行完毕就结束
def join(self):
self.queue.join()
def f1():
time.sleep(2)
print('任务一完成')
def f2(*args, **kwargs):
time.sleep(2)
print('任务二完成', args, kwargs)
t = MyThred()
t.submit_tasks(f1)
t.submit_tasks(f2, args=(1, 2, 3), kwargs={'name': 'tlk'})
t.submit_tasks(f1)
t.submit_tasks(f2, args=(1, 2, 3), kwargs={'name': 'tlk'})
t.join()
线程池
通过线程池来实现,多线程同时解决问题
import time
import queue
import threading
class MyPool(object):
def __init__(self, n):
self.queue = queue.Queue(20)
# 初始化时,这只需要生成多少条线程
for i in range(n):
threading.Thread(target=self.func, daemon=True).start()
def func(self):
while True:
func, args, kwargs = self.queue.get()
func(*args, **kwargs) # 执行任务
self.queue.task_done() # 让计数器减一
# 任务的添加
def submit_tasks(self, func, args=(), kwargs={}):
self.queue.put((func, args, kwargs))
# 等待任务执行完毕就结束
def join(self):
self.queue.join()
def f1():
time.sleep(2)
print('任务一完成')
def f2(*args, **kwargs):
time.sleep(2)
print('任务二完成', args, kwargs)
t = MyPool(6) # 设置同时进行任务处理的线程数量
t.submit_tasks(f1)
t.submit_tasks(f2, args=(1, 2, 3), kwargs={'name': 'tlk'})
t.submit_tasks(f1)
t.submit_tasks(f2, args=(1, 2, 3), kwargs={'name': 'tlk'})
t.submit_tasks(f1)
t.submit_tasks(f2, args=(1, 2, 3), kwargs={'name': 'tlk'})
t.join()
内置线程池
Python已经内置了线程池,提供了更多验证,更加安全
import time
from multiprocessing.pool import ThreadPool #导入线程池
def f1():
time.sleep(2)
print('任务一完成')
def f2(*args, **kwargs):
time.sleep(2)
print('任务二完成', args, kwargs)
tpool = ThreadPool(6)
tpool.apply_async(f1)
tpool.apply_async(f2,args=(1,2,3),kwds={'name':'tlk'})
tpool.apply_async(f1)
tpool.apply_async(f2,args=(1,2,3),kwds={'name':'tlk'})
tpool.apply_async(f1)
tpool.apply_async(f2,args=(1,2,3),kwds={'name':'tlk'})
tpool.close() #在join之前,禁止再往队列中提交任务
tpool.join()
内置检进程池
import time
from multiprocessing import Pool #导入进程池
def f1():
time.sleep(2)
print('任务一完成')
def f2(*args, **kwargs):
time.sleep(2)
print('任务二完成', args, kwargs)
pool = Pool(6)
pool.apply_async(f1)
pool.apply_async(f2, args=(1, 2, 3), kwds={'name': 'tlk'})
pool.apply_async(f1)
pool.apply_async(f2, args=(1, 2, 3), kwds={'name': 'tlk'})
pool.apply_async(f1)
pool.apply_async(f2, args=(1, 2, 3), kwds={'name': 'tlk'})
pool.close() #在join之前,禁止再往队列中提交任务
pool.join()
线程池和进程池实现并发服务器
开启多进程,多进程里又开启多线程
进程数量由cpu数量决定
import socket
from multiprocessing import Pool,cpu_count
from multiprocessing.pool import ThreadPool
server = socket.socket()
server.bind(('127.0.0.1',5959))
server.listen(50)
# 处理数据
def socket_recv(conn):
while True:
recv_data = conn.recv(1024)
if recv_data:
print(recv_data)
conn.send(recv_data)
else:
conn.close()
break
# 建立连接
def accept_process(server):
tpool = ThreadPool(cpu_count()*2) # 一般线程的数量是cpu的2倍
while True:
conn,addr = server.accept()
tpool.apply_async(socket_recv,args=(conn,))
n = cpu_count()
mpool = Pool(1)
for i in range(n):
mpool.apply_async(accept_process,args=(server,))
mpool.close()
mpool.join()