python之线程与线程池

摘要:
#进程是资源分配的最小单位,线程是CPU调度的最小单位。每个进程中至少有一个线程。运行是指启动后和结束前的线程,不包括启动前和终止后的线程。
# 进程是资源分配的最小单位,线程是CPU调度的最小单位.每一个进程中至少有一个线程。
# 传统的不确切使用线程的程序称为只含有一个线程或单线程程序,而可以使用线程的程序被称为多线程程序,在程序中使用一个线程的方法
# 被称为多线程
# 线程的模块:
# thread >> 实现线程的低级接口
# threading>>> 可以提供高级方法

# 同一进程下的各个线程是可以共享该进程的所有的资源的,各个线程之间是可以相互影响的

1.线程创建的两种方式,与进程创建的两种方式基本
python之线程与线程池第1张python之线程与线程池第2张
from threading import Thread
from multiprocessing import Process
import time
def fucn1(n):
    time.sleep(1)
    print('XXXXXXXXXXXXX',n)
if __name__ == '__main__':
    # p = Process(target=fucn1,args=(1,))
    t = Thread(target=fucn1,args=(1,))
    print(t1.isAlive())#返回线程是否活动的
    print(t1.getName())# 返回线程名
    t1.setName()#设置线程名
    t.start()#开启线程的速度非常快
    t.join() #等待子线程运行结束之后才进行下面的代码
    print('主线程结束')
方式1
python之线程与线程池第3张python之线程与线程池第4张
class gg(Thread):
    def __init__(self,n):
        super().__init__()
        self.n = n

    def run(self):
        print('xxx')
        print(self.n)

if __name__ == '__main__':
    t1 = gg(66)
    t1.start()


与进程创建运行相似
方式2
# threading模块提供的一些方法:
# threading.currentThread(): 返回当前的线程变量。
# threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
# threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果

1.1
python之线程与线程池第5张python之线程与线程池第6张
from  threading import Thread
from multiprocessing import Process
import time
def jisuan():
    for i in range(100000):
        i+= 1
    # print(i)
def pro():
    p = Process(target=jisuan)
    p1 = Process(target=jisuan)
    p.start()
    p1.start()
    p1.join()
    p.join()

def threading():
    t1 = Thread(target=jisuan)
    t2 = Thread(target=jisuan)
    t1.start()
    t2.start()

if __name__ == '__main__':
    t1 = time.time()
    pro()
    t2 = time.time()

    t3 = time.time()
    threading()
    t4 = time.time()
    print(t2-t1)#0.24815773963928223
    print(t4-t3)#0.01202702522277832
线程与进程之间的效率对比

1.2

python之线程与线程池第7张python之线程与线程池第8张
from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())

if __name__ == '__main__':
    #在主进程下开启线程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread())#主线程对象
    print(threading.current_thread().getName()) #主线程名称
    print(threading.current_thread().ident) #主线程ID
    print(threading.get_ident()) #主线程ID
    print(threading.enumerate()) #连同主线程在内有两个运行的线程
    print(threading.active_count())
    print('主线程/主进程')

#     '''
#     打印结果:
#     <_MainThread(MainThread, started 14104)>
#     MainThread
#     14104
#     [<_MainThread(MainThread, started 14104)>, <Thread(Thread-1, started 17976)>]
#     主线程/主进程
#     Thread-1
#     '''
一些不常用的方法

1.3

python之线程与线程池第9张python之线程与线程池第10张
# 一个主线程要等待所有的非守护线程结束才结束
# (主线程的代码执行完之后主线程并没有结束,而要等待所有的非守护进程执行完并返回结果后才结束)

#主进程默认是在执行完代码之后,相当于结束了,并不关心所有的子进程的执行结果,只是关心所有的子进程是否结束的的信号,
# 接收到所有子进程结束的信号之后,主进程(程序)才结束
import time
from threading import Thread
def func():
    time.sleep(3)
    print('任务1')

def func1():
    time.sleep(2)
    print('任务2')

if __name__ == '__main__':
    t1 = Thread(target=func)
    t2 = Thread(target=func1)
    t1.daemon = True
    t1.start()
    t2.start()
    print('主线程结束')
#结果
'''
主线程结束
任务2

'''
守护进程

1.4
python之线程与线程池第11张python之线程与线程池第12张
from threading import Thread
from multiprocessing import Process
import time
a = 100
def fucn1():
    global a
    a -= 1 #等同于
    temp = a
    time.sleep(0.001)#验证关键点
    temp = temp -1
    a  = temp
    time.sleep(5)

if __name__ == '__main__':


    gg = []
    for i in range(100):
        t = Thread(target=fucn1)
        t.start()#开启线程的速度非常快
        gg.append(t)
        print(t.is_alive())
    [tt.join() for tt in gg]
    print(a)
    print('主线程结束')


# 线程共享进程的数据,由于数据是共享的也会有数据的不安全的情况(数据混乱),
# 但是由于线程的创建的速度非常快,如果加上系统的线程不多的话,
# 效果不明显
# 解决共享数据不安全: 加锁 ,对取值和修改值的的操作开始加锁(与多进程加锁一样)
验证线程之间的数据是共享的,但是也存在数据的安全的问题
python之线程与线程池第13张python之线程与线程池第14张
信号量,事件等与进程的操作方法一样
# Semaphore管理一个内置的计数器,
#   每当调用acquire()时内置计数器-1;
#   调用release() 时内置计数器+1;
#   计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
#
#   实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):
# # 基本代码如下
# def func(sm):
#     sm.acquire()
#     # 赋值或修改的代码
#     sm.release()
# if __name__ == '__main__':
#     sm=Semaphore(5)
#     t = Thread(target=func)

# 事件与进程的一样,
# event.isSet() 查看等待的状态不一样
# event.wait():如果 event.isSet()==False将阻塞线程;
# event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
# event.clear():恢复event的状态值为False。
与进程功能基本一样的相关说明

 线程队列

python之线程与线程池第15张python之线程与线程池第16张
# 线程队列
# 使用import queue,用法与进程Queue一样,直接引入,不用通过threading 模块引入

# 1>class queue.Queue(maxsize=0) #先进先出
import queue
# q = queue.Queue(3)#创建一个容量为3的队列
# q.put(1)
# q.put(2)
# q.put(3)#在队列塞满三个元素后,如果继续塞元素,就会进入一个阻塞的状态
# # 但是如果使用q.put_nowait()塞元素的话,到塞满之后再塞的话,就会直接抛出队列已满的异常,
# # 不会进入阻塞的状态,与q.get_nowait()相似
# print(q.get())#1
# print(q.get())#2
# print(q.get())#3
# #按照添加的顺序进行输出
# print(q.get())#>>>>>>>>>取到第四个的时候,队列已经是空的了,如果使用这个的话,就会进入
# # 阻塞的状态
# print(q.get_nowait())#>>>>>>但是如果取到第四个使用这个的话,不会进入阻塞的状态,直接
# #抛出异常
#
#
# # 2>class queue.LifoQueue(maxsize=0) #先进后出
# q = queue.LifoQueue(3)
# q.put(1)
# q.put(2)
# q.put(None)
# #
# # # 取值的时候输出为
# print(q.get())#None
# print(q.get())#2
# print(q.get())#1
#
# # 3>class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列
# q = queue.PriorityQueue(4)
# # 在设置的时候,元组的方式进行添加  如:(优先级,元素),
# # 优先级通过使用数字来表示,数字越小优先级越高
# #如果优先级一样,就会按照元素的ASCIll顺序进行输出,相同优先级的两个元素能够进行比较(同优先级的两个元素必须是同种类型的)
# #字典类型的东西不能进行比较
# q.put((-10,1))
# q.put((-10,3))
# q.put((1,20))
# q.put((2,'我'))
#
# #按照优先级进行输出
# print(q.get())#(-10, 1)
# print(q.get())#(-10, 3)
# print(q.get())#(1, 20)
# print(q.get())#(2, '我')
#
#
# # 这三队列是安全的,不存在多个线程抢占同一资源或数据的情况
线程三种队列的使用
线程池
  submit的使用
python之线程与线程池第17张python之线程与线程池第18张
源代码欣赏
import threading
import os
class ThreadPoolExecutor(_base.Executor):
    def __init__(self, max_workers=None, thread_name_prefix=''): #初始化方法,设置线程池的最大线程数量
        if max_workers is None:#线程池的默认设置
            max_workers = (os.cpu_count() or 1) * 5默认设置的线程数是CPU核数的5倍
        if max_workers <= 0:
            raise ValueError("max_workers must be greater than 0")

        self._max_workers = max_workers
        self._work_queue = queue.Queue()
        self._threads = set()
        self._shutdown = False
        self._shutdown_lock = threading.Lock()#创建线程锁
        self._thread_name_prefix = (thread_name_prefix or
                                    ("ThreadPoolExecutor-%d" % self._counter()))

    def submit(self, fn, *args, **kwargs):#创建一个线程,并异步提交任务
        with self._shutdown_lock:
            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')

            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            return f
    submit.__doc__ = _base.Executor.submit.__doc__

    def _adjust_thread_count(self):#调整线程池的数量
        def weakref_cb(_, q=self._work_queue):
            q.put(None)
        num_threads = len(self._threads)
        if num_threads < self._max_workers: #创建线程的过程
            thread_name = '%s_%d' % (self._thread_name_prefix or self,
                                     num_threads)
            t = threading.Thread(name=thread_name, target=_worker,
                                 args=(weakref.ref(self, weakref_cb),
                                       self._work_queue))
            t.daemon = True
            t.start()
            self._threads.add(t)
            _threads_queues[t] = self._work_queue

    def shutdown(self, wait=True):
        with self._shutdown_lock:
            self._shutdown = True
            self._work_queue.put(None)
        if wait:
            for t in self._threads:
                t.join()
class ThreadPoolExecutor的源码欣赏
1.1 submit的基本使用
python之线程与线程池第19张python之线程与线程池第20张
# 常用基本方法
# class ThreadPoolExecutor():
#     def submit(self, fn, *args, **kwargs):#创建一个线程,并异步提交任务
#         pass
#     def shutdown(self,wait=True):#相当于进程池中的p.close() 和p.join()
#         pass
#     #wait = True ,等待池内所有任务执行完毕回收完资源后才继续
#     #wait = False,立即返回,并不会等待池内的任务执行完毕
#     # 但不管wait参数为何值,整个程序都会等到所有任务执行完毕
#     # submit和map必须在shutdown之前
#通过

# import time
# import threading
# from concurrent.futures import ThreadPoolExecutor
# def func(i):
#     time.sleep(2)
#     print('%s打印的:'%(threading.get_ident()),)
#     return i*i
#
# tpool = ThreadPoolExecutor(max_workers= 5)
#
# t_lst = []
# for  i in range(5):
#     t = tpool.submit(func,i)#异步提交任务,与apply_async 相似,返回的也是一个结果对象
#     t_lst.append(t)
# tpool.shutdown()
# for a in t_lst:
#     print('>>',a.result())#获取
线程池submit的基本使用

1.2 map的基本使用

1.2.1 源码欣赏

python之线程与线程池第21张python之线程与线程池第22张
def map(self, fn, *iterables, timeout=None, chunksize=1):
    if timeout is not None:
        end_time = timeout + time.time()

    fs = [self.submit(fn, *args) for args in zip(*iterables)]

    def result_iterator():#生成器
        try:
            fs.reverse()
            while fs:
                # Careful not to keep a reference to the popped future
                if timeout is None:
                    yield fs.pop().result()
                else:
                    yield fs.pop().result(end_time - time.time())
        finally:
            for future in fs:
                future.cancel()
    return result_iterator()
map方法的源码欣赏

1.2.2 map的基本使用(验证使用过程)

python之线程与线程池第23张python之线程与线程池第24张
# 简单使用
from concurrent.futures import ThreadPoolExecutor
import threading
import os,time,random
def task(n):
    print('%s is running'%(threading.get_ident()))
    # time.sleep(random.randint(1,2))
    time.sleep(10)#测试for循环取值的时候,如果执行的子线程还没有执行完的时候的情况
    return n**2
if __name__ == '__main__':
    t_pool   =  ThreadPoolExecutor(max_workers=3)
    s = t_pool.map(task,range(1,5))#map取代for + sumbit
    print(s)  #<generator object Executor.map.<locals>.result_iterator at 0x000000C536C2B0F8>
    for i in s :
        print(i)
    # print([i for i in s])#
    print('主程序结束')

'''
#前面4个瞬间就出来
7252 is running 
3084 is running
7824 is running
<generator object Executor.map.<locals>.result_iterator at 0x000000AF18AAA2B0>

7252 is running #延迟大概10s后后面4个瞬间出来
1
4
9

16#延迟10s后两个瞬间出来
主程序结束
'''
map的简单使用

1.3 submit回调函数的应用

python之线程与线程池第25张python之线程与线程池第26张
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<进程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<进程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]
' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
回调函数的应用

1.4 线程与进程之间的性能测试

python之线程与线程池第27张python之线程与线程池第28张
# 进程可以利用多核,但是开销大,而python的多线程开销小,但却无法利用多核优势

# 现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,
# 甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。
from multiprocessing import Process
from threading import Thread
import threading
import os,time
def work():
    time.sleep(2)
    print('===>')

if __name__ == '__main__':
    l=[]
    print(os.cpu_count()) #本机为4核
    start=time.time()
    for i in range(400):
        # p=Process(target=work) #耗时12s多,大部分时间耗费在创建进程上
        p=Thread(target=work) #耗时2s多
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print('run time is %s' %(stop-start))
# # I/O密集型:多线程效率高
from multiprocessing import Process
from threading import Thread
import os,time
def work():
    res=0
    for i in range(100000000):
        res*=i


if __name__ == '__main__':
    l=[]
    print(os.cpu_count()) #本机为4核
    start=time.time()
    for i in range(4):
        p=Process(target=work) #耗时5s多
        p=Thread(target=work) #耗时18s多
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print('run time is %s' %(stop-start))
#
# # 计算密集型:多进程效率高
# 多线程用于IO密集型,如socket,爬虫,web
# 多进程用于计算密集型,如金融分析
线程与进程之家你的性能测试

1.5 线程的使用补充

# 线程提供了一种便利的能够同时处理多个请求的高效的服务器
# 多线程服务器基本有着同样的体系结构, :主线程负责侦听请求的线程
# 当它收到一个请求的时候,一个新的工作者线程就会被建立起来,处理该客户端
# 的请求,当客户端断开连接时候,工作者线程会终止

# 线程池被设计成一个线程同时只为一个客户服务,但是在服务结束之后
# 线程并不终止,线程池中的线程要么是事先全部建立起来,要么是在需要的时候被建立起来
# 在客户端断开连接的时候,线程并不终止,而是保持着,等待为更多的连接提供服务
#
# 线程池通常包含:
# 1.一个主要的侦听线程来接收和分派客户端的连接
# 2.一些工作者线程用来处理客户端请求
# 3.一个线程管理系统用来处理那些意外终止的线程
#





免责声明:文章转载自《python之线程与线程池》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Docker学习笔记之保存和共享镜像开发者自述:我是这样学习 GAN 的下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Python机器学习(5)——朴素贝叶斯分类器

朴素贝叶斯分类器是一个以贝叶斯定理为基础,广泛应用于情感分类领域的优美分类器。本文我们尝试使用该分类器来解决上一篇文章中影评态度分类。 1、贝叶斯定理 假设对于某个数据集,随机变量C表示样本为C类的概率,F1表示测试样本某特征出现的概率,套用基本贝叶斯公式,则如下所示: 上式表示对于某个样本,特征F1出现时,该样本被分为C类的条件概率。那么如何用上式来...

iOS开发日记16-通知栏扩展 (App Extension)

今天博主有一个App Extension的需求,遇到了一些困难点,在此和大家分享,希望能够共同进步. 总览 扩展 (Extension) 是 iOS 8 和 OSX 10.10 加入的一个非常大的功能点,开发者可以通过系统提供给我们的扩展接入点 (Extension point) 来为系统特定的服务提供某些附加的功能。对于 iOS 来说,可以使用的扩展接入...

Centos下堡垒机Jumpserver V3.0环境部署完整记录(2)-配置篇

前面已经介绍了Jumpserver V3.0的安装,基于这篇安装文档,下面说下Jumpserver安装后的的功能使用: 一、jumpserver的启动 Jumpserver的启动和重启 [root@test-vm001 install]# /opt/jumpserver/service.sh start/restart 二、按照Jumpserver部署过...

IO文件

一:常见的函数输入输出函数 1.打印到屏幕   print:   最简单的输出方法是用print语句,你可以给它传递零个或多个用逗号隔开的表达式。 2.读取键盘输入   Python提供了两个内置函数从标准输入读入一行文本,默认的标准输入是键盘。   raw_input input 3.raw_input函数   raw_input([prompt])...

Python基础知识

作者:地球的外星人君链接:https://www.zhihu.com/question/20336475/answer/197317130来源:知乎著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 1 算法 1.1 字符串 1.1.1 正则表达式 re 【标准库】 提供基于正则的匹配和替换。 1.1.2 字符集 chardet Home...

Python实现自动连接密码本破解wifi

在闲余时间尝试了利用Python实现使用本地无线网卡,自动连接wifi,读取密码本中的密码,迭代尝试密码连接破解wifi,话不多说,代码随上,密码可以从网上下载,也可参考我的密码本: 链接:https://pan.baidu.com/s/1xNEKvurhs6SgdXlCmu_new 提取码:6666 在执行该脚本前,一定要安装pywifi包,--pip...