I/O 编程中有一些容易混淆的概念
并行:在某个时间点,同时有多个程序运行在多个 cpu 上
同步:调用 I/O 操作时,必须等待 I/O 操作完成才返回
异步:调用 I/O 操作时,不必等待 I/O 操作完成即可返回
阻塞:调用函数时当前线程被挂起
随着计算机网络的发展,特别是 C10K 问题的出现,即 1 GHz 的 cpu,2 GB 内存,1 Gbps 网络下单台服务器怎么同时向 1 万台客户机提供 FTP 服务。有限计算资源服务尽可能多的用户成了刚需,而网络开销最大的是 I/O 操作,由此 Unix 逐步演化出了五种 I/O 模型
阻塞式 I/O 是最原始的 I/O 处理方式,发送请求后必须等待操作处理完成才能执行后续代码,浪费了大量时间在等候 I/O 完成上。
为减少等候时间人们在想能否提交后就返回,但这样的问题是即便返回后为了确认 I/O 是否完成还是需要不停的询问服务器,往返通信开销依然很大。例外情况是 I/O 后续操作并不依赖 I/O 状态,比如跟的是一些其他计算任务,或是发起其他连接请求,此时非阻塞就比阻塞优势明显了
既然有很多程序都要请求 I/O 操作,每一个还都需要通信确认操作完成情况,于是自然而然的想法是将这些请求统一管理起来。于是有了 select
,它会监控所有 socket 变动,当某一个请求传来完成信号,客户机随即开始将数据从内核空间转移到用户空间。
所有程序运行都依托内存,操作系统也一样,为了保障系统运行稳定,不会被用户程序意外破坏,内存空间被分成了属于系统的低地址内核空间和属于用户的高地址空间。而网络访问请求最终都是调用系统接口完成的,所以传来的数据最开始是存放在系统那里的,但内核空间用户是没权限访问的,必须拷贝出来才能处理。
所以 I/O 操作过程仍有一部分时间浪费在等候复制上。但 select
模式已经很高效了,I/O 复用也是当下主流异步通信方式
这种模式下请求 I/O 操作的客户端和服务端通过信号程序确立了一个“契约”——处理完成后通知我。但没有 select
高效,没掀起大波澜
上面说到只是服务端处理完成还不够,还有工作没做完,那就都完成了再通知客户机呗,这就是异步 I/O 的由来,也是真正的异步通信。但为达到该效果付出成本也是巨大的,综合来看并没比 I/O 复用强多少,没能成为主流
select
,poll
,epoll
都是 I/O 多路复用机制。一个进程可以监视多个描述符,一旦某个描述符就绪(读就绪/写就绪),便通知程序进行相应的读写操作。但 select
,poll
,epoll
本质上都是同步 I/O,因为都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步 I/O 会把数据从内核拷贝到用户空间,程序可以直接取用。
select
函数监视的文件描述符分3类,分别是 writefds
、readfds
、和 exceptfds
。调用后select函数会阻塞,直到有描述符就绪(有数据可读、可写、或者有异常),或者超时(timeout
指定等待时间,如果立即返回设为 null 即可),函数返回。当 select
函数返回后,可以通过遍历 fdset
,来找到就绪的描述符。
select 目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select 的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在 Linux 上一般为 1024
不同于 select 使用三个位图来表示三个 fdset 的方式,poll
使用一个 pollfd
的指针实现。
pollfd 结构包含了要监视的 event 和发生的 event,不再使用 select “参数-值”传递的方式。同时,pollfd 并没有最大数量限制(但是数量过大后性能也是会下降)。和select函数一样,poll返回后,需要轮询 pollfd 来获取就绪的描述符。
从上面看,select 和 poll 都需要在返回后,通过遍历文件描述符来获取已经就绪的 socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降
epoll
是 Linux 下 select 和 poll 的增强版本,基于红黑树实现。相对于 select 和 poll 来说,epoll更加灵活,没有描述符限制。epoll 使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间只需复制一次。
多数框架实现 I/O 复用的方式可以概括为 select + 回调 + 事件循环获取 模式,为了便于解释先来看一个例子
Python 实现 select 可以使用 select
模块,但这个模块提供的方法、功能不如 selectors
,后者是对前者进一步的封装强化
import socket
from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
selector = DefaultSelector()
urls = ["http://www.baidu.com"]
stop = False
class Fetcher:
def connected(self, key):
'''写请求回调函数'''
selector.unregister(key.fd) # 注销写请求的监控事件
self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode('utf8'))
selector.register(self.client.fileno(), EVENT_READ, self.readable)
def readable(self, key):
'''读请求回调函数'''
d = self.client.recv(1024)
if d:
self.data += d
else:
selector.unregister(key.fd)
data = self.data.decode("utf8")
html_data = data.split('\r\n\r\n')[1]
print(html_data)
self.client.close()
urls.remove(self.spider_url)
if not urls:
global stop
stop = True
def get_url(self, url):
self.spider_url = url
p_url = urlparse(url)
self.host = p_url.netloc
self.path = p_url.path
self.data = b""
if self.path == "":
self.path = '/'
# 建立 socket 连接
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client.setblocking(False) # 设置为非阻塞模式
try:
self.client.connect((self.host, 80))
except BlockingIOError as e:
pass
# 注册到 selector 进行托管
selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
def event_loop():
'''事件循环'''
while not stop:
res_lst = selector.select()
for key, mask in res_lst:
call_back = key.data
call_back(key)
这里模仿爬虫使用 I/O 复用模式抓取 html 内容。get_url
为入口,建立 IPv4,TCP 套接字,设置通信方式为非阻塞式,这样不用等待 connect
返回即可执行后续的注册程序。
基础库 select
本身不支持 register 模式,selector 对此进行了封装,通过 register(fileobj, events, data=None)
方法注册 I/O 请求进行托管。fileobj
是文本对象或文件描述符,Python 内部用变量 fd
表示,可以通过通信连接对象的 fileno()
获取,events
是托管事件,一般是读请求 EVENT_READ
和写请求 EVENT_WRITE
,data
用来接收回调方法或其他要上传的数据。
注意,使用 select 时,请求生效后的操作要自己指定,即要自行实现回调方法,样例代码中的 connected
,readable
就是两个回调方法。同时别忘了请求生效以后适时注销在 select 中的托管。
整个 I/O 复用的驱动程序就是事件循环,不停的请求 socket 的状态并调用回调函数。selector.select()
是对托管的描述符列表进行检索,当某一 socket 注册的请求事件被满足时会以元组 (key, mask)
形式添加到列表中并返回列表,key
是 socket 描述数据,是调用 register
方法时产生的 SelectorKey
对象,mask
是验证用掩码
fetcher = Fetcher()
for url in urls:
fetcher.get_url(url)
event_loop()
Windows 下 selector
底层调用的是 select.select
方法,在 Linux 环境中调用的是 epoll
。
select/poll/epoll + 回调 + 事件循环
看似将一件简单的通信变得复杂了,但实际上这带来了高并发性。事件循环驱动中,只有 select()
一个单线程,调用时会陷入阻塞,收到请求回复则转去处理请求,实现了单线程多通信的效果,免去线程切换成本,极大提高了效率。
说了优点,那不得不说一说这种模式的缺点。首先很明显的一点是原来通畅的逻辑被割裂了,连接、发送、读取都被分配到了不同方法中执行,可读性变差。其次,回调容易出现多层嵌套,这时如果发生异常,既难定位又难处理。最后,回调期间用到的对象通常都是一样的,多层嵌套的情况下怎么管理共享参数就是个问题。
到目前为止我们掌握了三类线程编码方法,它们各有优缺点。同步编程逻辑简单可读性好,但性能差;回调函数并发性强但编码复杂;多线程执行任务功能强大,但线程间通信以及锁的开销、维护成本高。那么能不能寻找这样一种方法:
这种理想的挑战在于,一来线程一般是由操作系统调度的,这种要求下需要自己来调度线程了。二者函数间进行切换性能自然是比加锁或切换线程的方法高很多,但问题是异步执行意味着跳出当前函数体,函数对应栈会退出,也就回不来了。
所以实现上述功能的函数必须是可以暂停,并能在适当时候(传入参数至暂停处)恢复执行的函数,这就是协程。
之前讲到生成器使用时,只讲了 yeild
和 next
方法
def gen_demo():
yield 1
yield 2
return 'Finished'
gen = gen_demo()
print(next(gen))
print(next(gen))
print(next(gen))
也讲到底层的 FrameObj
,只要获取了它就能在任何地方调用生成器,这是用生成器构建协程的基础。
但不可能在编程时总去访问底层结构,而且恢复现场必然要恢复上下文内的对象,这些对象可能需要外部传入。这就要用到生成器的另一个方法 send
了,调用 send
会重启生成器,接收传入值,并前往下一个 yeild
位置。
def gen_url():
html = yield 'http://www.google.com/'
print('get html:{}'.format(html))
yield 2
yield 3
return 'Finished'
gen = gen_url()
print(gen.send(None))
print(gen.send('www.baidu.com')) # 给 html 赋值,返回下一个 yield 值
print(next(gen))
print(next(gen))
注意,send
的机制是回到上次生成器执行的位置,赋值,然后再前往下一个 yeild
并返回其值。这意味着调用 send
前生成器起码运行过,否则没有上次位置可返回了,如果第一次使用生成器时就调用 send
,那么传入参数必须是 None
使用 close
可以关闭生成器
def gen_demo():
yield 'hello'
yield 'world'
return 'Finished'
gen = gen_demo()
print(next(gen))
gen.close()
next(gen) # 因生成器关闭会报异常
调用 close
后会在当前 yeild
处唤起 GeneratorExit
异常,该异常继承自 BaseException
而不是传统的 Exception
。拦截忽略该异常,但后续还有 yeild
的话会报错,所以不建议捕获该异常
def gen_demo():
try:
yield 'hello'
except GeneratorExit as e:
pass
yield 'world'
return 'Finished'
gen = gen_demo()
print(next(gen))
gen.close()
next(gen)
与 close
的异常相反,另一个生成器方法 throw
会向当前 yeild
处抛入必须处理的异常,否则出错
def gen_demo():
yield 'Hi'
yield 'Hello'
return
gen = gen_demo()
print(next(gen))
gen.throw(Exception, 'Must catch')
next(gen)
def gen_demo():
try:
yield 'Hi'
except Exception:
print('a')
yield 'Hello' # 被 pass 掉了,无效
return 'Finished'
gen = gen_demo()
print(next(gen))
gen.throw(Exception, 'Must catch')
print(next(gen))
yield from
是生成器变协程最后一个拼图。它解决了“跨越式”沟通问题,比如程序调用链 A->B->C,yield from iterable_obj
能在 A 与 C 间建立连接通道。iterable_obj
是可迭代对象,yield from
会迭代返回其中元素,同时别忘了 yield
必须在函数体内
lst = ['Tom', 'Marry']
dic = {'A':'abandon','B':'bad'}
def iter_chain(*args):
for iterable in args:
yield from iterable
for val in iter_chain(lst, dic, range(5)):
print(val, end=' ')
用 yield from
单纯做迭代显得多此一举,下面这种情况才是它的主场
final_result = {}
def sales_sum(pro_name):
total = 0
nums = []
while True:
x = yield
if not x:
break
print(pro_name+"销量: ", x)
total += x
nums.append(x)
return total, nums
def middle(key):
while True:
final_result[key] = yield from sales_sum(key)
print(key+"销量统计完成!!.")
def main():
data_sets = {
"面膜": [1200, 1500, 3000],
"手机": [28,55,98,108 ],
"大衣": [280,560,778,70],
}
for key, data_set in data_sets.items():
print("start key:", key)
m = middle(key)
m.send(None) # 预激活 middle 生成器
for value in data_set:
m.send(value) # 传递各品类下的销售数据
m.send(None)
print("final_result:", final_result)
main()
这里展示了一个协程 demo,调用方 main
通过委托生成器 middle
与子生成器 sales_sum
建立了双向连通通道,预激活 middle
后通过 send
方法传入值交给子生成器处理。每组数据处理完后传入 None
结束子生成器的循环,子生成器将统计销售数据返回给接收字典 final_result
这里直观看上去求统计数字只需要一层生成器就好了,为什么又多写一个子生成器呢?因为上面的 demo 逻辑简单,不涉及异常处理,但生成器的使用中会有许多异常情况,必须针对性处理。为避免异常处理代码过于冗余,3.3 版本后 Python 加入了 yield from
语法,RESULT = yield from EXPR
其真实运行逻辑为
"""
PEP380 对 RESULT = yield from EXPR 的解释
符号约定:
-------------
_i:子生成器,同时也是一个迭代器
_y:子生成器生产的值
_r:yield from 表达式最终的值
_s:调用方通过send()发送的值
_e:异常对象
"""
_i = iter(EXPR) # EXPR是一个可迭代对象,因此 _i 可能是子生成器;
try:
_y = next(_i) # 预激子生成器,把产出的第一个值存在_y中;
except StopIteration as _e:
_r = _e.value # 如果抛出了`StopIteration`异常,那么就将异常对象的`value`属性保存到_r,这是最简单的情况的返回值;
else:
while 1: # 尝试执行这个循环,委托生成器会阻塞;
try:
_s = yield _y # 生产子生成器的值,等待调用方`send()`值,发送过来的值将保存在_s中;
except GeneratorExit as _e: # 迭代对象是生成器时的关闭处理,比如按了 CTRL + C
try:
_m = _i.close # 如果是生成器,调用其 close 方法,先关闭子生成器再关闭委托生成器
except AttributeError:
pass
else:
_m()
raise _e
except BaseException as _e: # 一般异常处理
_x = sys.exc_info()
try:
_m = _i.throw # 如果是生成器,使用 throw 方法传递异常
except AttributeError:
raise _e
else:
try:
_y = _m(*_x) # 传入异常时生成器可能已经关闭
except StopIteration as _e:
_r = _e.value
break
else: # 接收值正常
try:
if _s is None: # 激活或是传入 None 值
_y = next(_i)
else:
_y = _i.send(_s) # 非空就向下转发_s
except StopIteration as _e:
_r = _e.value # 如果子生成器抛出异常,那么就获取异常对象的`value`属性存到_r,退出循环,恢复委托生成器的运行;
break
RESULT = _r # _r就是整个yield from表达式返回的值。
总结:
send()
发送的值都是直接传递给子生成器的;如果发送的是 None
,会调用子生成器的 __next__()
方法,如果不是 None
,会调用子生成器的 send()
方法;return EXPR
,会触发一个 StopIteration(EXPR)
异常;yield from
表达式的返回值,是子生成器终止时,传递给 StopIteration
异常的第一个参数;StopIteration
异常,委托生成器会恢复运行,同时其他的异常会向上 "冒泡";GeneratorExit
之外,其他的所有异常全部传递给子生成器的throw()
方法;如果调用throw()
的时候出现了StopIteration
异常,那么就恢复委托生成器的运行,其他的异常全部向上 "冒泡";close()
或传入 GeneratorExit
异常,会调用子生成器的 close()
方法,没有的话就不调用。如果在调用 close()
的时候抛出了异常,那么就向上 "冒泡",否则的话委托生成器会抛出 GeneratorExit
异常。到此为止,达成我们期望目标的条件就都具备了。我们希望同步编码的方式写异步程序,只需要在耗费 I/O 操作的地方调用生成器,将异步任务转出去,让生成器将结果返回,既不妨碍编码逻辑的顺序性,又不需要考虑变量共享的问题。我们希望单线程做到 I/O 复用,实现并发。生成器 + 事件循环整个流程都是函数间转换,没有线程切换,同时循环阻塞发生在子生成器内,不妨碍调用者的执行。
至于多层套用时的异常处理难题,通过上述 yield from
机制不难发现,生成器能将错误一直上抛到调用者,调用者也能向下传入异常中断。
终于用生成器解决了高效率编码单线程高并发的需求,但带来的问题就是可读性变差,这种模式下对 yield
关键字必须辨别是要当作生成器处理还是协程处理。为了保证语义明确,3.5 版本后 Python 加入了 async
和 await
关键词,专门指明协程。async
标识函数,await
可以理解为 yield from
,后面接一个 Awaitable
对象(内部实现了 __await__()
方法)
注意,使用原生协程后函数体内不能再出现 yield
关键词,激活只能使用 send(None)
形式
name_lst = []
async def get_name_string(name):
return 'I am ' + name
async def name_cor(name):
name_str = await get_name_string(name)
return name_lst.append(name_str)
def main():
gen = name_cor('Daniel Meng')
try:
gen.send(None)
except StopIteration:
pass
print(name_lst)
main()