转载

Java NIO之EPollSelectorImpl详解

这是滴滴的架构师欧阳康同学写的,非常赞,从EPollSelectorImpl到OS层面实现的详细解释,可以让大家对Java NIO的实现有更完整的理解,强烈推荐。

本文简述JDK1.7的NIO在linux平台上的实现,对java NIO的一些核心概念如Selector,Channel,Buffer等,不会做过多解释,这些请参考JDK的文档。JDK 1.7 NIO Selector在linux平台上的实现类是sun.nio.ch.EPollSelectorImpl,这个类通过linux下的epoll系列系统调用实现NIO,因此在介绍这个类的实现之前,先介绍一下linux的epoll。epoll是poll/select系统调用的一个改进版本,能以更高的性能实现IO事件的检测和分发(主要归功于epoll的事件回调机制,下文详述),主要包含以下3个系统调用:


#include
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

上述函数中,epoll_create函数负责创建一个检测IO事件的epoll实例,size参数用于“暗示”操作系统事件队列的长度,在linux-2.6.32内核中,此参数被忽略。epoll_ctl函数用于管理文件描述符的事件集,使用此函数可以注册、修改、删除一个或多个事件。epoll_wait负责检测事件,这三个函数的详细描,请参阅epoll的man文档。

Java类sun.nio.ch.EPollSelectorImpl主要的功能都委托给sun.nio.ch. EPollArrayWrapper实现 (下文所引java代码反编译自linux版jdk_1.7.0_17/lib/rt.jar):

  package sun.nio.ch;  class EPollArrayWrapper{  private native int epollCreate();  private native void epollCtl(int paramInt1, int paramInt2, int paramInt3, int paramInt4);  private native int epollWait(long paramLong1, int paramInt1, long paramLong2, int paramInt2) throws IOException;  }  

EPollArrayWrapper的三个native方法的实现代码可参阅openjdk7/jdk/src/solaris/native/sun/nio/ch/ EPollArrayWrapper.c,可看到这三个native方法正是对上述epoll系列系统调用的包装。(其他jdk的实现代码会有所不同,但归根结底都是对epoll系列系统调用的包装)。

EPollSelectorImpl. implRegister方法(Selector.register方法的具体实现),通过调用epoll_ctl向epoll实例中注册事件:

  protected void implRegister(SelectionKeyImpl paramSelectionKeyImpl) {       if (this.closed)         throw new ClosedSelectorException();       SelChImpl localSelChImpl = paramSelectionKeyImpl.channel;       this.fdToKey.put(Integer.valueOf(localSelChImpl.getFDVal()), paramSelectionKeyImpl);       this.pollWrapper.add(localSelChImpl);       this.keys.add(paramSelectionKeyImpl);  }  

上述方法中,除了向epoll实例注册事件外,还将注册的文件描述符(fd)与SelectionKey的对应关系添加到fdToKey中,这个map维护了文件描述符与SelectionKey的映射。每当向Selector中注册一个Channel时,向此map中添加一条记录,而当Channel.close、SelectionKey.cancel方法调用时,则从fdToKey中移除与Channel的fd相关联的SelectionKey,具体代码在EPollSelectorImpl.implDereg方法中:

  protected void implDereg(SelectionKeyImpl paramSelectionKeyImpl) throws IOException {       assert (paramSelectionKeyImpl.getIndex() >= 0);       SelChImpl localSelChImpl = paramSelectionKeyImpl.channel;       int i = localSelChImpl.getFDVal();       this.fdToKey.remove(Integer.valueOf(i));       this.pollWrapper.release(localSelChImpl);       paramSelectionKeyImpl.setIndex(-1);       this.keys.remove(paramSelectionKeyImpl);       this.selectedKeys.remove(paramSelectionKeyImpl);       deregister(paramSelectionKeyImpl);       SelectableChannel localSelectableChannel = paramSelectionKeyImpl.channel();       if ((!localSelectableChannel.isOpen()) && (!localSelectableChannel.isRegistered()))         ((SelChImpl)localSelectableChannel).kill();  }  

EPollSelectorImpl. doSelect(Selector.select方法的实现),则通过调用epoll_wait实现事件检测:

  protected int doSelect(long paramLong)       throws IOException     {       if (this.closed)         throw new ClosedSelectorException();       processDeregisterQueue();       try {         begin();         this.pollWrapper.poll(paramLong);       } finally {         end();       }       processDeregisterQueue();       int i = updateSelectedKeys();       if (this.pollWrapper.interrupted())       {         this.pollWrapper.putEventOps(this.pollWrapper.interruptedIndex(), 0);         synchronized (this.interruptLock) {           this.pollWrapper.clearInterrupted();           IOUtil.drain(this.fd0);           this.interruptTriggered = false;         }       }       return i;     }  

此方法的主要流程概括如下:

1. 通过epoll_wait调用(this.pollWrapper.poll)获取已就绪的文件描述符集合

2. 通过fdToKey查找文件描述符对应的SelectionKey,并更新之,更新SelectionKey的具体代码在EPollSelectorImpl .updateSelectedKeys中:

  private int updateSelectedKeys()     {       int i = this.pollWrapper.updated;       int j = 0;       for (int k = 0; k < i; k++) {        int m = this.pollWrapper.getDescriptor(k);        SelectionKeyImpl localSelectionKeyImpl = (SelectionKeyImpl)this.fdToKey.get(Integer.valueOf(m));          if (localSelectionKeyImpl != null) {          int n = this.pollWrapper.getEventOps(k);          if (this.selectedKeys.contains(localSelectionKeyImpl)) {            if (localSelectionKeyImpl.channel.translateAndSetReadyOps(n, localSelectionKeyImpl))              j++;          }          else {            localSelectionKeyImpl.channel.translateAndSetReadyOps(n, localSelectionKeyImpl);            if ((localSelectionKeyImpl.nioReadyOps() & localSelectionKeyImpl.nioInterestOps()) != 0) {              this.selectedKeys.add(localSelectionKeyImpl);              j++;            }          }        }      }      return j;    } 

关于fdToKey,有几个问题:

一、为何fdToKey会变得非常大?由上述代码可知,fdToKey变得非常大的可能原因有2个:

1.注册到Selector上的Channel非常多,例如一个长连接服务器可能要同时维持数十万条连接;

2.过期或失效的Channel没有及时关闭,因而对应的记录会一直留在fdToKey中,时间久了就会越积越多;

二、为何fdToKey总是串行读取?fdToKey中记录的读取,是在select方法中进行的,而select方法一般而言总是单线程调用(Selector不是线程安全的)。

三、tcp发包堆积对导致fdToKey变大吗?一般而言不会,因为fdToKey只负责管理注册到Selector上的channel,与数据传输过程无关。当然,如果tcp发包堆积导致IO框架的空闲连接检测机制失效,无法及时检测并关闭空闲的连接,则有可能导致fdToKey变大。

下面聊一聊epoll系统调用的具体实现,它的实现代码在(linux-2.6.32.65)fs/eventpoll.c中(下文所引内核代码,由于较长,所以只贴出主流程,省略了错误处理及一些相对次要的细节如参数检查、并发控制等),先看epoll_create 系统调用的实现:

fs/eventpoll.c

  SYSCALL_DEFINE1(epoll_create, int, size)  {   if (size <= 0)   return -EINVAL;  return sys_epoll_create1(0); } 

SYSCALL_DEFINE1是一个宏,用于定义有一个参数的系统调用函数,上述宏展开后即成为:

int sys_epoll_create(int size)

这就是epoll_create系统调用的入口。至于为何要用宏而不是直接声明,主要是因为系统调用的参数个数、传参方式都有严格限制,最多六个参数, SYSCALL_DEFINE2 -SYSCALL_DEFINE6分别用来定义有2-6个参数的系统调用。由上述代码可知,epoll_create函数最终调用sys_epoll_create1实现具体功能,同时也可以看出size参数被忽略了。sys_epoll_create1的主要代码如下(省略了错误处理及一些次要的细节如参数检查等):

fs/eventpoll.c

  SYSCALL_DEFINE1(epoll_create1, int, flags)  {   int error, fd;   struct eventpoll *ep = NULL;   struct file *file;   error = ep_alloc(&ep);   file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,       O_RDWR | (flags & O_CLOEXEC));   fd_install(fd, file);   ep->file = file;   return fd;  }  

上述代码主要是分配一个struct eventpoll实例,并分配与此实例相关联的文件描述符,后续的epoll_ctl,epoll_wait调用通过此文件描述符引用此实例。struct eventpoll的结构如下:

fs/eventpoll.c

  struct eventpoll {   spinlock_t lock;   struct mutex mtx;   wait_queue_head_t wq;   wait_queue_head_t poll_wait;   struct list_head rdllist;   struct rb_root rbr;   struct epitem *ovflist;   struct user_struct *user;   struct file *file;   int visited;   struct list_head visited_list_link;  }  

上述数据结构的关键部分是:

1. 一个等待队列wq,epoll正是通过此等待队列实现的事件回调

2. 一个就绪列表rdllist,此列表以双链表的形式保存了已就绪的文件描述符

3. 一个红黑树rbr,用于保存已注册过的文件描述符,若重复注册相同的文件描述符,则会返回错误

等待队列是epoll系统调用的核心机制(不只是epoll,linux下事件的通知、回调等机制大都依赖于等待队列),在讲述epoll_ctl,epoll_wait的实现之前,先来看看等待队列。等待队列可以使一组进程/线程在等待某个事件时睡眠,当等待的事件发生时,内核会唤醒睡眠的进程/线程。注意,下文并不区分进程和线程,在linux下,进程和线程在调度这个意义下(调度就是指linux的进程调度,包括进程的切换、睡眠、唤醒等)并无差别。此机制可以类比java.lang.Object类的wait和notify/notifyAll方法,其中wait方法使线程睡眠,notify/notifyAll方法则唤醒睡眠的一个或全部线程。等待队列主要涉及两个数据结构:

include/linux/wait.h

  struct __wait_queue_head {   spinlock_t lock;   list_head task_list;  };  struct __wait_queue {   unsigned int flags;  #define WQ_FLAG_EXCLUSIVE 0x01   void *private;   wait_queue_func_t func;   struct list_head task_list;  };  

struct __wait_queue_head是队头结构,task_list 保存了添加到此队列上的元素,struct list_head是标准的linux双链表, 定义如下:

include/linux/list.h

  struct list_head {   struct list_head *next, *prev;  };  

注意,此结构既可以表示双链表的表头,也可以表示一个链表元素,且next,prev这两个指针可以指向任意数据结构。

struct __wait_queue是等待队列的元素结构,成员func是等待的进程被唤醒时执行的回调函数,其定义如下:

include/linux/wait.h

  typedef int (*wait_queue_func_t)(wait_queue_t *wait, unsigned mode, int flags, void *key);  

struct __wait_queue的成员task_list是一个链表元素用于将此结构放置到struct __wait_queue_head中(这和此结构的task_list成员含义是不同的,此成员的含义为双链表的头),private成员一般指向等待进程的task_struct实例(该结构成员很多,在此就不贴出了,只需要知道linux下每个进程都对应一个task_struct 实例)。

在使用上,等待队列主要涉及以下函数(或者宏):

include/linux/wait.h

__add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait);

#define wait_event(wq, condition)

#define wake_up_xxx(x,…)

__add_wait_queue用于将一个进程添加到等待队列,wait_event是一个宏,它用于等待一个事件,当事件未发生时使等待的进程休眠,wake_up_xxx是一系列的宏,包括wake_up,wake_up_all,wake_up_locked,wake_up_interruptible等,负责唤醒休眠在某个事件上的一个或一组进程。关于等待队列的具体实现细节,由于牵涉较广(涉及到进程调度、中断处理等),这里不再详述,可以将add_wait_queue,wait_event类比java.lang.Object的wait方法,而wake_up则可以类比java.lang.Object的notify/notifyAll方法。

介绍完等待队列后,就可以进一步研究epoll_ctl的实现了,其代码实现中核心的部分是:

fs/eventpoll.c

  SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,    struct epoll_event __user *, event)  {   if (!tfile->f_op || !tfile->f_op->poll)    goto error_tgt_fput;   switch (op) {   case EPOLL_CTL_ADD:  error=ep_insert(ep, &epds, tfile, fd);    break;   case EPOLL_CTL_DEL:  error=ep_remove(ep, epi);    break;   case EPOLL_CTL_MOD:    error = ep_modify(ep, epi, &epds);    break;   }   return error;  }  

什么样的文件描述符可以注册?从那个if判断可以看出,只有文件描述符对应的文件实现了poll方法的才可以,一般而言,字符设备的文件都实现了此方法,网络相关的套接字也实现了此方法,而块设备文件例如ext2/ext3/ext4文件系统文件,都没有实现此方法。实现了poll方法的文件,对应于java NIO的java.nio.channels.SelectableChannel,这也是为何只有 SelectableChannel 才能注册到Selector上的原因。ep_insert,ep_remove,ep_modify分别对应事件的注册、删除、修改,我们以ep_insert为例,看一下事件注册的过程,其关键代码如下:

fs/eventpoll.c

  static int ep_insert(struct eventpoll *ep, struct epoll_event *event,         struct file *tfile, int fd)  {  init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);  revents = tfile->f_op->poll(tfile, &epq.pt);  ep_rbtree_insert(ep, epi);  if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {  list_add_tail(&epi->rdllink, &ep->rdllist);;  wake_up_locked(&ep->wq);  }  }  

上述代码的主要做的事是:

1. 绑定等待队列的回调函数ep_ptable_queue_proc

2. 调用对应文件的实例的poll方法,此方法的具体实现差别非常大,但绝大多数都会调用wait_event相关方法,在没有事件发生时,使进程睡眠,例如socket对应的实现(代码在net/ipv4/af_inet.c的tcp_poll方法,在此不再详述);

3. 若注册的事件已经发生,则将已就绪的文件描述符插入到eventpoll实例的就绪列表(list_add_tail(&epi->rdllink, &ep->rdllist);),并唤醒睡眠的进程(wake_up_locked(&ep->wq))

第1步绑定的回调函数ep_ptable_queue_proc,会在等待的事件发生时执行,其主要功能是将就绪的文件描述符插入到eventpoll实例的就绪列表(具体是通过ep_ptable_queue_proc绑定的另一个回调函数ep_poll_callback实现的):

fs/eventpoll.c

  static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key){   if (!ep_is_linked(&epi->rdllink))    list_add_tail(&epi->rdllink, &ep->rdllist);  }  

最后看epoll_wait的实现,有了就绪队列,epoll_wait的实现就比较简单了,只需检查就绪队列是否为空,若为空,则在必要时睡眠或等待:

fs/eventpoll.c

  SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,    int, maxevents, int, timeout)  {   int error;   struct file *file;   struct eventpoll *ep;   file = fget(epfd);   ep = file->private_data;   error = ep_poll(ep, events, maxevents, timeout);   return error;  }  

此函数最终调用ep_poll完成其主要功能:

  static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,       int maxevents, long timeout)  {   retry:   if (list_empty(&ep->rdllist)) {    init_waitqueue_entry(&wait, current);    wait.flags |= WQ_FLAG_EXCLUSIVE;    __add_wait_queue(&ep->wq, &wait);

for (;;) {

set_current_state(TASK_INTERRUPTIBLE);

if (!list_empty(&ep->rdllist) || !jtimeout)

break;

if (signal_pending(current)) {

res = -EINTR;

break;

}

spin_unlock_irqrestore(&ep->lock, flags);

jtimeout = schedule_timeout(jtimeout);

spin_lock_irqsave(&ep->lock, flags);

}

__remove_wait_queue(&ep->wq, &wait);

set_current_state(TASK_RUNNING);

}

eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;

spin_unlock_irqrestore(&ep->lock, flags);

if (!res && eavail &&

!(res = ep_send_events(ep, events, maxevents)) && jtimeout)

goto retry;

return res;

}

上述代码主要是检查就绪队列是否为空,若为空时,则根据超时设置判断是否需要睡眠(__add_wait_queue)或等待(jtimeout = schedule_timeout(jtimeout);)。

综上所述,epoll系统调用通过等待队列,其事件检测(epoll_wait系统调用)的时间复杂度为O(n),其中n是“活跃”的文件描述符总数,所谓的活跃,是指在该文件描述符上有频繁的读写操作,而对比poll或select系统调用(其实现代码在fs/select.c中),其时间复杂度也是O(n),但这个n却是注册的文件描述符的总数。因此,当活跃的文件描述符占总的文件描述符比例较小时,例如,在长连接服务器的场景中,虽然同时可能需要维持数十万条长连接,但其中只有少数的连接是活跃的,使用epoll就比较合适。

=============================欢迎关注微信公众号:hellojavacases

公众号上发布的消息都存放在http://hellojava.info上。

原文  http://hellojava.info/?p=498
正文到此结束
Loading...