摘要:1.测量函数运行时importTimedefprofile(func):defwrapper(*args,**kwargs):importTimestart=时间。time()函数(*args,**kwargs)结束=时间。time()打印“成本:{}”。格式(结束-开始)returnwrapper@profiledeffib(n):如果n˂=2:返回
1 测量函数运行时间
importtime
defprofile(func):
def wrapper(*args, **kwargs):
importtime
start =time.time()
func(*args, **kwargs)
end =time.time()
print 'COST: {}'.format(end -start)
returnwrapper
@profile
deffib(n):
if n<= 2:
return 1
return fib(n-1) + fib(n-2)
fib(35)
2 启动多个线程,并等待完成
2.1 使用threading.enumerate()
importthreading
for i in range(2):
t = threading.Thread(target=fib, args=(35,))
t.start()
main_thread =threading.currentThread()
for t inthreading.enumerate():
if t ismain_thread:
continuet.join()
2.2 先保存启动的线程
threads =[]
for i in range(5):
t = Thread(target=foo, args=(i,))
threads.append(t)
t.start()
for t inthreads:
t.join()
3 使用信号量,限制同时能有几个线程访问临界区
from threading importSemaphore
importtime
sema = Semaphore(3)
deffoo(tid):
with sema:
print('{} acquire sema'.format(tid))
wt = random() * 2time.sleep(wt)
print('{} release sema'.format(tid))
4 锁,相当于信号量为1的情况
from threading importThread Lock
value =0
lock =Lock()
defgetlock():
globallock
with lock:
new = value + 1time.sleep(0.001)
value = new
5 可重入锁RLock
acquire() 可以不被阻塞的被同一个线程调用多次,release()需要和acquire()调用次数匹配才能释放锁
6 条件 Condition
一个线程发出信号,另一个线程等待信号
常用于生产者-消费者模型
importtime
importthreading
defconsumer(cond):
t =threading.currentThread()
with cond:
cond.wait()
print("{}: Resource is available to sonsumer".format(t.name))
defproducer(cond):
t =threading.currentThread()
with cond:
print("{}: Making resource available".format(t.name))
cond.notifyAll()
condition =threading.Condition()
c1 = threading.Thread(name='c1', target=consumer, args=(condition,))
c2 = threading.Thread(name='c2', target=consumer, args=(condition,))
p = threading.Thread(name='p', target=producer, args=(condition,))
c1.start()
c2.start()
p.start()
7 事件 Event
感觉和Condition 差不多
importtime
importthreading
from random importrandint
TIMEOUT = 2
defconsumer(event, l):
t =threading.currentThread()
while 1:
event_is_set =event.wait(TIMEOUT)
ifevent_is_set:
try:
integer =l.pop()
print '{} popped from list by {}'.format(integer, t.name)
event.clear() #重置事件状态
except IndexError: #为了让刚启动时容错
pass
defproducer(event, l):
t =threading.currentThread()
while 1:
integer = randint(10, 100)
l.append(integer)
print '{} appended to list by {}'.format(integer, t.name)
event.set() #设置事件
time.sleep(1)
event =threading.Event()
l =[]
threads =[]
for name in ('consumer1', 'consumer2'):
t = threading.Thread(name=name, target=consumer, args=(event, l))
t.start()
threads.append(t)
p = threading.Thread(name='producer1', target=producer, args=(event, l))
p.start()
threads.append(p)
for t inthreads:
t.join()
8 线程队列
线程队列有task_done() 和 join()
标准库里的例子
往队列内放结束标志,注意do_work阻塞可能无法结束,需要用超时
importqueue
defworker():
whileTrue:
item =q.get()
if item isNone:
breakdo_work(item)
q.task_done()
q =queue.Queue()
threads =[]
for i inrange(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for item insource():
q.put(item)
q.join()
for i inrange(num_worker_threads):
q.put(None)
for t inthreads:
t.join()
9 优先级队列 PriorityQueue
importthreading
from random importrandint
from queue importPriorityQueue
q =PriorityQueue()
defdouble(n):
return n * 2
defproducer():
count =0
while 1:
if count > 5:
breakpri = randint(0, 100)
print('put :{}'.format(pri))
q.put((pri, double, pri)) #(priority, func, args)
count += 1
defconsumer():
while 1:
ifq.empty():
breakpri, task, arg =q.get()
print('[PRI:{}] {} * 2 = {}'.format(pri, arg, task(arg)))
q.task_done()
time.sleep(0.1)
t = threading.Thread(target=producer)
t.start()
time.sleep(1)
t = threading.Thread(target=consumer)
t.start()
10 线程池
当线程执行相同的任务时用线程池
10.1 multiprocessing.pool 中的线程池
from multiprocessing.pool importThreadPool
pool = ThreadPool(5)
pool.map(lambda x: x**2, range(5))
10.2 multiprocessing.dummy
from multiprocessing.dummy import Pool
10.3 concurrent.futures.ThreadPoolExecutor
fromconcurrent.futures improt ThreadPoolExecutor
from concurrent.futures importas_completed
importurllib.request
URLS = ['http://www.baidu.com', 'http://www.hao123.com']
defload_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
returnconn.read()
with ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url inURLS}
for future inas_completed(future_to_url):
url =future_to_url[future]
try:
data =future.result()
execpt Exception as exc:
print("%r generated an exception: %s" %(url, exc))
else:
print("%r page is %d bytes" %(url, len(data)))
11 启动多进程,等待多个进程结束
importmultiprocessing
jobs =[]
for i in range(2):
p = multiprocessing.Process(target=fib, args=(12,))
p.start()
jobs.append(p)
for p injobs:
p.join()
12 进程池
12.1 multiprocessing.Pool
from multiprocessing importPool
pool = Pool(2)
pool.map(fib, [36] * 2)
12.2 concurrent.futures.ProcessPoolExecutor
from concurrent.futures importProcessPoolExecutor
importmath
PRIMES = [ 112272535095293, 112582705942171]
defis_prime(n):
if n < 2:
returnFalse
if n == 2:
returnTrue
if n % 2 ==0:
returnFalse
sqrt_n =int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i ==0:
returnFalse
returnTrue
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
for number, prime inzip(PRIMES, executor.map(is_prime, PRIMES)):
print("%d is prime: %s" %(number, prime))
13 asyncio
13.1 最基本的示例,单个任务
importasyncio
async defhello():
print("Hello world!")
await asyncio.sleep(1)
print("Hello again")
loop =asyncio.get_event_loop()
loop.run_until_complete(hello())
loop.close()
13.2 最基本的示例,多个任务
importasyncio
async defhello():
print("Hello world!")
await asyncio.sleep(1)
print("Hello again")
loop =asyncio.get_event_loop()
tasks =[hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
13.3 结合httpx 执行多个任务并接收返回结果
httpx 接口和 requests基本一致
importasyncio
importhttpx
async defget_url():
r = await httpx.get("http://www.baidu.com")
returnr.status_code
loop =asyncio.get_event_loop()
tasks = [get_url() for i in range(10)]
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
for num, result in zip(range(10), results):
print(num, result)