Python下Select模块以及IO多路复用

select模块以及IO多路复用

前言

Python中的select模块专注于I/O多路复用,提供了select, poll, epoll三个方法(其中后两个在Linux中可用,windows仅支持select),另外也提供了kqueue方法(freeBSD系统).

select()的机制中提供一fd_set的数据结构,实际上是一long类型的数组, 每一个数组元素都能与一打开的文件句柄(不管是Socket句柄,还是其他文件或命名管道或设备句柄)建立联系,建立联系的工作由程序员完成, 当调用select()时,由内核根据IO状态修改fd_set的内容,由此来通知执行了select()的进程哪一Socket或文件可读或可写。

select主要用于socket通信当中,能监视我们需要的文件描述符变化。

非阻塞式I/O编程特点

  • 如果发现一个I/O有输入,读取的过程中,另外一个也有了输入,这时候不会产生任何反应.这就需要你的程序语句去用到select函数的时候才知道有数据输入。
  • 程序去select的时候,如果没有数据输入,程序会一直等待,直到有数据为止,也就是程序中无需循环和sleep。

Select在Socket编程中还是比较重要的,可是对于初学Socket的人来说都不太爱用Select写程序,他们只是习惯写诸如connectacceptrecvrecvfrom这样的阻塞程序(所谓阻塞方式block,顾名思义,就是进程或是线程执行到这些函数时必须等待某个事件的发生,如果事件没有发生,进程或线程就被阻塞,函数不能立即返回)。

可是使用Select就可以完成非阻塞(所谓非阻塞方式non-block,就是进程或线程执行此函数时不必非要等待事件的发生,一旦执行肯定返回,以返回值的不同来反映函数的执行情况,如果事件发生则与阻塞方式相同,若事件没有发生,则返回一个代码来告知事件未发生,而进程或线程继续执行,所以效率较高)方式工作的程序,它能够监视我们需要监视的文件描述符的变化情况——读写或是异常。

Select方法

基本原理

进程指定内核监听哪些文件描述符(最多监听1024个fd)的哪些事件,当没有文件描述符事件发生时,进程被阻塞;当一个或者多个文件描述符事件发生时,进程被唤醒。

当我们调用select()时:

  1. 上下文切换转换为内核态
  2. 将fd从用户空间复制到内核空间
  3. 内核遍历所有fd,查看其对应事件是否发生
  4. 如果没发生,将进程阻塞,当设备驱动产生中断或者timeout时间后,将进程唤醒,再次进行遍历
  5. 返回遍历后的fd
  6. 将fd从内核空间复制到用户空间

select函数方法参数

1
fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout])

参数

可接受四个参数(前三个必须):

  • rlist: wait until ready for reading
  • wlist: wait until ready for writing
  • xlist: wait for an “exceptional condition”
  • timeout: 超时时间

返回值:三个列表

select方法用来监视文件描述符(当文件描述符条件不满足时,select会阻塞),当某个文件描述符状态改变后,会返回三个列表

  1. 当参数1 序列中的fd满足“可读”条件时,则获取发生变化的fd并添加到fd_r_list中
  2. 当参数2 序列中含有fd时,则将该序列中所有的fd添加到 fd_w_list中
  3. 当参数3 序列中的fd发生错误时,则将该发生错误的fd添加到 fd_e_list中
  4. 当超时时间为空,则select会一直阻塞,直到监听的句柄发生变化.当超时时间 = n(正整数)时,那么如果监听的句柄均无任何变化,则select会阻塞n秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。

示例

示例1:模拟select,同时监听多个端口

  • 服务端
服务端select_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# coding=utf-8
"""模拟select,同时监听多个端口"""

import socket
import select

HOST = ''
PORT1, PORT2, PORT3 = 8001, 8002, 8003
BUFSIZ = 1024
ADDR1, ADDR2, ADDR3 = (HOST, PORT1), (HOST, PORT2), (HOST, PORT3)

ss1 = socket.socket()
ss1.bind(ADDR1)
ss1.listen()

ss2 = socket.socket()
ss2.bind(ADDR2)
ss2.listen()

ss3 = socket.socket()
ss3.bind(ADDR3)
ss3.listen()

inputs = [ss1, ss2, ss3]

while True:
r_list, w_list, e_list = select.select(inputs,[],inputs,1)
for ss in r_list:
# conn表示每一个连接对象
conn, address = ss.accept()
conn.sendall(bytes('hello', encoding='utf-8'))
conn.close()

for ss in e_list:
inputs.remove(ss)
  • 客户端
客户端1select_client1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# coding=utf-8
"""客户端1"""

import socket

HOST = 'localhost'
PORT = 8001
BUFSIZ = 1024
ADDR = (HOST, PORT)


cs = socket.socket()
cs.connect(ADDR)

msg = cs.recv(BUFSIZ)

print(msg.decode('utf-8'))

cs.close()

客户端2select_client2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# coding=utf-8
"""客户端2"""

import socket

HOST = 'localhost'
PORT = 8002
BUFSIZ = 1024
ADDR = (HOST, PORT)


cs = socket.socket()
cs.connect(ADDR)

msg = cs.recv(BUFSIZ)

print(msg.decode('utf-8'))

cs.close()

运行server端和client端,客户端1,2均能连接。
但是以上程序并不能同时对客户端的输入同时响应处理(两个客户端连接都没关闭的情况下),下面就来介绍I/O多路复用的例子

示例2:IO多路复用–使用socket模拟多线程,并实现读写分离

  • 服务端
服务端select_multi_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# coding=utf-8
"""使用socket模拟多线程,使多用户可以同时连接"""

import socket
import select
import queue
from time import ctime

HOST = ''
PORT = 8001
BUFSIZ = 1024
ADDR = (HOST, PORT)

# 创建连接
ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#ss.setblocking(False)

ss.bind(ADDR)
ss.listen(5)

inputs = [ss, ]
outputs = []
message_dict = {}

while inputs:
print('waiting for the next event...')
r_list, w_list, e_list = select.select(inputs, outputs, inputs, 10)

for s in r_list:
# 判断当前触发的是不是服务端对象,当触发的是服务端对象时,说明有新客户端连接进来了
if s is ss:
# 表示有新用户来连接
conn, addr = s.accept()
print("connection from", addr)
# 将客户端对象也加入到监听的列表中,当客户端发消息时select将触发
#conn.setblocking(0)
inputs.append(conn)
# 为连接的客户端单独创建一个消息队列,用来保存客户端发送的消息
message_dict[conn] = queue.Queue()
else:
# 有老用户发消息
try:
data_bytes = s.recv(BUFSIZ)
# 客户端未断开
#if data_bytes != '':
data = data_bytes.decode('utf-8')
print('received "%s" from %s' % (data, s.getpeername()))
# 将收到的消息放到相对应的socket客户端的消息列表中
message_dict[s].put(data)
# 将需要进行回复操作socket放到outputs列表中,让select监听
if s not in outputs:
outputs.append(s)
except Exception as e:
# else:
# 客户端断开了连接(或出现其他异常),将客户端的监听从inputs列表中移除
print('closing', addr)
if s in outputs:
outputs.remove(s)
inputs.remove(s)
s.close()
# 移除相应socket客户端对象的消息队列
del message_dict[s]

# 处理发送消息列表
for s in w_list:
try:
# 如果消息队列中有消息,从消息队列中获取要发送的消息
message_queue = message_dict.get(s)
send_data = ''
if message_queue is not None:
send_data = message_queue.get_nowait()
else:
# 客户端连接断开了
print('has closed')
except queue.Empty:
# 客户端连接断开了
print(s.getpeername())
outputs.remove(s)
else:
# 处理消息
if message_queue is not None:
# 把接收到的数据加上时间戳再返回
s.send(("[%s] %s" % (ctime(), send_data)).encode('utf-8'))
else:
print("has closed")

# 处理异常情况
for s in e_list:
print("exception condition on", s.getpeername())
inputs.remove(s)
if s in outputs:
outputs.remove(s)

s.close()

del message_dict[s]
  • 客户端
客户端select_multi_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# coidng=utf-8
"""客户端"""

import socket

HOST = 'localhost'
PORT = 8001
BUFSIZ = 1024
ADDR = (HOST, PORT)


sock_num = 2
socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for _ in range(sock_num)]

msgs = ["Hello", "I'm Robot", "Bye"]

print("connecting to %s port %s..." % ADDR)
# 连接到服务器
for s in socks:
s.connect(ADDR)

for index, msg in enumerate(msgs):
for s in socks:
print('%s: sending "%s" %d' % (s.getpeername(), msg, index))
s.send(msg.encode('utf-8'))

for s in socks:
data = s.recv(BUFSIZ).decode("utf-8")
print('%s: received "%s"' % (s.getpeername(),data))
# 接收到一个回复后就断开连接,我们就可以看看服务器端是如何处理之后的请求的
if data != "":
print('closing socket', s.getsockname())
s.close()
  • 运行结果

分别运行服务端和客户端程序:

服务端结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
$ python select_multi_server.py
waiting for the next event...
connection from ('127.0.0.1', 9078)
waiting for the next event...
connection from ('127.0.0.1', 9079)
received "HelloI'm Robot" from ('127.0.0.1', 9078)
waiting for the next event...
received "Bye" from ('127.0.0.1', 9078)
received "HelloI'm RobotBye" from ('127.0.0.1', 9079)
waiting for the next event...
waiting for the next event...
('127.0.0.1', 9078)
('127.0.0.1', 9079)
waiting for the next event...
closing ('127.0.0.1', 9079)
waiting for the next event...
received "" from ('127.0.0.1', 9079)
waiting for the next event...
received "" from ('127.0.0.1', 9079)
waiting for the next event...
closing ('127.0.0.1', 9079)
has closed
has closed
waiting for the next event...

客户端结果
1
2
3
4
5
6
7
8
9
10
11
12
$ python select_multi_client.py
connecting to localhost port 8001...
('127.0.0.1', 8001): sending "Hello" 0
('127.0.0.1', 8001): sending "Hello" 0
('127.0.0.1', 8001): sending "I'm Robot" 1
('127.0.0.1', 8001): sending "I'm Robot" 1
('127.0.0.1', 8001): sending "Bye" 2
('127.0.0.1', 8001): sending "Bye" 2
('127.0.0.1', 8001): received "[Wed Oct 31 09:41:05 2018] HelloI'm RobotBye"
closing socket ('127.0.0.1', 9078)
('127.0.0.1', 8001): received "[Wed Oct 31 09:41:05 2018] HelloI'm RobotBye"
closing socket ('127.0.0.1', 9079)

多次运行程序,你会发现客户端程序返回结果里的received后面的略有不同,你发现其中的原因了吗!

select、poll、epoll区别

select, poll, epoll 都是I/O多路复用的具体的实现,之所以有这三个存在,其实是因为他们出现是有先后顺序的。
I/O多路复用这个概念被提出来以后, select是第一个实现 (1983 左右在BSD里面实现的)。

select

select 被实现以后,很快就暴露出了很多问题:

  • select 会修改传入的参数数组,这个对于一个需要调用很多次的函数,是非常不友好的。
  • 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
  • select 如果任何一个sock(I/O stream)出现了数据,select仅仅会返回,但是并不会告诉你是那个sock上有数据,于是你只能自己一个一个的找,)每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
  • select 只能监视1024个链接,linux 定义在头文件中的,参见FD_SETSIZE
  • select 不是线程安全的,如果你把一个sock加入到select, 然后突然另外一个线程发现,尼玛,这个sock不用,要收回。对不起,这个select 不支持的,如果你丧心病狂的竟然关掉这个sock, select的标准行为是。。呃。。不可预测的,

于是14年以后(1997年)一帮人又实现了poll, poll 修复了select的很多问题

poll

  • poll 去掉了1024个链接的限制,于是要多少链接呢, 主人你开心就好。
  • poll 从设计上来说,不再修改传入数组,不过这个要看你的平台了,所以行走江湖,还是小心为妙。

其实拖14年那么久也不是效率问题, 而是那个时代的硬件实在太弱,一台服务器处理1千多个链接简直就是神一样的存在了,select很长段时间已经满足需求。

但是poll仍然不是线程安全的, 这就意味着,不管服务器有多强悍,你也只能在一个线程里面处理一组I/O流。你当然可以那多进程来配合了,不过然后你就有了多进程的各种问题。

于是5年以后, 在2002, 大神 Davide Libenzi 实现了epoll.

epoll

epoll 可以说是I/O 多路复用最新的一个实现,epoll 修复了pollselect绝大部分问题, 比如:

  • 对于每次需要将FD从用户态拷贝至内核态.epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定EPOLL_CTL_ADD),会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝一次。
  • 同样epoll也没有1024的连接数限制
  • epoll 现在是线程安全的。
  • epoll 现在不仅告诉你sock组里面数据,还会告诉你具体哪个sock有数据,你不用自己去找了。

epoll的解决方案不像selectpoll一样每次都把current轮流加入fd对应的设备等待队列中,而只在epoll_ctl时把current挂一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd(利用schedule_timeout()实现睡一会,判断一会的效果,和select实现中的第7步是类似的)。

I/O多路复用知友有话说

select/poll, epoll总结

  1. selectpoll实现需要自己不断轮询所有fd集合,直到设备就绪,期间可能要睡眠和唤醒多次交替。而epoll其实也需要调用epoll_wait不断轮询就绪链表,期间也可能多次睡眠和唤醒交替,但是它是设备就绪时,调用回调函数,把就绪fd放入就绪链表中,并唤醒在epoll_wait中进入睡眠的进程。虽然都要睡眠和交替,但是select和poll在“醒着”的时候要遍历整个fd集合,而epoll在“醒着”的时候只要判断一下就绪链表是否为空就行了,这节省了大量的CPU时间。这就是回调机制带来的性能提升。

  2. select,poll每次调用都要把fd集合从用户态往内核态拷贝一次,并且要把current往设备等待队列中挂一次,而epoll只要一次拷贝,而且把current往等待队列上挂也只挂一次(在epoll_wait的开始,注意这里的等待队列并不是设备等待队列,只是一个epoll内部定义的等待队列)。这也能节省不少的开销。

epoll示例1:简单时间戳服务器

  • 服务端
服务端epoll_simple_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import socket
import select
from time import ctime

s = socket.socket()
s.bind(('127.0.0.1',8888))
s.listen(5)
epoll_obj = select.epoll()
epoll_obj.register(s,select.EPOLLIN)
connections = {}
while True:
events = epoll_obj.poll()
for fd, event in events:
print("fd : {fd} | event : {event}".format(fd=fd, event=event))
if fd == s.fileno():
conn, addr = s.accept()
connections[conn.fileno()] = conn
epoll_obj.register(conn,select.EPOLLIN)
msg = conn.recv(200)
conn.sendall(('OK, first input --- [%s] %s'% (ctime(), msg.decode('utf-8'))).encode())
else:
try:
fd_obj = connections[fd]
msg = fd_obj.recv(200)
fd_obj.sendall(('[%s] %s'% (ctime(), msg.decode('utf-8'))).encode())
except BrokenPipeError:
epoll_obj.unregister(fd)
connections[fd].close()
del connections[fd]

s.close()
epoll_obj.close()
  • 客户端
客户端epoll_simple_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import socket

flag = 1
s = socket.socket()
s.connect(('127.0.0.1',8888))
while flag:
input_msg = input('input>>>')
if input_msg == '0':
break
s.sendall(input_msg.encode())
msg = s.recv(1024)
print(msg.decode())

s.close()

epoll示例2:读写分离的epoll

  • 服务端
服务端epoll_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python
# coding=utf-8

import socket
import select
import queue
from time import ctime

#创建socket对象
ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#设置IP地址复用
ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#ip地址和端口号
SADDR = ("127.0.0.1", 8888)
#绑定IP地址
ss.bind(SADDR)
#监听,并设置最大连接数
ss.listen(10)
print ("服务器启动成功,监听IP:" , SADDR)
#服务端设置非阻塞
ss.setblocking(False)
#超时时间
timeout = 10
# bufsize
BUFSIZ = 1024
#创建epoll事件对象,后续要监控的事件添加到其中
epoll = select.epoll()
#注册服务器监听fd到等待读事件集合
epoll.register(ss.fileno(), select.EPOLLIN)
#保存连接客户端消息的字典,格式为{}
message_queues = {}
#文件句柄到所对应对象的字典,格式为{句柄:对象}
fd_to_socket = {ss.fileno():ss,}

while True:
print("等待活动连接......")
#轮询注册的事件集合,返回值为[(文件句柄,对应的事件),(...),....]
events = epoll.poll(timeout)
if not events:
print("epoll超时无活动连接,重新轮询......")
continue
print("有{num}个新事件,开始处理......".format(num=len(events)))

for fd, event in events:
socket = fd_to_socket[fd]
#如果活动socket为当前服务器socket,表示有新连接
if socket == ss:
connection, address = ss.accept()
print("新连接:" , address)
#新连接socket设置为非阻塞
connection.setblocking(False)
#注册新连接fd到待读事件集合
epoll.register(connection.fileno(), select.EPOLLIN)
#把新连接的文件句柄以及对象保存到字典
fd_to_socket[connection.fileno()] = connection
#以新连接的对象为键值,值存储在队列中,保存每个连接的信息
message_queues[connection] = queue.Queue()
#关闭事件
elif event & select.EPOLLHUP:
print('client close')
#在epoll中注销客户端的文件句柄
epoll.unregister(fd)
#关闭客户端的文件句柄
fd_to_socket[fd].close()
#在字典中删除与已关闭客户端相关的信息
del message_queues[fd_to_socket[fd]]
del fd_to_socket[fd]
#可读事件
elif event & select.EPOLLIN:
#接收数据
data = socket.recv(BUFSIZ)
if data:
data = data.decode("utf-8")
print("收到数据:{data} , 客户端:{client}".format(data=data,client=socket.getpeername()))
#将数据放入对应客户端的字典
message_queues[socket].put(data)
#修改读取到消息的连接到等待写事件集合(即对应客户端收到消息后,再将其fd修改并加入写事件集合)
epoll.modify(fd, select.EPOLLOUT)
#可写事件
elif event & select.EPOLLOUT:
try:
#从字典中获取对应客户端的信息
msg = message_queues[socket].get_nowait()
except queue.Empty:
print(socket.getpeername() , " queue empty")
#修改文件句柄为读事件
epoll.modify(fd, select.EPOLLIN)
else:
print("发送数据: {data} 客户端:{client}".format(data=msg, client=socket.getpeername()))
#发送数据
socket.send(('[%s] %s'% (ctime(), msg)).encode())

#在epoll中注销服务端文件句柄
epoll.unregister(ss.fileno())
#关闭epoll
epoll.close()
#关闭服务器socket
ss.close()
  • 客户端
客户端epoll_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import socket

#创建客户端socket对象
cs = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
#服务端IP地址和端口号元组
server_address = ('127.0.0.1',8888)
#客户端连接指定的IP地址和端口号
cs.connect(server_address)
BUFSIZE = 1024

while True:
#输入数据
data = input('input>')
if not data:
break
#客户端发送数据
cs.sendall(data.encode('utf-8'))
#客户端接收数据
server_data = cs.recv(BUFSIZE)
print('客户端收到的数据:',server_data.decode())

#关闭客户端socket
cs.close()

小结

本文总结了I/O多路复用的三种方式select、poll、epoll,并使用python下select模块实现了以其为基础的时间戳服务端和客户端。

-------------阅读完毕吐槽一番吧~-------------
0%