python进程池:multiprocessing.pool

摘要:
当要操作的对象数量不大时,可以直接在多处理中使用Process来动态生成多个进程。十几个过程是可以的,但如果有数百或数千个目标,手动限制过程的数量太麻烦了。此时,您可以扮演流程池的角色。回到上面的示例1:使用进程池#coding:utf-8importmultiprocessingimportTimedeffunc:print“msg:”,msgtime。sleepprint“end”if__name__==“__main__”:池=多处理。池foriinxrange:msg=“hello%d”%池。apply_ Async#为执行而维护的进程总数为个进程。进程完成后,将添加一个新进程以打印“标记~标记~标记~~~~~~”池。close()poolJoin()#在调用Join之前,请先调用close函数,否则将发生错误。join()主进程阻塞并等待子进程的退出。连接方法应在关闭或终止后使用。
阅读目录

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

例1:使用进程池

复制代码
#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print "Sub-process(es) done."
复制代码

一次执行结果

1
2
3
4
5
6
7
8
9
10
mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0
 
msg: hello 1
msg: hello 2
end
msg: hello 3
end
end
end
Sub-process(es) done.

函数解释

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解区别,看例1例2结果区别)
  • close()    关闭pool,使其不在接受新的任务。
  • terminate()    结束工作进程,不在处理未完成的任务。
  • join()    主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。

执行说明:创建一个进程池pool,并设定进程的数量为3,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()处等待各个进程的结束。

例2:使用进程池(阻塞)

复制代码
#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print "Sub-process(es) done."
复制代码

一次执行的结果

1
2
3
4
5
6
7
8
9
10
msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done. 

例3:使用进程池,并关注结果

复制代码
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"
    return "done" + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in xrange(3):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print ":::", res.get()
    print "Sub-process(es) done."
复制代码

一次执行结果

1
2
3
4
5
6
7
8
9
10
msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
::: donehello 0
::: donehello 1
::: donehello 2
Sub-process(es) done.

 :get()函数得出每个返回结果的值

例4:使用多个进程池

复制代码
#coding: utf-8
import multiprocessing
import os, time, random

def Lee():
    print "
Run task Lee-%s" %(os.getpid()) #os.getpid()获取当前的进程的ID
    start = time.time()
    time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
    end = time.time()
    print 'Task Lee, runs %0.2f seconds.' %(end - start)

def Marlon():
    print "
Run task Marlon-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 40)
    end=time.time()
    print 'Task Marlon runs %0.2f seconds.' %(end - start)

def Allen():
    print "
Run task Allen-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print 'Task Allen runs %0.2f seconds.' %(end - start)

def Frank():
    print "
Run task Frank-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print 'Task Frank runs %0.2f seconds.' %(end - start)
        
if __name__=='__main__':
    function_list=  [Lee, Marlon, Allen, Frank] 
    print "parent process %s" %(os.getpid())

    pool=multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)     #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中

    print 'Waiting for all subprocesses done...'
    pool.close()
    pool.join()    #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    print 'All subprocesses done.'
复制代码

一次执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
parent process 7704
 
Waiting for all subprocesses done...
Run task Lee-6948
 
Run task Marlon-2896
 
Run task Allen-7304
 
Run task Frank-3052
Task Lee, runs 1.59 seconds.
Task Marlon runs 8.48 seconds.
Task Frank runs 15.68 seconds.
Task Allen runs 18.08 seconds.
All subprocesses done.

 

multiprocessing pool map

复制代码
#coding: utf-8
import multiprocessing 

def m1(x): 
    print x * x 

if __name__ == '__main__': 
    pool = multiprocessing.Pool(multiprocessing.cpu_count()) 
    i_list = range(8)
    pool.map(m1, i_list)
复制代码

一次执行结果

1
2
3
4
5
6
7
8
0
1
4
9
16
25
36
49

 参考:http://www.dotblogs.com.tw/rickyteng/archive/2012/02/20/69635.aspx 

问题:http://bbs.chinaunix.net/thread-4111379-1-1.html

复制代码
#coding: utf-8
import multiprocessing
import logging

def create_logger(i):
    print i

class CreateLogger(object):
    def __init__(self, func):
        self.func = func

if __name__ == '__main__':
    ilist = range(10)

    cl = CreateLogger(create_logger)
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    pool.map(cl.func, ilist)

    print "hello------------>"
复制代码

一次执行结果

1
2
3
4
5
6
7
8
9
10
11
0
1
2
3
4
5
6
7
8
9
hello------------>

  

 
原文链接:http://www.cnblogs.com/kaituorensheng/p/4465768.html#_label3

免责声明:文章转载自《python进程池:multiprocessing.pool》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇sqlserver的存储过程RabbitMQ OS X下安装及常用命令-1下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Python的路径引用

1、以HOME目录为准,进行跳转 sys.path.append(os.path.dirname(__file__) + os.sep + '../') from config import swordfishconf from utils import log from utils.mysql_base import MySQLBase 将程序的HO...

Python解析HEX文件

解析Intel的HEX文件,学习Python文件输入输出的时候,练习了一下。 import sys import os HexTable = {'0':0,'1':1,'2':2,'3':3,'4':4,'5':5,'6':6,'7':7,'8':8,'9':9,'A':10,'B':11,'C':12,'D':13,'E':14,'F':15} fi...

高斯分布(Gaussian Distribution)的概率密度函数(probability density function)

高斯分布(Gaussian Distribution)的概率密度函数(probability density function) 对应于numpy中: numpy.random.normal(loc=0.0, scale=1.0, size=None) 参数的意义为: loc:float 此概率分布的均值(对应着整个分布的中心centre) scale...

部署一个基于python语言的web发布环境

---恢复内容开始--- 1) 一门面向对象的语言 2)拥有丰富的库 3)可移植性 4)免费、开源 5)简单易易学 可做软件开发、人工智能、web开发等等 部署流程: Cnetos7.5+Nginx+python+Django+uwsgi+mysql 实验部署流程 1)安装Nginx 2)安装python 3)安装mysql 4)部署发布平台 5)测试...

python GUI界面编程 口算题生成系统

问题描述 口算题生成系统 功能: (1)口算:题目显示在界面上(除法必须是整除),逐个显示题目,用户通过输入框输入计算结果。系统能实时统计正确率,将错误题目打印到文件里。 (2)生成题目:用户选择生成的题目数量,打印时的列数,运算符的数量,将题目生成到docx文件里。 设计说明 (1)拟设计的功能及实现思路、需要用到的知识功能(1)的实现思路: 1.初始化...

基于django2.2的网页构建

安装django pip install django==2.2 建一个在线商城的项目 django-admin startproject pyshop 启动项目 python manage.py runserver 页面访问效果 http://127.0.0.1:8000 建议一个项目的app 产品 products djan...