为了让I/O阻塞的时候,程序还可以去干别的。除了使用线程模型,让操作系统的内核去调度多个线程,Windows提供了IOCP机制。简单来说就是一个操作系统提供的回调机制。分成四个步骤
前面的例子太复杂了,我们把accept后面的操作全部忽略掉。单独看一个服务器接收客户端连接的代码:
import socket
from asyncio import _overlapped
import struct
listen_sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=socket.IPPROTO_IP)
listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listen_sock.bind((\'0.0.0.0\', 9090))
listen_sock.listen(0)
NULL = 0
concurrency=0xffffffff
_iocp = _overlapped.CreateIoCompletionPort(_overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
_overlapped.CreateIoCompletionPort(listen_sock.fileno(), _iocp, 0, 0)
conn_sock = socket.socket(listen_sock.family)
conn_sock.settimeout(0)
ov = _overlapped.Overlapped(NULL)
ov.AcceptEx(listen_sock.fileno(), conn_sock.fileno())
def on_accepted():
buf = struct.pack(\'@P\', listen_sock.fileno())
conn_sock.setsockopt(socket.SOL_SOCKET, _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
conn_sock.settimeout(listen_sock.gettimeout())
print(\'connected from %s:%s\' % conn_sock.getpeername())
return conn_sock, conn_sock.getpeername()
callback_map = {}
if ov.pending:
callback_map[ov.address] = on_accepted
else:
on_accepted()
while True:
# wait maximum 1 second
status = _overlapped.GetQueuedCompletionStatus(_iocp, 1000)
if status is None:
continue # try again
err, transferred, key, address = status
callback = callback_map[address]
callback()
break
这段代码使用了Python 3.4。其中 _overlapped.Overlapped(NULL) 这一步是创建key,ov.AcceptEx(listen_sock.fileno(), conn_sock.fileno()) 是做一个I/O调用,后面的 _overlapped.GetQueuedCompletionStatus(_iocp, 1000) 是轮询,callback_map[address] 这一步是根据返回的key查找对应的回调函数回调。
这种实现方式与前面基于线程的方式显著不同:
这样状态从多个线程的多个栈上,变成了只有一个线程,但是在线程内部有一个callback_map来维护单线程内多个并发流程的状态。某种程度上来说,相对于多线程是把一些操作系统的上下文保存和调度职责从操作系统内核移到了网络程序里。