import time import multiprocessing as mp from concurrent.futures import ThreadPoolExecutor
'''计算1+2+...+n ''' def func(n): s = 0 for i in range(n+1): s += i print(f"sum({n})={s}") return n, s
'''统计函数运行时间 ''' def get_duration(func_name, *args, **kwargs): print(f"{func_name} >>>>>>>>>>>>>>>>>>>>>") print(f"args: {args}") print(f"kwargs: {kwargs}") st = time.time() globals()[func_name](*args, **kwargs) print(f"TIME: {time.time() - st}\n")
'''单进程 运行结果: sum(100000000)=4999999950000000 sum(100000001)=5000000050000000 sum(100000002)=5000000150000001 sum(100000003)=5000000250000003 sum(100000004)=5000000350000006 results(100000000)=4999999950000000 results(100000001)=5000000050000000 results(100000002)=5000000150000001 results(100000003)=5000000250000003 results(100000004)=5000000350000006 TIME: 19.151068449020386 可以看出计算sum和最后获取到的result都是按照输入顺序串行的。 ''' def single_process(nums): results = [] for num in nums: result = func(num) results.append(result) for r in results: print(f"results({r[0]})={r[1]}")
'''多进程apply方法 因为5个任务要按顺序提交,apply方法是阻塞的, 每个任务都得等上一个任务结束后才能继续提交并执行, 所以同一时刻只有一个进程在执行任务,等同于单进程串行。
运行结果: sum(100000000)=4999999950000000 sum(100000001)=5000000050000000 sum(100000002)=5000000150000001 sum(100000003)=5000000250000003 sum(100000004)=5000000350000006 results(100000000)=4999999950000000 results(100000001)=5000000050000000 results(100000002)=5000000150000001 results(100000003)=5000000250000003 results(100000004)=5000000350000006 TIME: 18.436408758163452 可以看出跟单进程串行执行几乎没有任何区别。 ''' def multi_process_apply(parallel, nums): results = [] with mp.Pool(processes=parallel) as pool: for num in nums: result = pool.apply(func, (num,)) results.append(result) for r in results: print(f"results({r[0]})={r[1]}")
'''多进程apply_async方法 5个任务按顺序提交,但是会立即返回ApplyResult对象,是非阻塞的, 这样一下子就提交了5个任务,进程池的队列大小是2,所以同时有2个任务在执行, 最后通过ApplyResult.get()方法来获取对应任务的结果,这个方法会阻塞, 直到获取到结果才会继续获取下一个的结果。
运行结果(block=False): sum(100000001)=5000000050000000 sum(100000000)=4999999950000000 results(100000000)=4999999950000000 results(100000001)=5000000050000000 sum(100000002)=5000000150000001 results(100000002)=5000000150000001 sum(100000003)=5000000250000003 results(100000003)=5000000250000003 sum(100000004)=5000000350000006 results(100000004)=5000000350000006 TIME: 11.417532920837402 可以看出,如果提交完5个任务之后没有进行block阻塞, 那么一部分进程执行完sum计算之后,ApplyResult.get()会立刻获取到结果,然后输出result。 并且sum的计算顺序也是无序的,因为5个任务是非阻塞的,一股脑全提交到进程池里了,没有执行的先后顺序。 但是result的顺序还是跟输入顺序一样的,因为是用阻塞方法ApplyResult.get()一个个获取的, 这样即使第一行sum(100000001)已经计算完了,也没有立刻输出result, 而是等sum(100000000)计算完,才按顺序输出两者的result。 时间上缩短了一半左右。
运行结果(block=True): sum(100000000)=4999999950000000 sum(100000001)=5000000050000000 sum(100000002)=5000000150000001 sum(100000003)=5000000250000003 sum(100000004)=5000000350000006 results(100000000)=4999999950000000 results(100000001)=5000000050000000 results(100000002)=5000000150000001 results(100000003)=5000000250000003 results(100000004)=5000000350000006 TIME: 11.41776990890503 如果提交完5个任务之后进行了block阻塞,那么就会等这5个任务都执行完再输出result, 这样的话计算sum的顺序依然是无序的,只是上面恰好按顺序执行了, 输出result的话依然是有序的,并且不需要等待了,因为全都已经执行完了。 时间上缩短了一半左右。 ''' def multi_process_apply_async(parallel, nums, block=False): apply_results = [] with mp.Pool(processes=parallel) as pool: for num in nums: apply_result = pool.apply_async(func, (num,)) apply_results.append(apply_result) if block: pool.close() pool.join() for apply_result in apply_results: r = apply_result.get() print(f"results({r[0]})={r[1]}")
'''多进程map方法 map方法可以一次性提交一个可迭代的对象(例如这里的列表), 然后方法内部会迭代这个对象,将元素依次送到func中执行,注意是无序执行的。 map方法是阻塞的,也就是说所有任务全部执行完才会返回results,返回顺序也是有序的。 如果一次性提交了所有任务构成的列表,那么map和apply_async几乎没有任何区别。 如果手动对任务进行了分块,一块一块提交到map里,那么可能会更慢, 因为同一块中的任务执行时间会有差异,会出现进程等待。
运行结果: sum(100000000)=4999999950000000 sum(100000001)=5000000050000000 sum(100000002)=5000000150000001 sum(100000003)=5000000250000003 sum(100000004)=5000000350000006 results(100000000)=4999999950000000 results(100000001)=5000000050000000 results(100000002)=5000000150000001 results(100000003)=5000000250000003 results(100000004)=5000000350000006 TIME: 11.016913414001465 这里sum计算也是碰巧有序了,实际上是无序随机执行的,results一定是有序的。 时间上缩短了一半左右。 ''' def multi_process_map(parallel, nums): with mp.Pool(processes=parallel) as pool: results = pool.map(func, nums) for r in results: print(f"results({r[0]})={r[1]}")
'''多进程map_async方法 和map的区别就是非阻塞的,传入一个可迭代对象之后会立即返回MapResult对象, 之后需要用MapResult.get()方法来阻塞获取计算结果, 直到这个可迭代对象的任务全部执行完,返回结果列表,才会继续获取下一个可迭代对象的结果。 这个过程其实和apply_async基本一样,只是把单个元素换成了可迭代对象。
运行结果(block=False): sum(100000000)=4999999950000000 sum(100000001)=5000000050000000 sum(100000002)=5000000150000001 sum(100000003)=5000000250000003 sum(100000004)=5000000350000006 results(100000000)=4999999950000000 results(100000001)=5000000050000000 results(100000002)=5000000150000001 results(100000003)=5000000250000003 results(100000004)=5000000350000006 TIME: 11.517432928085327 如果不开block阻塞的话,所有的可迭代对象中的任务会一股脑全部新建起来,然后无序执行, 最后按照输入顺序获取results,只不过下面例子中只传入了一个列表,因此和上面的map没有任何区别。 时间上缩短了一半左右。
运行结果(block=True): sum(100000000)=4999999950000000 sum(100000001)=5000000050000000 sum(100000002)=5000000150000001 sum(100000003)=5000000250000003 sum(100000004)=5000000350000006 results(100000000)=4999999950000000 results(100000001)=5000000050000000 results(100000002)=5000000150000001 results(100000003)=5000000250000003 results(100000004)=5000000350000006 TIME: 11.51731562614441 因为只传入了一个列表,所以MapResult.get()和block没有区别了。 时间上缩短了一半左右。 ''' def multi_process_map_async(parallel, nums, block=False): with mp.Pool(processes=parallel) as pool: map_results = pool.map_async(func, nums) if block: pool.close() pool.join() map_results = map_results.get() for r in map_results: print(f"results({r[0]})={r[1]}")
'''多进程imap方法 和map的区别就是,imap是非阻塞的,并且返回的是一个迭代器, 不需要通过get方法一次性获取所有的计算结果,可以通过迭代器不断拿到已经执行完的进程结果, 但是注意迭代器是按照输入顺序迭代的,因此靠前的任务还没执行完的话,是不会先输出后面的任务结果的。
运行结果(block=False): sum(100000001)=5000000050000000 sum(100000000)=4999999950000000 results(100000000)=4999999950000000 results(100000001)=5000000050000000 sum(100000002)=5000000150000001 results(100000002)=5000000150000001 sum(100000003)=5000000250000003 results(100000003)=5000000250000003 sum(100000004)=5000000350000006 results(100000004)=5000000350000006 TIME: 11.41724157333374 可以看出输出结果和apply_async的结果很像,都是执行完一部分就输出一部分,并且输出是有序的。 时间上缩短了一半左右。
运行结果(block=True): sum(100000000)=4999999950000000 sum(100000001)=5000000050000000 sum(100000002)=5000000150000001 sum(100000003)=5000000250000003 sum(100000004)=5000000350000006 results(100000000)=4999999950000000 results(100000001)=5000000050000000 results(100000002)=5000000150000001 results(100000003)=5000000250000003 results(100000004)=5000000350000006 TIME: 11.418148040771484 如果开启了block阻塞的话,那所有进程执行完才会有序输出结果。 时间上缩短了一半左右。 ''' def multi_process_imap(parallel, nums, block=False): with mp.Pool(processes=parallel) as pool: result_iter = pool.imap(func, nums) if block: pool.close() pool.join() for r in result_iter: print(f"results({r[0]})={r[1]}")
'''多进程imap_unordered方法 和imap区别就是迭代器输出结果的时候,先执行完的进程先输出结果,不一定要有序。
运行结果(block=False): sum(100000000)=4999999950000000 results(100000000)=4999999950000000 sum(100000001)=5000000050000000 results(100000001)=5000000050000000 sum(100000002)=5000000150000001 results(100000002)=5000000150000001 sum(100000003)=5000000250000003 results(100000003)=5000000250000003 sum(100000004)=5000000350000006 results(100000004)=5000000350000006 TIME: 11.419070482254028 跑的结果比较巧,正好有序了,其实sum和results可能是无序的。 时间上缩短了一半左右。
运行结果(block=True): sum(100000000)=4999999950000000 sum(100000001)=5000000050000000 sum(100000002)=5000000150000001 sum(100000003)=5000000250000003 sum(100000004)=5000000350000006 results(100000000)=4999999950000000 results(100000001)=5000000050000000 results(100000002)=5000000150000001 results(100000003)=5000000250000003 results(100000004)=5000000350000006 TIME: 11.718666076660156 跑的结果比较巧,正好有序了,其实sum和results可能是无序的。 时间上缩短了一半左右。 ''' def multi_process_imap_unordered(parallel, nums, block=False): with mp.Pool(processes=parallel) as pool: result_iter = pool.imap_unordered(func, nums) if block: pool.close() pool.join() for r in result_iter: print(f"results({r[0]})={r[1]}")
'''多线程submit方法 submit方法是非阻塞的,会立刻返回Future对象,然后通过Future.result()获取计算结果, 和多进程的apply_async很像,但是因为GIL的存在,同一时刻只会计算一个sum,所以等同于串行计算。
运行结果: sum(100000001)=5000000050000000 sum(100000000)=4999999950000000 results(100000000)=4999999950000000 results(100000001)=5000000050000000 sum(100000002)=5000000150000001 sum(100000003)=5000000250000003 results(100000002)=5000000150000001 results(100000003)=5000000250000003 sum(100000004)=5000000350000006 results(100000004)=5000000350000006 TIME: 19.167519569396973 可以看出sum计算是无序的,但是results是有序的,并且时间几乎等同于串行。 ''' def multi_thread_submit(parallel, nums): futures = [] with ThreadPoolExecutor(max_workers=parallel) as pool: for num in nums: future = pool.submit(func, num) futures.append(future) for future in futures: r = future.result() print(f"results({r[0]})={r[1]}")
'''多线程map方法 和多进程的imap很像,也是输入一个可迭代对象,立刻返回一个迭代器, 并且迭代器的输出也是有序的,阻塞的,如果执行顺序是乱的,那么输出就要等待了。
运行结果: sum(100000000)=4999999950000000 sum(100000001)=5000000050000000 results(100000000)=4999999950000000 results(100000001)=5000000050000000 sum(100000002)=5000000150000001 sum(100000003)=5000000250000003 results(100000002)=5000000150000001 results(100000003)=5000000250000003 sum(100000004)=5000000350000006 results(100000004)=5000000350000006 TIME: 19.716532707214355 这里也碰巧sum计算是有序的,实际上是无序的,但是results是有序的,并且时间几乎等同于串行。 ''' def multi_thread_map(parallel, nums): with ThreadPoolExecutor(max_workers=parallel) as pool: result_iter = pool.map(func, nums) for r in result_iter: print(f"results({r[0]})={r[1]}")
if __name__ == "__main__": start, length = 10**8, 5 nums = range(start, start+length) parallel = 2
get_duration("single_process", nums)
get_duration("multi_process_apply", parallel, nums) get_duration("multi_process_apply_async", parallel, nums, block=False) get_duration("multi_process_apply_async", parallel, nums, block=True) get_duration("multi_process_map", parallel, nums) get_duration("multi_process_map_async", parallel, nums, block=False) get_duration("multi_process_map_async", parallel, nums, block=True) get_duration("multi_process_imap", parallel, nums, block=False) get_duration("multi_process_imap", parallel, nums, block=True) get_duration("multi_process_imap_unordered", parallel, nums, block=False) get_duration("multi_process_imap_unordered", parallel, nums, block=True)
get_duration("multi_thread_submit", parallel, nums) get_duration("multi_thread_map", parallel, nums)
|