最近打算学习 tornado 的源码,所以就建立一个系列主题 “深入理解 tornado”。 在此记录学习经历及个人见解与大家分享。文中一定会出现理解不到位或理解错误的地方,还请大家多多指教
进入正题:
tornado 优秀的大并发处理能力得益于它的 web server 从底层开始就自己实现了一整套基于 epoll 的单线程异步架构(其他 python web 框架的自带 server 基本是基于 wsgi 写的简单服务器,并没有自己实现底层结构。 关于 wsgi 详见之前的文章: 自己写一个 wsgi 服务器运行 Django 、Tornado 应用)。 那么 tornado.ioloop
就是 tornado web server 最底层的实现。
看 ioloop 之前,我们需要了解一些预备知识,有助于我们理解 ioloop。
epoll
ioloop 的实现基于 epoll ,那么什么是 epoll? epoll 是Linux内核为处理大批量文件描述符而作了改进的 poll 。
那么什么又是 poll ? 首先,我们回顾一下, socket 通信时的服务端,当它接受( accept )一个连接并建立通信后( connection )就进行通信,而此时我们并不知道连接的客户端有没有信息发完。 这时候我们有两种选择:
- 一直在这里等着直到收发数据结束;
- 每隔一定时间来看看这里有没有数据;
第二种办法要比第一种好一些,多个连接可以统一在一定时间内轮流看一遍里面有没有数据要读写,看上去我们可以处理多个连接了,这个方式就是 poll / select 的解决方案。 看起来似乎解决了问题,但实际上,随着连接越来越多,轮询所花费的时间将越来越长,而服务器连接的 socket 大多不是活跃的,所以轮询所花费的大部分时间将是无用的。为了解决这个问题, epoll 被创造出来,它的概念和 poll 类似,不过每次轮询时,他只会把有数据活跃的 socket 挑出来轮询,这样在有大量连接时轮询就节省了大量时间。
对于 epoll 的操作,其实也很简单,只要 4 个 API 就可以完全操作它。
epoll_create
用来创建一个 epoll 描述符( 就是创建了一个 epoll )
epoll_ctl
操作 epoll 中的 event;可用参数有:
参数 | 含义 |
---|---|
EPOLL_CTL_ADD | 添加一个新的epoll事件 |
EPOLL_CTL_DEL | 删除一个epoll事件 |
EPOLL_CTL_MOD | 改变一个事件的监听方式 |
而事件的监听方式有七种,而我们只需要关心其中的三种:
宏定义 | 含义 |
---|---|
EPOLLIN | 缓冲区满,有数据可读 |
EPOLLOUT | 缓冲区空,可写数据 |
EPOLLERR | 发生错误 |
epoll_wait
就是让 epoll 开始工作,里面有个参数 timeout,当设置为非 0 正整数时,会监听(阻塞) timeout 秒;设置为 0 时立即返回,设置为 -1 时一直监听。
在监听时有数据活跃的连接时其返回活跃的文件句柄列表(此处为 socket 文件句柄)。
close
关闭 epoll
现在了解了 epoll 后,我们就可以来看 ioloop 了 (如果对 epoll 还有疑问可以看这两篇资料: epoll 的原理是什么、百度百科:epoll)
tornado.ioloop
很多初学者一定好奇 tornado 运行服务器最后那一句 tornado.ioloop.IOLoop.current().start()
到底是干什么的。 我们先不解释作用,来看看这一句代码背后到底都在干什么。
先贴 ioloop 代码:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 | from __future__ import absolute_import, division, print_function, with_statement import datetimeimport errnoimport functoolsimport heapq # 最小堆import itertoolsimport loggingimport numbersimport osimport selectimport sysimport threadingimport timeimport tracebackimport math from tornado.concurrent import TracebackFuture, is_futurefrom tornado.log import app_log, gen_logfrom tornado.platform.auto import set_close_exec, Wakerfrom tornado import stack_contextfrom tornado.util import PY3, Configurable, errno_from_exception, timedelta_to_seconds try: import signalexcept ImportError: signal = None if PY3: import _thread as threadelse: import thread _POLL_TIMEOUT = 3600.0 class TimeoutError(Exception): pass class IOLoop(Configurable): _EPOLLIN = 0x001 _EPOLLPRI = 0x002 _EPOLLOUT = 0x004 _EPOLLERR = 0x008 _EPOLLHUP = 0x010 _EPOLLRDHUP = 0x2000 _EPOLLONESHOT = (1 << 30) _EPOLLET = (1 << 31) # Our events map exactly to the epoll events NONE = 0 READ = _EPOLLIN WRITE = _EPOLLOUT ERROR = _EPOLLERR | _EPOLLHUP # Global lock for creating global IOLoop instance _instance_lock = threading.Lock() _current = threading.local() @staticmethod def instance(): if not hasattr(IOLoop, \”_instance\”): with IOLoop._instance_lock: if not hasattr(IOLoop, \”_instance\”): # New instance after double check IOLoop._instance = IOLoop() return IOLoop._instance @staticmethod def initialized(): \”\”\”Returns true if the singleton instance has been created.\”\”\” return hasattr(IOLoop, \”_instance\”) def install(self): assert not IOLoop.initialized() IOLoop._instance = self @staticmethod def clear_instance(): \”\”\”Clear the global `IOLoop` instance. .. versionadded:: 4.0 \”\”\” if hasattr(IOLoop, \”_instance\”): del IOLoop._instance @staticmethod def current(instance=True): current = getattr(IOLoop._current, \”instance\”, None) if current is None and instance: return IOLoop.instance() return current def make_current(self): IOLoop._current.instance = self @staticmethod def clear_current(): IOLoop._current.instance = None @classmethod def configurable_base(cls): return IOLoop @classmethod def configurable_default(cls): if hasattr(select, \”epoll\”): from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, \”kqueue\”): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop def initialize(self, make_current=None): if make_current is None: if IOLoop.current(instance=False) is None: self.make_current() elif make_current: if IOLoop.current(instance=False) is not None: raise RuntimeError(\”current IOLoop already exists\”) self.make_current() def close(self, all_fds=False): raise NotImplementedError() def add_handler(self, fd, handler, events): raise NotImplementedError() def update_handler(self, fd, events): raise NotImplementedError() def remove_handler(self, fd): raise NotImplementedError() def |