Python 中的一个线程对应 C 里的一个线程,线程间协作免不了锁机制。而 Python 被人说的最多的锁莫过于全局解释器锁(GIL, Global Interpreter Lock) ,它导致同一时刻只有一个线程在 CPU 内核上执行字节码,无法将多线程映射到多核上执行
但在某些情况下,GIL 会主动释放
import threading
test_res = []
total = 0
def add():
global total
for i in range(1000000):
total += 1
def desc():
global total
for i in range(1000000):
total -= 1
thread_one = threading.Thread(target=add)
thread_two = threading.Thread(target=desc)
thread_one.start()
thread_two.start()
thread_one.join()
thread_two.join()
test_res.append(total)
执行四次上述代码
test_res
发现结果不定。这是因为 GIL 会根据字节码长度和时间片释放 GIL,此外遇到 IO 操作时也会 GIL 也会被主动释放掉
进程是计算机资源的分配单位,线程构成进程并共享进程分得的资源,线程是操作系统任务调度的最小单位。对于 I/O 操作来讲,多进程还是多线程差别不大。
Python 中线程相关的模块是 threading
,多线程编程常见两种方式,其一是直接使用 threading.Thread
实例化线程
import time
from threading import Thread
def get_html(url):
print('Start getting html')
time.sleep(2)
print('End getting html')
def get_detail(url):
print('Start getting url')
time.sleep(4)
print('End getting url')
html_thread = Thread(target=get_html, args=('',))
detail_thread = Thread(target=get_detail, args=('',))
这里模拟爬虫定义两个方法,分别爬取 html 和页面详情信息。Thread
实例化时通过 target
指定线程要执行的任务,注意传的是方法名。运行参数以元组方式传给 args
start_time = time.time()
html_thread.setDaemon(True)
detail_thread.setDaemon(True)
html_thread.start()
detail_thread.start()
print('Cost time:{}'.format(time.time() - start_time))
通过 setDaemon
将线程设为守护线程,当主线程结束时随之结束。同时可以看到任务开始后,因为线程并行,输出内容有一部分发生了重合,想要保证次序可以使用 thread.join()
方法进行阻塞。保证线程执行完毕后主线程才继续执行
start_time = time.time()
html_join_thread = Thread(target=get_html, args=('',))
detail_join_thread = Thread(target=get_detail, args=('',))
html_join_thread.start()
detail_join_thread.start()
html_join_thread.join()
detail_join_thread.join()
print('Cost time:{}'.format(time.time() - start_time))
直接实例化 Thread
适合简单任务或临时调试用,大型项目里的多线程都是继承 Thread
封装成类
class GetHtml(Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
print('Start getting html')
time.sleep(2)
print('End getting html')
class GetDetail(Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
print('Start getting url')
time.sleep(4)
print('End getting url')
start_time = time.time()
html_thread = GetHtml('get_html')
detail_thread = GetDetail('get_detail')
html_thread.start()
detail_thread.start()
html_thread.join()
detail_thread.join()
print('Cost time:{}'.format(time.time() - start_time))
多线程间常常要协调资源,保证一致性,所以通信机制就很重要。直观的实现方法就是共享变量了,可以利用 Python 对象可变性选一个对象为各线程当联络员,可以写出如下伪代码
share_lst = []
def get_html(share_lst):
'''
解析 html 获取 <html></html> 间的内容
'''
while True:
for i in range(20):
share_lst.append('http:{id}'.format(id=i))
def get_detail(share_lst):
'''
从 <html></html> 内抽取详情信息并格式化
'''
while True:
if share_lst:
item = share_lst.pop()
print("Processing {}".format(item))
html_thread = Thread(target=get_html, args=(share_lst,))
html_thread.start()
for i in range(10):
detail_thread = Thread(target=get_detail, args=(share_lst,))
detail_thread.start()
这里依旧是获取 html 内容和其中细节的两个方法,考虑到速度差异只开启一个抓取线程,10 个分析线程,线程间通过共享列表的形式通信。
但 list.pop
方法并不是线程安全的,很容易出事故。更好的做法是用线程安全的数据结构代替,比如消息队列 queue.Queue
from queue import Queue
msg_queue = Queue(maxsize=100) # 容量为 100 的消息队列
def get_html(queue):
'''
解析 html 获取 <html></html> 间的内容
'''
while True:
for i in range(20):
queue.put('http:{id}'.format(id=i))
def get_detail(queue):
'''
从 <html></html> 内抽取详情信息并格式化
'''
while True:
if share_lst:
item = queue.get()
print("Processing {}".format(item))
Queue
是基于 deque
封装出来的,deque
本身是字节码级线程安全的。Queue
提供了很多线程安全方法,比如 get
,put
,也能选择二者是否会等待阻塞。其次,Queue.join()
阻塞方法执行后,必须在某个位置调用 Queue.task_done()
告知,然后才能解除阻塞
help(Queue)
线程协作一大问题就是同步,比如 ATM 转账,购物时加减库存,使用多线程时我们当然希望账户金额和库存余量都是正确而稳定的。但通常不会如意。
a = 0
def inc():
global a
a += 1
def dec():
global a
a -= 1
import dis
print(dis.dis(inc))
print('------------------------------------------------')
print(dis.dis(dec))
这里构造了两个简单函数,分别实现对同一变量的加一减一操作。通过字节码可以看到,两个方法在运行时指令相似度很高,区别只是 INPLACE_ADD
INPLACE_SUBTRACT
。问题就在这里,之前说过 GIL 执行时可能因为时间片耗尽或字节码执行数耗尽而主动释放。那就存在这样一种情景,inc
指令执行完第二条时间片耗尽,转向dec
,此时刚好轮到 INPLACE_SUBTRACT
执行,但没来得及将新值 -1 赋给 a
dec
时间片就耗尽转回 inc
,这时再执行INPLACE_ADD
,就有了另一个值 1 赋给 a
。
于是在下一个时间片来临时,因为执行总有先后,读取 a
的值就会出现反复震荡。
对于同步,经典解决方案是锁,在 Python 中 threading
模块提供了很多实现
from threading import Lock
a = 0
lock = Lock()
def inc():
global a
lock.acquire()
a += 1
lock.release()
def dec():
global a
lock.acquire()
a -= 1
lock.release()
每一个 Lock
对象就是一把锁,通过 acquire
方法获取锁后会将之后的字节码锁定,遇到 release
后解锁,其余线程才能再对锁定的字节码恢复访问。
但对于锁有两个长久困扰:一是锁会降低性能,上锁解锁时间开销并不小,二是死锁,即多个线程在执行过程中,因争夺资源而造成的一种互相等待的现象,如无外界干扰,它们都将无法推进下去。
对于其中一类因嵌套调用,导致重复尝试获取同一把锁而导致的死锁问题,Python 提供了 threading.Rlock
(可重入锁),同一线程内可以多次申请加锁,但要注意加锁和解锁次数必须一致。
def func_with_lock(*args):
......
Rlock.acquire()
other_func_with_lock()
Rlock.release()
......
def other_func_with_lock(*args):
......
lock.acquire()
do something
lock.release()
......
有了锁可以一定程度保证一致性和数据鲁棒,但并不能覆盖所有场景。比如有时需要若干线程按照某种顺序交替执行,因为 GIL 会因为时间片或字节码行数问题主动释放的,如果恰巧一个时间片内线程“超额”完成了任务,就打破了交替顺序。
这种情况下就需要用到稍高级点的锁机制——Condition
了。threading.Condition
是基于 threading.lock
或 threading.Rlock
封装的条件型锁,通过 wait()
,notify()
方法保证阻塞的顺序性,除非收到 notify()
传来的信号,wait()
会持续阻塞。
但也正因 Condition
工作的特点,决定了使用 Condition
的时候线程启动顺序很关键。
import threading
class YinShi(threading.Thread):
def __init__(self, name, cond):
super().__init__(name=name)
self.cond = cond
def run(self):
with self.cond:
print('{} : 窗前明月光'.format(self.name))
self.cond.notify()
self.cond.wait()
print('{} : 举头望明月'.format(self.name))
self.cond.notify()
self.cond.wait()
class ZuoDui(threading.Thread):
def __init__(self, name, cond):
super().__init__(name=name)
self.cond = cond
def run(self):
with self.cond:
self.cond.wait()
print('{} : 疑是地上霜'.format(self.name))
self.cond.notify()
self.cond.wait()
print('{} : 低头思故乡'.format(self.name))
self.cond.notify()
cond = threading.Condition()
yin_shi = YinShi('Yin Shi', cond)
zuo_dui = ZuoDui('Zuo Dui', cond)
zuo_dui.start() # 注意启动顺序,后说话的要先 wait 等候通知唤醒
yin_shi.start()
这里我们构造了一个念诗场景,两个对象依次执行。要注意的是,Condition
封装后同样是锁,所以需要先获取底层的 cond.acquire
加锁,但 Python 多实现了 __enter__
和 __exit__
方法使 Condition
成为上下文管理器,底层锁的获取可以使用 with
来实现,这之后才能使用 wait
和 notify
。当初次 wait
调用发生时底层锁解除锁定。
但还有高一层的锁,Condition
内部维护了一个双端队列,每次调用 wait()
都会创建一个锁加入队列,每次 notify()
执行时则弹出一个锁解除锁定
有一类生产场景是需要限制线程并发量的,比如数据库读写,往往是一个写线程和多个读线程,考虑到线程开销问题需要对读线程数加以限制。threading.Semaphore
就是一种用于控制访问数量的锁。
import threading
import time
class Process(threading.Thread):
def __init__(self, name, sem):
super().__init__(name=name)
self.sem = sem
def run(self):
time.sleep(2)
print('{} is processed.'.format(self.name))
self.sem.release()
class Scrapy(threading.Thread):
def __init__(self, sem):
super().__init__()
self.sem = sem
def run(self):
for i in range(15):
self.sem.acquire()
prc = Process("http://www.google.com?id={}".format(i), self.sem)
prc.start()
%run ./code/semaphore.py
time.sleep(15)
# sem = threading.Semaphore(3)
# scrapy = Scrapy(sem) # 每次 3 个线程
# scrapy.start()
这里模仿了爬虫抓取与处理过程,每次限值 3 个线程同时处理爬取信息。
Semaphore
初始化时要指定并发线程数,每次调用 Semaphore.acquire
时计数器减一,为 0 时陷入阻塞,Semaphore.release
调用时,计数器加一
每次分配任务都要自己维护线程不免效率太低,同时人们希望能获取线程执行状态信息,或是取得线程执行结果,当线程执行完毕在主线程中可以收到反馈。于是有了线程池的概念,开发者只需要获取线程池,取用其中线程执行任务,同时在主线程中可以获取线程结果,控制线程状态。
Python 中 concurrent.futures
提供了线程池服务,
from concurrent.futures import ThreadPoolExecutor
import time
def get_html(times):
time.sleep(times)
print('get html {}'.format(times))
return times
pool = ThreadPoolExecutor(max_workers=3)
task1 = pool.submit(get_html, (2))
task2 = pool.submit(get_html, (3))
print(task1.done())
print(task2.done())
初始化线程池对象是可以通过 max_workers
参数指定最多同时有几个线程工作,通过 submit
方法将任务提交给线程池运行,同时 submit
会返回一个 future 对象,其中包含了很多线程状态相关的方法。比如 future.done()
可以获知任务是否执行成功
要注意 submit
方法是立即返回值的,不会阻塞主线程,所以上述代码结果是 False,等待一会就能收到成功反馈
task1 = pool.submit(get_html, (2))
task2 = pool.submit(get_html, (3))
print(task1.done())
print(task2.done())
time.sleep(4)
print(task1.done())
print(task2.done())
future.result()
方法可以获取任务返回结果,future.cancel()
可以在任务执行前取消任务,对运行中、执行完成的任务无效
pool = ThreadPoolExecutor(max_workers=1) # 只允许线程串行执行
task1 = pool.submit(get_html, (2))
task2 = pool.submit(get_html, (3))
print(task1.done())
print(task2.cancel()) # 取消任务 2
time.sleep(3)
print(task1.result())
当然,生产环境的任务多是批量执行的,我们通常在意的也是执行成功的任务。concurrent.futures.as_completed
就是这样一个生成器,可以动态监测执行成功的任务,并返回相应的 future 对象
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def get_html(times):
time.sleep(times)
print('get html {}'.format(times))
return times
pool = ThreadPoolExecutor(max_workers=3)
tasks = [2,1,3]
tasks_rst = [pool.submit(get_html, (i)) for i in tasks]
for future in as_completed(tasks_rst):
print('task {} complete.'.format(future.result()))
time.sleep(3)
如果只是想要任务返回结果,通过线程池的 map(func_name, tasks)
方法也能做到,其会将 tasks
中的每一个元素提取出来传入 func_name
执行
from concurrent.futures import ThreadPoolExecutor
import time
def get_html(times):
time.sleep(times)
print('get html {}'.format(times))
return times
pool = ThreadPoolExecutor(max_workers=3)
tasks = [1,2,3]
for result in pool.map(get_html, tasks):
print('task {} complete.'.format(result))
time.sleep(3)
concurrent.futures.wait()
方法可以使程序陷入阻塞,当某些任务完成后才恢复,还可以传入参数控制等候规则,比如 FIRST_COMPLETED
指定任意任务完成后即解除阻塞
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED
import time
def get_html(times):
time.sleep(times)
print('get html {}'.format(times))
return times
pool = ThreadPoolExecutor(max_workers=3)
tasks = [2,1,3]
tasks_rst = [pool.submit(get_html, (i)) for i in tasks]
wait(tasks_rst, return_when=FIRST_COMPLETED)
print('Cancel wait')
time.sleep(3)
Python 多进程和多线程接口近乎一致,比如同是 threading.futures
模块下,有进程池类 threading.futures.ProcessPoolExcutor
。
因为 GIL 的存在,Python 在涉及数学、图形等耗 CPU 运算任务时多进程性能更优,而 I/O 运算多的任务则适于使用多线程
%run ./code/process_thread_compare.py
除去 threading
模块,还有一个多进程模块 multiprocessing
,其中的 multiprocessing.Process
类比于 Thread
,是 Python 提供的进程类,可以通过相似的实例化、继承的方式创建自己的进程
import multiprocessing
import time
class MyProcess(multiprocessing.Process):
def __init__(self, name):
super().__init__(name=name)
def run(self):
print('{} running'.format(self.name))
time.sleep(2)
print('{} finished'.format(self.name))
def delay(times):
time.sleep(times)
print('sleeping done')
return times
process_obj = multiprocessing.Process(target=delay, args=(2,))
my_process = MyProcess('inherit')
process_obj.start()
my_process.start()
process_obj.join()
my_process.join()
multiprocessing
下面也有进程池 Pool
,进程是资源分配的单位,所以一个计算核分配一个进程最好,可以通过 multiprocessing.cpu_count()
来获取
pool = multiprocessing.Pool(multiprocessing.cpu_count())
res = pool.apply_async(delay, (3,))
pool.close()
pool.join()
apply_async
方法类似于 submit
会收到一个 ApplyResult
返回对象,通过 get
方法获取返回值。Pool.join()
会调用进程池内所有任务的 join()
方法,等待所有任务完成恢复阻塞。但要注意前提是先调用 Pool.close()
停止接收任务提交。
和线程池一样,单就任务结果的获取进程池通过 map 也能做到,具体方法是 imap
,imap_unordered
。前者严格遵照参数传递顺序获取结果,后者则不,谁先返回结果先处理谁
for res in pool.imap(delay, [3,1,5]):
print('{}s sleep task done.'.format(res))
线程要沟通很简单,因为都是共享进程的资源。但每个进程资源都是独立的,所以进程间通信需要额外一些处理。
先说两两进程通信的情况,多线程中我们提到可以使用消息队列 deque.Queue
进行讯息沟通,进程间通信也有消息队列,只不过是 multiprocessing.Queue
from multiprocessing import Process, Queue
import time
def producer(queue):
queue.put('food')
time.sleep(2)
def consumer(queue):
time.sleep(2)
print(queue.get())
queue = Queue(10)
produce_thread = Process(target=producer, args=(queue,))
consume_thread = Process(target=consumer, args=(queue,))
produce_thread.start()
consume_thread.start()
produce_thread.join()
consume_thread.join()
进程池内的进程间通信,如果仍是使用消息队列,需要借助 multiprocessing.Manager
对象下的 Queue
from multiprocessing import Manager, Pool
import time
def producer(queue):
queue.put('food')
time.sleep(2)
def consumer(queue):
time.sleep(2)
print(queue.get())
queue = Manager().Queue(10) # 注意队列来源
pool = Pool(2)
pool.apply_async(producer, args=(queue,))
pool.apply_async(consumer, args=(queue,))
pool.close()
pool.join()
除了消息队列,两两进程间通信还可以通过管道 Pipe
收发讯息,性能强于消息队列
from multiprocessing import Manager, Pipe
import time
def producer(send_port):
send_port.send('food')
def consumer(recv_port):
print(recv_port.recv())
send_port, recv_port = Pipe()
produce_thread = Process(target=producer, args=(send_port,))
consume_thread = Process(target=consumer, args=(recv_port,))
produce_thread.start()
consume_thread.start()
produce_thread.join()
consume_thread.join()
如果一定要使用共享变量的方式可不可以呢? Manager
对象下可以创建各种用于进程间通信的常见数据结构
from multiprocessing import Manager
import time
def producer(arg_dict):
arg_dict[Producer] = 'create food'
def consumer(arg_dict):
arg_dict[Consumer] = 'eat food'
arg_dict = Manager().dict()
produce_thread = Process(target=producer, args=(arg_dict,))
consume_thread = Process(target=consumer, args=(arg_dict,))
produce_thread.start()
consume_thread.start()
produce_thread.join()
consume_thread.join()
print(arg_dict)