深入理解 tornado 之底层 ioloop 实现

最近打算学习 tornado 的源码,所以就建立一个系列主题 “深入理解 tornado”。 在此记录学习经历及个人见解与大家分享。文中一定会出现理解不到位或理解错误的地方,还请大家多多指教

进入正题:

tornado 优秀的大并发处理能力得益于它的 web server 从底层开始就自己实现了一整套基于 epoll 的单线程异步架构(其他 python web 框架的自带 server 基本是基于 wsgi 写的简单服务器,并没有自己实现底层结构。 关于 wsgi 详见之前的文章: 自己写一个 wsgi 服务器运行 Django 、Tornado 应用)。 那么 tornado.ioloop 就是 tornado web server 最底层的实现。

看 ioloop 之前,我们需要了解一些预备知识,有助于我们理解 ioloop。

epoll

ioloop 的实现基于 epoll ,那么什么是 epoll? epoll 是Linux内核为处理大批量文件描述符而作了改进的 poll 。
那么什么又是 poll ? 首先,我们回顾一下, socket 通信时的服务端,当它接受( accept )一个连接并建立通信后( connection )就进行通信,而此时我们并不知道连接的客户端有没有信息发完。 这时候我们有两种选择:

  1. 一直在这里等着直到收发数据结束;
  2. 每隔一定时间来看看这里有没有数据;

第二种办法要比第一种好一些,多个连接可以统一在一定时间内轮流看一遍里面有没有数据要读写,看上去我们可以处理多个连接了,这个方式就是 poll / select 的解决方案。 看起来似乎解决了问题,但实际上,随着连接越来越多,轮询所花费的时间将越来越长,而服务器连接的 socket 大多不是活跃的,所以轮询所花费的大部分时间将是无用的。为了解决这个问题, epoll 被创造出来,它的概念和 poll 类似,不过每次轮询时,他只会把有数据活跃的 socket 挑出来轮询,这样在有大量连接时轮询就节省了大量时间。

对于 epoll 的操作,其实也很简单,只要 4 个 API 就可以完全操作它。

epoll_create

用来创建一个 epoll 描述符( 就是创建了一个 epoll )

epoll_ctl

操作 epoll 中的 event;可用参数有:

参数 含义
EPOLL_CTL_ADD 添加一个新的epoll事件
EPOLL_CTL_DEL 删除一个epoll事件
EPOLL_CTL_MOD 改变一个事件的监听方式

而事件的监听方式有七种,而我们只需要关心其中的三种:

宏定义 含义
EPOLLIN 缓冲区满,有数据可读
EPOLLOUT 缓冲区空,可写数据
EPOLLERR 发生错误

epoll_wait

就是让 epoll 开始工作,里面有个参数 timeout,当设置为非 0 正整数时,会监听(阻塞) timeout 秒;设置为 0 时立即返回,设置为 -1 时一直监听。

在监听时有数据活跃的连接时其返回活跃的文件句柄列表(此处为 socket 文件句柄)。

close

关闭 epoll

现在了解了 epoll 后,我们就可以来看 ioloop 了 (如果对 epoll 还有疑问可以看这两篇资料: epoll 的原理是什么、百度百科:epoll)

tornado.ioloop

很多初学者一定好奇 tornado 运行服务器最后那一句 tornado.ioloop.IOLoop.current().start() 到底是干什么的。 我们先不解释作用,来看看这一句代码背后到底都在干什么。

先贴 ioloop 代码:

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 from __future__ import absolute_import, division, print_function, with_statement import datetimeimport errnoimport functoolsimport heapq       # 最小堆import itertoolsimport loggingimport numbersimport osimport selectimport sysimport threadingimport timeimport tracebackimport math from tornado.concurrent import TracebackFuture, is_futurefrom tornado.log import app_log, gen_logfrom tornado.platform.auto import set_close_exec, Wakerfrom tornado import stack_contextfrom tornado.util import PY3, Configurable, errno_from_exception, timedelta_to_seconds try:    import signalexcept ImportError:    signal = None  if PY3:    import _thread as threadelse:    import thread  _POLL_TIMEOUT = 3600.0  class TimeoutError(Exception):    pass  class IOLoop(Configurable):    _EPOLLIN = 0x001    _EPOLLPRI = 0x002    _EPOLLOUT = 0x004    _EPOLLERR = 0x008    _EPOLLHUP = 0x010    _EPOLLRDHUP = 0x2000    _EPOLLONESHOT = (1 << 30)    _EPOLLET = (1 << 31)     # Our events map exactly to the epoll events    NONE = 0    READ = _EPOLLIN    WRITE = _EPOLLOUT    ERROR = _EPOLLERR | _EPOLLHUP     # Global lock for creating global IOLoop instance    _instance_lock = threading.Lock()     _current = threading.local()     @staticmethod    def instance():        if not hasattr(IOLoop, \”_instance\”):            with IOLoop._instance_lock:                if not hasattr(IOLoop, \”_instance\”):                    # New instance after double check                    IOLoop._instance = IOLoop()        return IOLoop._instance     @staticmethod    def initialized():        \”\”\”Returns true if the singleton instance has been created.\”\”\”        return hasattr(IOLoop, \”_instance\”)     def install(self):        assert not IOLoop.initialized()        IOLoop._instance = self     @staticmethod    def clear_instance():        \”\”\”Clear the global `IOLoop` instance.        .. versionadded:: 4.0        \”\”\”        if hasattr(IOLoop, \”_instance\”):            del IOLoop._instance     @staticmethod    def current(instance=True):        current = getattr(IOLoop._current, \”instance\”, None)        if current is None and instance:            return IOLoop.instance()        return current     def make_current(self):        IOLoop._current.instance = self     @staticmethod    def clear_current():        IOLoop._current.instance = None     @classmethod    def configurable_base(cls):        return IOLoop     @classmethod    def configurable_default(cls):        if hasattr(select, \”epoll\”):            from tornado.platform.epoll import EPollIOLoop            return EPollIOLoop        if hasattr(select, \”kqueue\”):            # Python 2.6+ on BSD or Mac            from tornado.platform.kqueue import KQueueIOLoop            return KQueueIOLoop        from tornado.platform.select import SelectIOLoop        return SelectIOLoop     def initialize(self, make_current=None):        if make_current is None:            if IOLoop.current(instance=False) is None:                self.make_current()        elif make_current:            if IOLoop.current(instance=False) is not None:                raise RuntimeError(\”current IOLoop already exists\”)            self.make_current()     def close(self, all_fds=False):        raise NotImplementedError()     def add_handler(self, fd, handler, events):        raise NotImplementedError()     def update_handler(self, fd, events):        raise NotImplementedError()     def remove_handler(self, fd):        raise NotImplementedError()     def