Python 并发总结,多线程,多进程,异步IO

摘要:
1.测量函数运行时importTimedefprofile(func):defwrapper(*args,**kwargs):importTimestart=时间。time()函数(*args,**kwargs)结束=时间。time()打印“成本:{}”。格式(结束-开始)returnwrapper@profiledeffib(n):如果n˂=2:返回
1 测量函数运行时间
importtime
defprofile(func):
    def wrapper(*args, **kwargs):
        importtime
        start =time.time()
        func(*args, **kwargs)
        end   =time.time()
        print 'COST: {}'.format(end -start)
    returnwrapper
 
@profile
deffib(n):
    if n<= 2:
        return 1
    return fib(n-1) + fib(n-2)
 
fib(35)
 

2 启动多个线程,并等待完成
2.1 使用threading.enumerate()
importthreading
for i in range(2):
    t = threading.Thread(target=fib, args=(35,))
    t.start()
main_thread =threading.currentThread()
 
for t inthreading.enumerate():
    if t ismain_thread:
        continuet.join()
2.2 先保存启动的线程
threads =[]
for i in range(5):
    t = Thread(target=foo, args=(i,))
    threads.append(t)
    t.start()
for t inthreads:
    t.join()

3 使用信号量,限制同时能有几个线程访问临界区
from threading importSemaphore
importtime
 
sema = Semaphore(3)
 
deffoo(tid):
    with sema:
        print('{} acquire sema'.format(tid))
        wt = random() * 2time.sleep(wt)
        print('{} release sema'.format(tid))
 

4 锁,相当于信号量为1的情况
from threading importThread Lock
value =0
lock =Lock()
defgetlock():
    globallock
    with lock:
        new = value + 1time.sleep(0.001)
        value = new

5 可重入锁RLock
acquire() 可以不被阻塞的被同一个线程调用多次,release()需要和acquire()调用次数匹配才能释放锁

6 条件 Condition
一个线程发出信号,另一个线程等待信号
常用于生产者-消费者模型
importtime
importthreading
 
defconsumer(cond):
    t =threading.currentThread()
    with cond:
        cond.wait()
        print("{}: Resource is available to sonsumer".format(t.name))
 
defproducer(cond):
    t =threading.currentThread()
    with cond:
        print("{}: Making resource available".format(t.name))
        cond.notifyAll()
 
condition =threading.Condition()
c1 = threading.Thread(name='c1', target=consumer, args=(condition,))
c2 = threading.Thread(name='c2', target=consumer, args=(condition,))
p = threading.Thread(name='p', target=producer, args=(condition,))
 
c1.start()
c2.start()
p.start()

7 事件 Event
感觉和Condition 差不多
importtime
importthreading
from random importrandint
 
TIMEOUT = 2
 
defconsumer(event, l):
    t =threading.currentThread()
    while 1:
        event_is_set =event.wait(TIMEOUT)
        ifevent_is_set:
            try:
                integer =l.pop()
                print '{} popped from list by {}'.format(integer, t.name)
                event.clear()  #重置事件状态
            except IndexError:  #为了让刚启动时容错
                pass
 
defproducer(event, l):
    t =threading.currentThread()
    while 1:
        integer = randint(10, 100)
        l.append(integer)
        print '{} appended to list by {}'.format(integer, t.name)
        event.set()  #设置事件
        time.sleep(1)
 
event =threading.Event()
l =[]
 
threads =[]
 
for name in ('consumer1', 'consumer2'):
    t = threading.Thread(name=name, target=consumer, args=(event, l))
    t.start()
    threads.append(t)
 
p = threading.Thread(name='producer1', target=producer, args=(event, l))
p.start()
threads.append(p)
 
 
for t inthreads:
    t.join()

8 线程队列
线程队列有task_done() 和 join()
标准库里的例子
往队列内放结束标志,注意do_work阻塞可能无法结束,需要用超时
importqueue
defworker():
    whileTrue:
        item =q.get()
        if item isNone:
            breakdo_work(item)
        q.task_done()
q =queue.Queue()
threads =[]
for i inrange(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)
for item insource():
    q.put(item)
q.join()
for i inrange(num_worker_threads):
    q.put(None)
for t inthreads:
    t.join()

9 优先级队列 PriorityQueue
importthreading
from random importrandint
from queue importPriorityQueue
 
q =PriorityQueue()
 
defdouble(n):
    return n * 2
 
defproducer():
    count =0
    while 1:
        if count > 5:
            breakpri = randint(0, 100)
        print('put :{}'.format(pri))
        q.put((pri, double, pri))  #(priority, func, args)
        count += 1
 
defconsumer():
    while 1:
        ifq.empty():
            breakpri, task, arg =q.get()
        print('[PRI:{}] {} * 2 = {}'.format(pri, arg, task(arg)))
        q.task_done()
        time.sleep(0.1)
 
t = threading.Thread(target=producer)
t.start()
time.sleep(1)
t = threading.Thread(target=consumer)
t.start()

10 线程池
当线程执行相同的任务时用线程池
10.1 multiprocessing.pool 中的线程池
from multiprocessing.pool importThreadPool
pool = ThreadPool(5)
pool.map(lambda x: x**2, range(5))
10.2 multiprocessing.dummy
from multiprocessing.dummy import Pool
10.3 concurrent.futures.ThreadPoolExecutor
fromconcurrent.futures improt ThreadPoolExecutor
from concurrent.futures importas_completed
importurllib.request
 
URLS = ['http://www.baidu.com', 'http://www.hao123.com']
 
defload_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        returnconn.read()
 
with ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {executor.submit(load_url, url, 60): url for url inURLS}
    for future inas_completed(future_to_url):
        url =future_to_url[future]
        try:
            data =future.result()
        execpt Exception as exc:
            print("%r generated an exception: %s" %(url, exc))
        else:
            print("%r page is %d bytes" %(url, len(data)))
 

11 启动多进程,等待多个进程结束
importmultiprocessing
jobs =[]
for i in range(2):
    p = multiprocessing.Process(target=fib, args=(12,))
    p.start()
    jobs.append(p)
for p injobs:
    p.join()
 

12 进程池
12.1 multiprocessing.Pool
from multiprocessing importPool
pool = Pool(2)
pool.map(fib, [36] * 2)
12.2 concurrent.futures.ProcessPoolExecutor
from concurrent.futures importProcessPoolExecutor
importmath
 
PRIMES = [ 112272535095293, 112582705942171]
 
defis_prime(n):
    if n < 2:
        returnFalse
    if n == 2:
        returnTrue
    if n % 2 ==0:
        returnFalse
    sqrt_n =int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i ==0:
            returnFalse
    returnTrue
 
if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        for number, prime inzip(PRIMES, executor.map(is_prime, PRIMES)):
            print("%d is prime: %s" %(number, prime))
 

13 asyncio
13.1 最基本的示例,单个任务
importasyncio
 
async defhello():
    print("Hello world!")
    await asyncio.sleep(1)
    print("Hello again")
 
loop =asyncio.get_event_loop()
loop.run_until_complete(hello())
loop.close()
13.2 最基本的示例,多个任务
importasyncio
 
async defhello():
    print("Hello world!")
    await asyncio.sleep(1)
    print("Hello again")
 
loop =asyncio.get_event_loop()
tasks =[hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
13.3 结合httpx 执行多个任务并接收返回结果
httpx 接口和 requests基本一致
importasyncio
importhttpx
 
 
async defget_url():
    r = await httpx.get("http://www.baidu.com")
    returnr.status_code
 
 
loop =asyncio.get_event_loop()
tasks = [get_url() for i in range(10)]
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
 
 
for num, result in zip(range(10), results):
    print(num, result)
 

免责声明:文章转载自《Python 并发总结,多线程,多进程,异步IO》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇VC++中使用用户自定义消息及自定制窗口技巧SQL Server2000中死锁经验总结 &amp;lt;转&amp;gt;下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

python-3.6.2安装

1、下载python-3.6.2-amd64.exe安装包 官网下载地址:https://www.python.org/ 2、Python安装,双击傻瓜式安装(用英文路径,不要有空格), 特别要注意勾上Add Python 3.6 to PATH(勾选后,不需要再设置环境变量,如果在安装时漏掉了勾选Add Python 3.7 to PATH,那就要手动把...

【python标准库学习】thread,threading(一)多线程的介绍和使用

在单个程序中我们经常用多线程来处理不同的工作,尤其是有的工作需要等,那么我们会新建一个线程去等然后执行某些操作,当做完事后线程退出被回收。当一个程序运行时,就会有一个进程被系统所创建,同时也会有一个线程运行,这个线程就是主线程main,在主线程中所创建的新的线程都是子线程,子线程通常都是做一些辅助的事。python中提供了thread和threading两...

利用python将ip转换为10进制

def int2ip(num): data = [] for i in range(4): num ,extra = divmod(num, 256) data.insert(0, str(extra)) return ".".join(data) def ip2int(astr): ipnum = astr.split(".") num = 0 for...

用Python实现gmail邮箱服务,实现两个邮箱之间的绑定(中)

  这篇博客,主要讲解用Python实现邮箱服务的几个需要学习的模块:E-mail Compotion and Decoding(邮件生成和解析)、SMTP、POP、IMAP   如上篇博客所讲,我学习过程参考《Foundations of Python3 Network Programming. 2nd Edition》,代码部分借鉴了其中的例子,但绝对...

python的调试

在python脚本中可以用pdb进行调试,具体方法如下: 1.使用run语句调试,格式为: run(statement[,globals[,locals]]) 参数含义如下 statement:要调试的语句块,以字符串的形式globals:可选参数,设置statement运行的全局环境变量locals:可选参数,设置statement运行的局部环境变量 以...

python科学计算库-pandas

------------恢复内容开始------------ 1、基本概念 在数据分析工作中,Pandas 的使用频率是很高的,一方面是因为 Pandas 提供的基础数据结构 DataFrame 与 json 的契合度很高,转换起来就很方便。另一方面,如果我们日常的数据清理工作不是很复杂的话,你通常用几句 Pandas 代码就可以对数据进行规整。 Pand...