python开发过程中免不了会用到并发处理数据的场景,通常在i/o密集型的情况下我们可以采用python多线程进行处理,来达到批量快速处理的效果,但是对于计算密集型的场景来说,python的多线程显得无能为力。
背上上面问题锅的主要原因是由于python自身的,GIL即全局解释器锁,使其同一时间只能有一个线程在执行字节码。

处理并发/计算密集型处理可采用多进程进行来充分利用系统的cpu。
必要时可采用分布式进行处理。

以下代码对多进程的几种实现方式进行速度的对比。
· multiprocess模块并行不切换process带锁实现
· multiprocess模块并行不切换process不带锁实现
· multiprocess模块进程池实现
· 普通串行实现
· pp模块并行代码实现

pp模块安装
如果没有记错的话默认直接使用pip install pp安装的是python2的版本。
python3的安装请前往pp模块的官网
找到For Python3后手动下载,然后解压。
最后再使用python3 setup.py install进行安装即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import pp, time, math, multiprocessing, random
# https://blog.csdn.net/qq_19175749/article/details/51611643 测试素数计算所用代码参考

def isprime(n):
"""Returns True if n is prime and False otherwise"""
if not isinstance(n, int):
raise TypeError("argument passed to is_prime is not of 'int' type")
if n < 2:
return False
if n == 2:
return True
max = int(math.ceil(math.sqrt(n)))
i = 2
while i <= max:
if n % i == 0:
return False
i += 1
return True


def sum_primes(n):
"""Calculates sum of all primes below given integer n"""
return sum([x for x in range(2, n) if isprime(x)])


def mains(reslut_list, inputs, lock):
while True:
lock.acquire()
if inputs.qsize() != 0:
input = inputs.get()
lock.release()
reslut_list.put((input, sum_primes(input)))
else:
lock.release()
break


def mains2(reslut_list, inputs):
while inputs.qsize() != 0:
try:
input = inputs.get(block=False)
reslut_list.put((input, sum_primes(input)))
except:
pass


print("CPU Core: ", multiprocessing.cpu_count())
inputs = tuple([random.randint(10000, 10090) for i in range(2000)]) # 生成100个测试数据
# mul并行不切换process带锁
print("{beg}mul并行不切换process带锁{beg}".format(beg='-' * 16))
startTime = time.time()
inputs = inputs
result_list = multiprocessing.Queue()
plist = []
re_queue = multiprocessing.Queue()
for i in inputs:
re_queue.put(i)
lock = multiprocessing.Lock()
for i in range(multiprocessing.cpu_count()):
p = multiprocessing.Process(target=mains, args=(result_list, re_queue, lock))
p.start()
plist.append(p)
for i in plist:
p.join()
for i in range(result_list.qsize()):
input, result = result_list.get()
# print("Sum of primes below %s is %s" % (input, result))
# print('+', end='')
print("\n用时:%.3fs" % (time.time() - startTime))



# mul并行不切换process不带锁
print("{beg}mul并行不切换process不带锁{beg}".format(beg='-' * 16))
startTime = time.time()
inputs = inputs
result_list = multiprocessing.Queue()
plist = []
re_queue = multiprocessing.Queue()
for i in inputs:
re_queue.put(i)
for i in range(multiprocessing.cpu_count()):
p = multiprocessing.Process(target=mains2, args=(result_list, re_queue))
p.start()
plist.append(p)
for i in plist:
p.join()
for i in range(result_list.qsize()):
input, result = result_list.get()
# print("Sum of primes below %s is %s" % (input, result))
# print('+', end='')
print("\n用时:%.3fs" % (time.time() - startTime))



# mul进程池
print("{beg}mul进程池{beg}".format(beg='-' * 16))
startTime = time.time()
inputs = inputs
m_pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
results = [(input, m_pool.apply_async(sum_primes, (input, ))) for input in inputs]
m_pool.close()
m_pool.join()
for input, result in results:
input, result = input, result.get()
# print("Sum of primes below %s is %s" % (input, result))
# print('+', end='')
print("\n用时:%.3fs" % (time.time() - startTime))



# 串行代码
print("{beg}串行程序{beg}".format(beg='-' * 16))
startTime = time.time()
inputs = inputs
results = [(input, sum_primes(input)) for input in inputs]
for input, result in results:
input, result = input, result
# print("Sum of primes below %s is %s" % (input, result))
# print('+', end='')
print("\n用时:%.3fs" % (time.time() - startTime))



# pp并行代码
print("{beg}pp并行程序{beg}".format(beg='-' * 16))
startTime = time.time()
job_server = pp.Server(ncpus="autodetect")
inputs = inputs
jobs = [(input, job_server.submit(sum_primes, (input,), (isprime,), ("math",))) for input in inputs]
for input, job in jobs:
input, result = input, job()
# print("Sum of primes below %s is %s" % (input, result))
# print('+', end='')
print("\n用时:%.3fs" % (time.time() - startTime))

实现效果,速度对比:
2000数据:

1000数据:

500数据:

100数据:

10数据:

multiprocess模块并行不切换process带锁实现
多进程带锁放置queue.get()为空时阻塞的情况。带锁判断是否队列为空,为空跳出循环。
理论上来说加锁由于某些时间锁住了变量,另一个进程无法操控,会浪费一定的时间,整体时间相对不加锁的情况下变长。进程全程创建一次销毁一次。
multiprocess模块并行不切换process不带锁实现
不带锁利用try…except去捕获队列为空时get()方法的异常来跳出循环。所有进程无需等待其他进程处理情况,同时读取数据不等待。进程全程创建一次,销毁一次。
multiprocess模块进程池实现
使用multiprocessing.Pool()创建进程池,apply_async()去进行执行func,进程随当前传入数据次数进行创建和销毁,会有一定时间开销。
普通串行实现
串行执行,理论整体消耗时间最长。
pp模块并行代码实现
使用pp模块进行并行实现,pp模块的并行相对于multiprocessing模块会稍慢一点点。pp模块支持分布式处理,在写法上相对更容易。

multiprocessing也支持分布式处理,个人感觉在func传递上相对于pp来说不是太好处理,pp已经封装好了,同时pp有已经写好的server端,可直接用。