Python 多进程解析:Multiprocessing 高效并行处理的奥秘

CSDN 2024-10-25 10:35:07 阅读 85

Python 多进程解析:Multiprocessing 高效并行处理的奥秘

文章目录

Python 多进程解析:Multiprocessing 高效并行处理的奥秘一 多进程1 导入进程标准模块2 定义调用函数3 创建和启动进程

二 存储进程结果 Queue三 threading & multiprocessing 对比1 创建多进程 multiprocessing2 创建多线程 multithread3 创建普通函数4 创建对比时间函数5 运行结果

四 进程池 Pool1 进程池 Pool() 和 map()2 自定义核数量3 apply_async 单结果返回4 apply_async 多结果返回5 划重点

五 共享内存 shared memory六 进程锁 Lock1 不加进程锁2 加进程锁

七 完整代码示例八 源码地址

在 Python 编程中,多进程(Multiprocessing)是一种提高程序执行效率的重要手段。本文深入解析了多进程的概念与应用,帮助开发者充分利用多核处理器的计算能力。我们从基本的进程创建与启动开始,讲解了如何通过 <code>Queue 实现进程间的数据传递,并通过对比多进程与多线程的性能差异,揭示了多进程在处理 CPU 密集型任务时的显著优势。文章还详细介绍了进程池(Pool)的使用方法,包括 mapapply_async 的不同应用场景。最后,我们探讨了共享内存和进程锁的使用,确保多进程在并发操作中的数据安全性。本文为希望掌握多进程编程的读者提供了全面且易懂的实践指导。

一 多进程

Multiprocessing 是一种编程和执行模式,它允许多个进程同时运行,以此提高应用程序的效率和性能。在 Python 中,multiprocessing 模块可以帮助你创建多个进程,使得每个进程都可以并行处理任务,从而有效利用多核处理器的能力。

1 导入进程标准模块

import multiprocessing as mp

2 定义调用函数

def job(a, d):

print('你好 世界')

3 创建和启动进程

# 创建进程

p1 = mp.Process(target=job, args=(1, 2))

# 启动进程

p1.start()

# 连接进程

p1.join()

二 存储进程结果 Queue

1 存入输出到 Queue

# 该函数没有返回值!!!

def job02(q):

res = 0

for i in range(1000):

res += i + i ** 2 + i ** 3

q.put(res) #

def my_result_process02():

q = mp.Queue()

p1 = mp.Process(target=job02, args=(q,))

p2 = mp.Process(target=job02, args=(q,))

p1.start()

p2.start()

p1.join()

p2.join()

res1 = q.get()

res2 = q.get()

print(res1)

print(res2)

print(res1 + res2)

三 threading & multiprocessing 对比

1 创建多进程 multiprocessing

def job03(q):

res = 0

for i in range(1000000):

res += i + i ** 2 + i ** 3

# 结果加 queue

q.put(res)

# 多核运算多进程

def multicore03():

q = mp.Queue()

p1 = mp.Process(target=job03, args=(q,))

p2 = mp.Process(target=job03, args=(q,))

p1.start()

p2.start()

p1.join()

p2.join()

res1 = q.get()

res2 = q.get()

print('multicore:', res1 + res2)

2 创建多线程 multithread

# 单核运算多线程

def multithread03():

# thread可放入process同样的queue中

q = mp.Queue()

t1 = td.Thread(target=job03, args=(q,))

t2 = td.Thread(target=job03, args=(q,))

t1.start()

t2.start()

t1.join()

t2.join()

res1 = q.get()

res2 = q.get()

print('multithread:', res1 + res2)

3 创建普通函数

def normal03():

res = 0

for _ in range(2):

for i in range(1000000):

res += i + i ** 2 + i ** 3

print('normal:', res)

4 创建对比时间函数

def time_result03():

st = time.time()

normal03()

st1 = time.time()

print('normal time:', st1 - st)

multithread03()

st2 = time.time()

print('multithread time:', st2 - st1)

multicore03()

print('multicore time:', time.time() - st2)

5 运行结果

normal03: 499999666667166666000000

normal03 time: 0.6855959892272949

multithread03: 499999666667166666000000

multithread03 time: 0.6804449558258057

multicore03: 499999666667166666000000

multicore03 time: 0.38849496841430664

我运行的是 normal03 > multithread03 > multicore03normal03multithread03 相差不大,multicore03normal03multithread03 快将近一倍。

四 进程池 Pool

使用进程池 Pool ,Python 会自行解决多进程问题。

1 进程池 Pool() 和 map()

map() 返回的是多结果。

def job04(x):

# Pool的函数有返回值

return x * x

def multicore04():

# Pool的函数有返回值

pool = mp.Pool()

# 自分配 CPU 计算

res = pool.map(job04, range(10))

print(res)

2 自定义核数量

Pool 默认大小是 CPU的核数,传入 processes 参数自定义需要的核数量。

def multicore05():

# 定义CPU核数量为3

pool = mp.Pool(processes=3)

res = pool.map(job04, range(10))

print(res)

3 apply_async 单结果返回

apply_async() 中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的, 所以在传入值后需要加逗号, 同时需要用 get() 方法获取返回值。

def multicore06():

pool = mp.Pool()

res = pool.apply_async(job04, (2,))

# 用get获得结果

print(res.get())

4 apply_async 多结果返回

def multicore07():

pool = mp.Pool()

multi_res = [pool.apply_async(job04, (i,)) for i in range(10)]

# 用get获得结果

print([res.get() for res in multi_res])

5 划重点

Pool 默认调用是 CPU 的核数,传入 processes 参数可自定义CPU核数。map() 放入迭代参数,返回多个结果。apply_async() 只能放入一组参数,并返回一个结果,如果想得到 map() 的效果需要通过迭代。

五 共享内存 shared memory

1 定义 Shared Value

value1 = mp.Value('i', 0)

value2 = mp.Value('d', 3.14)

2 定义 Shared Array

它只能是一维数组

array = mp.Array('i', [1, 2, 3, 4])

其中 d 和 i 参数用来设置数据类型的,d 表示一个双精浮点类型,i 表示一个带符号的整型,参考数据类型如下:

Type code C Type Python Type Minimum size in bytes Notes
<code>'b' signed char int 1
<code>'B' unsigned char int 1
<code>'u' wchar_t Unicode character 2 (1)
<code>'h' signed short int 2
<code>'H' unsigned short int 2
<code>'i' signed int int 2
<code>'I' unsigned int int 2
<code>'l' signed long int 4
<code>'L' unsigned long int 4
<code>'q' signed long long int 8
<code>'Q' unsigned long long int 8
<code>'f' float float 4
<code>'d' double float 8

具体链接:Efficient arrays of numeric values

六 进程锁 Lock

1 不加进程锁

争抢共享内存

<code>def job08(v, num):

for _ in range(5):

time.sleep(0.1) # 暂停0.1秒,让输出效果更明显

v.value += num # v.value获取共享变量值

print(v.value, end="\n")code>

def multicore08():

v = mp.Value('i', 0) # 定义共享变量

p1 = mp.Process(target=job08, args=(v, 1))

p2 = mp.Process(target=job08, args=(v, 3)) # 设定不同的number看如何抢夺内存

p1.start()

p2.start()

p1.join()

p2.join()

2 加进程锁

def job09(v, num, l):

l.acquire() # 锁住

for _ in range(5):

# print(v.value, num)

time.sleep(0.1)

v.value = v.value + num # 获取共享内存

print(v.value)

l.release() # 释放

def multicore09():

l = mp.Lock() # 定义一个进程锁

v = mp.Value('i', 0) # 定义共享内存

p1 = mp.Process(target=job09, args=(v, 1, l)) # 需要将lock传入

p1.start()

p1.join()

p2 = mp.Process(target=job09, args=(v, 3, l))

p2.start()

p2.join()

# def multicore10():

# l = mp.Lock() # 定义一个进程锁

# v = mp.Value('i', 0) # 定义共享内存

# p1 = mp.Process(target=job09, args=(v, 1, l)) # 需要将lock传入

# p2 = mp.Process(target=job09, args=(v, 3, l))

# p1.start()

# p2.start()

# p1.join()

# p2.join()

在这个示例中,必须先执行 p1 以达到预期效果。分别运行 multicore09multicore10 会发现一些有意思的情况。

七 完整代码示例

:建议在运行 main.py 对应的代码功能时,逐行使用注释进行操作。

# This is a sample Python script.

# Press ⌃R to execute it or replace it with your code.

# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.

import multiprocessing as mp

import threading as td

import time as time

def print_hi(name):

# Use a breakpoint in the code line below to debug your script.

print(f'Hi, { name}') # Press ⌘F8 to toggle the breakpoint.

# 创建进程

p1 = mp.Process(target=job, args=(1, 2))

# 启动进程

p1.start()

# Shared Value

value1 = mp.Value('i', 0)

value2 = mp.Value('d', 3.14)

# Shared Array,只能是一维数组

array = mp.Array('i', [1, 2, 3, 4])

def job(a, d):

print('你好 世界')

# 该函数没有返回值!!!

def job02(q):

res = 0

for i in range(1000):

res += i + i ** 2 + i ** 3

q.put(res) #

def my_result_process02():

q = mp.Queue()

p1 = mp.Process(target=job02, args=(q,))

p2 = mp.Process(target=job02, args=(q,))

p1.start()

p2.start()

p1.join()

p2.join()

res1 = q.get()

res2 = q.get()

print(res1)

print(res2)

print(res1 + res2)

def job03(q):

res = 0

for i in range(1000000):

res += i + i ** 2 + i ** 3

# 结果加 queue

q.put(res)

# 多核运算多进程

def multicore03():

q = mp.Queue()

p1 = mp.Process(target=job03, args=(q,))

p2 = mp.Process(target=job03, args=(q,))

p1.start()

p2.start()

p1.join()

p2.join()

res1 = q.get()

res2 = q.get()

print('multicore03:', res1 + res2)

# 单核运算多线程

def multithread03():

# thread可放入process同样的queue中

q = mp.Queue()

t1 = td.Thread(target=job03, args=(q,))

t2 = td.Thread(target=job03, args=(q,))

t1.start()

t2.start()

t1.join()

t2.join()

res1 = q.get()

res2 = q.get()

print('multithread03:', res1 + res2)

def normal03():

res = 0

for _ in range(2):

for i in range(1000000):

res += i + i ** 2 + i ** 3

print('normal03:', res)

def time_result03():

st = time.time()

normal03()

st1 = time.time()

print('normal03 time:', st1 - st)

multithread03()

st2 = time.time()

print('multithread03 time:', st2 - st1)

multicore03()

print('multicore03 time:', time.time() - st2)

def job04(x):

# Pool的函数有返回值

return x * x

def multicore04():

# Pool的函数有返回值

pool = mp.Pool()

# 自分配 CPU 计算

res = pool.map(job04, range(10))

print(res)

def multicore05():

pool = mp.Pool(processes=3) # 定义CPU核数量为3

res = pool.map(job04, range(10))

print(res)

def multicore06():

pool = mp.Pool()

# apply_async() 中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,

# 所以在传入值后需要加逗号, 同时需要用get()方法获取返回值

res = pool.apply_async(job04, (2,))

# 用get获得结果

print(res.get())

def multicore07():

pool = mp.Pool()

multi_res = [pool.apply_async(job04, (i,)) for i in range(10)]

# 用get获得结果

print([res.get() for res in multi_res])

def job08(v, num):

for _ in range(5):

time.sleep(0.1) # 暂停0.1秒,让输出效果更明显

v.value += num # v.value获取共享变量值

print(v.value, end="\n")code>

def multicore08():

v = mp.Value('i', 0) # 定义共享变量

p1 = mp.Process(target=job08, args=(v, 1))

p2 = mp.Process(target=job08, args=(v, 3)) # 设定不同的number看如何抢夺内存

p1.start()

p2.start()

p1.join()

p2.join()

def job09(v, num, l):

l.acquire() # 锁住

for _ in range(5):

# print(v.value, num)

time.sleep(0.1)

v.value = v.value + num # 获取共享内存

print(v.value)

l.release() # 释放

def multicore09():

l = mp.Lock() # 定义一个进程锁

v = mp.Value('i', 0) # 定义共享内存

p1 = mp.Process(target=job09, args=(v, 1, l)) # 需要将lock传入

p1.start()

p1.join()

p2 = mp.Process(target=job09, args=(v, 3, l))

p2.start()

p2.join()

def multicore10():

l = mp.Lock() # 定义一个进程锁

v = mp.Value('i', 0) # 定义共享内存

p1 = mp.Process(target=job09, args=(v, 1, l)) # 需要将lock传入

p2 = mp.Process(target=job09, args=(v, 3, l))

p1.start()

p2.start()

p1.join()

p2.join()

# Press the green button in the gutter to run the script.

if __name__ == '__main__':

print_hi('什么是 Multiprocessing')

my_result_process02()

time_result03()

multicore04()

multicore05()

multicore06()

multicore07()

multicore08()

multicore09()

# multicore10()

# See PyCharm help at https://www.jetbrains.com/help/pycharm/

复制粘贴并覆盖到你的 main.py 中运行,运行结果如下。

Hi, 什么是 Multiprocessing

你好 世界

249833583000

249833583000

499667166000

normal03: 499999666667166666000000

normal03 time: 0.7139420509338379

multithread03: 499999666667166666000000

multithread03 time: 0.6696178913116455

multicore03: 499999666667166666000000

multicore03 time: 0.3917398452758789

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

4

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

3

4

7

8

11

12

1515

16

19

1

2

3

4

5

8

11

14

17

20

八 源码地址

代码地址:

国内看 Gitee 之 什么是 Multiprocessing.py

国外看 GitHub 之 什么是 Multiprocessing.py

引用 莫烦 Python



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。