Python3入门到精通——异步 I/O 与协程

作者: Daniel Meng

GitHub: LibertyDream

博客:明月轩

本系列教程采用知识共享署名-非商业性使用-相同方式共享 2.5 中国大陆许可协议

I/O 编程中有一些容易混淆的概念

  • 并发:在一段时间内,有多个程序在同一个 cpu 上运行,但任意时刻只有一个程序在 cpu 上运行
  • 并行:在某个时间点,同时有多个程序运行在多个 cpu 上

  • 同步:调用 I/O 操作时,必须等待 I/O 操作完成才返回

  • 异步:调用 I/O 操作时,不必等待 I/O 操作完成即可返回

  • 阻塞:调用函数时当前线程被挂起

  • 非阻塞:调用函数时当前线程不会被挂起,而是立即返回

随着计算机网络的发展,特别是 C10K 问题的出现,即 1 GHz 的 cpu,2 GB 内存,1 Gbps 网络下单台服务器怎么同时向 1 万台客户机提供 FTP 服务。有限计算资源服务尽可能多的用户成了刚需,而网络开销最大的是 I/O 操作,由此 Unix 逐步演化出了五种 I/O 模型

  • 阻塞式 I/O

阻塞式 I/O 是最原始的 I/O 处理方式,发送请求后必须等待操作处理完成才能执行后续代码,浪费了大量时间在等候 I/O 完成上。

  • 非阻塞式 I/O

为减少等候时间人们在想能否提交后就返回,但这样的问题是即便返回后为了确认 I/O 是否完成还是需要不停的询问服务器,往返通信开销依然很大。例外情况是 I/O 后续操作并不依赖 I/O 状态,比如跟的是一些其他计算任务,或是发起其他连接请求,此时非阻塞就比阻塞优势明显了

  • I/O 复用

既然有很多程序都要请求 I/O 操作,每一个还都需要通信确认操作完成情况,于是自然而然的想法是将这些请求统一管理起来。于是有了 select,它会监控所有 socket 变动,当某一个请求传来完成信号,客户机随即开始将数据从内核空间转移到用户空间。

所有程序运行都依托内存,操作系统也一样,为了保障系统运行稳定,不会被用户程序意外破坏,内存空间被分成了属于系统的低地址内核空间和属于用户的高地址空间。而网络访问请求最终都是调用系统接口完成的,所以传来的数据最开始是存放在系统那里的,但内核空间用户是没权限访问的,必须拷贝出来才能处理。

所以 I/O 操作过程仍有一部分时间浪费在等候复制上。但 select 模式已经很高效了,I/O 复用也是当下主流异步通信方式

  • 信号驱动式 I/O

这种模式下请求 I/O 操作的客户端和服务端通过信号程序确立了一个“契约”——处理完成后通知我。但没有 select 高效,没掀起大波澜

  • 异步 I/O(POSIX 的 aio_ 系列函数)

上面说到只是服务端处理完成还不够,还有工作没做完,那就都完成了再通知客户机呗,这就是异步 I/O 的由来,也是真正的异步通信。但为达到该效果付出成本也是巨大的,综合来看并没比 I/O 复用强多少,没能成为主流

I/O 复用

selectpollepoll 都是 I/O 多路复用机制。一个进程可以监视多个描述符,一旦某个描述符就绪(读就绪/写就绪),便通知程序进行相应的读写操作。但 selectpollepoll 本质上都是同步 I/O,因为都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步 I/O 会把数据从内核拷贝到用户空间,程序可以直接取用。

select

select 函数监视的文件描述符分3类,分别是 writefdsreadfds、和 exceptfds。调用后select函数会阻塞,直到有描述符就绪(有数据可读、可写、或者有异常),或者超时(timeout 指定等待时间,如果立即返回设为 null 即可),函数返回。当 select 函数返回后,可以通过遍历 fdset,来找到就绪的描述符。

select 目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select 的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在 Linux 上一般为 1024

poll

不同于 select 使用三个位图来表示三个 fdset 的方式,poll 使用一个 pollfd 的指针实现。

pollfd 结构包含了要监视的 event 和发生的 event,不再使用 select “参数-值”传递的方式。同时,pollfd 并没有最大数量限制(但是数量过大后性能也是会下降)。和select函数一样,poll返回后,需要轮询 pollfd 来获取就绪的描述符。

从上面看,select 和 poll 都需要在返回后,通过遍历文件描述符来获取已经就绪的 socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降

epoll

epoll 是 Linux 下 select 和 poll 的增强版本,基于红黑树实现。相对于 select 和 poll 来说,epoll更加灵活,没有描述符限制。epoll 使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间只需复制一次。

  • 高并发,连接活跃度低时 epoll 强于 select;并发低,连接活跃度高,select 强于 epoll

多数框架实现 I/O 复用的方式可以概括为 select + 回调 + 事件循环获取 模式,为了便于解释先来看一个例子

Python 实现 select 可以使用 select 模块,但这个模块提供的方法、功能不如 selectors ,后者是对前者进一步的封装强化

In [6]:
import socket
from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
In [7]:
selector = DefaultSelector()
urls = ["http://www.baidu.com"]
stop = False
In [8]:
class Fetcher:

    def connected(self, key):
        '''写请求回调函数'''
        selector.unregister(key.fd)  # 注销写请求的监控事件
        self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode('utf8'))
        selector.register(self.client.fileno(), EVENT_READ, self.readable)

    def readable(self, key):
        '''读请求回调函数'''
        d = self.client.recv(1024)
        if d:
            self.data += d
        else:
            selector.unregister(key.fd)
            data = self.data.decode("utf8")
            html_data = data.split('\r\n\r\n')[1]
            print(html_data)
            self.client.close()
            urls.remove(self.spider_url)
            if not urls:
                global stop
                stop = True
                
    
    def get_url(self, url):
        self.spider_url = url
        p_url = urlparse(url)
        self.host = p_url.netloc
        self.path = p_url.path
        self.data = b""
        if self.path == "":
            self.path = '/'
        
        # 建立 socket 连接
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client.setblocking(False)  # 设置为非阻塞模式

        try:
            self.client.connect((self.host, 80))
        except BlockingIOError as e:
            pass


        # 注册到 selector 进行托管
        selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
In [3]:
def event_loop():
    '''事件循环'''
    while not stop:
        res_lst = selector.select()
        for key, mask in res_lst:
            call_back = key.data
            call_back(key)

这里模仿爬虫使用 I/O 复用模式抓取 html 内容。get_url 为入口,建立 IPv4,TCP 套接字,设置通信方式为非阻塞式,这样不用等待 connect 返回即可执行后续的注册程序。

基础库 select 本身不支持 register 模式,selector 对此进行了封装,通过 register(fileobj, events, data=None) 方法注册 I/O 请求进行托管。fileobj 是文本对象或文件描述符,Python 内部用变量 fd 表示,可以通过通信连接对象的 fileno() 获取,events 是托管事件,一般是读请求 EVENT_READ 和写请求 EVENT_WRITEdata 用来接收回调方法或其他要上传的数据。

注意,使用 select 时,请求生效后的操作要自己指定,即要自行实现回调方法,样例代码中的 connectedreadable 就是两个回调方法。同时别忘了请求生效以后适时注销在 select 中的托管。

整个 I/O 复用的驱动程序就是事件循环,不停的请求 socket 的状态并调用回调函数。selector.select() 是对托管的描述符列表进行检索,当某一 socket 注册的请求事件被满足时会以元组 (key, mask) 形式添加到列表中并返回列表,key 是 socket 描述数据,是调用 register 方法时产生的 SelectorKey 对象,mask 是验证用掩码

In [5]:
fetcher = Fetcher()
for url in urls:
    fetcher.get_url(url)
event_loop()
<!DOCTYPE html><!--STATUS OK-->
<html>
<head>
	<meta http-equiv="content-type" content="text/html;charset=utf-8">
	<meta http-equiv="X-UA-Compatible" content="IE=Edge">
	<link rel="dns-prefetch" href="//s1.bdstatic.com"/>
	<link rel="dns-prefetch" href="//t1.baidu.com"/>
	<link rel="dns-prefetch" href="//t2.baidu.com"/>
	<link rel="dns-prefetch" href="//t3.baidu.com"/>
	<link rel="dns-prefetch" href="//t10.baidu.com"/>
	<link rel="dns-prefetch" href="//t11.baidu.com"/>
	<link rel="dns-prefetch" href="//t12.baidu.com"/>
	<link rel="dns-prefetch" href="//b1.bdstatic.com"/>
	<title>百度一下,你就知道</title>
	<link href="http://s1.bdstatic.com/r/www/cache/static/home/css/index.css" rel="stylesheet" type="text/css" />
	<!--[if lte IE 8]><style index="index" >#content{height:480px\9}#m{top:260px\9}</style><![endif]-->
	<!--[if IE 8]><style index="index" >#u1 a.mnav,#u1 a.mnav:visited{font-family:simsun}</style><![endif]-->
	<script>var hashMatch = document.location.href.match(/#+(.*wd=[^&].+)/);if (hashMatch && hashMatch[0] && hashMatch[1]) {document.location.replace("http://"+location.host+"/s?"+hashMatch[1]);}var ns_c = function(){};</script>
	<script>function h(obj){obj.style.behavior='url(#default#homepage)';var a = obj.setHomePage('//www.baidu.com/');}</script>
	<noscript><meta http-equiv="refresh" content="0; url=/baidu.html?from=noscript"/></noscript>
	<script>window._ASYNC_START=new Date().getTime();</script>
</head>
<body link="#0000cc"><div id="wrapper" style="display:none;"><div id="u"><a href="//www.baidu.com/gaoji/preferences.html"  onmousedown="return user_c({'fm':'set','tab':'setting','login':'0'})">搜索设置</a>|<a id="btop" href="/"  onmousedown="return user_c({'fm':'set','tab':'index','login':'0'})">百度首页</a>|<a id="lb" href="https://passport.baidu.com/v2/?login&tpl=mn&u=http%3A%2F%2Fwww.baidu.com%2F" onclick="return false;"  onmousedown="return user_c({'fm':'set','tab':'login'})">登录</a><a href="https://passport.baidu.com/v2/?reg&regType=1&tpl=mn&u=http%3A%2F%2Fwww.baidu.com%2F"  onmousedown="return user_c({'fm':'set','tab':'reg'})" target="_blank" class="reg">注册</a></div><div id="head"><div class="s_nav"><a href="/" class="s_logo" onmousedown="return c({'fm':'tab','tab':'logo'})"><img src="//www.baidu.com/img/baidu_jgylogo3.gif" width="117" height="38" border="0" alt="到百度首页" title="到百度首页"></a><div class="s_tab" id="s_tab"><a href="http://news.baidu.com/ns?cl=2&rn=20&tn=news&word=" wdfield="word"  onmousedown="return c({'fm':'tab','tab':'news'})">新闻</a>&#12288;<b>网页</b>&#12288;<a href="http://tieba.baidu.com/f?kw=&fr=wwwt" wdfield="kw"  onmousedown="return c({'fm':'tab','tab':'tieba'})">贴吧</a>&#12288;<a href="http://zhidao.baidu.com/q?ct=17&pn=0&tn=ikaslist&rn=10&word=&fr=wwwt" wdfield="word"  onmousedown="return c({'fm':'tab','tab':'zhidao'})">知道</a>&#12288;<a href="http://music.baidu.com/search?fr=ps&key=" wdfield="key"  onmousedown="return c({'fm':'tab','tab':'music'})">音乐</a>&#12288;<a href="http://image.baidu.com/i?tn=baiduimage&ps=1&ct=201326592&lm=-1&cl=2&nc=1&word=" wdfield="word"  onmousedown="return c({'fm':'tab','tab':'pic'})">图片</a>&#12288;<a href="http://v.baidu.com/v?ct=301989888&rn=20&pn=0&db=0&s=25&word=" wdfield="word"   onmousedown="return c({'fm':'tab','tab':'video'})">视频</a>&#12288;<a href="http://map.baidu.com/m?word=&fr=ps01000" wdfield="word"  onmousedown="return c({'fm':'tab','tab':'map'})">地图</a>&#12288;<a href="http://wenku.baidu.com/search?word=&lm=0&od=0" wdfield="word"  onmousedown="return c({'fm':'tab','tab':'wenku'})">文库</a>&#12288;<a href="//www.baidu.com/more/"  onmousedown="return c({'fm':'tab','tab':'more'})">更多»</a></div></div><form id="form" name="f" action="/s" class="fm" ><input type="hidden" name="ie" value="utf-8"><input type="hidden" name="f" value="8"><input type="hidden" name="rsv_bp" value="1"><span class="bg s_ipt_wr"><input name="wd" id="kw" class="s_ipt" value="" maxlength="100"></span><span class="bg s_btn_wr"><input type="submit" id="su" value="百度一下" class="bg s_btn" onmousedown="this.className='bg s_btn s_btn_h'" onmouseout="this.className='bg s_btn'"></span><span class="tools"><span id="mHolder"><div id="mCon"><span>输入法</span></div><ul id="mMenu"><li><a href="javascript:;" name="ime_hw">手写</a></li><li><a href="javascript:;" name="ime_py">拼音</a></li><li class="ln"></li><li><a href="javascript:;" name="ime_cl">关闭</a></li></ul></span><span class="shouji"><strong>推荐&nbsp;:&nbsp;</strong><a href="http://w.x.baidu.com/go/mini/8/10000020" onmousedown="return ns_c({'fm':'behs','tab':'bdbrowser'})">百度浏览器,打开网页快2秒!</a></span></span></form></div><div id="content"><div id="u1"><a href="http://news.baidu.com" name="tj_trnews" class="mnav">新闻</a><a href="http://www.hao123.com" name="tj_trhao123" class="mnav">hao123</a><a href="http://map.baidu.com" name="tj_trmap" class="mnav">地图</a><a href="http://v.baidu.com" name="tj_trvideo" class="mnav">视频</a><a href="http://tieba.baidu.com" name="tj_trtieba" class="mnav">贴吧</a><a href="https://passport.baidu.com/v2/?login&tpl=mn&u=http%3A%2F%2Fwww.baidu.com%2F" name="tj_login" id="lb" onclick="return false;">登录</a><a href="//www.baidu.com/gaoji/preferences.html" name="tj_settingicon" id="pf">设置</a><a href="//www.baidu.com/more/" name="tj_briicon" id="bri">更多产品</a></div><div id="m"><p id="lg"><img src="//www.baidu.com/img/bd_logo.png" width="270" height="129"></p><p id="nv"><a href="http://news.baidu.com">新&nbsp;闻</a> <b>网&nbsp;页</b> <a href="http://tieba.baidu.com">贴&nbsp;吧</a> <a href="http://zhidao.baidu.com">知&nbsp;道</a> <a href="http://music.baidu.com">音&nbsp;乐</a> <a href="http://image.baidu.com">图&nbsp;片</a> <a href="http://v.baidu.com">视&nbsp;频</a> <a href="http://map.baidu.com">地&nbsp;图</a></p><div id="fm"><form id="form1" name="f1" action="/s" class="fm"><span class="bg s_ipt_wr"><input type="text" name="wd" id="kw1" maxlength="100" class="s_ipt"></span><input type="hidden" name="rsv_bp" value="0"><input type=hidden name=ch value=""><input type=hidden name=tn value="baidu"><input type=hidden name=bar value=""><input type="hidden" name="rsv_spt" value="3"><input type="hidden" name="ie" value="utf-8"><span class="bg s_btn_wr"><input type="submit" value="百度一下" id="su1" class="bg s_btn" onmousedown="this.className='bg s_btn s_btn_h'" onmouseout="this.className='bg s_btn'"></span></form><span class="tools"><span id="mHolder1"><div id="mCon1"><span>输入法</span></div></span></span><ul id="mMenu1"><div class="mMenu1-tip-arrow"><em></em><ins></ins></div><li><a href="javascript:;" name="ime_hw">手写</a></li><li><a href="javascript:;" name="ime_py">拼音</a></li><li class="ln"></li><li><a href="javascript:;" name="ime_cl">关闭</a></li></ul></div><p id="lk"><a href="http://baike.baidu.com">百科</a> <a href="http://wenku.baidu.com">文库</a> <a href="http://www.hao123.com">hao123</a><span>&nbsp;|&nbsp;<a href="//www.baidu.com/more/">更多&gt;&gt;</a></span></p><p id="lm"></p></div></div><div id="ftCon"><div id="ftConw"><p id="lh"><a id="seth" onClick="h(this)" href="/" onmousedown="return ns_c({'fm':'behs','tab':'homepage','pos':0})">把百度设为主页</a><a id="setf" href="//www.baidu.com/cache/sethelp/index.html" onmousedown="return ns_c({'fm':'behs','tab':'favorites','pos':0})" target="_blank">把百度设为主页</a><a onmousedown="return ns_c({'fm':'behs','tab':'tj_about'})" href="http://home.baidu.com">关于百度</a><a onmousedown="return ns_c({'fm':'behs','tab':'tj_about_en'})" href="http://ir.baidu.com">About Baidu</a></p><p id="cp">&copy;2018&nbsp;Baidu&nbsp;<a href="/duty/" name="tj_duty">使用百度前必读</a>&nbsp;京ICP证030173号&nbsp;<img src="http://s1.bdstatic.com/r/www/cache/static/global/img/gs_237f015b.gif"></p></div></div><div id="wrapper_wrapper"></div></div><div class="c-tips-container" id="c-tips-container"></div>
<script>window.__async_strategy=2;</script>
<script>var bds={se:{},su:{urdata:[],urSendClick:function(){}},util:{},use:{},comm : {domain:"http://www.baidu.com",ubsurl : "http://sclick.baidu.com/w.gif",tn:"baidu",queryEnc:"",queryId:"",inter:"",templateName:"baidu",sugHost : "http://suggestion.baidu.com/su",query : "",qid : "",cid : "",sid : "",indexSid : "",stoken : "",serverTime : "",user : "",username : "",loginAction : [],useFavo : "",pinyin : "",favoOn : "",curResultNum:"",rightResultExist:false,protectNum:0,zxlNum:0,pageNum:1,pageSize:10,newindex:0,async:1,maxPreloadThread:5,maxPreloadTimes:10,preloadMouseMoveDistance:5,switchAddMask:false,isDebug:false,ishome : 1},_base64:{domain : "http://b1.bdstatic.com/",b64Exp : -1,pdc : 0}};var name,navigate,al_arr=[];var selfOpen = window.open;eval("var open = selfOpen;");var isIE=navigator.userAgent.indexOf("MSIE")!=-1&&!window.opera;var E = bds.ecom= {};bds.se.mon = {'loadedItems':[],'load':function(){},'srvt':-1};try {bds.se.mon.srvt = parseInt(document.cookie.match(new RegExp("(^| )BDSVRTM=([^;]*)(;|$)"))[2]);document.cookie="BDSVRTM=;expires=Sat, 01 Jan 2000 00:00:00 GMT"; }catch(e){}</script>
<script>if(!location.hash.match(/[^a-zA-Z0-9]wd=/)){document.getElementById("ftCon").style.display='block';document.getElementById("u1").style.display='block';document.getElementById("content").style.display='block';document.getElementById("wrapper").style.display='block';setTimeout(function(){try{document.getElementById("kw1").focus();document.getElementById("kw1").parentNode.className += ' iptfocus';}catch(e){}},0);}</script>
<script type="text/javascript" src="http://s1.bdstatic.com/r/www/cache/static/jquery/jquery-1.10.2.min_f2fb5194.js"></script>
<script>(function(){var index_content = $('#content');var index_foot= $('#ftCon');var index_css= $('head [index]');var index_u= $('#u1');var result_u= $('#u');var wrapper=$("#wrapper");window.index_on=function(){index_css.insertAfter("meta:eq(0)");result_common_css.remove();result_aladdin_css.remove();result_sug_css.remove();index_content.show();index_foot.show();index_u.show();result_u.hide();wrapper.show();if(bds.su&&bds.su.U&&bds.su.U.homeInit){bds.su.U.homeInit();}setTimeout(function(){try{$('#kw1').get(0).focus();window.sugIndex.start();}catch(e){}},0);if(typeof initIndex=='function'){initIndex();}};window.index_off=function(){index_css.remove();index_content.hide();index_foot.hide();index_u.hide();result_u.show();result_aladdin_css.insertAfter("meta:eq(0)");result_common_css.insertAfter("meta:eq(0)");result_sug_css.insertAfter("meta:eq(0)");wrapper.show();};})();</script>
<script>window.__switch_add_mask=1;</script>
<script type="text/javascript" src="http://s1.bdstatic.com/r/www/cache/static/global/js/instant_search_newi_redirect1_20bf4036.js"></script>
<script>initPreload();$("#u,#u1").delegate("#lb",'click',function(){try{bds.se.login.open();}catch(e){}});if(navigator.cookieEnabled){document.cookie="NOJS=;expires=Sat, 01 Jan 2000 00:00:00 GMT";}</script>
<script>$(function(){for(i=0;i<3;i++){u($($('.s_ipt_wr')[i]),$($('.s_ipt')[i]),$($('.s_btn_wr')[i]),$($('.s_btn')[i]));}function u(iptwr,ipt,btnwr,btn){if(iptwr && ipt){iptwr.on('mouseover',function(){iptwr.addClass('ipthover');}).on('mouseout',function(){iptwr.removeClass('ipthover');}).on('click',function(){ipt.focus();});ipt.on('focus',function(){iptwr.addClass('iptfocus');}).on('blur',function(){iptwr.removeClass('iptfocus');}).on('render',function(e){var $s = iptwr.parent().find('.bdsug');var l = $s.find('li').length;if(l>=5){$s.addClass('bdsugbg');}else{$s.removeClass('bdsugbg');}});}if(btnwr && btn){btnwr.on('mouseover',function(){btn.addClass('btnhover');}).on('mouseout',function(){btn.removeClass('btnhover');});}}});</script>
<script type="text/javascript" src="http://s1.bdstatic.com/r/www/cache/static/home/js/bri_7f1fa703.js"></script>
<script>(function(){var _init=false;window.initIndex=function(){if(_init){return;}_init=true;var w=window,d=document,n=navigator,k=d.f1.wd,a=d.getElementById("nv").getElementsByTagName("a"),isIE=n.userAgent.indexOf("MSIE")!=-1&&!window.opera;(function(){if(/q=([^&]+)/.test(location.search)){k.value=decodeURIComponent(RegExp["\x241"])}})();(function(){var u = G("u1").getElementsByTagName("a"), nv = G("nv").getElementsByTagName("a"), lk = G("lk").getElementsByTagName("a"), un = "";var tj_nv = ["news","tieba","zhidao","mp3","img","video","map"];var tj_lk = ["baike","wenku","hao123","more"];un = bds.comm.user == "" ? "" : bds.comm.user;function _addTJ(obj){addEV(obj, "mousedown", function(e){var e = e || window.event;var target = e.target || e.srcElement;if(target.name){ns_c({'fm':'behs','tab':target.name,'un':encodeURIComponent(un)});}});}for(var i = 0; i < u.length; i++){_addTJ(u[i]);}for(var i = 0; i < nv.length; i++){nv[i].name = 'tj_' + tj_nv[i];}for(var i = 0; i < lk.length; i++){lk[i].name = 'tj_' + tj_lk[i];}})();(function() {var links = {'tj_news': ['word', 'http://news.baidu.com/ns?tn=news&cl=2&rn=20&ct=1&ie=utf-8'],'tj_tieba': ['kw', 'http://tieba.baidu.com/f?ie=utf-8'],'tj_zhidao': ['word', 'http://zhidao.baidu.com/search?pn=0&rn=10&lm=0'],'tj_mp3': ['key', 'http://music.baidu.com/search?fr=ps&ie=utf-8'],'tj_img': ['word', 'http://image.baidu.com/i?ct=201326592&cl=2&nc=1&lm=-1&st=-1&tn=baiduimage&istype=2&fm=&pv=&z=0&ie=utf-8'],'tj_video': ['word', 'http://video.baidu.com/v?ct=301989888&s=25&ie=utf-8'],'tj_map': ['wd', 'http://map.baidu.com/?newmap=1&ie=utf-8&s=s'],'tj_baike': ['word', 'http://baike.baidu.com/search/word?pic=1&sug=1&enc=utf8'],'tj_wenku': ['word', 'http://wenku.baidu.com/search?ie=utf-8']};var domArr = [G('nv'), G('lk'),G('cp')],kw = G('kw1');for (var i = 0, l = domArr.length; i < l; i++) {domArr[i].onmousedown = function(e) {e = e || window.event;var target = e.target || e.srcElement,name = target.getAttribute('name'),items = links[name],reg = new RegExp('^\\s+|\\s+\x24'),key = kw.value.replace(reg, '');if (items) {if (key.length > 0) {var wd = items[0], url = items[1],url = url + ( name === 'tj_map' ? encodeURIComponent('&' + wd + '=' + key) : ( ( url.indexOf('?') > 0 ? '&' : '?' ) + wd + '=' + encodeURIComponent(key) ) );target.href = url;} else {target.href = target.href.match(new RegExp('^http:\/\/.+\.baidu\.com'))[0];}}name && ns_c({'fm': 'behs','tab': name,'query': encodeURIComponent(key),'un': encodeURIComponent(bds.comm.user || '') });};}})();};if(window.pageState==0){initIndex();}})();document.cookie = 'IS_STATIC=1;expires=' + new Date(new Date().getTime() + 10*60*1000).toGMTString();</script>
</body></html>

Windows 下 selector 底层调用的是 select.select 方法,在 Linux 环境中调用的是 epoll

select/poll/epoll + 回调 + 事件循环看似将一件简单的通信变得复杂了,但实际上这带来了高并发性。事件循环驱动中,只有 select() 一个单线程,调用时会陷入阻塞,收到请求回复则转去处理请求,实现了单线程多通信的效果,免去线程切换成本,极大提高了效率。

说了优点,那不得不说一说这种模式的缺点。首先很明显的一点是原来通畅的逻辑被割裂了,连接、发送、读取都被分配到了不同方法中执行,可读性变差。其次,回调容易出现多层嵌套,这时如果发生异常,既难定位又难处理。最后,回调期间用到的对象通常都是一样的,多层嵌套的情况下怎么管理共享参数就是个问题。

协程

到目前为止我们掌握了三类线程编码方法,它们各有优缺点。同步编程逻辑简单可读性好,但性能差;回调函数并发性强但编码复杂;多线程执行任务功能强大,但线程间通信以及锁的开销、维护成本高。那么能不能寻找这样一种方法:

  1. 以同步编程的方式写异步的代码
  2. 单线程内能自由切换函数

这种理想的挑战在于,一来线程一般是由操作系统调度的,这种要求下需要自己来调度线程了。二者函数间进行切换性能自然是比加锁或切换线程的方法高很多,但问题是异步执行意味着跳出当前函数体,函数对应栈会退出,也就回不来了。

所以实现上述功能的函数必须是可以暂停,并能在适当时候(传入参数至暂停处)恢复执行的函数,这就是协程。

生成器的 send,throw,close 方法

之前讲到生成器使用时,只讲了 yeildnext 方法

In [10]:
def gen_demo():
    yield 1
    yield 2
    return 'Finished'
In [13]:
gen = gen_demo()

print(next(gen))
print(next(gen))
print(next(gen))
1
2
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-13-35c67995e47a> in <module>
      3 print(next(gen))
      4 print(next(gen))
----> 5 print(next(gen))

StopIteration: Finished

也讲到底层的 FrameObj ,只要获取了它就能在任何地方调用生成器,这是用生成器构建协程的基础。

但不可能在编程时总去访问底层结构,而且恢复现场必然要恢复上下文内的对象,这些对象可能需要外部传入。这就要用到生成器的另一个方法 send 了,调用 send 会重启生成器,接收传入值,并前往下一个 yeild 位置。

In [57]:
def gen_url():
    html = yield 'http://www.google.com/'
    print('get html:{}'.format(html))
    yield 2
    yield 3
    return 'Finished'
In [58]:
gen = gen_url()
print(gen.send(None))
print(gen.send('www.baidu.com'))  #  给 html 赋值,返回下一个 yield 值
print(next(gen))
print(next(gen))
http://www.google.com/
get html:www.baidu.com
2
3
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-58-d503f7166de4> in <module>
      3 print(gen.send('www.baidu.com'))  #  给 html 赋值,返回下一个 yield 值
      4 print(next(gen))
----> 5 print(next(gen))

StopIteration: Finished

注意,send 的机制是回到上次生成器执行的位置,赋值,然后再前往下一个 yeild 并返回其值。这意味着调用 send 前生成器起码运行过,否则没有上次位置可返回了,如果第一次使用生成器时就调用 send,那么传入参数必须是 None

使用 close 可以关闭生成器

In [61]:
def gen_demo():
    yield 'hello'
    yield 'world'
    return 'Finished'

gen = gen_demo()
print(next(gen))
gen.close()
next(gen)  # 因生成器关闭会报异常
hello
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-61-400f33bbd5fa> in <module>
      7 print(next(gen))
      8 gen.close()
----> 9 next(gen)  # 因生成器关闭会报异常

StopIteration: 

调用 close 后会在当前 yeild 处唤起 GeneratorExit 异常,该异常继承自 BaseException 而不是传统的 Exception。拦截忽略该异常,但后续还有 yeild 的话会报错,所以不建议捕获该异常

In [67]:
def gen_demo():
    try:
        yield 'hello'
    except GeneratorExit as e:
        pass
    yield 'world'
    return 'Finished'

gen = gen_demo()
print(next(gen))
gen.close()
next(gen)
hello
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-67-fe3fae2f131f> in <module>
      9 gen = gen_demo()
     10 print(next(gen))
---> 11 gen.close()
     12 next(gen)

RuntimeError: generator ignored GeneratorExit

close 的异常相反,另一个生成器方法 throw 会向当前 yeild 处抛入必须处理的异常,否则出错

In [69]:
def gen_demo():
    yield 'Hi'
    yield 'Hello'
    return

gen = gen_demo()
print(next(gen))
gen.throw(Exception, 'Must catch')
next(gen)
Hi
---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-69-2a4af42b230d> in <module>
      6 gen = gen_demo()
      7 print(next(gen))
----> 8 gen.throw(Exception, 'Must catch')
      9 next(gen)

<ipython-input-69-2a4af42b230d> in gen_demo()
      1 def gen_demo():
----> 2     yield 'Hi'
      3     yield 'Hello'
      4     return
      5 

Exception: Must catch
In [76]:
def gen_demo():
    try:
        yield 'Hi'
    except Exception:
        print('a')
    yield 'Hello'  # 被 pass 掉了,无效
    return 'Finished'

gen = gen_demo()
print(next(gen))
gen.throw(Exception, 'Must catch')
print(next(gen))
Hi
a
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-76-50b606c5415d> in <module>
     10 print(next(gen))
     11 gen.throw(Exception, 'Must catch')
---> 12 print(next(gen))

StopIteration: Finished

yield from

yield from 是生成器变协程最后一个拼图。它解决了“跨越式”沟通问题,比如程序调用链 A->B->C,yield from iterable_obj 能在 A 与 C 间建立连接通道。iterable_obj 是可迭代对象,yield from 会迭代返回其中元素,同时别忘了 yield 必须在函数体内

In [6]:
lst = ['Tom', 'Marry']
dic = {'A':'abandon','B':'bad'}

def iter_chain(*args):
    for iterable in args:
        yield from iterable

for val in iter_chain(lst, dic, range(5)):
    print(val, end=' ')
Tom Marry A B 0 1 2 3 4

yield from 单纯做迭代显得多此一举,下面这种情况才是它的主场

In [8]:
final_result = {}

def sales_sum(pro_name):
    total = 0
    nums = []
    while True:
        x = yield
        if not x:
            break
        print(pro_name+"销量: ", x)        
        total += x
        nums.append(x)
    return total, nums

def middle(key):
    while True:
        final_result[key] = yield from sales_sum(key)
        print(key+"销量统计完成!!.")

def main():
    data_sets = {
        "面膜": [1200, 1500, 3000],
        "手机": [28,55,98,108 ],
        "大衣": [280,560,778,70],
    }
    for key, data_set in data_sets.items():
        print("start key:", key)
        m = middle(key)
        m.send(None) # 预激活 middle 生成器
        for value in data_set:
            m.send(value)   # 传递各品类下的销售数据
        m.send(None)
    print("final_result:", final_result)

main()
start key: 面膜
面膜销量:  1200
面膜销量:  1500
面膜销量:  3000
面膜销量统计完成!!.
start key: 手机
手机销量:  28
手机销量:  55
手机销量:  98
手机销量:  108
手机销量统计完成!!.
start key: 大衣
大衣销量:  280
大衣销量:  560
大衣销量:  778
大衣销量:  70
大衣销量统计完成!!.
final_result: {'面膜': (5700, [1200, 1500, 3000]), '手机': (289, [28, 55, 98, 108]), '大衣': (1688, [280, 560, 778, 70])}

这里展示了一个协程 demo,调用方 main 通过委托生成器 middle 与子生成器 sales_sum 建立了双向连通通道,预激活 middle 后通过 send 方法传入值交给子生成器处理。每组数据处理完后传入 None 结束子生成器的循环,子生成器将统计销售数据返回给接收字典 final_result

这里直观看上去求统计数字只需要一层生成器就好了,为什么又多写一个子生成器呢?因为上面的 demo 逻辑简单,不涉及异常处理,但生成器的使用中会有许多异常情况,必须针对性处理。为避免异常处理代码过于冗余,3.3 版本后 Python 加入了 yield from 语法,RESULT = yield from EXPR 其真实运行逻辑为

In [ ]:
"""
    PEP380 对 RESULT = yield from EXPR 的解释

符号约定:
-------------
_i:子生成器,同时也是一个迭代器
_y:子生成器生产的值
_r:yield from 表达式最终的值
_s:调用方通过send()发送的值
_e:异常对象

"""
_i = iter(EXPR)  # EXPR是一个可迭代对象,因此 _i 可能是子生成器;
try:
    _y = next(_i)  # 预激子生成器,把产出的第一个值存在_y中;
except StopIteration as _e:
    _r = _e.value  # 如果抛出了`StopIteration`异常,那么就将异常对象的`value`属性保存到_r,这是最简单的情况的返回值;
else:
    while 1:  # 尝试执行这个循环,委托生成器会阻塞;
        try:
            _s = yield _y  # 生产子生成器的值,等待调用方`send()`值,发送过来的值将保存在_s中;
        except GeneratorExit as _e:  # 迭代对象是生成器时的关闭处理,比如按了 CTRL + C
            try:
                _m = _i.close # 如果是生成器,调用其 close 方法,先关闭子生成器再关闭委托生成器
            except AttributeError:  
                pass
            else:
                _m()
            raise _e
        except BaseException as _e:  # 一般异常处理
            _x = sys.exc_info()
            try:
                _m = _i.throw  # 如果是生成器,使用 throw 方法传递异常
            except AttributeError:
                raise _e
            else:
                try:
                    _y = _m(*_x)  # 传入异常时生成器可能已经关闭
                except StopIteration as _e:
                    _r = _e.value
                    break
        else:  # 接收值正常
            try:
                if _s is None:  # 激活或是传入 None 值
                    _y = next(_i)
                else:
                    _y = _i.send(_s)  # 非空就向下转发_s
            except StopIteration as _e:
                _r = _e.value  # 如果子生成器抛出异常,那么就获取异常对象的`value`属性存到_r,退出循环,恢复委托生成器的运行;
                break
RESULT = _r  # _r就是整个yield from表达式返回的值。

总结:

  1. 子生成器生产的值,都是直接传给调用方的;调用方通过 send() 发送的值都是直接传递给子生成器的;如果发送的是 None,会调用子生成器的 __next__() 方法,如果不是 None,会调用子生成器的 send() 方法;
  2. 子生成器退出的时候,最后的 return EXPR,会触发一个 StopIteration(EXPR) 异常;
  3. yield from 表达式的返回值,是子生成器终止时,传递给 StopIteration 异常的第一个参数;
  4. 如果调用的时候出现 StopIteration 异常,委托生成器会恢复运行,同时其他的异常会向上 "冒泡";
  5. 传入委托生成器的异常里,除了GeneratorExit之外,其他的所有异常全部传递给子生成器的throw()方法;如果调用throw()的时候出现了StopIteration异常,那么就恢复委托生成器的运行,其他的异常全部向上 "冒泡";
  6. 如果在委托生成器上调用 close() 或传入 GeneratorExit 异常,会调用子生成器的 close() 方法,没有的话就不调用。如果在调用 close() 的时候抛出了异常,那么就向上 "冒泡",否则的话委托生成器会抛出 GeneratorExit 异常。

到此为止,达成我们期望目标的条件就都具备了。我们希望同步编码的方式写异步程序,只需要在耗费 I/O 操作的地方调用生成器,将异步任务转出去,让生成器将结果返回,既不妨碍编码逻辑的顺序性,又不需要考虑变量共享的问题。我们希望单线程做到 I/O 复用,实现并发。生成器 + 事件循环整个流程都是函数间转换,没有线程切换,同时循环阻塞发生在子生成器内,不妨碍调用者的执行。

至于多层套用时的异常处理难题,通过上述 yield from 机制不难发现,生成器能将错误一直上抛到调用者,调用者也能向下传入异常中断。

原生协程

终于用生成器解决了高效率编码单线程高并发的需求,但带来的问题就是可读性变差,这种模式下对 yield 关键字必须辨别是要当作生成器处理还是协程处理。为了保证语义明确,3.5 版本后 Python 加入了 asyncawait 关键词,专门指明协程。async 标识函数,await 可以理解为 yield from,后面接一个 Awaitable 对象(内部实现了 __await__() 方法)

注意,使用原生协程后函数体内不能再出现 yield 关键词,激活只能使用 send(None) 形式

In [14]:
name_lst = []

async def get_name_string(name):
    return 'I am ' + name

async def name_cor(name):
    name_str = await get_name_string(name)
    return name_lst.append(name_str)

def main():
    gen = name_cor('Daniel Meng')
    try:
        gen.send(None)
    except StopIteration:
        pass
    print(name_lst)


main()
['I am Daniel Meng']