IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程

通俗理解(摘自网上一大神)

这些名词比较绕口,理解涵义就好。一个epoll场景:一个酒吧服务员(一个线程),前面趴了一群醉汉,突然一个吼一声“倒酒”(事件),你小跑过去给他倒一杯,然后随他去吧,突然又一个要倒酒,你又过去倒上,就这样一个服务员服务好多人,有时没人喝酒,服务员处于空闲状态,可以干点别的玩玩手机。至于epoll与select,poll的区别在于后两者的场景中醉汉不说话,你要挨个问要不要酒,没时间玩手机了。io多路复用大概就是指这几个醉汉共用一个服务员。

三个函数

1、select

进程指定内核监听哪些文件描述符(最多监听1024个fd)的哪些事件,当没有文件描述符事件发生时,进程被阻塞;当一个或者多个文件描述符事件发生时,进程被唤醒。

当我们调用select()时:

  1 上下文切换转换为内核态

  2 将fd从用户空间复制到内核空间

  3  内核遍历所有fd,查看其对应事件是否发生

  4  如果没发生,将进程阻塞,当设备驱动产生中断或者timeout时间后,将进程唤醒,再次进行遍历

  5 返回遍历后的fd

  6  将fd从内核空间复制到用户空间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout])
  
参数: 可接受四个参数(前三个必须)
rlist: wait until ready for reading
wlist: wait until ready for writing
xlist: wait for an “exceptional condition”
timeout: 超时时间
 
返回值:三个列表
  
select方法用来监视文件描述符(当文件描述符条件不满足时,select会阻塞),当某个文件描述符状态改变后,会返回三个列表
1、当参数1 序列中的fd满足“可读”条件时,则获取发生变化的fd并添加到fd_r_list中
2、当参数2 序列中含有fd时,则将该序列中所有的fd添加到 fd_w_list中
3、当参数3 序列中的fd发生错误时,则将该发生错误的fd添加到 fd_e_list中
4、当超时时间为空,则select会一直阻塞,直到监听的句柄发生变化
   当超时时间 = n(正整数)时,那么如果监听的句柄均无任何变化,则select会阻塞n秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。

在服务端我们可以看到,我们需要不停的调用select, 这就意味着:

  1  当文件描述符过多时,文件描述符在用户空间与内核空间进行copy会很费时

  2  当文件描述符过多时,内核对文件描述符的遍历也很浪费时间

  3  select最大仅仅支持1024个文件描述符

参考:http://www.cnblogs.com/Anker/archive/2013/08/14/3258674.html

2、poll

参考:http://www.cnblogs.com/Anker/archive/2013/08/15/3261006.html

3、epoll

参考:http://www.cnblogs.com/Anker/archive/2013/08/17/3263780.html

epoll是select和poll改进后的结果,相比下epoll具有以下优点:

1、支持一个进程打开的socket描述符(FD)不受限制(仅受限于操作系统的最大文件句柄数)

select最大的缺陷就是单个进程所打开的FD是有一定限制的,它由FD_SETSIZE设置,默认值是1024,epoll并没有这个限制,它所支持的FD上限是操作系统的最大文件句柄数,这个数字远远大于1024

2、I/O效率不会随着FD数目的增加而线性下降

 epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时,会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝一次

传统的select/poll另一个致命弱点就是当你拥有一个很大的socket集合,由于网络延时或者链路空闲,任一时刻只有少部分的socket是“活跃”的,但是select/poll每次调用都会线性扫描全部集合,导致效率呈现线性下降。epoll不存在这个问题,它只会对“活跃”的socket进行操作-这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的,那么,只有“活跃”的socket才会主动的去调用callback函数,其他idle状态socket则不会。在这点上,epoll实现了一个伪AIO

3、使用mmap加速内核与用户空间的消息传递

epoll会在epoll_ctl时把指定的fd遍历一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd

无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存复制就显得非常重要,epoll是通过内核和用户空间mmap使用同一块内存实现。

4、epoll的API更加简单

用来克服select/poll缺点的方法不只有epoll,epoll只是一种Linux的实现方案。在freeBSD下有kqueue,而dev/poll是最古老的Solaris的方案,使用难度依次递增。但epoll更加简单。

epoll详解(python中)

 Python中的select模块专注于I/O多路复用,提供了select  poll  epoll三个方法(其中后两个在Linux中可用,windows仅支持select),另外也提供了kqueue方法(freeBSD系统)

select.epoll(sizehint=-1, flags=0) 创建epoll对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
epoll.close()
Close the control file descriptor of the epoll object.关闭epoll对象的文件描述符
 
epoll.closed
True if the epoll object is closed.检测epoll对象是否关闭
 
epoll.fileno()
Return the file descriptor number of the control fd.返回epoll对象的文件描述符
 
epoll.fromfd(fd)
Create an epoll object from a given file descriptor.根据指定的fd创建epoll对象
 
epoll.register(fd[, eventmask])
Register a fd descriptor with the epoll object.向epoll对象中注册fd和对应的事件
 
epoll.modify(fd, eventmask)
Modify a registered file descriptor.修改fd的事件
 
epoll.unregister(fd)
Remove a registered file descriptor from the epoll object.取消注册
 
epoll.poll(timeout=-1, maxevents=-1)
Wait for events. timeout in seconds (float)阻塞,直到注册的fd事件发生,会返回一个dict,格式为:{(fd1,event1),(fd2,event2),……(fdn,eventn)}

事件:

1
2
3
4
5
6
7
8
9
10
11
12
EPOLLERR = 8               ----发生错误
EPOLLET = 2147483648       ----默认为水平触发,设置该事件后则边缘触发
EPOLLHUP = 16              ----挂起状态
EPOLLIN = 1                ----可读
EPOLLMSG = 1024            ----忽略
EPOLLONESHOT = 1073741824  ----一次性行为。在退出一个事件后,FD内部禁用
EPOLLOUT = 4               ----可写
EPOLLPRI = 2               ----紧急可读
EPOLLRDBAND = 128          ----读取优先
EPOLLRDNORM = 64           ----相当于epollin
EPOLLWRBAND = 512          ----写入优先
EPOLLWRNORM = 256          ----相当于epollout

水平触发和边缘触发:

Level_triggered(水平触发,有时也称条件触发):当被监控的文件描述符上有可读写事件发生时,epoll.poll()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll.poll()时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!!!如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率!!! 优点很明显:稳定可靠

Edge_triggered(边缘触发,有时也称状态触发):当被监控的文件描述符上有可读写事件发生时,epoll.poll()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll.poll()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你!!!这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符!!!缺点:某些条件下不可靠

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训 服务端

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训 客户端

 

实战代码:

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训 server

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

# /usr/bin/env python# coding:utf-8import socketimport sys, osimport threadingimport timeimport loggingimport multiprocessingimport randomimport datetimeimport hashlib# hashlib.md5(open(fileName,'rb').read()).hexdigest()import pdb

sys.path.append('../')import exceptions# 导入任务调度模块from apscheduler.schedulers.blocking import BlockingSchedulerfrom sqldb import SQLdbfrom mylog import MyLog as Logfrom communication_packet import Communication_Packetclass Client(object):    # Handle message type
    HANDLE_GENERAL = 1  # 处理普通的应答包
    HANDLE_INSERT = 2  # 所有数据安装传递过来的进行数据库插入操作
    HANDLE_UPDATE = 3  # 对传递过来的数据进行更新操作,需要传递更新条件
    HANDLE_INERT_UPDATE = 4
    HANDLE_FILE = 5    def __init__(self, IP='112.33.9.154', Port=11366, blackdir=None, db=None):
        self.tasksched = BlockingScheduler()  # 任务调度器
        self.serverIP = IP  # 设置服务器IP
        self.serverPort = Port  # 设置服务器端口
        if db is not None:
            self.db = db        else:
            self.db = SQLdb()  # 初始化数据库用于数据库同步
        self.lock = threading.Lock()        # 使用默认模式:debug模式
        self.log = Log()
        self.log.openConsole()  # 打开控制端输出
        self.logger = self.log.getLog()

        self.EOF = '\n\r\n'  # Set EOF flag

        if blackdir is None:
            self.basepath = './blacklist/'
        else:
            self.basepath = blackdir

        self.sysdbFirstFlag = False
        self.key = None  # using to calculate token
        self.encryption = None
        self.detect_piont_name = 'xxxx'
        self.detect_piont_serial_number = 'xxxxxx'

    def set_DB(self, host=None, username=None, password=None, dbname=None):
        self.db = SQLdb(host, username, password, dbname)    def set_first_synclog_flag(self, flag):
        self.synclog_flag = flag    def setBlacklistDir(self, filedir=None):        # Set blacklist dir
        if filedir is None:
            self.basepath = './blacklist/'
        else:
            self.basepath = filedir    def createNewBlackListPath(self):        # blacklistdir if not exists,create it
        if os.path.exists(self.basepath):            pass
        else:            try:
                os.mkdir(self.basepath)            except:                raise
        nowtime = datetime.datetime.now().strftime('%Y_%b_%d_%H_%M_%S')
        filename = 'blacklist_' + nowtime + '.txt'  # 根据文件扩展名
        filepath = self.basepath + filename        return filepath    def handle_file_func(self, content, filename):        try:
            content_data = eval(db_content)        except Exception, e:
            self.logger.error('-----handle_file_func:    eval business_content error!-----')            return False        else:            # Open file for write data
            try:
                w_file = file(filename, 'w')                for data_item in content_data:
                    w_file.write(str(data))                else:
                    w_file.close()            except Exception, e:
                self.logger.error('-----handle_file_func:    write data to file Error!------')                return False            else:                return True    def reap(self):        # 回收可回收的进程,使用多进程的时候调用...可能不用
        while True:            try:
                result = os.waitpid(-1, os.WNOHANG)                if not result[0]: break
            except:                break
            self.logger.info("reaped child process %d" % result[0])    def __connect_server(self, IP, Port):        '''
        连接远程服务器反正通讯套接字        '''
        # Creat a TCP/IP socket
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        # Connet the socket to the port where the server is listening
        server_address = (IP, Port)
        self.logger.info('connecting to %s port %s' % server_address)        try:
            sock.connect(server_address)        except:
            self.logger.error('connecting to %s port %s error!' % server_address)            raise
        return sock    def __get_tasks(self):    # Get task from db,return an list

    def scheduler_tasks(self, task_list=None):        # start scheduler to sched the tasks
        pass

    def calculateTime(self, starttime=None, intervalMinutes=None):        if not starttime:
            nowtime = datetime.datetime.now()        else:
            nowtime = starttime        if intervalMinutes:
            interval = datetime.timedelta(minutes=intervalMinutes)            return nowtime - interval        else:            return nowtime    def calc_md5(self, data):        return hashlib.md5(data)).hexdigest()    def validating_message_token(self, receving_data):
        pre_md5 = receving_data[:16]
        suffix_md5 = receving_data[-16:]
        message_md5 = pre_md5 + suffix_md5
        message = receving_data.lstrip().rstrip(suffix_md5)
        cur_md5 = self.calc_md5(message)        if message_md5 == cur_md5:            return True, message        else:            return False, message        pass

    def read_sync_state(self):
        qeury_sql = "select * from sync_state;"
        DB_ERROR = False
        with self.lock:
            sync_status = self.db.fechdb(query_sql)
            DB_ERROR = self.db.Error        if DB_ERROR:            raise
        if sync_status:
            sync_status_dict = dict()            for sync_s in sync_status:
                sync_status_dict[sync_s[0]] = sync_s            return sync_status_dict        else:            return None    def read_sync_contol_configure(self, read_flag=False):        # Read the control configuration from loacal db,if have not,we sync it from server,then read it again
        qeury_sql = "select * from sync_control;"
        DB_ERROR = False        # set table name
        table_name = 'sync_control'
http://www.cnblogs.com/luxiaojun/p/7084360.html

网友评论