Python “黑魔法” 之 Generator Coroutines

写在前面

  • 本文默认读者对 Python 生成器 有一定的了解,不了解者请移步至生成器 – 廖雪峰的官方网站。
  • 本文基于 Python 3.5.1,文中所有的例子都可在 Github 上获得。

学过 Python 的都知道,Python 里有一个很厉害的概念叫做 生成器(Generators)。一个生成器就像是一个微小的线程,可以随处暂停,也可以随时恢复执行,还可以和代码块外部进行数据交换。恰当使用生成器,可以极大地简化代码逻辑。

也许,你可以熟练地使用生成器完成一些看似不可能的任务,如“无穷斐波那契数列”,并引以为豪,认为所谓的生成器也不过如此——那我可要告诉你:这些都太小儿科了,下面我所要介绍的绝对会让你大开眼界。

生成器 可以实现 协程,你相信吗?

什么是协程

在异步编程盛行的今天,也许你已经对 协程(coroutines) 早有耳闻,但却不一定了解它。我们先来看看 Wikipedia 的定义:

Coroutines are computer program components that generalize subroutines for nonpreemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations.

也就是说:协程是一种 允许在特定位置暂停或恢复的子程序——这一点和 生成器 相似。但和 生成器 不同的是,协程 可以控制子程序暂停之后代码的走向,而 生成器 仅能被动地将控制权交还给调用者。

协程 是一种很实用的技术。和 多进程 与 多线程 相比,协程 可以只利用一个线程更加轻便地实现 多任务,将任务切换的开销降至最低。和 回调 等其他异步技术相比,协程 维持了正常的代码流程,在保证代码可读性的同时最大化地利用了 阻塞 IO 的空闲时间。它的高效与简洁赢得了开发者们的拥戴。

Python 中的协程

早先 Python 是没有原生协程支持的,因此在 协程 这个领域出现了百家争鸣的现象。主流的实现由以下两种:

  • 用 C 实现协程调度。这一派以 gevent 为代表,在底层实现了协程调度,并将大部分的 阻塞 IO 重写为异步。
  • 用 生成器模拟。这一派以 Tornado 为代表。Tornado 是一个老牌的异步 Web 框架,涵盖了五花八门的异步编程方式,其中包括 协程。本文部分代码借鉴于 Tornado。

直至 Python 3.4,Python 第一次将异步编程纳入标准库中(参见 PEP 3156),其中包括了用生成器模拟的 协程。而在 Python 3.5 中,Guido 总算在语法层面上实现了 协程(参见 PEP 0492)。比起 yield 关键字,新关键字 asyncawait 具有更好的可读性。在不久的将来,新的实现将会慢慢统一混乱已久的协程领域。

尽管 生成器协程 已成为了过去时,但它曾经的辉煌却不可磨灭。下面,让我们一起来探索其中的魔法。

一个简单的例子

假设有两个子程序 mainprinterprinter 是一个死循环,等待输入、加工并输出结果。main 作为主程序,不时地向 printer 发送数据。

这应该怎么实现呢?

传统方式中,这几乎不可能在一个线程中实现,因为死循环会阻塞。而协程却能很好地解决这个问题:

1234567891011121314 def printer():     counter = 0    while True:        string = (yield)        print(\'[{0}] {1}\’.format(counter, string))        counter += 1 if __name__ == \’__main__\’:    p = printer()    next(p)    p.send(\’Hi\’)    p.send(\’My name is hsfzxjy.\’)    p.send(\’Bye!\’)

输出:

123 [0] Hi[1] My name is hsfzxjy.[2] Bye!

这其实就是最简单的协程。程序由两个分支组成。主程序通过 send 唤起子程序并传入数据,子程序处理完后,用 yield 将自己挂起,并返回主程序,如此交替进行。

协程调度

有时,你的手头上会有多个任务,每个任务耗时很长,而你又不想同步处理,而是希望能像多线程一样交替执行。这时,你就需要一个调度器来协调流程了。

作为例子,我们假设有这么一个任务:

1234 def task(name, times):     for i in range(times):        print(name, i)

如果你直接执行 task,那它会在遍历 times 次之后才会返回。为了实现我们的目的,我们需要将 task 人为地切割成若干块,以便并行处理:

12345 def task(name, times):     for i in range(times):        yield        print(name, i)

这里的 yield 没有逻辑意义,仅是作为暂停的标志点。程序流可以在此暂停,也可以在此恢复。而通过实现一个调度器,我们可以完成多个任务的并行处理:

12345678910111213141516171819 from collections import deque class Runner(object):     def __init__(self, tasks):        self.tasks = deque(tasks)     def next(self):        return self.tasks.pop()     def run(self):        while len(self.tasks):            task = self.next()            try:                next(task)            except StopIteration:                pass            else:                self.tasks.appendleft(task)

这里我们用一个队列(deque)储存任务列表。其中的 run 是一个重要的方法: 它通过轮转队列依次唤起任务,并将已经完成的任务清出队列,简洁地模拟了任务调度的过程。

而现在,我们只需调用: