浅谈Python-IO多路复用(select、poll、epoll模式)

摘要:
答案是I/O复用模型使用select、poll和epoll函数。这些函数也会阻止该过程。但与阻塞I/O不同,这两个函数可以同时阻塞多个I/O操作。

1. 什么是IO多路复用

  在传统socket通信中,存在两种基本的模式,

  第一种是同步阻塞IO,其线程在遇到IO操作时会被挂起,直到数据从内核空间复制到用户空间才会停止,因为对CPython来说,很多socket相关函数均是与内核函数(系统调用)密切相关的,比如fctl与ioctl,那么采用这种模式就会存在CPU资源利用率变低,具体的模式图如下:

  浅谈Python-IO多路复用(select、poll、epoll模式)第1张

  第二种模式是异步非阻塞IO(异步:当遇到IO操作时立即返回。非阻塞:线程不会被挂起),这一种模式采用轮询的方式,在调用Windows Sockets API函数时,调用函数会立即返回。大多数情况下,这些函数调用都会调用“失败”,并返回WSAEWOULDBLOCK错误代码。说明请求的操作在调用期间内没有时间完成。通常,应用程序需要重复调用该函数,直到获得成功返回代码。其模式图如下:

  浅谈Python-IO多路复用(select、poll、epoll模式)第2张

  其实以上两种IO模式相比,异步非阻塞IO需要更多的错误及异常处理,但是对于一些收发时间不固定,收发数据量不均匀,连接数量较多的情况下,还是具有较高的性能的。

  那么如何在单进程环境下更加高效地处理多个网络连接呢?答案就是采用IO多路复用模型

  I/O复用模型会用到select、poll、epoll函数,这几个函数也会使进程阻塞,但是和阻塞I/O所不同的的,这两个函数可以同时阻塞多个I/O操作。而且可以同时对多个读操作多个写操作的I/O函数进行检测,直到有数据可读或可写时,才真正调用I/O操作函数。

  浅谈Python-IO多路复用(select、poll、epoll模式)第3张

  可以看出由操作系统来管理socket连接实例,当有数据报准备好时,操作系统库函数向用户上层程序发送指示,程序在接收之后,才进行IO操作,并返回成功标志,可以概括为两次调用,两次返回。

2. select、poll、epoll

  select: 

  系统库函数:int select(int maxfdpl, fd_set * readset, fd_set *writeset, fd_set *exceptset, const struct timeval * tiomeout)

  注:单个进程能够监听端口的最大数量在/proc/sys/fs/file-max中可以查看,32位机默认1024,64位机默认2048.

  select本质上是通过设置或者检查存放fd标志位的数据结构来进行下一步处理。这样所带来的缺点是:

  1、 单个进程可监视的fd数量被限制,即能监听端口的大小有限。 一般来说这个数目和系统内存关系很大,具体数目可以cat /proc/sys/fs/file-max察看。32位机默认是1024个。64位机默认是2048.

  2、 对socket进行扫描时是线性扫描,即采用轮询的方法,效率较低:当套接字比较多的时候,每次select()都要通过遍历FD_SETSIZE个Socket来完成调度,不管哪个Socket是活跃的,都遍历一遍。这会浪费很多CPU时间。如果能给套接字注册某个回调函数,当他们活跃时,自动完成相关操作,那就避免了轮询,这正是epoll与kqueue做的。

  3、需要维护一个用来存放大量fd的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大

   

  poll:

  poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态,如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后没有发现就绪设备,则挂起当前进程,直到设备就绪或者主动超时,被唤醒后它又要再次遍历fd。这个过程经历了多次无谓的遍历。

  它没有最大连接数的限制,原因是它是基于链表来存储的,但是同样有一个缺点:

  1、大量的fd的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义。                                                                                                                                     

  2、poll还有一个特点是“水平触发”,如果报告了fd后,没有被处理,那么下次poll时会再次报告该fd。

  epoll:  

  epoll支持水平触发和边缘触发,最大的特点在于边缘触发,它只告诉进程哪些fd刚刚变为就需态,并且只会通知一次。还有一个特点是,epoll使用“事件”的就绪通知方式,通过epoll_ctl注册fd,一旦该fd就绪,内核就会采用类似callback的回调机制来激活该fd,epoll_wait便可以收到通知

  epoll的优点:

  1、没有最大并发连接的限制,能打开的FD的上限远大于1024(1G的内存上能监听约10万个端口);

  2、效率提升,不是轮询的方式,不会随着FD数目的增加效率下降。只有活跃可用的FD才会调用callback函数;即Epoll最大的优点就在于它只管你“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,Epoll的效率就会远远高于select和poll。

  3、 内存拷贝,利用mmap()文件映射内存加速与内核空间的消息传递;即epoll使用mmap减少复制开销。
 
 
  
  select、poll、epoll 区别总结:(来自:https://blog.csdn.net/u013408431/article/details/67632468#t3

  1、支持一个进程所能打开的最大连接数

select

单个进程所能打开的最大连接数有FD_SETSIZE宏定义,其大小是32个整数的大小(在32位的机器上,大小就是32*32,同理64位机器上FD_SETSIZE为32*64),当然我们可以对进行修改,然后重新编译内核,但是性能可能会受到影响,这需要进一步的测试。

poll

poll本质上和select没有区别,但是它没有最大连接数的限制,原因是它是基于链表来存储的

epoll

虽然连接数有上限,但是很大,1G内存的机器上可以打开10万左右的连接,2G内存的机器可以打开20万左右的连接

2、FD剧增后带来的IO效率问题

select

因为每次调用时都会对连接进行线性遍历,所以随着FD的增加会造成遍历速度慢的“线性下降性能问题”。

poll

同上

epoll

因为epoll内核中实现是根据每个fd上的callback函数来实现的,只有活跃的socket才会主动调用callback,所以在活跃socket较少的情况下,使用epoll没有前面两者的线性下降的性能问题,但是所有socket都很活跃的情况下,可能会有性能问题。

3、 消息传递方式

select

内核需要将消息传递到用户空间,都需要内核拷贝动作

poll

同上

epoll

epoll通过内核和用户空间共享一块内存来实现的。

3. select实现c/s通信

  浅谈Python-IO多路复用(select、poll、epoll模式)第4张

  服务器端:(在写队列中,调用Queue对象get_nowait方法时,可能会抛出Queue.Empty的异常,需要做异常处理)

import socket
# import threading
import select
import queue

HOST, PORT = "localhost", 8020
address = (HOST, PORT)

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(address)
server.listen(10)
print("server is listening...")

# server套接字处理连接,其余套接字处理读操作
inputs = [server, ]
outputs = []
exceptions = []
# 消息接收
msg = {}

def handle_read(readable:list):
    """处理socket新建连接及数据读入"""
    for read_socket in readable:
        if read_socket is server:
            # 新建socket连接(有新用户加入)
            sock, addr = read_socket.accept()
            print("({}):connect successfully...".format(addr))
            sock.setblocking(False)
            inputs.append(sock)
            msg[sock] = queue.Queue()
        else:
            # 已建立连接的socket有消息接收
            # 此时该socket实例已被添加,直接收数据
            data = read_socket.recv(1024)
            if data:
                print("({0}) message: {1}".format(read_socket.getpeername(), data.decode("utf8")))
                # 将消息压入消息队列中
                msg[read_socket].put(data)
                if read_socket not in outputs:
                    outputs.append(read_socket)
            else:
                # socket断开连接
                print("({0}):close successfully...".format(read_socket.getpeername()))

                # 清空消息发送队列,以及输入输出队列
                inputs.remove(read_socket)
                if read_socket in outputs:
                    outputs.remove(read_socket)
                read_socket.close()
                del msg[read_socket]


def handle_write(writable: list):
    """处理socket消息发送"""
    for write_socket in writable:
        # get_nowait可能出现queue.Empty异常
        try:
            cur_writable_queue = msg.get(write_socket, None)
            if cur_writable_queue:
                # 有消息则却出消息并转发
                cur_w_data = cur_writable_queue.get_nowait()
                write_socket.send(cur_w_data)
            else:
                # 没有消息,则退出
                outputs.remove(write_socket)
        except queue.Empty:
            pass


def handle_exception(exceptional:list):
    """处理异常"""
    for e in exceptional:
        print("({0}) connect failed...".format(e.getpeername()))
        inputs.remove(e)
        if e in outputs:
            outputs.remove(e)
        if msg.get(e, None):
            del msg[e]
        e.close()


# server存在则循环监听, 事件循环的方式
while inputs:
    # 开启select监听
    readable, writable, exceptional = select.select(inputs, outputs, exceptions)
    handle_read(readable)
    handle_write(writable)
    handle_exception(exceptional)

  客户端(异步非阻塞IO):

import socket

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setblocking(0)

try:
    client.connect(("localhost", 8020))
except BlockingIOError:
    pass

while True:
    response = input("回复服务器:").encode("utf8")
    client.send(response)
    if response=="exit":
        break

    # 非阻塞I/O轮询方式
    while True:
        try:
            data = client.recv(1024)
        except BlockingIOError as e:
            pass
        else:
            if data:
                data = data.decode("utf8")
                break

    print("收到来自服务器的消息:%s" % data)

client.close()

  运行结果:

  服务器与第一个客户端建立连接

  浅谈Python-IO多路复用(select、poll、epoll模式)第5张

  服务器与第一个客户端通信:

  浅谈Python-IO多路复用(select、poll、epoll模式)第6张

  浅谈Python-IO多路复用(select、poll、epoll模式)第7张

  服务器与第二个客户端通信:

  浅谈Python-IO多路复用(select、poll、epoll模式)第8张

  浅谈Python-IO多路复用(select、poll、epoll模式)第9张

  浅谈Python-IO多路复用(select、poll、epoll模式)第10张

4. 使用DefaultSelector自适应操作系统默认IO多路复用模式

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from urllib.parse import urlparse
import socket

selector = DefaultSelector()
urls = []
stop = False

class HTTPSelector(object):
    """使用select或epoll完成http请求"""

    def __init__(self, url):
        self.url = url
        self.domain = urlparse(url).netloc
        self.path = urlparse(url).path
        self.data = b""
        urls.append(self.url)
        if self.path == "":
            self.path = "/"

        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        # 设置为非阻塞
        self.client.setblocking(0)

        try:
            self.client.connect((self.domain, 80))
        except BlockingIOError:
            pass

        # 注册写事件
        selector.register(self.client.fileno(), EVENT_WRITE, self.connect)


    def connect(self, key):
        """连接http服务器"""

        # 解除注册写事件, 如果未解除则出现异常
        selector.unregister(key.fd)
        request_data = """GET {0} HTTP/1.1
Host: {1}
Connection: close

""".format(self.path, self.domain).encode("utf8")
        self.client.send(request_data)
        # 注册读事件
        selector.register(self.client.fileno(), EVENT_READ, self.read)

    def read(self, key):
        """接收http响应"""
        data = b""
        # 这里没有使用循环读取响应数据,原因在于select仅处理socket文件描述符状态发生变化
        # 的socket实例,此外,该程序只有一个client实例,所以其接收到的数据是属于整个HTML数据的一部分,
        # 就需要数据累加
        # while 1:
        #     try:
        #         cur_data = self.client.recv(1024)
        #     except BlockingIOError as e:
        #         pass
        #     else:
        #         if cur_data:
        #             data += cur_data
        #         else:
        #             break
        cur_data = self.client.recv(1024)
        if cur_data:
            self.data += cur_data
        else:
            selector.unregister(key.fd)
            data = data.decode("utf8")
            html_data = data.split("

")[1]
            print(html_data)
            urls.remove(self.url)
            if not urls:
                global stop
                stop = True
            self.client.close()

def loop():
    # 1.selector本身不支持register模式
    # 需要手动开启事件循环,需要由程序员自己来完成
    while not stop:
        ready = selector.select()
        for key, mask in ready:
            call_back = key.data
            call_back(key)

if __name__ == '__main__':
    test = HTTPSelector("https://www.baidu.com")
    loop()

 浅谈Python-IO多路复用(select、poll、epoll模式)第11张

免责声明:文章转载自《浅谈Python-IO多路复用(select、poll、epoll模式)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇PDO详解Perl命令行常见用法及技巧下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

node.js的net模块实现socket通信

本文实例讲述了通过node.js的net模块实现nodejs socket服务端和客户端简单通信功能,可以用作客户端对服务端的端口监听以及事件回执。 server端代码 var net = require('net'); //模块引入 var listenPort = 8080;//监听端口 var server = net.createServer(fu...

js获取select下拉框的value值和text文本值

介绍一种取下拉框值以及绑定下拉框数据的方法    这里用到的jquery-ui-multiselect插件  1、前台html代码 <span class="ModuleFormFieldLabel" style="float: left; padding-top: 3px;">品类:</span> <asp:Hidde...

网络技能大赛A卷测试

  这个测试对我来言有些难度,短时间内做不了太多。首先是思路的理清,登录后的界面有好几种,而且公文的状态也有好几种。理清思路就花了一些时间 然后大致的框架做了做,然后将用户的增删改查还有公文的增删改查写了写。登录界面也完成了,不过不同角色登陆后的界面还没来得及做。主要就是功能太多,运用不熟练 数据库      bean层的基本信息 package c...

mysql复制表和表结构

一、CREATE TABLE 方法 整表复制 # create table 新表 select * from 旧表;结构复制 # create table 新表 select * from 旧表 where 1<>1; 二、INSERT INTO 方法 得到建表语句 # show create table 旧表;新建表复制数据到新表 #...

MYSQL获取自增ID的四种方法

1. select max(id) from tablename    2.SELECT LAST_INSERT_ID() 函数    LAST_INSERT_ID 是与table无关的,如果向表a插入数据后,再向表b插入数据,LAST_INSERT_ID会改变。    在多用户交替插入数据的情况下max(id)显然不能用。这时就该使用LAST_INSER...

Js基础知识4-函数的三种创建、四种调用(及关于new function()的解释)

在js中,函数本身属于对象的一种,因此可以定义、赋值,作为对象的属性或者成为其他函数的参数。函数名只是函数这个对象类的引用。 函数定义 1 //函数的三种创建方法(定义方式) 2 function one(){ //函数声明语句,不属于任何对象,始终默认为全局对象 3 console.log(...