利用epoll实现异步IO

摘要:
因此,异步IO也称为事件驱动IO。但在此之前,我想谈谈select和epoll之间的区别。Epoll可以支持大量连接,但select有局限性,所以我决定使用Epoll。

  之前异步IO一直没搞明白,大致的理解就是在一个大的循环中,有两部分:第一部分是监听事件;第二部分是处理事件(通过添加回调函数的方式)。就拿网络通信来说,可以先通过调用 select 模块中的 select 监听各个 socket 。当 socket 有事件到来时,针对相应的事件做出处理,就这么一直循环下去。所以异步IO也被称为事件驱动IO。原理其实我说得太简单了,所以我会以一个例子来说明一切。不过在这之前我还是要说一下 select 和 epoll 的区别。

一、IO多路服用的select

  IO多路复用相对于阻塞式和非阻塞式的好处就是它可以监听多个 socket ,并且不会消耗过多资源。当用户进程调用 select 时,它会监听其中所有 socket 直到有一个或多个 socket 数据已经准备好,否则就一直处于阻塞状态。select的缺点在于单个进程能够监视的文件描述符的数量存在最大限制,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的的开销也线性增长。同时,由于网络响应时间的延迟使得大量的tcp链接处于非常活跃状态,但调用select()会对所有的socket进行一次线性扫描,所以这也浪费了一定的开销。不过它的好处还有就是它的跨平台特性。

利用epoll实现异步IO第1张

二、 异步IO的epoll

  epoll的优点就是完全的异步,你只需要对其中 poll 函数注册相应的 socket 和事件,就可以完全不管。当有时间发生时,数据已经从内核态拷贝到用户态,也就是完全没有阻塞。

利用epoll实现异步IO第2张

三、基于epoll的聊天室程序

  说了这么多,我决定还是用epoll写一个多人聊天程序。epoll可以支持大量连接,select却有限制,所以这就是我决定用epoll的原因。首先看服务器程序:

 1 import socket, select
 2 # 服务端
 3 
 4 serverSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 5 serverSock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
 6 serverSock.bind(('127.0.0.1', 8001))
 7 serverSock.listen(5)
 8 serverSock.setblocking(False)
 9 
10 EOL = bytes('

', 'utf-8')
11 QUIT = bytes('
', 'utf-8')
12 epoll = select.epoll()
13 epoll.register(serverSock.fileno(), select.EPOLLIN)
14 print('注册事件:%s'%serverSock.fileno())
15 
16 try:
17     connections = {}; requests = {}; responses = {}
18     while True:
19         events = epoll.poll(1)
20         for fileno, event in events:
21 #             print('event:%s fileno:%s'%(event, fileno))
22             if fileno == serverSock.fileno():
23                 clientSock, address = serverSock.accept()
24                 print('连接到客户端: %s:%s'%(address[0], address[1]))
25                 clientSock.setblocking(False)
26                 connections[clientSock.fileno()] = (clientSock, address)
27                 epoll.register(clientSock.fileno(), select.EPOLLOUT) # socket只能注册输入或输出一个,不能同时注册
28                 requests[clientSock.fileno()] = bytes('', 'utf-8')
29                 responses[clientSock.fileno()] = bytes('你已连接到服务器,IP为{}:{}

'.format(*serverSock.getsockname()),
30                                          'utf-8')
31             elif event & select.EPOLLIN:
32                 requests[fileno] += connections[fileno][0].recv(1024)
33                 if requests[fileno].endswith(EOL):
34                     msg = str(requests[fileno], 'utf-8')
35                     msg = '来自{}的消息:{}'.format(connections[fileno][1], msg[:-2])
36                     requests[fileno] = b''
37                     #print(msg)
38                     for i in responses:
39                         if i == fileno:
40                             continue
41                         responses[i] += bytes(msg, 'utf-8')
42                         epoll.modify(i, select.EPOLLOUT)
43                 if QUIT in requests[fileno]:
44                     epoll.modify(fileno, select.EPOLLOUT) 
45                     
46             elif event & select.EPOLLOUT:
47                 #print('开始发送消息:%s'%str(responses[fileno], 'utf-8'))
48                 bytesSend = connections[fileno][0].send(responses[fileno])
49                 responses[fileno] = responses[fileno][bytesSend:]
50                 #print('发送完成')
51                 if responses[fileno] == b'':
52                     epoll.modify(fileno, select.EPOLLIN)
53                 if QUIT in requests[fileno]:
54                     epoll.modify(fileno, 0)
55                     connections[fileno][0].shutdown(socket.SHUT_RDWR)
56                     
57             elif event & select.EPOLLHUP:
58                 epoll.unregister(fileno)
59                 connections[fileno][0].close()
60                 del connections[fileno]
61 finally:
62     epoll.unregister(serverSock.fileno())
63     epoll.close()
64     serverSock.close()
65     print('已退出服务端程序')

注意,我首先定义了两个终止符:EOL表示这段话已经发完了;QUIT表示客户端想要退出。客户端的程序有点让我为难,既要在命令行输入又要同时保证能输出别人发过来的消息,所有我只好用了prompt_toolkit再加上一个线程。如下:

 1 import socket, prompt_toolkit, select
 2 import threading, queue
 3 
 4 
 5 class Client:
 6     def __init__(self, sock):
 7         self.sock = sock
 8         self.want_to_send = False
 9         self.want_to_recv = True
10         self._msg = queue.Queue()
11         
12     def fileno(self):
13         return self.sock.fileno()
14     
15     def handle_recv(self):
16         print('接受消息..')
17         msg = self.sock.recv(1024)
18         print(str(msg, 'utf-8'))
19         
20     def handle_send(self):
21         msg = self._msg.get()
22         if msg == '
':
23             self.want_to_send = False
24             self.want_to_recv = False
25         self.sock.sendall(bytes(msg, 'utf-8'))
26         self.want_to_send = False
27             
28 def handle_sock(want_to_send, want_to_recv, sock):
29     print('开始处理消息...')
30     want_to_recv.append(sock.fileno())
31     while True:
32         if sock.want_to_send:
33             if not want_to_send:
34                 want_to_send.append(myclient.fileno())
35         else:
36             want_to_send.clear()
37         can_recv, can_send, _ = select.select(want_to_recv, want_to_send, [], 1)
38         if can_recv:
39             sock.handle_recv()
40         if can_send:
41             sock.handle_send()
42         if not (sock.want_to_send or sock.want_to_recv):
43             print('正停止客户端连接...')
44             break
45         if sock._msg.qsize():
46             sock.want_to_send = True
47 
48             
49 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
50 s.connect(('127.0.0.1',8001))
51 
52 myclient = Client(s)
53 want_to_send = []
54 want_to_recv = []
55     
56 
57             
58 t = threading.Thread(target=handle_sock,
59                      args=(want_to_send, want_to_recv, myclient),
60                      daemon=True)
61 t.start()
62 
63 try:
64     while True:
65         messages = prompt_toolkit.shortcuts.prompt('

>>> ',patch_stdout=True)
66         myclient._msg.put(messages+'

')
67 except KeyboardInterrupt:
68     myclient._msg.put('
')
69 finally:
70     t.join()
71     myclient.sock.close()
72     print('网络已断开')

我的服务器跑在 jupyter 上,客户端跑在命令行上,效果如下:

利用epoll实现异步IO第3张

  客户端接受和发送消息都是互不影响的,这样就实现了一个多人聊天的功能。而且服务器使用的是epoll,所以哪怕是成千上万的人同时在线也没有任何压力。至于怎么测试暂时还没想到办法。

免责声明:文章转载自《利用epoll实现异步IO》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇《汇编语言(第三版)》标志寄存器java List接口一下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

oracle审计

1、什么是审计审计(Audit)用于监视用户所执行的数据库操作,并且Oracle会将审计跟踪结果存放到OS文件(默认位置为$ORACLE_BASE/admin/$ORACLE_SID/adump/)或数据库(存储在system表空间中的SYS.AUD$表中,可通过视图dba_audit_trail查看)中。默认情况下审计是没有开启的。不管你是否打开数据库的...

socket---异常

Socket异常 客户端异常 1 java.net.ConnectException: Connection refused: connect。 该异常发生在客户端进行new Socket(ip, port)操作时,该异常发生的原因是或者具有ip地址的机器不能找到(也就是说从当前机器不存在到指定ip路由),或者是该ip存在,但找不到指定的端口进行监听。...

oracle正则表达式regexp_like的用法详解

ORACLE中的支持正则表达式的函数主要有下面四个:1,REGEXP_LIKE :与LIKE的功能相似2,REGEXP_INSTR :与INSTR的功能相似3,REGEXP_SUBSTR :与SUBSTR的功能相似4,REGEXP_REPLACE :与REPLACE的功能相似它们在用法上与Oracle SQL 函数LIKE、INSTR、SUBSTR 和RE...

Python学习—数据库篇之索引

一、索引简介 索引,是数据库中专门用于帮助用户快速查询数据的一种数据结构。类似于字典中的目录,查找字典内容时可以根据目录查找到数据的存放位置,然后直接获取即可,对于索引,会保存在额外的文件中。在mysql数据库中,索引是按照B树的结构来进行存储的。                              30                      ...

MyBatis

为什么使用mybatis: 在知道为什么使用mybatis之前,我们先了解java如何进行jdbc访问数据库的。第一是从连接池取出或者自己创建Connection对象,第二是从Connection对象中创建出Statement对象,第三,根据Statement对象去执行SQL语句,第四,获取执行SQL语句的返回结果并处理,第五,关闭数据库。在这几个步骤中,...

网络技能大赛A卷测试

  这个测试对我来言有些难度,短时间内做不了太多。首先是思路的理清,登录后的界面有好几种,而且公文的状态也有好几种。理清思路就花了一些时间 然后大致的框架做了做,然后将用户的增删改查还有公文的增删改查写了写。登录界面也完成了,不过不同角色登陆后的界面还没来得及做。主要就是功能太多,运用不熟练 数据库      bean层的基本信息 package c...