关注公众号【算法码上来】,每日算法干货马上就来!
你有没有遇到过这样的问题:在网络上找到很多部你迫不及待想看的小电影,却发现下载速度慢得令人抓狂?那么,你可能需要知道一些关于CPU多线程和多进程的知识,让我们一起揭开CPU的神秘面纱,看看它是如何让你的小电影下载飞快的吧!
基本概念
首先回顾两个操作系统课学过的基本概念,“并行”和“并发”。并行指的是同一时刻有多个任务在同时执行;并发指的是在一段时间内有多个任务在执行,但是在同一时刻只能有一个任务在执行。所以并行是真的在同时干活,而并发实际上是在多个任务之间频繁切换,只是看起来像而已。举个通俗的例子,你一边听歌一边写代码,那你就是在并行干了两件事;但你一边打游戏写代码,那你就是在并发,只能在游戏死亡的时候切屏写两行代码,但看起来你同时干了两件事。
多线程和多进程都是用来同时处理任务的,但还是有很大不同,比如在python中:
- 多线程是并发的(由于python的全局解释器锁GIL的存在),所以同一时刻只会有一个线程在干活(计算),其他线程只能摸鱼。当一个线程干活累了,开始等待补给送到(比如下载数据),操作系统就会切换到其他线程运行,不过同时也可能调用其它CPU核心。因此多线程更适合I/O密集型任务,比如同时下载很多部小电影,这样所有线程都可以等着,无需CPU核心干活。而如果用到计算密集型任务上,就会出现一个线程一直在干活,其他线程只能等它干完活才能继续干,这样就等同于串行了。此外多个线程共享同一块内存空间,因此数据是共享的。
- 多进程是并行的,每个进程运行在独立的CPU核心上,因此可以同一时刻同时干活。此外多个进程的内存空间完全独立,互不影响,创建多进程需要复制出很多块相同的内存空间,因此耗时耗资源。多进程适合计算密集型任务,比如并行给众多小电影去除马赛克,因为利用多个CPU核心同时干活,效率更高。但如果用多进程来进行I/O密集型任务,那么大部分CPU核心都没活干,都在等待下载数据,太浪费资源了。
使用方法和速度测试
python提供了不少的多线程和多进程API,非常好用。
基本思想就是进程池和线程池这两个概念,你可以把它理解成一个队列,队列长度为进程数或者线程数。然后不断往队列里提交任务,当队列满了后,其他任务就等待,直到队列中有任务执行结束,就随机再入队一个任务。
具体的用法和作用我都写在测试代码对应位置的注释里了,执行时间也加上了,很详细!代码也放下面链接了:
https://github.com/godweiyang/cpu_parallel
不想看的我直接总结一下,大多数时候:
- 计算密集型情况下,用多进程的
map_async
一次性传入所有任务列表。 - I/O密集型情况下,CPU够多的话多进程
map_async
和多线程map
都行,CPU很少的话用多线程map
,都是一次性传入所有任务列表。
注意多进程不要定义很占内存的变量,因为每个子进程都会复制所有内存空间,可能会OOM,并且很耗时。
计算密集型任务
测试的方法是计算1到n求和,一共同时计算5个不同的n,n比较大。显然这是一个计算密集型任务,非常吃CPU性能。
import time
import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor
'''计算1+2+...+n
'''
def func(n):
s = 0
for i in range(n+1):
s += i
print(f"sum({n})={s}")
return n, s
'''统计函数运行时间
'''
def get_duration(func_name, *args, **kwargs):
print(f"{func_name} >>>>>>>>>>>>>>>>>>>>>")
print(f"args: {args}")
print(f"kwargs: {kwargs}")
st = time.time()
globals()[func_name](*args, **kwargs)
print(f"TIME: {time.time() - st}\n")
'''单进程
运行结果:
sum(100000000)=4999999950000000
sum(100000001)=5000000050000000
sum(100000002)=5000000150000001
sum(100000003)=5000000250000003
sum(100000004)=5000000350000006
results(100000000)=4999999950000000
results(100000001)=5000000050000000
results(100000002)=5000000150000001
results(100000003)=5000000250000003
results(100000004)=5000000350000006
TIME: 19.151068449020386
可以看出计算sum和最后获取到的result都是按照输入顺序串行的。
'''
def single_process(nums):
results = []
for num in nums:
result = func(num)
results.append(result)
for r in results:
print(f"results({r[0]})={r[1]}")
'''多进程apply方法
因为5个任务要按顺序提交,apply方法是阻塞的,
每个任务都得等上一个任务结束后才能继续提交并执行,
所以同一时刻只有一个进程在执行任务,等同于单进程串行。
运行结果:
sum(100000000)=4999999950000000
sum(100000001)=5000000050000000
sum(100000002)=5000000150000001
sum(100000003)=5000000250000003
sum(100000004)=5000000350000006
results(100000000)=4999999950000000
results(100000001)=5000000050000000
results(100000002)=5000000150000001
results(100000003)=5000000250000003
results(100000004)=5000000350000006
TIME: 18.436408758163452
可以看出跟单进程串行执行几乎没有任何区别。
'''
def multi_process_apply(parallel, nums):
results = []
with mp.Pool(processes=parallel) as pool:
for num in nums:
result = pool.apply(func, (num,)) # 同步,阻塞,直接返回计算结果
results.append(result)
for r in results:
print(f"results({r[0]})={r[1]}")
'''多进程apply_async方法
5个任务按顺序提交,但是会立即返回ApplyResult对象,是非阻塞的,
这样一下子就提交了5个任务,进程池的队列大小是2,所以同时有2个任务在执行,
最后通过ApplyResult.get()方法来获取对应任务的结果,这个方法会阻塞,
直到获取到结果才会继续获取下一个的结果。
运行结果(block=False):
sum(100000001)=5000000050000000
sum(100000000)=4999999950000000
results(100000000)=4999999950000000
results(100000001)=5000000050000000
sum(100000002)=5000000150000001
results(100000002)=5000000150000001
sum(100000003)=5000000250000003
results(100000003)=5000000250000003
sum(100000004)=5000000350000006
results(100000004)=5000000350000006
TIME: 11.417532920837402
可以看出,如果提交完5个任务之后没有进行block阻塞,
那么一部分进程执行完sum计算之后,ApplyResult.get()会立刻获取到结果,然后输出result。
并且sum的计算顺序也是无序的,因为5个任务是非阻塞的,一股脑全提交到进程池里了,没有执行的先后顺序。
但是result的顺序还是跟输入顺序一样的,因为是用阻塞方法ApplyResult.get()一个个获取的,
这样即使第一行sum(100000001)已经计算完了,也没有立刻输出result,
而是等sum(100000000)计算完,才按顺序输出两者的result。
时间上缩短了一半左右。
运行结果(block=True):
sum(100000000)=4999999950000000
sum(100000001)=5000000050000000
sum(100000002)=5000000150000001
sum(100000003)=5000000250000003
sum(100000004)=5000000350000006
results(100000000)=4999999950000000
results(100000001)=5000000050000000
results(100000002)=5000000150000001
results(100000003)=5000000250000003
results(100000004)=5000000350000006
TIME: 11.41776990890503
如果提交完5个任务之后进行了block阻塞,那么就会等这5个任务都执行完再输出result,
这样的话计算sum的顺序依然是无序的,只是上面恰好按顺序执行了,
输出result的话依然是有序的,并且不需要等待了,因为全都已经执行完了。
时间上缩短了一半左右。
'''
def multi_process_apply_async(parallel, nums, block=False):
apply_results = []
with mp.Pool(processes=parallel) as pool:
for num in nums:
apply_result = pool.apply_async(func, (num,)) # 异步,不阻塞,返回ApplyResult对象
apply_results.append(apply_result)
if block: # 阻塞,等待所有进程结束
pool.close()
pool.join()
for apply_result in apply_results:
r = apply_result.get() # 阻塞,按输入顺序返回计算结果
print(f"results({r[0]})={r[1]}")
'''多进程map方法
map方法可以一次性提交一个可迭代的对象(例如这里的列表),
然后方法内部会迭代这个对象,将元素依次送到func中执行,注意是无序执行的。
map方法是阻塞的,也就是说所有任务全部执行完才会返回results,返回顺序也是有序的。
如果一次性提交了所有任务构成的列表,那么map和apply_async几乎没有任何区别。
如果手动对任务进行了分块,一块一块提交到map里,那么可能会更慢,
因为同一块中的任务执行时间会有差异,会出现进程等待。
运行结果:
sum(100000000)=4999999950000000
sum(100000001)=5000000050000000
sum(100000002)=5000000150000001
sum(100000003)=5000000250000003
sum(100000004)=5000000350000006
results(100000000)=4999999950000000
results(100000001)=5000000050000000
results(100000002)=5000000150000001
results(100000003)=5000000250000003
results(100000004)=5000000350000006
TIME: 11.016913414001465
这里sum计算也是碰巧有序了,实际上是无序随机执行的,results一定是有序的。
时间上缩短了一半左右。
'''
def multi_process_map(parallel, nums):
with mp.Pool(processes=parallel) as pool:
results = pool.map(func, nums) # 同步,阻塞,按输入顺序返回所有进程的计算结果
for r in results:
print(f"results({r[0]})={r[1]}")
'''多进程map_async方法
和map的区别就是非阻塞的,传入一个可迭代对象之后会立即返回MapResult对象,
之后需要用MapResult.get()方法来阻塞获取计算结果,
直到这个可迭代对象的任务全部执行完,返回结果列表,才会继续获取下一个可迭代对象的结果。
这个过程其实和apply_async基本一样,只是把单个元素换成了可迭代对象。
运行结果(block=False):
sum(100000000)=4999999950000000
sum(100000001)=5000000050000000
sum(100000002)=5000000150000001
sum(100000003)=5000000250000003
sum(100000004)=5000000350000006
results(100000000)=4999999950000000
results(100000001)=5000000050000000
results(100000002)=5000000150000001
results(100000003)=5000000250000003
results(100000004)=5000000350000006
TIME: 11.517432928085327
如果不开block阻塞的话,所有的可迭代对象中的任务会一股脑全部新建起来,然后无序执行,
最后按照输入顺序获取results,只不过下面例子中只传入了一个列表,因此和上面的map没有任何区别。
时间上缩短了一半左右。
运行结果(block=True):
sum(100000000)=4999999950000000
sum(100000001)=5000000050000000
sum(100000002)=5000000150000001
sum(100000003)=5000000250000003
sum(100000004)=5000000350000006
results(100000000)=4999999950000000
results(100000001)=5000000050000000
results(100000002)=5000000150000001
results(100000003)=5000000250000003
results(100000004)=5000000350000006
TIME: 11.51731562614441
因为只传入了一个列表,所以MapResult.get()和block没有区别了。
时间上缩短了一半左右。
'''
def multi_process_map_async(parallel, nums, block=False):
with mp.Pool(processes=parallel) as pool:
map_results = pool.map_async(func, nums) # 异步,不阻塞,返回MapResult对象
if block: # 阻塞,等待所有进程结束
pool.close()
pool.join()
map_results = map_results.get() # 阻塞,按输入顺序返回计算结果
for r in map_results:
print(f"results({r[0]})={r[1]}")
'''多进程imap方法
和map的区别就是,imap是非阻塞的,并且返回的是一个迭代器,
不需要通过get方法一次性获取所有的计算结果,可以通过迭代器不断拿到已经执行完的进程结果,
但是注意迭代器是按照输入顺序迭代的,因此靠前的任务还没执行完的话,是不会先输出后面的任务结果的。
运行结果(block=False):
sum(100000001)=5000000050000000
sum(100000000)=4999999950000000
results(100000000)=4999999950000000
results(100000001)=5000000050000000
sum(100000002)=5000000150000001
results(100000002)=5000000150000001
sum(100000003)=5000000250000003
results(100000003)=5000000250000003
sum(100000004)=5000000350000006
results(100000004)=5000000350000006
TIME: 11.41724157333374
可以看出输出结果和apply_async的结果很像,都是执行完一部分就输出一部分,并且输出是有序的。
时间上缩短了一半左右。
运行结果(block=True):
sum(100000000)=4999999950000000
sum(100000001)=5000000050000000
sum(100000002)=5000000150000001
sum(100000003)=5000000250000003
sum(100000004)=5000000350000006
results(100000000)=4999999950000000
results(100000001)=5000000050000000
results(100000002)=5000000150000001
results(100000003)=5000000250000003
results(100000004)=5000000350000006
TIME: 11.418148040771484
如果开启了block阻塞的话,那所有进程执行完才会有序输出结果。
时间上缩短了一半左右。
'''
def multi_process_imap(parallel, nums, block=False):
with mp.Pool(processes=parallel) as pool:
result_iter = pool.imap(func, nums) # 异步,不阻塞,返回迭代器
if block: # 阻塞,等待所有进程结束
pool.close()
pool.join()
for r in result_iter: # 阻塞,按输入顺序返回计算结果
print(f"results({r[0]})={r[1]}")
'''多进程imap_unordered方法
和imap区别就是迭代器输出结果的时候,先执行完的进程先输出结果,不一定要有序。
运行结果(block=False):
sum(100000000)=4999999950000000
results(100000000)=4999999950000000
sum(100000001)=5000000050000000
results(100000001)=5000000050000000
sum(100000002)=5000000150000001
results(100000002)=5000000150000001
sum(100000003)=5000000250000003
results(100000003)=5000000250000003
sum(100000004)=5000000350000006
results(100000004)=5000000350000006
TIME: 11.419070482254028
跑的结果比较巧,正好有序了,其实sum和results可能是无序的。
时间上缩短了一半左右。
运行结果(block=True):
sum(100000000)=4999999950000000
sum(100000001)=5000000050000000
sum(100000002)=5000000150000001
sum(100000003)=5000000250000003
sum(100000004)=5000000350000006
results(100000000)=4999999950000000
results(100000001)=5000000050000000
results(100000002)=5000000150000001
results(100000003)=5000000250000003
results(100000004)=5000000350000006
TIME: 11.718666076660156
跑的结果比较巧,正好有序了,其实sum和results可能是无序的。
时间上缩短了一半左右。
'''
def multi_process_imap_unordered(parallel, nums, block=False):
with mp.Pool(processes=parallel) as pool:
result_iter = pool.imap_unordered(func, nums) # 异步,不阻塞,返回迭代器
if block: # 阻塞,等待所有进程结束
pool.close()
pool.join()
for r in result_iter: # 阻塞,按进程结束顺序返回计算结果
print(f"results({r[0]})={r[1]}")
'''多线程submit方法
submit方法是非阻塞的,会立刻返回Future对象,然后通过Future.result()获取计算结果,
和多进程的apply_async很像,但是因为GIL的存在,同一时刻只会计算一个sum,所以等同于串行计算。
运行结果:
sum(100000001)=5000000050000000
sum(100000000)=4999999950000000
results(100000000)=4999999950000000
results(100000001)=5000000050000000
sum(100000002)=5000000150000001
sum(100000003)=5000000250000003
results(100000002)=5000000150000001
results(100000003)=5000000250000003
sum(100000004)=5000000350000006
results(100000004)=5000000350000006
TIME: 19.167519569396973
可以看出sum计算是无序的,但是results是有序的,并且时间几乎等同于串行。
'''
def multi_thread_submit(parallel, nums):
futures = []
with ThreadPoolExecutor(max_workers=parallel) as pool:
for num in nums:
future = pool.submit(func, num) # 异步,不阻塞,返回Future对象
futures.append(future)
for future in futures:
r = future.result() # 阻塞,按输入顺序返回计算结果
print(f"results({r[0]})={r[1]}")
'''多线程map方法
和多进程的imap很像,也是输入一个可迭代对象,立刻返回一个迭代器,
并且迭代器的输出也是有序的,阻塞的,如果执行顺序是乱的,那么输出就要等待了。
运行结果:
sum(100000000)=4999999950000000
sum(100000001)=5000000050000000
results(100000000)=4999999950000000
results(100000001)=5000000050000000
sum(100000002)=5000000150000001
sum(100000003)=5000000250000003
results(100000002)=5000000150000001
results(100000003)=5000000250000003
sum(100000004)=5000000350000006
results(100000004)=5000000350000006
TIME: 19.716532707214355
这里也碰巧sum计算是有序的,实际上是无序的,但是results是有序的,并且时间几乎等同于串行。
'''
def multi_thread_map(parallel, nums):
with ThreadPoolExecutor(max_workers=parallel) as pool:
result_iter = pool.map(func, nums) # 异步,不阻塞,返回迭代器
for r in result_iter: # 阻塞,按输入顺序返回计算结果
print(f"results({r[0]})={r[1]}")
if __name__ == "__main__":
start, length = 10**8, 5
nums = range(start, start+length)
parallel = 2
get_duration("single_process", nums)
get_duration("multi_process_apply", parallel, nums)
get_duration("multi_process_apply_async", parallel, nums, block=False)
get_duration("multi_process_apply_async", parallel, nums, block=True)
get_duration("multi_process_map", parallel, nums)
get_duration("multi_process_map_async", parallel, nums, block=False)
get_duration("multi_process_map_async", parallel, nums, block=True)
get_duration("multi_process_imap", parallel, nums, block=False)
get_duration("multi_process_imap", parallel, nums, block=True)
get_duration("multi_process_imap_unordered", parallel, nums, block=False)
get_duration("multi_process_imap_unordered", parallel, nums, block=True)
get_duration("multi_thread_submit", parallel, nums)
get_duration("multi_thread_map", parallel, nums)
I/O密集型任务
要想模拟I/O密集型任务很简单,不一定非要去下你们喜欢看的小电影,修改一下func
函数,用time.sleep(3)
替代sum
计算,阻塞3秒就行了。这期间CPU会没事干,就跟下载小电影一样的效果。
因为是5个任务,每个阻塞3秒,所以串行情况下理论执行时间是15秒,那么我们直接看一下汇总结果吧。
可以看出单进程15秒,多进程除了apply
都是9秒,这都和计算密集型任务结论完全一致。
唯一不同的是,多线程两个方法全都变的和多进程一样快了!这就验证了多线程在I/O密集型任务上和多进程没啥区别。
single_process >>>>>>>>>>>>>>>>>>>>>
args: (range(100000000, 100000005),)
kwargs: {}
TIME: 15.00304365158081
multi_process_apply >>>>>>>>>>>>>>>>>>>>>
args: (2, range(100000000, 100000005))
kwargs: {}
TIME: 15.082982540130615
multi_process_apply_async >>>>>>>>>>>>>>>>>>>>>
args: (2, range(100000000, 100000005))
kwargs: {'block': False}
TIME: 9.077685832977295
multi_process_apply_async >>>>>>>>>>>>>>>>>>>>>
args: (2, range(100000000, 100000005))
kwargs: {'block': True}
TIME: 9.075870275497437
multi_process_map >>>>>>>>>>>>>>>>>>>>>
args: (2, range(100000000, 100000005))
kwargs: {}
TIME: 9.07173490524292
multi_process_map_async >>>>>>>>>>>>>>>>>>>>>
args: (2, range(100000000, 100000005))
kwargs: {'block': False}
TIME: 9.067619562149048
multi_process_map_async >>>>>>>>>>>>>>>>>>>>>
args: (2, range(100000000, 100000005))
kwargs: {'block': True}
TIME: 9.076589584350586
multi_process_imap >>>>>>>>>>>>>>>>>>>>>
args: (2, range(100000000, 100000005))
kwargs: {'block': False}
TIME: 9.06967830657959
multi_process_imap >>>>>>>>>>>>>>>>>>>>>
args: (2, range(100000000, 100000005))
kwargs: {'block': True}
TIME: 9.074320793151855
multi_process_imap_unordered >>>>>>>>>>>>>>>>>>>>>
args: (2, range(100000000, 100000005))
kwargs: {'block': False}
TIME: 9.069270849227905
multi_process_imap_unordered >>>>>>>>>>>>>>>>>>>>>
args: (2, range(100000000, 100000005))
kwargs: {'block': True}
TIME: 9.076633930206299
multi_thread_submit >>>>>>>>>>>>>>>>>>>>>
args: (2, range(100000000, 100000005))
kwargs: {}
TIME: 9.002540111541748
multi_thread_map >>>>>>>>>>>>>>>>>>>>>
args: (2, range(100000000, 100000005))
kwargs: {}
TIME: 9.002438068389893