引言
目前很多公司选择将python项目使用golang重构,很大一方面原因是因为golang的并发能力,golang自带的语法糖支持使并发编程变的相对简单,也更能充分的使用多核CPU的计算资源。
相应的,python长期受制于GIL,无法在多线程时使用多核CPU,所以一直以来在谈及python的缺陷时,性能总是无法回避的一个问题。当然,一些python著名的第三方组织也一直通过各种手段来改善python的并发性能,如twisted的异步模型使用事件驱动机制来提升python性能,著名的爬虫框架scrapy便是以twisted作为底层网络库来开发的,还有gevent,它使用greenlet在用户态完成栈和上下文切换来减少切换带来的性能损耗,同样还有著名的web协程框架tornado,他使用生成器来保存协程上下文及状态,使用原生的python语法实现了协程。但从python3.4开始python引入asyncio标准库,随后又在3.5引入async/await关键字,从根本上规范了python异步编程标准,使python异步编程逐渐流行起来。
关于什么是python协程,相信网上已经有了不少资料,但是只描述抽象的上层建筑难免会让人接受困难,本文希望可以通过从最简单的代码和逻辑,使用最基础的数据结构,从实现出发,带领大家理解什么是python协程。
首先需要补充一些基础知识
什么是生成器
我们都应该听说过迭代器,这在很多语言中都有类似的概念,简单的说,迭代器就是可以被迭代的对象,对其使用next操作可以返回一个元素,通常多次迭代后迭代器会中止,此时迭代器无法再使用。比如python中可以通过iter方法来将一个列表转换成迭代器:
In [1]: lst = [1, 2, 3] Python学习交流群:1004391443 In [2]: iterator = iter(lst) In [3]: next(iterator) Out[3]: 1 In [4]: next(iterator) Out[4]: 2 In [5]: next(iterator) Out[5]: 3 In [6]: next(iterator) --------------------------------------------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-7-4ce711c44abc> in <module>() ----> 1 next(iterator) StopIteration:
生成器可以看作是迭代器的子类,同时提供了比迭代器更强大的功能,python中,可以使用yield关键字使函数返回生成器对象。
In [8]: def fun(): ...: yield 1 ...: yield 2 ...: yield 3 ...: In [9]: iterator = fun() In [10]: next(iterator) Out[10]: 1 In [11]: next(iterator) Out[11]: 2 In [12]: next(iterator) Out[12]: 3 In [13]: next(iterator) --------------------------------------------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-13-4ce711c44abc> in <module>() ----> 1 next(iterator) StopIteration:
每次next调用, fun函数只执行四分之一,如果我们拥有多个生成器对象, 按照一定规则 可控的对他分别调用next,生成器每次的暂停都保存了执行进度和内部状态。如果将这三个生成器理解成协程,那不正是我们熟悉的协程间的切换?
事件循环
所以,我们可以想象,现在有一个循环和一个生成器列表,每次循环,我们都将所有的生成器进行一次调用,所有生成器交替执行。如下:
In [16]: gen_list = [fun(), fun(), fun()] In [17]: while True: ...: for gen in gen_list: ...: print(next(gen)) ...: 1 1 1 2 2 2 3 3 3 --------------------------------------------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-17-f2c1d557da29> in <module>() 1 while True: 2 for gen in gen_list: ----> 3 print(next(gen)) 4 StopIteration:
当然,我们还可以换一种写法,将生成器的每一步都当成是一次调用,把生成器包装成一个Handle对象,每次调用handle对象的call来完成生成器的调用,同时,我们还可以在调用完成后做一些准备来控制下一次调用的时间,将Handle对应放到一个scheduled_list里面:
def fun(): print("step1") yield print("step2") yield print("step3") yield scheduled_list = [] class Handle(object): def __init__(self, gen): self.gen = gen def call(self): next(self.gen) scheduled_list.append(self) def loop(*coroutines): scheduled_list.extend(Handle(c) for c in coroutines) while True: while scheduled_list: handle = scheduled_list.pop(0) handle.call() if __name__ == "__main__": loop(fun(), fun(), fun())
协程中的阻塞
在有了以上的基础后,我们来分析上面提到的切换规则,什么时候应该切换协程(生成器)?显而易见,当遇到阻塞时,我们才需要切换协程,以避免CPU的浪费。我将阻塞分为了以下三种:
- IO调用,如socket,file,pipe等。
- 人为制造的阻塞,如sleep。
- 异步调用。
假设,在我们的生成器内有一次socket调用,我们不知道它多久会ready,我们希望不等待它的返回,切换到其它协程运行,等其准备好之后再切换回来,该怎么办?
有同学可能会想到了,将socket注册到epoll上。如下:
import time import socket from functools import partial from select import epoll poll = epoll() handlers = dict() scheduled_list = [] def fun(): print("step1") sock = socket.socket() future = Future() def handler(): future.set_done(sock.recv(1024)) add_handler(sock.fileno(), handler, READ) yield future print("step2") yield print("step3") yield def add_handler(fd, handler, events): handlers[fd] = handler poll.register(fd, events) class Future(object): def __init__(self): self.callbacks = [] def add_callback(self, callback): self.callbacks.append(callback) def set_done(self, value): self.value = value for callback in self.callbacks: callback() def get_result(self): return self.value class Handle(object): def __init__(self, gen): self.gen = gen def call(self): yielded = next(self.gen) if isinstance(yielded, Future): yielded.add_callback(partial(scheduled_list.append, self)) else: scheduled_list.append(self) def loop(*coroutines): scheduled_list.extend(Handle(c) for c in coroutines) while True: default_timeout = 10000 while scheduled_list: handle = scheduled_list.pop(0) handle.call() # 等待描述符可操作 events = poll.poll(default_timeout) while events: fd, event = events.popitem() handlers[fd]() poll.unregister(fd) del handlers[fd] if __name__ == "__main__": loop(fun(), fun(), fun())
这一步引入一个新的对象Future,他用来代指未来即将发生的调用,通过epoll上注册的事件,触发了它的完成,完成之后执行了将handle对象放回scheduled_list, 可从而切回了协程。
那么,人为制造的阻塞我们怎么切换协程呢?这里,我们又引入了一个新的对象Timeout
import time import socket from functools import partial from select import epoll poll = epoll() handlers = dict() scheduled_list = [] # 创建一个timeout_list timeout_list = [] def fun(): print("step1") sock = socket.socket() future = Future() def handler(): future.set_done() add_handler(sock.fileno(), handler, READ) yield future print("step2") yield sleep(3) print("step3") yield def add_handler(fd, handler, events): handlers[fd] = handler poll.register(fd, events) def sleep(sec): future = Future() timeout = Timeout(sec, future.set_done) timeout_list.append(timeout) return future class Timeout(object): def __init__(self, timeout, callback): self.deadline = time.time() + timeout self.callback = callback def call(self): self.callback(None) class Future(object): def __init__(self): self.callbacks = [] self.value = None def add_callback(self, callback): self.callbacks.append(callback) def set_done(self, value): self.value = value for callback in self.callbacks: callback() def get_result(self): return self.value class Handle(object): def __init__(self, gen): self.gen = gen def call(self): yielded = next(self.gen) if isinstance(yielded, Future): yielded.add_callback(partial(scheduled_list.append, self)) else: scheduled_list.append(self) def loop(*coroutines): scheduled_list.extend(Handle(c) for c in coroutines) while True: default_timeout = 10000 deadline = time.time() for timeout in timeout_list[:]: if timeout.deadline <= deadline: timeout.call() timeout_list.remove(timeout) while scheduled_list: handle = scheduled_list.pop(0) handle.call() for timeout in timeout_list: wait_time = timeout.deadline - deadline if wait_time <= 0: wait_time = 0 default_timeout = min(default_timeout, wait_time) if not scheduled_list and not timeout_list and not handlers: break # 等待描述符可操作 events = poll.poll(default_timeout) while events: fd, event = events.popitem() handlers[fd]() poll.unregister(fd) del handlers[fd] if __name__ == "__main__": loop(fun(), fun(), fun())
通过创建一个Timeout对象,我们在deadline时触发了其回调,使Future完成,从而完成了协程的切换。
由以上两点,我们可以大致观察出一个规律,创建Future对象,切出协程,在合适的时机(如socket ready或到达deadline/timeout)让他完成,切入协程,这才是协程切换的关键所在,由此,我们可以使用Future来管理各种异步调用。
如,我们在python编码时遇到了一个计算密集型的函数,由于python单进程无法利用多核,我们可以创建一个子进程来处理计算,同时关联到一个Future中:
def fun(): print("step1") sock = socket.socket() future = Future() def handler(): future.set_done() add_handler(sock.fileno(), handler, READ) yield future print("step2") yield sleep(3) print("step3") future = Future() from multiprocessing import Process Process(target=long_time_call, args=(future, )).start() yield future def long_time_call(future): #... future.set_done()
当协程执行到第三步时,遇到了长时间运行的函数调用,我们创建了一个Future,关联到一个子进程中,并在子进程完成时设置future完成,在子进程完成之前,父进程已完成协程的切出,将执行权交给其它协程执行。
这个地方遗漏了一个细节,当没有其它协程可以执行时,epoll会被设置成超时时间=10000,因而陷入到长时间的睡眠中,而子进程完成后需要切入协程,但父进程已经被epoll阻塞掉,如何唤醒主进程继续执行该协程呢?业界通用的做法是,创建一个管道,在切出协程时让epoll监听读fd,子进程完成后,往管道中写入一个字符,epoll监听的读fd 马上变成ready,因此epoll解除阻塞,事件循环得以继续执行。
当然,异步调用不仅仅可以使用子进程,子线程、远程计算框架都可以通过这种方式执行。
讲到这里,大家应该基本明白了一个协程函数是如何工作的了。下表可帮助我们从线程的角度理解协程
上面的表格表述线程和协程的一一对应关系,最后一栏可能还需要举例解释一下:
我们知道一个线程执行过程中,会嵌套调用多个函数,如:
def foo(): print("in foo") def bar(): print("in bar") def fun(): bar() foo() if __name__ == "__main__": fun()
那么生成器如何嵌套调用呢?python3.4之前,嵌套的生成器只能这么使用:
def foo(): print("in foo") yield def bar(): print("in bar") yield def fun(): for i in bar(): yield i for i in foo(): yield i if __name__ == "__main__": for i in fun(): pass
python3.4之后引入了新的语法糖yield from,简化了调用方式:
def foo(): print("in foo") yield def bar(): print("in bar") yield def fun(): yield from bar() yield from foo() if __name__ == "__main__": for i in fun(): pass
yield from可以驱动子生成器,来逐一返回子生成器中的值,将嵌套的生成器打平。值得一提的是,yield from才是await的真实身份。
让我们用最初的例子来编写一个嵌套了的子生成器函数的协程demo。我们将fun生成器抽离成2种阻塞操作,并封装好:
def read(sock): future = Future() def handler(): buf = sock.recv(1024) future.set_done(buf) add_handler(sock.fileno(), handler, 0x001) yield future return future.get_result() def sleep(sec): future = Future() timeout = Timeout(sec, future.set_done) timeout_list.append(timeout) yield future
有了这两个基础函数之后,我们就可以自由的编写我们协程了
def coroutine(num): client = socket.socket() client.connect(("", 1234)) print(f"coroutine_{num} start") buffer = yield from read(client) print(f"coroutine_{num} recv: ", buffer) yield from sleep(3) print(f"coroutine_{num} wake up ") client.close() if __name__ == "__main__": loop(coroutine(1), coroutine(2))
我们创建了两个协程,其中调用了一次socket读和一个睡眠,让我们看一下执行效果:
coroutine_1 start coroutine_2 start coroutine_2 recv: b'test' coroutine_1 recv: b'test' coroutine_2 wake up coroutine_1 wake up
两个协程异步交替执行。
asyncio的使用
相信看完上面的例子之后,大家应该对python协程的实现有了初步的认识,那标准的python协程如何使用呢?
import socket import asyncio async def read(sock): loop = asyncio.get_event_loop() future = loop.create_future() def handler(): buf = sock.recv(1024) future.set_result(buf) loop.remove_reader(sock.fileno()) loop.add_reader(sock.fileno(), handler) await future return future.result() async def coroutine(num): client = socket.socket() client.connect(("", 1234)) print(f"coroutine_{num} start") buffer = await read(client) print(f"coroutine_{num} recv: ", buffer) await asyncio.sleep(3) print(f"coroutine_{num} wake up ") client.close() if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(coroutine(1), coroutine(2)))
几乎和我们的实现一模一样,其中await取代了yield from, 协程显式使用async来声明。
python协程的应用
python协程优势
通过上述例子我们可以很容易看出,python协程具有以下特点:
- 超级轻量,不需要维护协程栈,所有的上下文执行状态都被维护在了生成器中。
- 切换自由,通过yield from(python3.5以后是await)随意切换协程,协程切换完全可控以至于几乎不用加锁。
- 并发能力强,并发上限理论上取决于IO多路复用可注册的文件描述符的极限。
缺点
- 还是只能用到单核。
- 由于协程切换是非抢占式的,所以如果业务是CPU密集型的,可能其它协程长时间得不到执行。
综上所述,在使用python的高并发场景下,python多进程+协程是最优的解决方案。