考虑用concurrent.futures来实现平行计算或并发处理
导读
编写Python程序时,我们可以利用CPU的多核心通过平行计算来提升计算任务的速度。很遗憾,Python的全局解释器(GIL
)的存在使得我们没有办法用线程
实现真正的平行计算。
为了实现平行计算,我们可以考虑用C语言扩展或者使用诸如Cython
和Numba
等开源工具迁移到C语言。但是这样做大幅增加了测试量和风险。于是我们思考一下:有没有一种更好的方式,只需使用少量的Python代码,即可有效提升执行效率,并迅速解决复杂的计算问题。
我们可以试着通过内置的concurrent.futures
模块来利用内置的multiprocessing
模块实现这种需求。这样的做法会以子进程的形式,平行运行多个解释器,从而利用多核心CPU来提升执行速度(子进程与主解释器相分离,所以它们的全局解释器锁也是相互独立的)。
我们可以通过下面的例子来看一下效果。
计算两数最大公约数
现在给出一个列表,列表里每个元素是一对数,求出每对数的最大公约数
1 | numbers = [(1963309, 2265973), (2030677, 3814172), |
没有做平行计算的版本
1 |
|
我们用map来试运行一下:
1 | import time |
下面我们用conccurrent.futures来模拟多线程和多进程
使用concurretn.futures
的ThreadPoolExecutor
1 | from concurrent.futures import ThreadPoolExecutor |
两个线程用了和上面差不多的时间,而且比上面还慢一些,说明多线程并不能平行计算,而且开线程也有耗费。
使用concurrent.futures
的ProcessPoollExecutor
1 | from concurrent.futures import ProcessPoolExecutor |
在双核电脑上运行上面程序发现比之前两个版本运行快很多。这是因为ProcessPoolExecutor
会利用multiprocessing
模块所提供的的底层机制来逐步完成下列操作:
- 把numbers列表中的每一项输入数据都传给map
- 用pickle模块对数据进行序列化,将其变成二进制形式。
- 通过本地套接字socket将序列化后的数据从主解释器所在的进程发送到子解释器所在的进程。
- 接下来在子进程中,用pickle对二进制数据进行反序列化操作,将其还原为Python对象
- 引入包含gcd函数的那个Python模块
- 各条子进程平行地针对各自的输入数据,来运行gcd函数
- 对运行结果进行序列化操作,将其变为字节
- 将这些字节通过socket复制到主进程中
- 主进程对这些字节执行反序列化操作,将其还原为Python对象。
- 最后,把每条子进程所求出的计算结果合并到一份列表中,返回给调用者
编后语
为了实现平行计算,multiprocessing
模块和ProcessPoolExecutor
类在幕后做了大量的工作。如果改用其他的语言来写,那么开发者只需一把同步锁或一项原子操作,就可以把线程之间的通信过程协调好。而在Python中,我们却必须使用开销较高的multiprocessing
模块,其开销之所以大,原因就在于主进程与子进程之间,必须进行序列化和反序列化操作,这些是导致大量开销的来源。
对于某些较为孤立,且数据利用率高的任务来说,上述方案非常适合。如果执行的运算不符合上述特征,那么multiprocessing
所产生的的开销可能并不能使程序加速。在这种情况下,可以求助multiprocessing所提供的的一些高级机制,如内存共享(shared memory
)、跨进程锁定(cross-process lock
)、队列(queue
)和代理(proxy
)等。
下载进度条显示
用concurrent.futures
的ThreadPoolExecutor
类处理对于大量I/O操作的并发任务的示例。非常值得参考的实现。
flags_common.py是一些默认参数和函数接口以及argparse。
flags_sequential.py是单线程依序下载以及进度条显示实现。
flags_threadpool.py是利用concurrent.futures的多线程操作实现。
- flags_common.py
1 | """Utilities for second set of flag examples. |
- falgs_sequential.py
1 | """Download flags of countries (with error handling). |
- flags_threadpool.py
1 | """Download flags of countries (with error handling). |