Python多线程----线程池以及线程实现异步任务

摘要:
Python多线程-线程池要求:假设我们现在有一个多线程项目。每次用户连接时,我们的服务器都会创建一个线程。在这种情况下,我们的Python也应该有线程池,这个问题非常常见。必须有现有的库供我们使用。

Python多线程----线程池

需求:假设我们现在有一个多线程项目,每有一个用户连接进来,我们的服务器就会创建一个线程。而我们的服务器最多能够承载100个线程,再多就会崩溃。为了防止恶意用户伪装真实用户构建大量的访问来让我们的服务器崩溃,现在需要对线程数量进行限制,一共只有100个线程,并且当一个用户访问结束以后线程会自动归还,等待下一个用户访问。如果100个线程全部被占用则101个用户进入阻塞时间,直到某一个用户退出,线程得到释放,101个用户才能被通行。

不难看出上面的需求,类似我们MySQL的连接池。既然如此,我们的Python也应该有一个线程池,并且这种问题非常的常见,肯定已经有现成的库以供我们使用。今天我们就来看一下Python标准库中from concurrent.futures下的ThreadPoolExecutor。

第一个例子

# 首先导包
from concurrent.futures import ThreadPoolExecutor

# 创建线程池
executor = ThreadPoolExecutor(10)

# 测试方法
def test_function(num1, num2):
    print(num1, num2)
    return num1 + num2

# 第一个参数为具体的方法,后面为方法的参数
future = executor.submit(test_function, 1, 2)
# future的result()方法可以获取到函数的执行结果
print(future.result())

执行结果:

1 2
3

ThreadPoolExecutor(pool_count): pool_count代表创建线程的数量,会返回一个该线程池的执行者对象,这个对象的submit()方法和map()方法,能够使用线程池中的线程来执行我们指定的方法,并且返回一个Future对象。Future对象的result()方法,可以获取我们方法执行的结果。如果方法一直没有返回或执行完毕,则result()方法会进入阻塞状态,直到我们的方法返回或执行完毕。

使用map()方法批量执行

from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(10)

def test_function(num1, num2):
    print(num1, num2)
    return num1 + num2

"""
    executor.map(function, 参数1_list, 参数2_list, 参数n_list)
    参数1_list: 代表方法第一个参数的列表
    参数2_list: 代表方法第二个参数的列表
    如:
        executor.map(test_function, [1, 2], [5, 5])
        代表,执行test_function方法,第一个线程的参数为1和5,第二个线程的参数为2和5。
        线程1:test_function(1, 5) 结果为1 + 5 = 6
    该方法返回的是一个可迭代的对象,里面直接包含了每个方法执行的结果,不需要调用result()方法。
    详情:https://docs.python.org/3/library/concurrent.futures.html
"""
result_iterators = executor.map(test_function, [1, 2], [5, 5])

for result in result_iterators:
    print(result)

执行结果:

1 5
2 5
6
7

尝试一下所有线程都被占用的情况

import time
from concurrent.futures import ThreadPoolExecutor

# 方便测试创建三个线程
executor = ThreadPoolExecutor(3)

def test_function(num1, num2):
    print(num1, num2)
    # 方法休眠十秒
    time.sleep(10)
    return num1 + num2

# 使用三个线程,占用线程池全部线程
# 由于我们的结果是十秒后返回,所以这里也会被阻塞,十秒后才会收到结果
result_iterators = executor.map(test_function, [1, 2, 3], [5, 6, 7])

for result in result_iterators:
    print(result)

# 到这里很显然前面三个线程都在使用中,10秒后才能得到执行
future = executor.submit(test_function, 4, 8)
print(future.result())

执行结果:

1 5
2 6
3 7
6
8
10
4 8
12
[Finished in 20.2s]

https://www.imooc.com/article/16217

python线程实现异步任务
 

了解异步编程

楼主在工作中遇到了以下问题,开发接口爬取数据代码完成之后要写入redis缓存,但是在写入缓存的过程花费2-3s,进行这样就大大影响了接口的性能,于是想到了使用异步存储。

传统的同步编程是一种请求响应模型,调用一个方法,等待其响应返回.
异步编程就是要重新考虑是否需要响应的问题,也就是缩小需要响应的地方。因为越快获得响应,就是越同步化,顺序化,事务化,性能差化。

线程实现异步

思路:通过线程调用的方式,来达到异步非阻塞的效果,也就是说主程序无需等待线程执行完毕,仍然可以继续向下执行。

1.threading模块和thread模块

Python通过两个标准库thread和threading提供对线程的支持。thread提供了低级别的、原始的线程以及一个简单的锁。

threading 模块提供的其他方法:

  • threading.currentThread(): 返回当前的线程变量。
  • threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  • threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:

  • run(): 用以表示线程活动的方法。
  • start():启动线程活动。
  • join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
  • isAlive(): 返回线程是否活动的。
  • getName(): 返回线程名。
  • setName(): 设置线程名。

同步阻塞:

 
 1 import  threading,time
 2 
 3 def thead(num):
 4     time.sleep(1)
 5     print("阻塞程序%s开始执行"%num)
 6     time.sleep(3)
 7     print("阻塞程序%s执行完毕"%num)
 8 
 9 def main():
10     print("主方法开始执行")
11 
12     for i in range(1,3):
13         thead(i)
14 
15     print("主方法执行完毕")
16     return
17 
18 if __name__ == '__main__':
19     print(time.ctime())
20     num = main()
21     print("返回结果为%s"%num)
22     print(time.ctime())
 
 
Wed Nov 21 09:22:56 2018
主方法开始执行
阻塞程序1开始执行
阻塞程序1执行完毕
阻塞程序2开始执行
阻塞程序2执行完毕
主方法执行完毕
返回结果为None
Wed Nov 21 09:23:04 2018
 

异步,无需等待线程执行

 
import  threading,time

def thead(num):
# time.sleep(1)
print("线程%s开始执行"%num)
time.sleep(3)
print("线程%s执行完毕"%num)

def main():
print("主方法开始执行")

#创建2个线程
poll = []#线程池
for i in range(1,3):
thead_one = threading.Thread(target=thead, args=(i,))
poll.append(thead_one) #线程池添加线程
for n in poll:
n.start() #准备就绪,等待cpu执行

print("主方法执行完毕")
return

if __name__ == '__main__':
print(time.ctime())
num = main()
print("返回结果为%s"%num)
print(time.ctime())



 
 

Wed Nov 21 09:48:00 2018
主方法开始执行
主方法执行完毕
返回结果为None
Wed Nov 21 09:48:00 2018
线程1开始执行
线程2开始执行
线程1执行完毕
线程2执行完毕

 

 2.concurrent.futures模块

concurrent.futures模块实现了对threading(线程)multiprocessing(进程)的更高级的抽象,对编写线程池/进程池提供了直接的支持。 

从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutorProcessPoolExecutor两个类,ThreadPoolExecutor和ProcessPoolExecutor继承了Executor,分别被用来创建线程池和进程池的代码。(暂时只介绍线程池的使用)

concurrent.futures模块的基础是Exectuor,Executor是一个抽象类,它不能被直接使用。但是它提供的两个子类ThreadPoolExecutor和ProcessPoolExecutor却是非常有用,顾名思义两者分别被用来创建线程池和进程池的代码。我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。

Future这个概念你可以把它理解为一个在未来完成的操作,这是异步编程的基础,传统编程模式下比如我们操作queue.get的时候,在等待返回结果之前会产生阻塞,cpu不能让出来做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。

  • Future Objects:Future类封装了可调用的异步执行.Future 实例通过 Executor.submit()方法创建。

  • submit(fn, *args, **kwargs):调度可调用的fn,作为fn(args kwargs)执行,并返回一个表示可调用的执行的Future对象。
  • ThreadPoolExecutor:ThreadPoolExecutor是一个Executor的子类,它使用线程池来异步执行调用。

  • concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix=''):Executor子类,使用max_workers规格的线程池来执行异步调用。

在Flask应用中使用异步redis:

 
from flask import Flask
import time
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor()
app = Flask(__name__)


@app.route('/')
def update_redis():
    executor.submit(do_update)
    return 'ok'


def do_update():
    time.sleep(3)
    print('start update cache')
    time.sleep(1)
    print("end")


if __name__ == '__main__':
    app.run(debug=True)
 

Python多线程----线程池以及线程实现异步任务第1张

“ok“在更新缓存前已经返回。

本文到这里就结束了,着重介绍了线程实现异步的方法。当然还有其他的方法,比如yied实现,还有asyncio模块,后续会继续更新异步编程的文章。

温馨提示

  • 本文代码是在python3.5版本测试运行。

python 异步任务

免责声明:文章转载自《Python多线程----线程池以及线程实现异步任务》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇vSpherePython中利用LSTM模型进行时间序列预测分析下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

java框架之mybatis

一、简介 1、基本概念 mybatis 是一个半自动轻量级的一个 orm 框架 将 java 与 sql 分离,解决了 jdbc 的硬编码问题; sql 由开发人员控制,更加方便 sql 的修改调优; 2、入门程序 2.1 原始方法:通过 sqlsession 来操作数据库 建一个全局配置文件(mybatis-config.xml),配置数据源等运行...

安卓多线程的实现

有以下几种方式: 1)Activity.runOnUiThread(Runnable) 2)View.post(Runnable) ;View.postDelay(Runnable , long) 3)Handler 4)AsyncTask Android是单线程模型,这意味着Android UI操作并不是线程安全的并且这些操作必须在UI线程中执行,所以你...

Java调用Python脚本并获取返回值

在Java程序中有时需要调用Python的程序,这时可以使用一般的PyFunction来调用python的函数并获得返回值,但是采用这种方法有可能出现一些莫名其妙的错误,比如ImportError。在这种情况下可以采用另一种方法:使用Java的Runtime,像在命令行直接调用python脚本那样调用python程序。此时可以通过文件作为脚本参数来传递Py...

[go]函数

init执行顺序 对同一个 go 文件的 init( ) 调用顺序是从上到下的 对同一个 package 中的不同文件,将文件名按字符串进行“从小到大”排序,之后顺序调用各文件中的init()函数 对于不同的 package,如果不相互依赖的话,按照 main 包中 import 的顺序调用其包中的 init() 函数 如果 package 存在依赖,调...

Protobuf 语法指南

英文: Proto Buffers Language Guide 本指南描述了怎样使用protocol buffer 语法来构造你的protocol buffer数据,包括.proto文件语法以及怎样生成.proto文件的数据访问类。(本文只针对proto2的语法) 本文是一个参考指南——如果要查看如何使用本文中描述的多个特性的循序渐进的例子,请在http...

c#之Redis实践list,hashtable

写在前面 最近公司搞了一个活动,用到了redis的队列,就研究了下redis的相关内容。也顺手做了个demo。 C#之使用Redis 可以通过Nuget安装Reidis的相关程序集。安装之后发现会引入以下几个dll 一些list,队列和hashtable的操作。 using System; using System.Collections.Generi...