线程与进程 concurrent.futures模块

摘要:
Thereturnediteratorraisesaconcurrent.futures.TimeoutErrorif__next__()iscalledandtheresultisn’tavailableaftertimeoutsecondsfromtheoriginalcalltoExecutor.map().timeoutcanbeanintorafloat.IftimeoutisnotspecifiedorNone,thereisnolimittothewaittime.返回的迭代器提出concurrent.futures.timeouterror,如果调用__next__(),调用Executor.map()超时后结果不可用。Ifafunccallraisesanexception,thenthatexceptionwillberaisedwhenitsvalueisretrievedfromtheiterator.如果函数调用引发异常,则该异常在从迭代器中检索其值时将引发异常。WhenusingProcessPoolExecutor,thismethodchopsiterablesintoanumberofchunkswhichitsubmitstothepoolasseparatetasks.Thesizeofthesechunkscanbespecifiedbysettingchunksizetoapositiveinteger.Forverylongiterables,usingalargevalueforchunksizecansignificantlyimproveperformancecomparedtothedefaultsizeof1.WithThreadPoolExecutor,chunksizehasnoeffect.使用processpoolexecutor,这种方法切分可迭代对象成若干块,它向池提交作为单独的任务。shutdown关闭Signaltheexecutorthatitshouldfreeanyresourcesthatitisusingwhenthecurrentlypendingfuturesaredoneexecuting.CallstoExecutor.submit()andExecutor.map()madeaftershutdownwillraiseRuntimeError.向执行器发出信号,当前正在执行future时,它应该释放它正在使用的任何资源。DeadlockscanoccurwhenthecallableassociatedwithaFuturewaitsontheresultsofanotherFuture.当与未来相关联的可调用等待另一个未来的结果时,可能发生死锁。classconcurrent.futures.ThreadPoolExecutorAnExecutorsubclassthatusesapoolofatmostmax_workersthreadstoexecutecallsasynchronously.子类执行器使用一个池在最大去执行异步调用。

https://docs.python.org/3/library/concurrent.futures.html

17.4.1 Executor Objects

class concurrent.futures.Executor #concurrent.futures.Executor类

An abstract class that provides methods to execute calls asynchronously. It should not be used directly, but through its concrete subclasses.

提供异步执行调用的方法的抽象类。它不应该直接使用,而是通过具体的子类来使用。

类方法:

submit(fn, *args, **kwargs) 提交(函数,*参数,**参数)
Schedules the callable, fn, to be executed as fn(*args **kwargs) and returns a Future object representing the execution of the callable.

计划要执行调用,fn,fn(*参数和* *参数)和返回表示可调用执行一个未来的目标。

from concurrent.futures importThreadPoolExecutor
with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

map(func, *iterables, timeout=None, chunksize=1)
Similar to map(func, *iterables) except:

the iterables are collected immediately rather than lazily;
func is executed asynchronously and several calls to func may be made concurrently.

类似于map(函数,*可迭代对象)除以下方面:迭代对象是立即执行而不是懒洋洋地;函数是异步执行的,对几个函数的调用可以同时进行。
The returned iterator raises a concurrent.futures.TimeoutError if __next__() is called and the result isn’t available after timeout seconds from the original call to Executor.map(). timeout can be an int or a float. If timeout is not specified or None, there is no limit to the wait time.

返回的迭代器提出concurrent.futures.timeouterror,如果调用__next__(),调用Executor.map()超时后结果不可用。超时可以是整数或浮点数。如果没有指定超时或没有超时,则等待时间没有限制。

If a func call raises an exception, then that exception will be raised when its value is retrieved from the iterator.

如果函数调用引发异常,则该异常在从迭代器中检索其值时将引发异常。

When using ProcessPoolExecutor, this method chops iterables into a number of chunks which it submits to the pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer. For very long iterables, using a large value for chunksize can significantly improve performance compared to the default size of 1. With ThreadPoolExecutor, chunksize has no effect.

使用processpoolexecutor,这种方法切分可迭代对象成若干块,它向池提交作为单独的任务。这些块的(近似)大小可以通过设置一个正整数,指定分片。很长的可迭代对象,采用大值分片能明显比1的默认大小提高性能。用线程池,分片大小没有影响。

shutdown(wait=True) 关闭(等待= TRUE)
Signal the executor that it should free any resources that it is using when the currently pending futures are done executing. Calls to Executor.submit() and Executor.map() made after shutdown will raise RuntimeError.

向执行器发出信号,当前正在执行future时,它应该释放它正在使用的任何资源。shutdown()后调用执行submit()和map()后,会报运行时出错。

17.4.2 ThreadPoolExecutor

ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.

线程池执行器是一个执行器子类,使用线程池的线程执行异步调用。

Deadlocks can occur when the callable associated with a Future waits on the results of another Future.

当与未来相关联的可调用等待另一个未来的结果时,可能发生死锁。

class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

子类执行器使用一个池在最大(max_workers线程)去执行异步调用。

17.4.3. ProcessPoolExecutor

The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

进程池执行器类是一个执行器子类,使用一个进程池来执行异步调用。进程池执行器采用多进程模块,这使得它绕过全局解释器锁,也意味着只有picklable对象可以执行并返回。

The __main__ module must be importable by worker subprocesses. This means that ProcessPoolExecutor will not work in the interactive interpreter.

__main__模块必须的在工作的子进程模块中。这意味着,进程池执行器不会在交互式解释器的工作。

Calling Executor or Future methods from a callable submitted to a ProcessPoolExecutor will result in deadlock.

从递交的可调用进程池执行器中调用执行器或者future方法会造成死锁。

class concurrent.futures.ProcessPoolExecutor(max_workers=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.

一个执行子类执行异步调用,使用池的最大(=max_workers)个进程。如果max_workers没有或不给,则默认为机器上的处理器的数量。如果max_workers低于或等于0,则将引发ValueError

17.4.4. Future Objects

The Future class encapsulates the asynchronous execution of a callable. Future instances are created by Executor.submit().

未来类封装了可调用的异步执行。Future实例是由Executor.submit()创建的。

class concurrent.futures.Future #concurrent.futures.Future 类
Encapsulates the asynchronous execution of a callable. Future instances are created by Executor.submit() and should not be created directly except for testing.

封装可调用的异步执行。Future实例是由Executor.submit()创建的。submit()不能直接创建除了测试外。

cancel()
Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.

cancelled()
Return True if the call was successfully cancelled.

running()
Return True if the call is currently being executed and cannot be cancelled.

done()
Return True if the call was successfully cancelled or finished running.

result(timeout=None)
Return the value returned by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds, then a concurrent.futures.TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

If the future is cancelled before completing then CancelledError will be raised.

If the call raised, this method will raise the same exception.

问题1:

1、submit递交后,可以用result来查看结果,如果用map递交后,该如何查看结果呢?map递交后为generator对象,通过list或者tuple可以查看结果。

from concurrent.futures importThreadPoolExecutor
with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 3, 2)
    print(future.result())

from concurrent.futures importProcessPoolExecutor
if __name__=='__main__':  #如果不用,则会报错
    with ProcessPoolExecutor(max_workers=1) as executor:
        future = executor.submit(pow, 3, 2)
        print(future.result())
#9#9#9   多出来一个9,因为 if __name__=='__main__':
from concurrent.futures importProcessPoolExecutor
if __name__=='__main__':  #如果不用,则会报错
    with ProcessPoolExecutor(max_workers=1) as executor:
        future = executor.map(pow, [1,2],[3,4])
        print(type(future),list(future))
#<class 'generator'> [1, 16]

执行流程:

ProcessPoolExecutor类会利用multiprocessing模块所提供的底层机制,来逐步完成下列操作:

1、把[1,2],[3,4]两个列表中的每一项输入数据传给map

2、用pickle模块对数据进行序列化,将其变为二进制形式。

3、通过本地套接字(local socket),将序列化之后的数据从主解释器所在的进程,发送到子解释器所在的进程。

4、接下来,在子进程中,用pickle对二进制数据进行反序列化操作,将其还原为python对象。

5、引入包含pow函数的那个python模块。

6、各条子进程平行地针对各自的输入数据,来运行pow函数。

7、对运行结果进行序列化操作,将其转变为字节。

8、将这些字节通过socket复制到主进程之中。

9、主进程对这些字节执行反序列操作,将其还原为python对象。

10、最后,把每条子进程所求出的计算结果合并到一份列表之中,并返回给调用者。

问题2:concurrent.futures是否可以提高执行速度?

以下代码执行结果可见,通过futures模块,提高速度近一倍。

from concurrent importfutures
importtime
defgcd(pair):
    a,b=pair
    low=min(a,b)
    for i in range(low,0,-1):
        if a%i==0 and b%i==0:
            returni
numbers=[(19622022,22737382),(2332312,932326),(19649022,22736382),(2764312,9329765)]
#start=time.time()#results=list(map(gcd,numbers))#print(results)#end=time.time()#print('took {:.3f} seconds'.format(end-start))#[2, 2, 6, 1]#took 3.197 seconds

if __name__=='__main__':
    start=time.time()
    pool=futures.ProcessPoolExecutor()
    results=list(pool.map(gcd,numbers))
    print(results)
    end=time.time()
    print('took {:.3f} seconds'.format(end-start))
#[2, 2, 6, 1]#took 1.683 seconds

免责声明:文章转载自《线程与进程 concurrent.futures模块》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Asp.Net MVC项目中如何调试ActiveX插件在Windows 10中截取截图的6种方式 简介下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Java多线程学习之任务的创建以及在线程中执行任务

传统的创建任务、驱动任务的方式 1.继承Thread类   通过继承Thead类,并重写run方法,在run方法里面编码具体的任务,调用对象的start方法驱动任务。    public class TestThread extends Thread{ private int count = 5;   //创建介绍String形参的构造器,一般...

sqlmap介绍与使用案例

作者:虫儿飞ZLEI链接:https://www.jianshu.com/p/3d3656be3c60来源:简书著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 sqlmap介绍与使用案例 1.sqlmap简介 最白话的介绍就是sqlmap是一个工具,一个用来做sql注入攻击的工具 2.windows安装python2 这个sqlmap...

Spark记录-Spark性能优化解决方案

Spark性能优化的10大问题及其解决方案 问题1:reduce task数目不合适解决方式:需根据实际情况调节默认配置,调整方式是修改参数spark.default.parallelism。通常,reduce数目设置为core数目的2到3倍。数量太大,造成很多小任务,增加启动任务的开销;数目太少,任务运行缓慢。 问题2:shuffle磁盘IO时间长...

submit提交表单后,不刷新当前页面

<form method="get" target="test" action="a.html"> <input type=""text /> <input type="submit" /> </form> <iframe name="test" style="display:none">...

web框架前言与学生数据库系统(附1.0源码)

  对于所有的Web应用,本质上其实就是一个socket服务端,用户的浏览器其实就是一个socket客户端。 import socket def f1(request): """ 处理用户请求,并返回相应的内容 :param request: 用户请求的所有信息 :return: """...

mybatis源码分析(5)-----拦截器的实现原理(动态代理+责任链)

写在前面   MyBatsi 的拦截器模式是基于代理的代理模式。并且myBatis 的插件开发也是以拦截器的形式集成到myBatis 当中。   MyBatis 的拦截器已经插件是在org.apache.ibatis.plugin包下面。   MyBatis拦截器可以拦截的类,Executor(执行器),ParameterHandler(参数处理器),R...