Python3入门到精通——多线程与多进程

作者: Daniel Meng

GitHub: LibertyDream

博客:明月轩

本系列教程采用知识共享署名-非商业性使用-相同方式共享 2.5 中国大陆许可协议

GIL

Python 中的一个线程对应 C 里的一个线程,线程间协作免不了锁机制。而 Python 被人说的最多的锁莫过于全局解释器锁(GIL, Global Interpreter Lock) ,它导致同一时刻只有一个线程在 CPU 内核上执行字节码,无法将多线程映射到多核上执行

但在某些情况下,GIL 会主动释放

In [1]:
import threading
test_res = []
In [6]:
total = 0

def add():
    global total
    for i in range(1000000):
        total += 1

def desc():
    global total
    for i in range(1000000):
        total -= 1

thread_one = threading.Thread(target=add)
thread_two = threading.Thread(target=desc)
thread_one.start()
thread_two.start()
thread_one.join()
thread_two.join()

test_res.append(total)

执行四次上述代码

In [7]:
test_res
Out[7]:
[-559567, -278363, -624265, -332884]

发现结果不定。这是因为 GIL 会根据字节码长度和时间片释放 GIL,此外遇到 IO 操作时也会 GIL 也会被主动释放掉

多线程编程

进程是计算机资源的分配单位,线程构成进程并共享进程分得的资源,线程是操作系统任务调度的最小单位。对于 I/O 操作来讲,多进程还是多线程差别不大。

Python 中线程相关的模块是 threading,多线程编程常见两种方式,其一是直接使用 threading.Thread 实例化线程

In [8]:
import time
from threading import Thread
In [9]:
def get_html(url):
    print('Start getting html')
    time.sleep(2)
    print('End  getting html')

def get_detail(url):
    print('Start getting url')
    time.sleep(4)
    print('End  getting url')

html_thread = Thread(target=get_html, args=('',))
detail_thread = Thread(target=get_detail, args=('',))

这里模拟爬虫定义两个方法,分别爬取 html 和页面详情信息。Thread 实例化时通过 target 指定线程要执行的任务,注意传的是方法名。运行参数以元组方式传给 args

In [10]:
start_time = time.time()

html_thread.setDaemon(True)
detail_thread.setDaemon(True)
html_thread.start()
detail_thread.start()

print('Cost time:{}'.format(time.time() - start_time))
Start getting html
Start getting urlCost time:0.022986173629760742

通过 setDaemon 将线程设为守护线程,当主线程结束时随之结束。同时可以看到任务开始后,因为线程并行,输出内容有一部分发生了重合,想要保证次序可以使用 thread.join() 方法进行阻塞。保证线程执行完毕后主线程才继续执行

In [11]:
start_time = time.time()

html_join_thread = Thread(target=get_html, args=('',))
detail_join_thread = Thread(target=get_detail, args=('',))
html_join_thread.start()
detail_join_thread.start()

html_join_thread.join()
detail_join_thread.join()

print('Cost time:{}'.format(time.time() - start_time))
Start getting html
Start getting url
End  getting html
End  getting url
Cost time:4.019838809967041

直接实例化 Thread 适合简单任务或临时调试用,大型项目里的多线程都是继承 Thread 封装成类

In [12]:
class GetHtml(Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        print('Start getting html')
        time.sleep(2)
        print('End  getting html')

class GetDetail(Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        print('Start getting url')
        time.sleep(4)
        print('End  getting url')
In [13]:
start_time = time.time()

html_thread = GetHtml('get_html')
detail_thread = GetDetail('get_detail')

html_thread.start()
detail_thread.start()

html_thread.join()
detail_thread.join()

print('Cost time:{}'.format(time.time() - start_time))
Start getting htmlStart getting url

End  getting html
End  getting url
Cost time:4.017937183380127

线程间通信

多线程间常常要协调资源,保证一致性,所以通信机制就很重要。直观的实现方法就是共享变量了,可以利用 Python 对象可变性选一个对象为各线程当联络员,可以写出如下伪代码

In [ ]:
share_lst = []

def get_html(share_lst):
    '''
        解析 html 获取 <html></html> 间的内容
    '''
    while True:
        for i in range(20):
            share_lst.append('http:{id}'.format(id=i))

def get_detail(share_lst):
    '''
        从 <html></html> 内抽取详情信息并格式化
    '''
    while True:
        if share_lst:
            item = share_lst.pop()
            print("Processing {}".format(item))

html_thread = Thread(target=get_html, args=(share_lst,))
html_thread.start()

for i in range(10):
    detail_thread = Thread(target=get_detail, args=(share_lst,))
    detail_thread.start()

这里依旧是获取 html 内容和其中细节的两个方法,考虑到速度差异只开启一个抓取线程,10 个分析线程,线程间通过共享列表的形式通信。

list.pop 方法并不是线程安全的,很容易出事故。更好的做法是用线程安全的数据结构代替,比如消息队列 queue.Queue

In [14]:
from queue import Queue

msg_queue = Queue(maxsize=100)  # 容量为 100 的消息队列

def get_html(queue):
    '''
        解析 html 获取 <html></html> 间的内容
    '''
    while True:
        for i in range(20):
            queue.put('http:{id}'.format(id=i))

def get_detail(queue):
    '''
        从 <html></html> 内抽取详情信息并格式化
    '''
    while True:
        if share_lst:
            item = queue.get()
            print("Processing {}".format(item))

Queue 是基于 deque 封装出来的,deque 本身是字节码级线程安全的。Queue 提供了很多线程安全方法,比如 getput,也能选择二者是否会等待阻塞。其次,Queue.join() 阻塞方法执行后,必须在某个位置调用 Queue.task_done() 告知,然后才能解除阻塞

In [15]:
help(Queue)
Help on class Queue in module queue:

class Queue(builtins.object)
 |  Queue(maxsize=0)
 |  
 |  Create a queue object with a given maximum size.
 |  
 |  If maxsize is <= 0, the queue size is infinite.
 |  
 |  Methods defined here:
 |  
 |  __init__(self, maxsize=0)
 |      Initialize self.  See help(type(self)) for accurate signature.
 |  
 |  empty(self)
 |      Return True if the queue is empty, False otherwise (not reliable!).
 |      
 |      This method is likely to be removed at some point.  Use qsize() == 0
 |      as a direct substitute, but be aware that either approach risks a race
 |      condition where a queue can grow before the result of empty() or
 |      qsize() can be used.
 |      
 |      To create code that needs to wait for all queued tasks to be
 |      completed, the preferred technique is to use the join() method.
 |  
 |  full(self)
 |      Return True if the queue is full, False otherwise (not reliable!).
 |      
 |      This method is likely to be removed at some point.  Use qsize() >= n
 |      as a direct substitute, but be aware that either approach risks a race
 |      condition where a queue can shrink before the result of full() or
 |      qsize() can be used.
 |  
 |  get(self, block=True, timeout=None)
 |      Remove and return an item from the queue.
 |      
 |      If optional args 'block' is true and 'timeout' is None (the default),
 |      block if necessary until an item is available. If 'timeout' is
 |      a non-negative number, it blocks at most 'timeout' seconds and raises
 |      the Empty exception if no item was available within that time.
 |      Otherwise ('block' is false), return an item if one is immediately
 |      available, else raise the Empty exception ('timeout' is ignored
 |      in that case).
 |  
 |  get_nowait(self)
 |      Remove and return an item from the queue without blocking.
 |      
 |      Only get an item if one is immediately available. Otherwise
 |      raise the Empty exception.
 |  
 |  join(self)
 |      Blocks until all items in the Queue have been gotten and processed.
 |      
 |      The count of unfinished tasks goes up whenever an item is added to the
 |      queue. The count goes down whenever a consumer thread calls task_done()
 |      to indicate the item was retrieved and all work on it is complete.
 |      
 |      When the count of unfinished tasks drops to zero, join() unblocks.
 |  
 |  put(self, item, block=True, timeout=None)
 |      Put an item into the queue.
 |      
 |      If optional args 'block' is true and 'timeout' is None (the default),
 |      block if necessary until a free slot is available. If 'timeout' is
 |      a non-negative number, it blocks at most 'timeout' seconds and raises
 |      the Full exception if no free slot was available within that time.
 |      Otherwise ('block' is false), put an item on the queue if a free slot
 |      is immediately available, else raise the Full exception ('timeout'
 |      is ignored in that case).
 |  
 |  put_nowait(self, item)
 |      Put an item into the queue without blocking.
 |      
 |      Only enqueue the item if a free slot is immediately available.
 |      Otherwise raise the Full exception.
 |  
 |  qsize(self)
 |      Return the approximate size of the queue (not reliable!).
 |  
 |  task_done(self)
 |      Indicate that a formerly enqueued task is complete.
 |      
 |      Used by Queue consumer threads.  For each get() used to fetch a task,
 |      a subsequent call to task_done() tells the queue that the processing
 |      on the task is complete.
 |      
 |      If a join() is currently blocking, it will resume when all items
 |      have been processed (meaning that a task_done() call was received
 |      for every item that had been put() into the queue).
 |      
 |      Raises a ValueError if called more times than there were items
 |      placed in the queue.
 |  
 |  ----------------------------------------------------------------------
 |  Data descriptors defined here:
 |  
 |  __dict__
 |      dictionary for instance variables (if defined)
 |  
 |  __weakref__
 |      list of weak references to the object (if defined)

线程同步

线程协作一大问题就是同步,比如 ATM 转账,购物时加减库存,使用多线程时我们当然希望账户金额和库存余量都是正确而稳定的。但通常不会如意。

In [16]:
a = 0

def inc():
    global a
    a += 1

def dec():
    global a
    a -= 1
In [17]:
import dis

print(dis.dis(inc))
print('------------------------------------------------')
print(dis.dis(dec))
5           0 LOAD_GLOBAL              0 (a)
              2 LOAD_CONST               1 (1)
              4 INPLACE_ADD
              6 STORE_GLOBAL             0 (a)
              8 LOAD_CONST               0 (None)
             10 RETURN_VALUE
None
------------------------------------------------
  9           0 LOAD_GLOBAL              0 (a)
              2 LOAD_CONST               1 (1)
              4 INPLACE_SUBTRACT
              6 STORE_GLOBAL             0 (a)
              8 LOAD_CONST               0 (None)
             10 RETURN_VALUE
None

这里构造了两个简单函数,分别实现对同一变量的加一减一操作。通过字节码可以看到,两个方法在运行时指令相似度很高,区别只是 INPLACE_ADD INPLACE_SUBTRACT。问题就在这里,之前说过 GIL 执行时可能因为时间片耗尽或字节码执行数耗尽而主动释放。那就存在这样一种情景,inc 指令执行完第二条时间片耗尽,转向dec,此时刚好轮到 INPLACE_SUBTRACT 执行,但没来得及将新值 -1 赋给 a dec 时间片就耗尽转回 inc,这时再执行INPLACE_ADD,就有了另一个值 1 赋给 a

于是在下一个时间片来临时,因为执行总有先后,读取 a 的值就会出现反复震荡。

对于同步,经典解决方案是锁,在 Python 中 threading 模块提供了很多实现

In [18]:
from threading import Lock

a = 0
lock = Lock()
def inc():
    global a
    lock.acquire()
    a += 1
    lock.release()

def dec():
    global a
    lock.acquire()
    a -= 1
    lock.release()

每一个 Lock 对象就是一把锁,通过 acquire 方法获取锁后会将之后的字节码锁定,遇到 release 后解锁,其余线程才能再对锁定的字节码恢复访问。

但对于锁有两个长久困扰:一是锁会降低性能,上锁解锁时间开销并不小,二是死锁,即多个线程在执行过程中,因争夺资源而造成的一种互相等待的现象,如无外界干扰,它们都将无法推进下去。

对于其中一类因嵌套调用,导致重复尝试获取同一把锁而导致的死锁问题,Python 提供了 threading.Rlock (可重入锁),同一线程内可以多次申请加锁,但要注意加锁和解锁次数必须一致。

In [ ]:
def func_with_lock(*args):
    ......
    Rlock.acquire()

    other_func_with_lock()

    Rlock.release()
    ......

def other_func_with_lock(*args):
    ......
    lock.acquire()

    do something

    lock.release()
    ......

Condition

有了锁可以一定程度保证一致性和数据鲁棒,但并不能覆盖所有场景。比如有时需要若干线程按照某种顺序交替执行,因为 GIL 会因为时间片或字节码行数问题主动释放的,如果恰巧一个时间片内线程“超额”完成了任务,就打破了交替顺序。

这种情况下就需要用到稍高级点的锁机制——Condition了。threading.Condition 是基于 threading.lockthreading.Rlock 封装的条件型锁,通过 wait()notify() 方法保证阻塞的顺序性,除非收到 notify() 传来的信号,wait() 会持续阻塞。

但也正因 Condition 工作的特点,决定了使用 Condition 的时候线程启动顺序很关键。

In [19]:
import threading

class YinShi(threading.Thread):
    def __init__(self, name, cond):
        super().__init__(name=name)
        self.cond = cond
    
    def run(self):
        with self.cond:
            print('{} : 窗前明月光'.format(self.name))
            self.cond.notify()
            self.cond.wait()

            print('{} : 举头望明月'.format(self.name))
            self.cond.notify()
            self.cond.wait()


class ZuoDui(threading.Thread):
    def __init__(self, name, cond):
        super().__init__(name=name)
        self.cond = cond
    
    def run(self):
        with self.cond:
            self.cond.wait()
            
            print('{} : 疑是地上霜'.format(self.name))
            self.cond.notify()
            self.cond.wait()

            print('{} : 低头思故乡'.format(self.name))
            self.cond.notify()
In [20]:
cond = threading.Condition()
yin_shi = YinShi('Yin Shi', cond)
zuo_dui = ZuoDui('Zuo Dui', cond)

zuo_dui.start()  # 注意启动顺序,后说话的要先 wait 等候通知唤醒
yin_shi.start()
Yin Shi : 窗前明月光
Zuo Dui : 疑是地上霜
Yin Shi : 举头望明月
Zuo Dui : 低头思故乡

这里我们构造了一个念诗场景,两个对象依次执行。要注意的是,Condition 封装后同样是锁,所以需要先获取底层的 cond.acquire 加锁,但 Python 多实现了 __enter____exit__ 方法使 Condition 成为上下文管理器,底层锁的获取可以使用 with 来实现,这之后才能使用 waitnotify。当初次 wait 调用发生时底层锁解除锁定。

但还有高一层的锁,Condition 内部维护了一个双端队列,每次调用 wait() 都会创建一个锁加入队列,每次 notify() 执行时则弹出一个锁解除锁定

Semaphore

有一类生产场景是需要限制线程并发量的,比如数据库读写,往往是一个写线程和多个读线程,考虑到线程开销问题需要对读线程数加以限制。threading.Semaphore 就是一种用于控制访问数量的锁。

In [21]:
import threading
import time

class Process(threading.Thread):
    def __init__(self, name, sem):
        super().__init__(name=name)
        self.sem = sem

    def run(self):
        time.sleep(2)
        print('{} is processed.'.format(self.name))
        self.sem.release()

class Scrapy(threading.Thread):
    def __init__(self, sem):
        super().__init__()
        self.sem = sem

    def run(self):
        for i in range(15):
            self.sem.acquire()
            prc = Process("http://www.google.com?id={}".format(i), self.sem)
            prc.start()
In [24]:
%run ./code/semaphore.py
time.sleep(15)
# sem = threading.Semaphore(3)
# scrapy = Scrapy(sem)  # 每次 3 个线程
# scrapy.start()
http://www.google.com?id=0 is processed.http://www.google.com?id=2 is processed.
http://www.google.com?id=1 is processed.

http://www.google.com?id=4 is processed.http://www.google.com?id=5 is processed.
http://www.google.com?id=3 is processed.

http://www.google.com?id=6 is processed.http://www.google.com?id=7 is processed.
http://www.google.com?id=8 is processed.

http://www.google.com?id=9 is processed.http://www.google.com?id=10 is processed.
http://www.google.com?id=11 is processed.

http://www.google.com?id=12 is processed.http://www.google.com?id=13 is processed.

http://www.google.com?id=14 is processed.

这里模仿了爬虫抓取与处理过程,每次限值 3 个线程同时处理爬取信息。

Semaphore 初始化时要指定并发线程数,每次调用 Semaphore.acquire 时计数器减一,为 0 时陷入阻塞,Semaphore.release 调用时,计数器加一

线程池

每次分配任务都要自己维护线程不免效率太低,同时人们希望能获取线程执行状态信息,或是取得线程执行结果,当线程执行完毕在主线程中可以收到反馈。于是有了线程池的概念,开发者只需要获取线程池,取用其中线程执行任务,同时在主线程中可以获取线程结果,控制线程状态。

Python 中 concurrent.futures 提供了线程池服务,

In [25]:
from concurrent.futures import ThreadPoolExecutor
import time

def get_html(times):
    time.sleep(times)
    print('get html {}'.format(times))
    return times

pool = ThreadPoolExecutor(max_workers=3)
task1 = pool.submit(get_html, (2))
task2 = pool.submit(get_html, (3))

print(task1.done())
print(task2.done())
False
False

初始化线程池对象是可以通过 max_workers 参数指定最多同时有几个线程工作,通过 submit 方法将任务提交给线程池运行,同时 submit 会返回一个 future 对象,其中包含了很多线程状态相关的方法。比如 future.done() 可以获知任务是否执行成功

要注意 submit 方法是立即返回值的,不会阻塞主线程,所以上述代码结果是 False,等待一会就能收到成功反馈

In [28]:
task1 = pool.submit(get_html, (2))
task2 = pool.submit(get_html, (3))

print(task1.done())
print(task2.done())
time.sleep(4)
print(task1.done())
print(task2.done())
False
False
get html 2
get html 3
True
True

future.result() 方法可以获取任务返回结果,future.cancel() 可以在任务执行前取消任务,对运行中、执行完成的任务无效

In [29]:
pool = ThreadPoolExecutor(max_workers=1)  # 只允许线程串行执行
task1 = pool.submit(get_html, (2))
task2 = pool.submit(get_html, (3))

print(task1.done())
print(task2.cancel())  # 取消任务 2
time.sleep(3)
print(task1.result())
False
True
get html 2
2

当然,生产环境的任务多是批量执行的,我们通常在意的也是执行成功的任务。concurrent.futures.as_completed 就是这样一个生成器,可以动态监测执行成功的任务,并返回相应的 future 对象

In [30]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def get_html(times):
    time.sleep(times)
    print('get html {}'.format(times))
    return times

pool = ThreadPoolExecutor(max_workers=3)
tasks = [2,1,3]
tasks_rst = [pool.submit(get_html, (i)) for i in tasks]

for future in as_completed(tasks_rst):
    print('task {} complete.'.format(future.result()))

time.sleep(3)
get html 1
task 1 complete.
get html 2
task 2 complete.
get html 3
task 3 complete.

如果只是想要任务返回结果,通过线程池的 map(func_name, tasks) 方法也能做到,其会将 tasks 中的每一个元素提取出来传入 func_name 执行

In [31]:
from concurrent.futures import ThreadPoolExecutor
import time

def get_html(times):
    time.sleep(times)
    print('get html {}'.format(times))
    return times

pool = ThreadPoolExecutor(max_workers=3)
tasks = [1,2,3]

for result in pool.map(get_html, tasks):
    print('task {} complete.'.format(result))
    
time.sleep(3)
get html 1
task 1 complete.
get html 2
task 2 complete.
get html 3
task 3 complete.

concurrent.futures.wait() 方法可以使程序陷入阻塞,当某些任务完成后才恢复,还可以传入参数控制等候规则,比如 FIRST_COMPLETED 指定任意任务完成后即解除阻塞

In [32]:
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED
import time

def get_html(times):
    time.sleep(times)
    print('get html {}'.format(times))
    return times

pool = ThreadPoolExecutor(max_workers=3)
tasks = [2,1,3]
tasks_rst = [pool.submit(get_html, (i)) for i in tasks]

wait(tasks_rst, return_when=FIRST_COMPLETED)
print('Cancel wait')

time.sleep(3)
get html 1
Cancel wait
get html 2
get html 3

多进程

Python 多进程和多线程接口近乎一致,比如同是 threading.futures 模块下,有进程池类 threading.futures.ProcessPoolExcutor

因为 GIL 的存在,Python 在涉及数学、图形等耗 CPU 运算任务时多进程性能更优,而 I/O 运算多的任务则适于使用多线程

In [33]:
%run ./code/process_thread_compare.py
ProcessPoolExecutor begin compute Fibonacci
[6765, 10946, 17711, 28657, 46368, 75025, 121393, 196418, 317811, 514229, 832040, 1346269, 2178309, 3524578, 5702887]
ProcessPoolExecutor time cost:6.444280624389648
ThreadPoolExecutor begin compute Fibonacci
[17711, 10946, 6765, 28657, 46368, 75025, 121393, 196418, 317811, 514229, 832040, 1346269, 2178309, 3524578, 5702887]
ThreadPoolExecutor time cost:8.758058309555054
--------------------
ProcessPoolExecutor begin random sleep test
ProcessPoolExecutor time cost:8.164332628250122
ThreadPoolExecutor begin random sleep test
ThreadPoolExecutor time cost:8.002155065536499

除去 threading 模块,还有一个多进程模块 multiprocessing,其中的 multiprocessing.Process 类比于 Thread,是 Python 提供的进程类,可以通过相似的实例化、继承的方式创建自己的进程

In [35]:
import multiprocessing
import time
class MyProcess(multiprocessing.Process):
    def __init__(self, name):
        super().__init__(name=name)
    
    def run(self):
        print('{} running'.format(self.name))
        time.sleep(2)
        print('{} finished'.format(self.name))

def delay(times):
    time.sleep(times)
    print('sleeping done')
    return times

process_obj = multiprocessing.Process(target=delay, args=(2,))
my_process = MyProcess('inherit')

process_obj.start()
my_process.start()

process_obj.join()
my_process.join()

multiprocessing 下面也有进程池 Pool,进程是资源分配的单位,所以一个计算核分配一个进程最好,可以通过 multiprocessing.cpu_count() 来获取

In [36]:
pool = multiprocessing.Pool(multiprocessing.cpu_count())
res = pool.apply_async(delay, (3,))

pool.close()
pool.join()

apply_async 方法类似于 submit 会收到一个 ApplyResult 返回对象,通过 get 方法获取返回值。Pool.join() 会调用进程池内所有任务的 join() 方法,等待所有任务完成恢复阻塞。但要注意前提是先调用 Pool.close() 停止接收任务提交。

和线程池一样,单就任务结果的获取进程池通过 map 也能做到,具体方法是 imap,imap_unordered。前者严格遵照参数传递顺序获取结果,后者则不,谁先返回结果先处理谁

In [ ]:
for res in pool.imap(delay, [3,1,5]):
    print('{}s sleep task done.'.format(res))

进程间通信

线程要沟通很简单,因为都是共享进程的资源。但每个进程资源都是独立的,所以进程间通信需要额外一些处理。

先说两两进程通信的情况,多线程中我们提到可以使用消息队列 deque.Queue 进行讯息沟通,进程间通信也有消息队列,只不过是 multiprocessing.Queue

In [2]:
from multiprocessing import Process, Queue
import time

def producer(queue):
    queue.put('food')
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    print(queue.get())

queue = Queue(10)
produce_thread = Process(target=producer, args=(queue,))
consume_thread = Process(target=consumer, args=(queue,))

produce_thread.start()
consume_thread.start()

produce_thread.join()
consume_thread.join()

进程池内的进程间通信,如果仍是使用消息队列,需要借助 multiprocessing.Manager 对象下的 Queue

In [ ]:
from multiprocessing import Manager, Pool
import time

def producer(queue):
    queue.put('food')
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    print(queue.get())

queue = Manager().Queue(10)  # 注意队列来源
pool = Pool(2)

pool.apply_async(producer, args=(queue,))
pool.apply_async(consumer, args=(queue,))

pool.close()
pool.join()

除了消息队列,两两进程间通信还可以通过管道 Pipe 收发讯息,性能强于消息队列

In [ ]:
from multiprocessing import Manager, Pipe
import time

def producer(send_port):
    send_port.send('food')

def consumer(recv_port):
    print(recv_port.recv())

send_port, recv_port = Pipe()
produce_thread = Process(target=producer, args=(send_port,))
consume_thread = Process(target=consumer, args=(recv_port,))

produce_thread.start()
consume_thread.start()

produce_thread.join()
consume_thread.join()

如果一定要使用共享变量的方式可不可以呢? Manager 对象下可以创建各种用于进程间通信的常见数据结构

In [ ]:
from multiprocessing import Manager
import time

def producer(arg_dict):
    arg_dict[Producer] = 'create food'

def consumer(arg_dict):
    arg_dict[Consumer] = 'eat food'

arg_dict = Manager().dict()
produce_thread = Process(target=producer, args=(arg_dict,))
consume_thread = Process(target=consumer, args=(arg_dict,))

produce_thread.start()
consume_thread.start()

produce_thread.join()
consume_thread.join()
print(arg_dict)