python多进程那点事儿【multiprocessing库】

摘要:
然后,我们需要为多个进程的并发执行编写以下代码:1p_list=[]2forxinrange:3p=multiprocessing.Process4p.start()5p_list.append67forpinp_list:8p。join()觉得用这种方式编写的代码一点都不优雅,而且将来扩展起来很不方便。子流程的数量会随着任务数量的增加而增加,流程不会被重复使用。

      前言:项目中有个需求需要对产品的日志处理,按照产品中日志的某些字段,对日志进行再次划分。比如产品的日志中含有字段id,tag=1,现在需要把tag是基数的放到一个文件中,tag是偶数的放入一个文件中。这就涉及到多个文件的读写操作,一个文件一个文件读取写入那时间太久了,公司配备的单机,跑了半个多小时,光标还是一直在闪闪闪【你懂得】。没办法了,还是用多进程跑吧。这就得对python中的多进程从新回顾一遍了。

Q1:为什么不用多线程呢?

A1:这个就需要了解python多线程的实现原理了,通过在其解释器层面施加一个全局锁来保证同一时刻只有一个线程可以拥有锁,执行相应的python字节码。所以虽然冠名是是多线程,但是实质上还是只有一个线程在运行。有时候多线程可能会让程序得不到提高反而降低,因为线程之间需要竞争资源。所以很多人也说,如果想真正的同一时刻执行多个任务的话,就需要使用多进程。

1.使用multiprocessing.Process

      multiprocessing.Process最常见的使用就是:

p = multiprocessing.Process(target = 多线程执行函数名, args = 函数参数元组形式)
p.start()
p.join()

      注意使用多进程时候一定要使用join对子进程的状态进行收集,否则在程序运行过程中会出现僵尸进程,对系统性能造成影响。

      当然,上面这只有一个进程,你在写的时候可能很顺手就写了

for x in range(10):
p = multiprocessing.Process(target = 多线程执行函数名, args = 函数参数元组形式)
p.start()
p.join()

      然后就发现,这个进程貌似是顺序执行的。。。好像没有并发,原因就出现在join的位置上,仔细查看手册,你会发现在join函数下方有一行说明:

Block the calling thread until the process whose join() method is called terminates or until the optional timeout occurs.

      意思就是主线程会在join的地方一直等子进程结束。。那么我们多个进程并发执行就要这样写了:

1 p_list = []
2 for x in range(10):
3     p = multiprocessing.Process(target = 多线程执行函数名, args = 函数参数元组形式)
4     p.start()
5     p_list.append(p)
6 
7 for p in p_list:
8     p.join()

      感觉这样写的代码一点都不优雅,而且以后拓展也很不方便,子进程的数目会随着任务数目的增加而增加,进程得不到重复的利用。

2.使用multiprocessing.Pool

      进程池就是上述不方便的完美解决,其一般用法如下:

pool = multiprocessing.Pool(processes=进程数目)
for x in xrange(任务数目):
    pool.apply_async(函数名, 函数参数元组形式)
pool.close() # close函数表明不会再往进程池中加入新任务,一定要在join方法调用之前调用。
pool.join()

      上述代码开启了含有一定数目的进程池,只需要往进程池中加入新任务即可,当进程池中已满,其他的任务就等待,直到有任务结束。

      注意:除了pool.apply_async方法,还有一个pool.apply方法,只不过pool.apply方法是阻塞的。

      还可以使用进程池方法来关注进程执行的结果,pool.apply_asyn函数即返回函数的执行结果,使用get()方法即可得到。

3.多进程共享数据

      多个进程之间共享数据也有很多种方法:

      1)共享变量

         只能使用Value和Array方法:

multiprocessing.Value(typecode_or_type, *args[, lock]) 
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True) 

         关于lock的一段说明:

Return a process-safe wrapper object for a ctypes object which uses lock to synchronize access. If lock is None (the default) then a multiprocessing.RLock object is created automatically.

         共享变量只能是一个变量,或者是线性的一组变量,类型也是从typecode_or_type衍生而来的,具体的用法和说明手册上已经讲解的很清楚了。附上手册上的一段代码如下:

python多进程那点事儿【multiprocessing库】第1张python多进程那点事儿【multiprocessing库】第2张
 1 from multiprocessing import Process, Lock
 2 from multiprocessing.sharedctypes import Value, Array
 3 from ctypes import Structure, c_double
 4 
 5 class Point(Structure):
 6     _fields_ = [('x', c_double), ('y', c_double)]
 7 
 8 def modify(n, x, s, A):
 9     n.value **= 2
10     x.value **= 2
11     s.value = s.value.upper()
12     for a in A:
13         a.x **= 2
14         a.y **= 2
15 
16 if __name__ == '__main__':
17     lock = Lock()
18 
19     n = Value('i', 7)
20     x = Value(c_double, 1.0/3.0, lock=False)
21     s = Array('c', 'hello world', lock=lock)
22     A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)
23 
24     p = Process(target=modify, args=(n, x, s, A))
25     p.start()
26     p.join()
27 
28     print n.value
29     print x.value
30     print s.value
31     print [(a.x, a.y) for a in A]
View Code

         结果:

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

      2)Manager

         使用Manager方法时,共享变量的类型会多一些,例如list,dict,Event,Lock,Array,Value...使用Manager方法时需要注意,在操作共享对象时候,除了赋值操作,其他的方法都作用在共享对象的拷贝上,并不会对共享对象生效。例如:

d = Manager().dict()
d[0] = []
d[0].append(0) # append方法作用在代理对象上,并不对原始对象生效
print d

         输出:{0: []}

         而同样意思的一段代码:

d = Manager().dict()
l = []
l.append(0)
d[0] = l # 直接赋值操作,影响原始共享对象
print d

         输出:{0: [0]}

      3)Queue

         队列,顾名思义,就是一组数据,使用put来往队列中存入数据,使用get方法获取数据,当队列满了继续put和队列空了继续get时候会抛出相对应的异常。可以多个进程之间传递数据。

      4)Pipe

         Pipe方法返回(conn1, conn2)代表一个管道的两个端,对应两个进程。还可以通过duplex来设定管道是全双工(duplex=True)还是半双工(duplex=False)工作。send和recv方法分别是发送和接受消息的方法,如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。

4.多进程同步互斥

      1)Lock和Semaphore

         这两个就不多讲了,学过操作系统的都知道。Lock限定同一时间仅一个进程访问共享变量,Semaphore则可以限定多个进程同时访问共享变量。

      2)Event

         使用set和is_set判断事件是否已经发生,决定下一步要执行的动作。

免责声明:文章转载自《python多进程那点事儿【multiprocessing库】》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇如何提升微服务的幸福感你需要PCIE×4硬盘下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

[C#] 多线程总结(结合进度条)

(一)使用线程的理由 1、可以使用线程将代码同其他代码隔离,提高应用程序的可靠性。 2、可以使用线程来简化编码。 3、可以使用线程来实现并发执行。 (二)基本知识 1、进程与线程:进程作为操作系统执行程序的基本单位,拥有应用程序的资源,进程包含线程,进程的资源被线程共享,线程不拥有资源。 2、前台线程和后台线程:通过Thread类新建线程默认为前台线程。当...

python常见错误记录

1. x^y x**y 2. range(a,b,c) http://www.runoob.com/python/python-func-range.html 以a为首项(默认从0开始),c为公差(默认为1)且不超过b-1的等差数列 3. lambda匿名函数 https://blog.csdn.net/liang19890820/article/det...

[笔记]--Linux下运行Python时报错解决办法

1、提示:bash: ./mp.py:/usr/bin/python^M:损坏的解释器: 没有该文件或目录 解决办法: $ sed -i 's/ $//' *.py 有时候在windows下编写的python脚本在linux下不能运行,就是因为^M的原因,因为windows下行结束符是/r/n,而liinux只需要/n. 我们可以使用cat –v来显示一...

使用Python Requests上传表单数据和文件

在Python环境下写一个HTTP客户端,发送POST请求,同时上传表单数据和文件,我们可以使用Requests模块来实现。代码如下: data = { 'name': 'nginx' } files = {'file': open("abc.csv", 'rb')} response = requests.post(url, data=data...

python安装matplotlib:python -m pip install matplotlib报错

matplotlib是python中强大的画图模块。 首先确保已经安装python,然后用pip来安装matplotlib模块。 进入到cmd窗口下,建议执行python -m pip install -U pip setuptools进行升级。 接着键入python -m pip install matplotlib进行自动的安装,系统会自动下载安...

Java多线程并发系列之闭锁(Latch)和栅栏(CyclicBarrier)

   JAVA并发包中有三个类用于同步一批线程的行为,分别是闭锁(Latch),信号灯(Semaphore)和栅栏(CyclicBarrier)。本贴主要说明闭锁(Latch)和栅栏(CyclicBarrier)。 1. 闭锁(Latch) 闭锁(Latch)  —— 确保多个线程在完成各自事务后,才会打开继续执行后面的内容,否则一直等待。 计数器闭锁(...