Python 异步编程

摘要:
介绍几种Python异步执行的方式参考:官方文档python实现异步执行Python中协程异步IO通过threading.Thread实现先将需要异步执行的函数用线程的方式包装为一个装饰器,然后拿去装饰需要异步执行的函数即可。协程结束时会抛出StopIteration。下面看代码:deffor_test():foriinrange:yieldidefyield_yield_test():yieldfromrange输出结果:[0,1,2][0,1,2]异步io异步IO的asyncio库使用时间循环驱动的协程实现并发。asyncio.Task用于实现协作式多任务的库,且Task对象不能用户手动实例化,通过下面2个函数创建:asyncio.async()loop.create_task()或asyncio.ensure_future()最简单的异步IO示例run_until_complete():阻塞调用,直到协程运行结束才返回。

介绍几种Python异步执行的方式

参考:

通过 threading.Thread 实现

先将需要异步执行的函数用线程的方式包装为一个装饰器,然后拿去装饰需要异步执行的函数即可。

下面构造两个函数

from threading import Thread
from time import sleep
import numpy as np
import pandas as pd

def async_call(func):
    def wrapper(*args, **kwargs):
        thr = Thread(target=func, args=args, kwargs=kwargs)
        thr.start()
    return wrapper

@async_call
def A(x1, x2, i):
    data = pd.DataFrame(np.random.randn(x1, x2))
    sleep(2)
    data.to_csv('data_{}.csv'.format(i))
    print ('data_{} save done.'.format(i))

def B(i):
    print ('B func ', i)

if __name__ == "__main__":
    for i in range(10):
        A(1000, 1000, i)
        B(i)

A 函数用 async_call 进行装饰,根据输入的形状创建数组并保存。

B 函数不进行装饰,打印输入的值

起一个循环,连续顺序执行10次A和B。

可以看到程序首先吊起了10个线程去执行A函数,不必等待A函数执行完毕,先输出B函数的打印信息“B func i”,随后这10个线程的命令分别执行完毕,并打印出相关信息“data_i save done.”。在当前目录保存了10个文件 "data_1.csv", "data_2.csv"... 。

Python 异步编程第1张

通过 yield 实现协程

yield 可以让程序暂停运行,等待主程序发送数据,下次继续再yield处暂停。下面看一个例子通过yield实现协程。

使用yeild实现的协程时需要先用next激活,才能使用send发送数据。

next时会产生yield右边的数据,也就是name。
send时接收值的是yield左边的数据,也就是x。
协程结束时会抛出StopIteration。

def coroutine_example(name):
    print ('start croutine name:{}'.format(name))
    x = yield name
    print ('send value:{}'.format(x))
    
if __name__ == "__main__":
    coro = coroutine_example('hi')
    print (next(crou))
    print (coro.send(1))

输出结果:

start croutine name:hi
hi
send value:1

Exception has occurred: StopIteration
  File "C:UsersAdministratorDesktop	estasync_test.py", line 43, in <module>
    print (crou.send(1))

yield from 说明

yield from 和for 循环类似,yield from x 内部先调用iter(x),然后调用next()获取x中的value。此处x为任意可迭代对象。
下面看代码:

def for_test():
    for i in range(3):
        yield i

def yield_yield_test():
    yield from range(3)

输出结果:

[0, 1, 2]
[0, 1, 2]
异步io (asyncio)

异步IO的asyncio库使用时间循环驱动的协程实现并发。用户可自主控制程序,在认为耗时处添加 yield from。在 asyncio 中,协程使用@asyncio.coroutine 来装饰,使用 yield from 来驱动。在Python 3.5版本做了如下更改:

  • @asyncio.coroutine --> async def
  • yield from --> await

asyncio 中的几个概念:

  1. 事件循环

管理所有的事件,在整个程序运行过程中不断循环执行并追踪事件发生的顺序将它们放在队列中,空闲时调用相应的事件处理者来处理这些事件。

  1. Fucture

Future对象表示尚未完成的计算,还未完成的结果

  1. Task

是Future的子类,作用是在运行某个任务的同时可以并发的运行多个任务。

asyncio.Task用于实现协作式多任务的库,且Task对象不能用户手动实例化,通过下面2个函数创建:

  • asyncio.async()

  • loop.create_task() 或 asyncio.ensure_future()

最简单的异步IO示例

  • run_until_complete():

阻塞调用,直到协程运行结束才返回。 参数是future,传入协程对象时内部会自动变为future。

  • asyncio.sleep():

模拟IO操作,这样的休眠不会阻塞事件循环,前面加上await后会把控制权交给主事件循环,在休眠(IO操作)结束后恢复这个协程。

注意: 若在协程中需要有延时操作,应该使用 await asyncio.sleep(),而不是使用time.sleep(),因为使用time.sleep()后会释放GIL,阻塞整个主线程,从而阻塞整个事件循环。

import asyncio

async def coroutine_example():
    await asyncio.sleep(1)
    print ('Fosen')
    
if __name__ == "__main__":
    coro = coroutine_example()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(coro)
    loop.close()

输出:暂停一秒后,打印“Fosen”。

创建 Task

  • loop.create_task():

接收一个协程,返回一个asyncio.Task的实例,也是asyncio.Future的实例,毕竟Task是Future的子类。返回值可直接传入run_until_complete()

返回的Task对象可以看到协程的运行情况, 可以通过task.result获取task协程的返回值,当协程未完成时,会出InvalidStateError。

import asyncio

async def coroutine_example():
    await asyncio.sleep(1)
    print ('Fosen')
    
if __name__ == "__main__":
    coro = coroutine_example()

    loop = asyncio.get_event_loop()
    task = loop.create_task(coro)
    print ('运行情况:', task)
    try:
        print ('返回值:', task.result())
    except asyncio.InvalidStateError:
        print ('task 状态未完成,捕获 InvalidStateError')

    loop.run_until_complete(task)
    print ('再看下运行情况:', task)
    print ('返回值:', task.result())
    loop.close()

输出:

运行情况: <Task pending coro=<coroutine_example() running at c:UsersAdministratorDesktop	estasync_test.py:57>>
task 状态未完成,捕获 InvalidStateError
Fosen
再看下运行情况: <Task finished coro=<coroutine_example() done, defined at c:UsersAdministratorDesktop	estasync_test.py:57> result=None>
返回值: None

多任务控制并获取返回值

  • asyncio.wait()

asyncio.wait()是一个协程,不会阻塞,立即返回,返回的是协程对象。传入的参数是future或协程构成的可迭代对象。最后将返回值传给run_until_complete()加入事件循环。

下面看代码示例:

import asyncio

async def coroutine_example(name):
    print ('正在执行:', name)
    await asyncio.sleep(2)
    print ('执行完毕:', name)
    return '返回值:' + name


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    tasks = [loop.create_task(coroutine_example('Fosen_{}'.format(i))) for i in range(3)]
    wait_coro = asyncio.wait(tasks)

    loop.run_until_complete(wait_coro)
    for task in tasks:
        print (task.result())

    loop.close()

运行输出:

正在执行: Fosen_0
正在执行: Fosen_1
正在执行: Fosen_2
执行完毕: Fosen_0
执行完毕: Fosen_1
执行完毕: Fosen_2
返回值:Fosen_0
返回值:Fosen_1
返回值:Fosen_2

动态添加协程--同步方式

创建一个线程,使事件循环在该线程中永久运行,通过 new_loop.call_soon_threadsafe 来添加协程任务。跟直接以线程的方式封装一个异步装饰器的方法有点类似。 见代码:

import asyncio
from threading import Thread

def start_thread_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def thread_example(name):
    print ('正在执行:', name)
    return '返回结果:' + name
    

if __name__ == "__main__":
    new_loop = asyncio.new_event_loop()
    t = Thread(target=start_thread_loop, args=(new_loop,))
    t.start()

    handle = new_loop.call_soon_threadsafe(thread_example, '1')
    handle.cancel()

    new_loop.call_soon_threadsafe(thread_example, '2')
    print ('主线程不阻塞')

    new_loop.call_soon_threadsafe(thread_example, '3')
    print ('继续运行中...')

运行结果:

正在执行: 2
主线程不阻塞
继续运行中...
正在执行: 3

动态添加协程--异步方式

同样创建一个线程来永久运行事件循环。不同的是 thread_example为一个协程函数,通过 asyncio.run_coroutine_threadsafe 来添加协程任务。

t.setDaemon(True) 表示把子线程设为守护线程,防止主线程已经退出了子线程还没退出。

import asyncio
from threading import Thread

def start_thread_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def thread_example(name):
    print ('正在执行:', name)
    await asyncio.sleep(1)
    return '返回结果:' + name


if __name__ == "__main__":
    new_loop = asyncio.new_event_loop()
    t = Thread(target=start_thread_loop, args=(new_loop,))
    t.setDaemon(True)
    t.start()

    future = asyncio.run_coroutine_threadsafe(thread_example('1'), new_loop)
    print (future.result())

    asyncio.run_coroutine_threadsafe(thread_example('2'), new_loop)
    print ('主线程不阻塞')

    asyncio.run_coroutine_threadsafe(thread_example('3'), new_loop)
    print ('继续运行中...')

运行结果

正在执行: 1
返回结果:1
主线程不阻塞
正在执行: 2
继续运行中...
正在执行: 3

协程中生产-消费模型设计

结合上面的动态异步添加协程的思想,我们设计两个生产-消费模型,分别基于Python内置队列和Redis队列。

基于Python 内置双向队列的生产-消费模型

import asyncio
from threading import Thread
from collections import deque
import random
import time

def start_thread_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def consumer():
    while True:
        if dp:
            msg = dp.pop()
            if msg:
                asyncio.run_coroutine_threadsafe(thread_example('Fosen_{}'.format(msg)), new_loop)

async def thread_example(name):
    print ('正在执行:', name)
    await asyncio.sleep(2)
    return '返回结果:' + name


if __name__ == "__main__":
    dp = deque()

    new_loop = asyncio.new_event_loop()
    loop_thread = Thread(target=start_thread_loop, args=(new_loop,))
    loop_thread.setDaemon(True)
    loop_thread.start()

    consumer_thread = Thread(target=consumer)
    consumer_thread.setDaemon(True)
    consumer_thread.start()

    while True:
        i = random.randint(1, 10)
        dp.appendleft(str(i))
        time.sleep(2)

运行输出:

正在执行: Fosen_6
正在执行: Fosen_2
正在执行: Fosen_8
正在执行: Fosen_2
正在执行: Fosen_1
正在执行: Fosen_3
正在执行: Fosen_1

基于 Redis 队列的生产-消费模型

这种写法与基于python队列的相似,只是操作队列、获取数据的方式不同而已。

import asyncio
from threading import Thread
import redis

# 生产者代码
def producer():
    for i in range(4):
        redis_conn.lpush('Fosen', str(i))

# 消费者代码
def get_redis():
    conn_pool = redis.ConnectionPool(host='127.0.0.1', port=6379)
    return redis.Redis(connection_pool=conn_pool)

def start_thread_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def thread_example(name):
    print ('正在执行:', name)
    await asyncio.sleep(2)
    return '返回结果:' + name


if __name__ == "__main__":
    redis_conn = get_redis()
    producer()

    new_loop = asyncio.new_event_loop()
    loop_thread = Thread(target=start_thread_loop, args=(new_loop,))
    loop_thread.setDaemon(True)
    loop_thread.start()

    while True:
        msg = redis_conn.rpop('Fosen')
        if msg:
            asyncio.run_coroutine_threadsafe(thread_example('Fosen_{}'.format(msg)), new_loop)

运行结果:

正在执行: Fosen_b'0'
正在执行: Fosen_b'1'
正在执行: Fosen_b'2'
正在执行: Fosen_b'3'

免责声明:文章转载自《Python 异步编程》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Linux图形界面开发—monodevelop初探用Visual C++创建WPF项目的三种主要方法下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

【测试平台学习2】 Django 的初使用

前言 最近打算使用Django+Vue 打造一个简单的测试平台,本文对django 的使用略做记录 Django的安装与背景 Python 的后端主要有Django 和flask , 我对此只有粗浅的理解和认识, 使用flask 编写过一个接口Mock系统, flask是轻量化的,能快速实现接口的开发工作,但是它没有自带数据库。 Django 相对比较全面...

Python和C++交互

关键字:Python 2.7,VS 2010,swig OS:Win8.1 with update。 1.下载swig:http://www.swig.org/download.html 2.将swig的路径添加到环境变量Path,例如set path=C:swigwin-3.0.2。 3.用VS创建一个win32 console application名...

Python-元组(tuple),文件

基础: 1. 元组由简单对象组构成。 2. 元组与列表类似,不过不能在原处修改(它们是不可变的),并且通常写成圆括号(),而不是方框号[]中的一系列项。 ========================================================================== >>> (1,2) + (3,4)...

静听网+python爬虫+多线程+多进程+构建IP代理池

目标网站:静听网 网站url:http://www.audio699.com/ 目标文件:所有在线听的音频文件 附:我有个喜好就是听有声书,然而很多软件都是付费才能听,免费在线网站虽然能听,但是禁ip很严重,就拿静听网来说,你听一个在线音频,不能一个没听完就点击下一集,甚至不能快进太快,否则直接禁你5分钟才能再听,真的是太太讨厌了... 于是我就想用爬虫给...

IO文件

一:常见的函数输入输出函数 1.打印到屏幕   print:   最简单的输出方法是用print语句,你可以给它传递零个或多个用逗号隔开的表达式。 2.读取键盘输入   Python提供了两个内置函数从标准输入读入一行文本,默认的标准输入是键盘。   raw_input input 3.raw_input函数   raw_input([prompt])...

Python——继承

Python的继承是多继承机制,一个子类可以同时有多个直接父类;继承可以得到父类定义的方法,子类就可以复用父类的方法。 一、继承的语法 子类:实现继承的类。 父类(基类、超类):被继承的类。 子类继承父类是在定义子类时,将多个父类放在子类之后的圆括号内,如果定义类时,未指定这个类的直接父类,则默认继承object类,所以object类是所有类的父类(直接父...