发一下牢骚和主题无关:
python模块分析- select 等待I/0实现
2013-05-212磁针石
#承接软件自动化实施与培训等gtalk:ouyangchongwu#gmail.comqq 37391319 博客:#版权全部,转载刊登请来函联系
# 深圳测试自动化python项目接单群113938272深圳会计软件测试兼职 6089740
#深圳地摊群 66250781武冈洞口城步新宁乡情群49494279
#自动化测试和python群组:
#参考资料:《The Python Standard Library by Example2011》
#
11.2 select – 等待I/O实现
select同时监控多个sockets,支撑网络服务和多个客户端通信。以下内容部分来源于:http://www.cnblogs.com/coser/archive/2012/01/06/2315216.html
该模块可以拜访大多数操作系统中的select()和poll()函数, Linux2.5+支撑的epoll()和大多数BSD支撑的kqueue()。请注意,在Windows上,它仅适用于socket,在其他操作系统上,它也适用于其他类型的文件(特别是在Unix上,它还可以用于管道)。它不能用于肯定常规文件是不是变化。
Select:synchronousI/O multiplexing. select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而停止后续的读写操作。Select是跨平台的。select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方法晋升这一限制。select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方法晋升这一限制。
Poll:wait for some event on a file descriptor。poll在1986年诞生于System VRelease 3,它和select在本质上没有多大差异,但是poll没有最大文件描述符数量的限制。poll和select同样存在一个缺点就是,包含大批文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是不是就绪,它的开销随着文件描述符数量的增长而线性增大。另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其停止IO操作,那么下次调用select()和poll()的时候 将再次呈文这些文件描述符,所以它们一般不会丧失就绪的消息,这类方法称为水平触发(Level Triggered)。不过poll其实不适用于windows平台。
Epoll:I/O eventnotification facility。Linux 2.5.44以后支撑,是性能最好的多路I/O就绪通知方法。epoll可以同时支撑水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚变为就绪状态,它只说一遍,如果我们没有采用行动,那么它将不会再次告知,这类方法称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相称庞杂。epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是现实的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次获得响应数量的文件描述符便可,这里也应用了内存映射(mmap)技巧,这样便彻底省掉了这些文 件描述符在系统调用时复制的开销。另一个本质的改良在于epoll采用基于事件的就绪通知方法。在select/poll中,进程只有在调用一定的方法后,内核才对全部监视的文件描述符停止扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制, 迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。
Kqueue:暂不触及
11.2.1 应用select
select监控sockets,打开的文件,管道(任何有fileno()方法返回一个有效的文件描述符的东东),直到它们变成可读可写或发生通讯错误。它可以更容易地同时监控多个连接,比应用socket超时轮询更有效,因为监控在操作系网络层而不是python解析器。注意windows的select其实不支撑打开文件。
上面代码通过select监控多个连接。注意通过server.setblocking(0)设置为非阻塞模式。select()的参数为3个列表:第一列表为读取输入数据的对象;第2个接收要发送的数据,第3个存放errors。
import select
import socket
import sys
import Queue
# Create aTCP/IP socket
server =socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
# Bind thesocket to the port
server_address= ('localhost', 10000)
print>>sys.stderr, 'starting up on %s port %s' % server_address
server.bind(server_address)
# Listen forincoming connections
server.listen(5)
# Socketsfrom which we expect to read
inputs = [server ]
# Sockets towhich we expect to write
outputs = [ ]
# Outgoingmessage queues (socket:Queue)
message_queues= {}
while inputs:
# Wait for at least one of the sockets tobe ready for processing
print >>sys.stderr, 'waiting for thenext event'
readable, writable, exceptional =select.select(inputs,
outputs,
inputs)
# Handle inputs
for s in readable:
if s is server:
# A "readable" socket isready to accept a connection
connection, client_address =s.accept()
print >>sys.stderr, ' connection from', client_address
connection.setblocking(0)
inputs.append(connection)
# Give the connection a queue fordata we want to send
message_queues[connection] =Queue.Queue()
else:
data = s.recv(1024)
if data:
# A readable client socket hasdata
print >>sys.stderr,' received "%s" from %s' % \
(data, s.getpeername())
message_queues[s].put(data)
# Add output channel forresponse
if s not in outputs:
outputs.append(s)
else:
# Interpret empty result asclosed connection
print >>sys.stderr,' closing', client_address
# Stop listening for input onthe connection
if s in outputs:
outputs.remove(s)
inputs.remove(s)
s.close()
# Remove message queue
del message_queues[s]
# Handle outputs
for s in writable:
try:
next_msg =message_queues[s].get_nowait()
except Queue.Empty:
# No messages waiting so stopchecking for writability.
print >>sys.stderr, ' ', s.getpeername(), 'queue empty'
outputs.remove(s)
else:
print >>sys.stderr, ' sending "%s" to %s' % \
(next_msg, s.getpeername())
s.send(next_msg)
# Handle "exceptional conditions"
for s in exceptional:
print >>sys.stderr, 'exceptioncondition on', s.getpeername()
# Stop listening for input on theconnection
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
# Remove message queue
del message_queues[s]
对应的客户端如下:
import socket
import sys
messages = ['This is the message. ',
'It will be sent ',
'in parts.',
]
server_address= ('localhost', 10000)
# Create aTCP/IP socket
socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM),
socket.socket(socket.AF_INET,socket.SOCK_STREAM),
]
# Connect thesocket to the port where the server is listening
print>>sys.stderr, 'connecting to %s port %s' % server_address
for s insocks:
s.connect(server_address)
for messagein messages:
# Send messages on both sockets
for s in socks:
print >>sys.stderr, '%s: sending"%s"' % \
(s.getsockname(), message)
s.send(message)
# Read responses on both sockets
for s in socks:
data = s.recv(1024)
print >>sys.stderr, '%s: received"%s"' % \
(s.getsockname(), data)
if not data:
print >>sys.stderr, 'closingsocket', s.getsockname()
s.close()
执行结果:
服务器端:
# pythonselect_echo_server.py
starting upon localhost port 10000
waiting forthe next event
connection from ('127.0.0.1', 52873)
waiting forthe next event
connection from ('127.0.0.1', 52874)
received "This is the message. "from ('127.0.0.1', 52873)
waiting forthe next event
received "This is the message. "from ('127.0.0.1', 52874)
sending "This is the message. " to('127.0.0.1', 52873)
waiting forthe next event
('127.0.0.1', 52873) queue empty
sending "This is the message. " to('127.0.0.1', 52874)
waiting forthe next event
('127.0.0.1', 52874) queue empty
waiting forthe next event
received "It will be sent " from('127.0.0.1', 52873)
received "It will be sent " from('127.0.0.1', 52874)
waiting forthe next event
sending "It will be sent " to('127.0.0.1', 52873)
sending "It will be sent " to('127.0.0.1', 52874)
waiting forthe next event
('127.0.0.1', 52873) queue empty
('127.0.0.1', 52874) queue empty
waiting forthe next event
received "in parts." from('127.0.0.1', 52873)
received "in parts." from('127.0.0.1', 52874)
waiting forthe next event
sending "in parts." to('127.0.0.1', 52873)
sending "in parts." to('127.0.0.1', 52874)
waiting forthe next event
('127.0.0.1', 52873) queue empty
('127.0.0.1', 52874) queue empty
waiting forthe next event
closing ('127.0.0.1', 52874)
closing ('127.0.0.1', 52874)
waiting forthe next event
客户端:
# pythonselect_echo_multiclient.py
connecting tolocalhost port 10000
('127.0.0.1',52873): sending "This is the message. "
('127.0.0.1',52874): sending "This is the message. "
('127.0.0.1',52873): received "This is the message. "
('127.0.0.1',52874): received "This is the message. "
('127.0.0.1',52873): sending "It will be sent "
('127.0.0.1',52874): sending "It will be sent "
('127.0.0.1',52873): received "It will be sent "
('127.0.0.1',52874): received "It will be sent "
('127.0.0.1',52873): sending "in parts."
('127.0.0.1',52874): sending "in parts."
('127.0.0.1',52873): received "in parts."
('127.0.0.1',52874): received "in parts."
11.2.2 增长超时
Select的第四个参数可以设置超时。
服务器端:
import select
import socket
import sys
import Queue
# Create aTCP/IP socket
server =socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
# Bind thesocket to the port
server_address= ('localhost', 10000)
print>>sys.stderr, 'starting up on %s port %s' % server_address
server.bind(server_address)
# Listen forincoming connections
server.listen(5)
# Socketsfrom which we expect to read
inputs = [server ]
# Sockets towhich we expect to write
outputs = [ ]
# Keep upwith the queues of outgoing messages
message_queues= {}
while inputs:
# Wait for at least one of the sockets tobe ready for processing
print >>sys.stderr, '\nwaiting forthe next event'
timeout = 1
readable, writable, exceptional =select.select(inputs,
outputs,
inputs,
timeout)
if not (readable or writable or exceptional):
print >>sys.stderr, ' timed out, do some other work here'
continue
# Handle inputs
for s in readable:
if s is server:
# A "readable" serversocket is ready to accept a connection
connection, client_address =s.accept()
print >>sys.stderr,'connection from', client_address
connection.setblocking(0)
inputs.append(connection)
# Give the connection a queue fordata we want to send
message_queues[connection] =Queue.Queue()
else:
data = s.recv(1024)
if data:
# A readable client socket hasdata
print >>sys.stderr,'received "%s" from %s' % \
(data, s.getpeername())
message_queues[s].put(data)
# Add output channel forresponse
if s not in outputs:
outputs.append(s)
else:
# Interpret empty result as closed connection
print >>sys.stderr,'closing', client_address
# Stop listening for input onthe connection
if s in outputs:
outputs.remove(s)
inputs.remove(s)
s.close()
# Remove message queue
del message_queues[s]
# Handle outputs
for s in writable:
try:
next_msg =message_queues[s].get_nowait()
except Queue.Empty:
# No messages waiting so stopchecking for writability.
print >>sys.stderr,s.getpeername(), 'queue empty'
outputs.remove(s)
else:
print >>sys.stderr, 'sending"%s" to %s' % \
(next_msg, s.getpeername())
s.send(next_msg)
# Handle "exceptional conditions"
for s in exceptional:
print >>sys.stderr, 'exceptioncondition on', s.getpeername()
# Stop listening for input on the connection
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
# Remove message queue
del message_queues[s]
客户端:
import socket
import sys
import time
# Create aTCP/IP socket
sock =socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect thesocket to the port where the server is listening
server_address= ('localhost', 10000)
print>>sys.stderr, 'connecting to %s port %s' % server_address
sock.connect(server_address)
time.sleep(1)
messages = ['Part one of the message.',
'Part two of the message.',
]
amount_expected= len(''.join(messages))
try:
# Send data
for message in messages:
print >>sys.stderr, 'sending"%s"' % message
sock.sendall(message)
time.sleep(1.5)
# Look for the response
amount_received = 0
while amount_received < amount_expected:
data = sock.recv(16)
amount_received += len(data)
print >>sys.stderr, 'received"%s"' % data
finally:
print >>sys.stderr, 'closing socket'
sock.close()
执行结果:
服务器端:
# python select_echo_server_timeout.py
waiting for the next event
timed out, do someother work here
waiting for the next event
connection from ('127.0.0.1', 52875)
waiting for the next event
received "Part one of the message." from('127.0.0.1', 52875)
waiting for the next event
sending "Part one of the message." to ('127.0.0.1',52875)
waiting for the next event
('127.0.0.1', 52875) queue empty
waiting for the next event
timed out, do someother work here
waiting for the next event
received "Part two of the message." from ('127.0.0.1',52875)
waiting for the next event
sending "Part two of the message." to ('127.0.0.1',52875)
waiting for the next event
('127.0.0.1', 52875) queue empty
waiting for the next event
timed out, do someother work here
waiting for the next event
closing ('127.0.0.1', 52875)
waiting for the next event
timed out, do someother work here
客户端:
# python select_echo_multiclient.py
connecting to localhost port 10000
('127.0.0.1', 52873): sending "This is the message."
('127.0.0.1', 52874): sending "This is the message."
('127.0.0.1', 52873): received "This is the message."
('127.0.0.1', 52874): received "This is the message."
('127.0.0.1', 52873): sending "It will be sent "
('127.0.0.1', 52874): sending "It will be sent "
('127.0.0.1', 52873): received "It will be sent "
('127.0.0.1', 52874): received "It will be sent "
('127.0.0.1', 52873): sending "in parts."
('127.0.0.1', 52874): sending "in parts."
('127.0.0.1', 52873): received "in parts."
('127.0.0.1', 52874): received "in parts."
[root@bogon select]# python select_echo_slow_client.py
connecting to localhost port 10000
sending "Part one of the message."
sending "Part two of the message."
received "Part one of the "
received "message.Part two"
received " of the message."
closing socket
11.2.3 poll
Poll和select类似,以下是服务器端:
import select
import socket
import sys
import Queue
# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
# Bind the socket to the port
server_address = ('localhost', 10000)
print >>sys.stderr, 'starting up on %s port %s' %server_address
server.bind(server_address)
# Listen for incoming connections
server.listen(5)
# Keep up with the queues of outgoing messages
message_queues = {}
# Do not block forever (milliseconds)
TIMEOUT = 1000
# Commonly used flag sets
READ_ONLY = ( select.POLLIN |
select.POLLPRI |
select.POLLHUP |
select.POLLERR )
READ_WRITE = READ_ONLY | select.POLLOUT
# Set up the poller
poller = select.poll()
poller.register(server, READ_ONLY)
# Map file descriptors to socket objects
fd_to_socket = { server.fileno(): server,
}
while True:
# Wait for at leastone of the sockets to be ready for processing
print>>sys.stderr, 'waiting for the next event'
events =poller.poll(TIMEOUT)
for fd, flag inevents:
# Retrieve theactual socket from its file descriptor
s =fd_to_socket[fd]
# Handle inputs
if flag &(select.POLLIN | select.POLLPRI):
if s isserver:
# Areadable socket is ready to accept a connection
connection, client_address = s.accept()
print>>sys.stderr, ' connection',client_address
connection.setblocking(0)
fd_to_socket[ connection.fileno() ] = connection
poller.register(connection, READ_ONLY)
# Givethe connection a queue for data to send
message_queues[connection] = Queue.Queue()
else:
data =s.recv(1024)
if data:
# Areadable client socket has data
print >>sys.stderr, ' received "%s" from %s' % \
(data, s.getpeername())
message_queues[s].put(data)
#Add output channel for response
poller.modify(s, READ_WRITE)
else:
#Interpret empty result as closed connection
print >>sys.stderr, ' closing', client_address
#Stop listening for input on the connection
poller.unregister(s)
s.close()
#Remove message queue
delmessage_queues[s]
elif flag &select.POLLHUP:
# Clienthung up
print>>sys.stderr, ' closing',client_address, '(HUP)'
# Stoplistening for input on the connection
poller.unregister(s)
s.close()
elif flag &select.POLLOUT:
# Socket isready to send data, if there is any to send.
try:
next_msg= message_queues[s].get_nowait()
exceptQueue.Empty:
# Nomessages waiting so stop checking
print>>sys.stderr, s.getpeername(), 'queue empty'
poller.modify(s, READ_ONLY)
else:
print>>sys.stderr, ' sending"%s" to %s' % \
(next_msg, s.getpeername())
s.send(next_msg)
elif flag &select.POLLERR:
print >>sys.stderr, ' exception on', s.getpeername()
# Stoplistening for input on the connection
poller.unregister(s)
s.close()
# Removemessage queue
delmessage_queues[s]
执行结果:
服务器端:
# ./select_poll_echo_server.py
starting up on localhost port 10000
waiting for the next event
waiting for the next event
waiting for the next event
waiting for the next event
connection('127.0.0.1', 52876)
waiting for the next event
connection('127.0.0.1', 52877)
received "This isthe message. " from ('127.0.0.1', 52876)
waiting for the next event
sending "This isthe message. " to ('127.0.0.1', 52876)
received "This isthe message. " from ('127.0.0.1', 52877)
waiting for the next event
('127.0.0.1', 52876) queue empty
sending "This isthe message. " to ('127.0.0.1', 52877)
waiting for the next event
('127.0.0.1', 52877) queue empty
waiting for the next event
received "It willbe sent " from ('127.0.0.1', 52876)
waiting for the next event
sending "It willbe sent " to ('127.0.0.1', 52876)
received "It willbe sent " from ('127.0.0.1', 52877)
waiting for the next event
('127.0.0.1', 52876) queue empty
sending "It willbe sent " to ('127.0.0.1', 52877)
waiting for the next event
('127.0.0.1', 52877) queue empty
waiting for the next event
received "inparts." from ('127.0.0.1', 52876)
received "inparts." from ('127.0.0.1', 52877)
waiting for the next event
sending "inparts." to ('127.0.0.1', 52876)
sending "inparts." to ('127.0.0.1', 52877)
waiting for the next event
('127.0.0.1', 52876) queue empty
('127.0.0.1', 52877) queue empty
waiting for the next event
closing ('127.0.0.1',52877)
closing ('127.0.0.1',52877)
waiting for the next event
客户端:
]# python select_echo_multiclient.py
connecting to localhost port 10000
('127.0.0.1', 52876): sending "This is the message."
('127.0.0.1', 52877): sending "This is the message."
('127.0.0.1', 52876): received "This is the message."
('127.0.0.1', 52877): received "This is the message."
('127.0.0.1', 52876): sending "It will be sent "
('127.0.0.1', 52877): sending "It will be sent "
('127.0.0.1', 52876): received "It will be sent "
('127.0.0.1', 52877): received "It will be sent "
('127.0.0.1', 52876): sending "in parts."
('127.0.0.1', 52877): sending "in parts."
('127.0.0.1', 52876): received "in parts."
('127.0.0.1', 52877): received "in parts."
11.2.4 其他
Epoll是poll的扩展,上例中的poll改成epoll也可以照样应用。
Kqueue是BSD内核队列,kevent是BSD内核event。
参考资料:
select(http://docs.python.org/library/select.html) The standard library documentation
for this module.
Socket Programming HOWTO (http://docs.python.org/howto/sockets.html)An
instructional guide by Gordon McMillan,included in the standard library
documentation.
socket (page 561) Low-level networkcommunication.
SocketServer (page 609) Framework forcreating network server applications.
asyncore (page 619) and asynchat (page629) Asynchronous I/O framework.select (page 594)
UNIX Network Programming, Volume 1: TheSockets Networking API, 3/E By
W. Richard Stevens, Bill Fenner, andAndrew M. Rudoff. Published by
Addison-Wesley Professional, 2004.ISBN-10: 0131411551.
文章结束给大家分享下程序员的一些笑话语录: 手机终究会变成PC,所以ip会比wm更加畅销,但是有一天手机强大到一定程度了就会发现只有wm的支持才能完美享受。就好比树和草,草长得再高也是草,时间到了条件成熟了树就会窜天高了。www.ishuo.cn