Android 重學(xué)系列 Handler與相關(guān)系統(tǒng)調(diào)用的剖析(下)

前言

上一篇文章,和大家講述了Handler的中使用到的eventfd系統(tǒng)調(diào)用原理。而本文將會著重剖析epoll系統(tǒng)調(diào)用,而整個handler核心的系統(tǒng)就是epoll。

如果遇到問題歡迎來到這里討論:http://m.itdecent.cn/p/d38b2970ff3f

正文

在聊epoll的原理之前,我們來看看epoll在Handler中的使用。
epoll的使用一般分為三個步驟:

  • 1.調(diào)用epoll_create構(gòu)建一個epoll的句柄
EpollFd = epoll_create(EPOLL_SIZE_HINT);
  • 2.調(diào)用epoll_ctl 注冊一個事件的監(jiān)聽,這個事件一般是文件描述符的數(shù)據(jù)是否發(fā)生變化
epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);

該方法的意思是,往mEpollFd句柄中注冊一個新的事件監(jiān)聽mWakeEventFd,把eventItem作為相關(guān)的數(shù)據(jù)傳入

  • 3.調(diào)用epoll_wait 阻塞當(dāng)前的循環(huán),直到監(jiān)聽到數(shù)據(jù)流發(fā)生變化,就釋放阻塞進行下一步
epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

該方法的意思就是指,當(dāng)前將會監(jiān)聽mEpollFd句柄,設(shè)定了最大的監(jiān)聽量以及超時事件。如果發(fā)生了某些監(jiān)聽對象發(fā)生了變化,則把相關(guān)變化的數(shù)據(jù)輸出到eventItems中。

知道了如何使用,我們就嘗試著剖析一下依照這個調(diào)用順序,解剖整個epoll的源碼原理。

首先我們需要有一個意識,那就是epoll本質(zhì)上和binder很相似, 但不是一個驅(qū)動,而是通過通過fs_initcall的方式,為內(nèi)核添加新的功能。

epoll的初始化

static int __init eventpoll_init(void)
{
    struct sysinfo si;

    si_meminfo(&si);
    /*
     * Allows top 4% of lomem to be allocated for epoll watches (per user).
     */
    max_user_watches = (((si.totalram - si.totalhigh) / 25) << PAGE_SHIFT) /
        EP_ITEM_COST;
    BUG_ON(max_user_watches < 0);

    /*
     * Initialize the structure used to perform epoll file descriptor
     * inclusion loops checks.
     */
    ep_nested_calls_init(&poll_loop_ncalls);

    /* Initialize the structure used to perform safe poll wait head wake ups */
    ep_nested_calls_init(&poll_safewake_ncalls);

    /* Initialize the structure used to perform file's f_op->poll() calls */
    ep_nested_calls_init(&poll_readywalk_ncalls);


    /* Allocates slab cache used to allocate "struct epitem" items */
    epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),
            0, SLAB_HWCACHE_ALIGN | SLAB_PANIC, NULL);

    /* Allocates slab cache used to allocate "struct eppoll_entry" */
    pwq_cache = kmem_cache_create("eventpoll_pwq",
            sizeof(struct eppoll_entry), 0, SLAB_PANIC, NULL);

    return 0;
}
fs_initcall(eventpoll_init);

能看到在epoll在內(nèi)核啟動時候,會初始化如下幾個數(shù)據(jù)結(jié)構(gòu):

  • 1.poll_loop_ncalls 用于緩存循環(huán)查找epoll文件描述符時候的緩存路徑結(jié)構(gòu)
  • 2.poll_safewake_ncalls 保存著那些已經(jīng)加入到等待隊列那些可以安全喚醒的項。
  • 3.poll_readywalk_ncalls 已經(jīng)執(zhí)行了file的poll操作的文件描述符
  • 4.epi_cache 一個緩存的epitem隊列
  • 5.pwq_cache 一個緩存的eppoll_entry隊列

這些對象是做什么的什么的,稍后的解析就能明白了。

epoll_create源碼解析

文件:/bionic/libc/bionic/sys_epoll.cpp

int epoll_create(int size) {
  if (size <= 0) {
    errno = EINVAL;
    return -1;
  }
  return epoll_create1(0);
}

能看到在Android的epoll_create其實這個size設(shè)置的毫無意義,直接會調(diào)用epoll_create1這個系統(tǒng)調(diào)用,并且flag為0.接下來看看內(nèi)核中的方法。

文件:/fs/eventpoll.c

SYSCALL_DEFINE1(epoll_create1, int, flags)
{
    int error, fd;
    struct eventpoll *ep = NULL;
    struct file *file;


    if (flags & ~EPOLL_CLOEXEC)
        return -EINVAL;

    error = ep_alloc(&ep);
    if (error < 0)
        return error;
    /*
     * Creates all the items needed to setup an eventpoll file. That is,
     * a file structure and a free file descriptor.
     */
    fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
    if (fd < 0) {
        error = fd;
        goto out_free_ep;
    }
    file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
                 O_RDWR | (flags & O_CLOEXEC));
...
    ep->file = file;
    fd_install(fd, file);
    return fd;

...
}

我們一樣可以和eventfd系統(tǒng)調(diào)用進行比較,做的是事情如下:

  • 1.ep_alloc 初始化epoll需要的句柄以及所有的數(shù)據(jù)
    1. get_unused_fd_flags 獲取fdtable空閑的fd
    1. anon_inode_getfile 構(gòu)建一個名字為[eventpoll]的文件,并且把epoll對應(yīng)的文件操作設(shè)置到file結(jié)構(gòu)體中,把ep作為全局變量設(shè)置到file的私有數(shù)據(jù)中
    1. fd_install 把fd和file結(jié)構(gòu)體關(guān)聯(lián)起來。

ep_alloc

static int ep_alloc(struct eventpoll **pep)
{
    int error;
    struct user_struct *user;
    struct eventpoll *ep;

    user = get_current_user();
    error = -ENOMEM;
    ep = kzalloc(sizeof(*ep), GFP_KERNEL);
    if (unlikely(!ep))
        goto free_uid;

    spin_lock_init(&ep->lock);
    mutex_init(&ep->mtx);
    init_waitqueue_head(&ep->wq);
    init_waitqueue_head(&ep->poll_wait);
    INIT_LIST_HEAD(&ep->rdllist);
    ep->rbr = RB_ROOT;
    ep->ovflist = EP_UNACTIVE_PTR;
    ep->user = user;

    *pep = ep;

    return 0;

free_uid:
    free_uid(user);
    return error;
}

能看到這個過程實際上就是賦值eventpoll的過程。在這個過程為eventpoll初始化了如下的數(shù)據(jù):

  • 1.eventpoll 相關(guān)的線程的lock,mutex
  • 2.eventpoll 中的等待隊列頭poll_wait
  • 3.eventpoll 中文件描述符已經(jīng)處理過poll方法,其實就是準(zhǔn)備好的文件描述隊列
  • 4.eventpoll 中的紅黑樹根部
  • 5.ovflist 輸出到外界已經(jīng)發(fā)生變化的文件描述符
    1. user_struct 用于跟蹤進程用戶的信息

最后epollevent將會持有file結(jié)構(gòu)體。

最后這些epollevent會被設(shè)置為file中的中的私有數(shù)據(jù)。

到這里epoll需要注意初始化數(shù)據(jù)結(jié)構(gòu)就完成了,接下來看看epoll_ctl是怎么把需要監(jiān)聽的文件描述符設(shè)置到epoll中。

epoll_ctl 的系統(tǒng)調(diào)用

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
        struct epoll_event __user *, event)
{
    int error;
    int full_check = 0;
    struct fd f, tf;
    struct eventpoll *ep;
    struct epitem *epi;
    struct epoll_event epds;
    struct eventpoll *tep = NULL;

    error = -EFAULT;
    if (ep_op_has_event(op) &&
        copy_from_user(&epds, event, sizeof(struct epoll_event)))
        goto error_return;

    error = -EBADF;
    f = fdget(epfd);
...

    /* Get the "struct file *" for the target file */
    tf = fdget(fd);

    /* The target file descriptor must support poll */
    error = -EPERM;
    if (!tf.file->f_op->poll)
        goto error_tgt_fput;

    /* Check if EPOLLWAKEUP is allowed */
    if (ep_op_has_event(op))
        ep_take_care_of_epollwakeup(&epds);

....

    ep = f.file->private_data;

    mutex_lock_nested(&ep->mtx, 0);
    if (op == EPOLL_CTL_ADD) {
        if (!list_empty(&f.file->f_ep_links) ||
                        is_file_epoll(tf.file)) {
            full_check = 1;
            mutex_unlock(&ep->mtx);
            mutex_lock(&epmutex);
            if (is_file_epoll(tf.file)) {
                error = -ELOOP;
                if (ep_loop_check(ep, tf.file) != 0) {
                    clear_tfile_check_list();
                    goto error_tgt_fput;
                }
            } else
                list_add(&tf.file->f_tfile_llink,
                            &tfile_check_list);
            mutex_lock_nested(&ep->mtx, 0);
            if (is_file_epoll(tf.file)) {
                tep = tf.file->private_data;
                mutex_lock_nested(&tep->mtx, 1);
            }
        }
    }


    epi = ep_find(ep, tf.file, fd);

    error = -EINVAL;
    switch (op) {
    case EPOLL_CTL_ADD:
        if (!epi) {
            epds.events |= POLLERR | POLLHUP;
            error = ep_insert(ep, &epds, tf.file, fd, full_check);
        } else
            error = -EEXIST;
        if (full_check)
            clear_tfile_check_list();
        break;
    case EPOLL_CTL_DEL:
        if (epi)
            error = ep_remove(ep, epi);
        else
            error = -ENOENT;
        break;
    case EPOLL_CTL_MOD:
        if (epi) {
            epds.events |= POLLERR | POLLHUP;
            error = ep_modify(ep, epi, &epds);
        } else
            error = -ENOENT;
        break;
    }
    if (tep != NULL)
        mutex_unlock(&tep->mtx);
    mutex_unlock(&ep->mtx);

...
}

epoll_ctl把文件描述符添加到監(jiān)聽中大致分為以下3個步驟:

  • 1.首先從用戶空間拷貝相關(guān)的信息。設(shè)置f為epoll_ctl傳下來epoll句柄對應(yīng)的fd,tfd則是需要監(jiān)聽對象對應(yīng)的句柄。
  • 2.循環(huán)檢測監(jiān)聽對象的文件描述符是否出現(xiàn)嵌套深度過深
  • 3.處理epoll_ctl的操作標(biāo)志,如果是添加,調(diào)用ep_insert會把當(dāng)前文件描述添加到緩存中,并且調(diào)用文件描述符的poll方法。通過ep_find找到有相同的epoll則會報錯

值得注意的是后面兩點,我們著重剖析看看。

循環(huán)檢測每一個添加進來的監(jiān)聽對象的合法性

static LIST_HEAD(tfile_check_list);
    ep = f.file->private_data;

    mutex_lock_nested(&ep->mtx, 0);
    if (op == EPOLL_CTL_ADD) {
        if (!list_empty(&f.file->f_ep_links) ||
                        is_file_epoll(tf.file)) {
            full_check = 1;
            mutex_unlock(&ep->mtx);
            mutex_lock(&epmutex);
            if (is_file_epoll(tf.file)) {
                error = -ELOOP;
                if (ep_loop_check(ep, tf.file) != 0) {
                    clear_tfile_check_list();
                    goto error_tgt_fput;
                }
            } else
                list_add(&tf.file->f_tfile_llink,
                            &tfile_check_list);
            mutex_lock_nested(&ep->mtx, 0);
            if (is_file_epoll(tf.file)) {
                tep = tf.file->private_data;
                mutex_lock_nested(&tep->mtx, 1);
            }
        }
    }

當(dāng)前判斷是添加對象的操作則會處理一個額外的判斷。

  • epoll句柄對應(yīng)的file結(jié)構(gòu)體中f_ep_links隊列不為空,或者目標(biāo)監(jiān)聽的file結(jié)構(gòu)體中含有poll方法。

說明這種情況比較特殊一個可能是類似socket文件描述符一樣自身帶有著poll方法,一種可能是本身就是epoll對象,這樣就會出現(xiàn)一個環(huán),當(dāng)通知一個epoll有回喚醒另一個epoll對象,這個對象有可能繼續(xù)喚醒回來,出現(xiàn)一個死循環(huán)。

  • 都沒有,說明此時是一個普通的文件描述符,直接添加到tfile_check_list。

如果目標(biāo)文件有poll函數(shù)則把file中的私有數(shù)據(jù)賦值給tep。

我們先來注重看看第一種情況,其核心函數(shù)是ep_loop_check,一般常用是第一種情況。

ep_loop_check 檢測嵌套循環(huán)

/* Visited nodes during ep_loop_check(), so we can unset them when we finish */
static LIST_HEAD(visited_list);

#define EP_MAX_NESTS 4
static int ep_loop_check(struct eventpoll *ep, struct file *file)
{
    int ret;
    struct eventpoll *ep_cur, *ep_next;

    ret = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
                  ep_loop_check_proc, file, ep, current);
    /* clear visited list */
    list_for_each_entry_safe(ep_cur, ep_next, &visited_list,
                            visited_list_link) {
        ep_cur->visited = 0;
        list_del(&ep_cur->visited_list_link);
    }
    return ret;
}

這段代碼就是為了處理下面這個問題,那就是如果epoll自己監(jiān)聽自己怎么辦?自己喚醒自己,接著繼續(xù)通知自己數(shù)據(jù)來了又要喚醒自己。而且如果一個epoll注冊多個相同的監(jiān)聽對象,豈不是會出現(xiàn)喚醒返回的結(jié)果出現(xiàn)重復(fù)的對象。

ep_call_nested
static int ep_call_nested(struct nested_calls *ncalls, int max_nests,
              int (*nproc)(void *, void *, int), void *priv,
              void *cookie, void *ctx)
{
    int error, call_nests = 0;
    unsigned long flags;
    struct list_head *lsthead = &ncalls->tasks_call_list;
    struct nested_call_node *tncur;
    struct nested_call_node tnode;

    spin_lock_irqsave(&ncalls->lock, flags);


    list_for_each_entry(tncur, lsthead, llink) {
        if (tncur->ctx == ctx &&
            (tncur->cookie == cookie || ++call_nests > max_nests)) {
            /*
             * Ops ... loop detected or maximum nest level reached.
             * We abort this wake by breaking the cycle itself.
             */
            error = -1;
            goto out_unlock;
        }
    }

    /* Add the current task and cookie to the list */
    tnode.ctx = ctx;
    tnode.cookie = cookie;
    list_add(&tnode.llink, lsthead);

    spin_unlock_irqrestore(&ncalls->lock, flags);

    /* Call the nested function */
    error = (*nproc)(priv, cookie, call_nests);

    /* Remove the current task from the list */
    spin_lock_irqsave(&ncalls->lock, flags);
    list_del(&tnode.llink);
out_unlock:
    spin_unlock_irqrestore(&ncalls->lock, flags);

    return error;
}

為了弄懂整個方法,先標(biāo)記以下幾個關(guān)鍵的對象意味著什么:

  • priv 其實是被監(jiān)聽文件的file結(jié)構(gòu)體
  • cookie 是目前要把被監(jiān)聽對象添加到epoll句柄的那個epoll監(jiān)聽主體
  • nested_call_node 是nested_calls鏈表的子節(jié)點
  • ctx 上下文是指當(dāng)前執(zhí)行當(dāng)前系統(tǒng)調(diào)用對應(yīng)的task_struct進程是哪一個

分清楚這些對象之后,鏈表lsthead(nested_calls的tasks_call_list)的循環(huán)實際上是就是查找全局變量poll_loop_ncalls中每一個子節(jié)點中的是否存在一模一樣的進程,第二條件是判斷當(dāng)前的需要添加到的ep對象是否是同一個或者已經(jīng)添加了超過4次。

如果不滿足則會為當(dāng)前這個目標(biāo)file結(jié)構(gòu)體設(shè)置ctx以及cookie,并且添加到lsthead(nested_calls的tasks_call_list)保存起來,接著執(zhí)行上面?zhèn)飨聛淼姆椒ㄖ羔?,?dāng)執(zhí)行完之后就把剛加入的nested_call_node從poll_loop_ncalls刪除。為了更好的明白這段代碼的邏輯,我們再來看看這個方法指針ep_loop_check_proc

ep_loop_check_proc
static int ep_loop_check_proc(void *priv, void *cookie, int call_nests)
{
    int error = 0;
    struct file *file = priv;
    struct eventpoll *ep = file->private_data;
    struct eventpoll *ep_tovisit;
    struct rb_node *rbp;
    struct epitem *epi;

    mutex_lock_nested(&ep->mtx, call_nests + 1);
    ep->visited = 1;
    list_add(&ep->visited_list_link, &visited_list);
    for (rbp = rb_first(&ep->rbr); rbp; rbp = rb_next(rbp)) {
        epi = rb_entry(rbp, struct epitem, rbn);
        if (unlikely(is_file_epoll(epi->ffd.file))) {
            ep_tovisit = epi->ffd.file->private_data;
            if (ep_tovisit->visited)
                continue;
            error = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
                    ep_loop_check_proc, epi->ffd.file,
                    ep_tovisit, current);
            if (error != 0)
                break;
        } else {

            if (list_empty(&epi->ffd.file->f_tfile_llink))
                list_add(&epi->ffd.file->f_tfile_llink,
                     &tfile_check_list);
        }
    }
    mutex_unlock(&ep->mtx);

    return error;
}

這一段代碼其實就是不斷的遍歷掛在epoll監(jiān)聽主體的紅黑樹中每一子節(jié)點epitem結(jié)構(gòu)體,檢查每一個判斷到有poll方法的file結(jié)構(gòu)體,獲取里面的私有數(shù)據(jù)epitem,判斷是否已經(jīng)遍歷過了這個file結(jié)構(gòu)體。一般遍歷過的epitem,其visited就會為1.沒遍歷過的一般為0.

什么時候會出現(xiàn)1和0的差異呢?一般情況下,如果是一個沒有連接到任何一個epoll主體對象的epoll對象都為0,鏈接過則為1.這樣就能很好的區(qū)分出所有的epoll是否會出現(xiàn)鏈接監(jiān)聽重復(fù)了。

如果當(dāng)前的對應(yīng)epitem已經(jīng)訪問過了則查找下一個子節(jié)點,沒有訪問過,說明就要考慮一個特殊情況這個epoll或者說帶有這poll方法的file結(jié)構(gòu)體。這個情況就可能出現(xiàn)循環(huán)監(jiān)聽,因此需要不斷向著子節(jié)點查詢校驗。

當(dāng)找到一個沒有重寫poll的file結(jié)構(gòu)體。此時是把對應(yīng)節(jié)點下的f_tfile_llink拷貝到tfile_check_list。

為了減少遞歸次數(shù),使用了visit_list記錄已經(jīng)訪問過的文件,用visit標(biāo)志位避免重復(fù)判斷。

這個過程做了什么呢?其實這個過程就是為了解決循環(huán)嵌套監(jiān)聽做的努力?實際上沒有做什么,太過深入細節(jié)反而容易忘記初衷:

if (ep_loop_check(ep, tf.file) != 0) {
                    clear_tfile_check_list();
                    goto error_tgt_fput;
                }

其實就是在判斷當(dāng)前目標(biāo)file中到error不為0的時候會報錯,錯誤是什么時候返回的,就是在ep_call_nested的遍歷循環(huán)中,通過檢驗當(dāng)前進程屬否出現(xiàn)loop深度過深(超過4層),一旦超過則返回-1.也就是說不允許你嵌套監(jiān)聽過多層次。當(dāng)然如果遇到了自己監(jiān)聽自己就會稱為一個有向圖數(shù)據(jù)結(jié)構(gòu)一定會超出4層,一定會報錯。

因此我們可以得到一個epoll_ctl使用細節(jié),請不要自己監(jiān)聽自己,也不要epoll監(jiān)聽epoll的層數(shù)超過4層。

ep_insert 把當(dāng)前被監(jiān)聽對象插入到epoll對象中

static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
             struct file *tfile, int fd, int full_check)
{
    int error, revents, pwake = 0;
    unsigned long flags;
    long user_watches;
    struct epitem *epi;
    struct ep_pqueue epq;

    user_watches = atomic_long_read(&ep->user->epoll_watches);
    if (unlikely(user_watches >= max_user_watches))
        return -ENOSPC;
    if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
        return -ENOMEM;

    /* Item initialization follow here ... */
    INIT_LIST_HEAD(&epi->rdllink);
    INIT_LIST_HEAD(&epi->fllink);
    INIT_LIST_HEAD(&epi->pwqlist);
    epi->ep = ep;
    ep_set_ffd(&epi->ffd, tfile, fd);
    epi->event = *event;
    epi->nwait = 0;
    epi->next = EP_UNACTIVE_PTR;
    if (epi->event.events & EPOLLWAKEUP) {
        error = ep_create_wakeup_source(epi);
        if (error)
            goto error_create_wakeup_source;
    } else {
        RCU_INIT_POINTER(epi->ws, NULL);
    }

    /* Initialize the poll table using the queue callback */
    epq.epi = epi;
    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);


    revents = ep_item_poll(epi, &epq.pt);


    error = -ENOMEM;
    if (epi->nwait < 0)
        goto error_unregister;

    /*把file結(jié)構(gòu)體的 f_ep_links指針指給epoll的flink方便epoll查找*/
    spin_lock(&tfile->f_lock);
    list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);
    spin_unlock(&tfile->f_lock);


    ep_rbtree_insert(ep, epi);

    /* now check if we've created too many backpaths */
    error = -EINVAL;
    if (full_check && reverse_path_check())
        goto error_remove_epi;

    /* We have to drop the new item inside our item list to keep track of it */
    spin_lock_irqsave(&ep->lock, flags);

    /* If the file is already "ready" we drop it inside the ready list */
    if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
        list_add_tail(&epi->rdllink, &ep->rdllist);
        ep_pm_stay_awake(epi);

        /* Notify waiting tasks that events are available */
        if (waitqueue_active(&ep->wq))
            wake_up_locked(&ep->wq);
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
    }

    spin_unlock_irqrestore(&ep->lock, flags);

    atomic_long_inc(&ep->user->epoll_watches);

    /* We have to call this outside the lock */
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);

    return 0;

... 
}

epitem的插入大致分為如下幾個步驟:

  • 1.初始化epitem中所有的數(shù)據(jù),rdllink,fllink,pwqlist這幾個隊列,設(shè)置epi中的event數(shù)據(jù)就是從系統(tǒng)調(diào)用傳下來的數(shù)據(jù)(eventItem),如果打開了EPOLLWAKEUP則創(chuàng)建一個wakeup_source注冊監(jiān)聽,用于避免系統(tǒng)沉睡(電量消耗檢測的關(guān)鍵之一,一般在AlarmManager和InputManager中使用)。
  • 2.調(diào)用被監(jiān)聽對象的poll方法
  • 3.把epi對象添加到ep的紅黑樹中
  • 4.如果file結(jié)構(gòu)體已經(jīng)準(zhǔn)備好了,就添加到準(zhǔn)備列表中。接著保持cpu避免沉睡,嘗試著喚醒對應(yīng)進程,判斷當(dāng)前的事件是可以走得通的。

其實重點是從第二點開始,我們先從第二點開始閱讀。

調(diào)用被監(jiān)聽對象的poll方法

struct ep_pqueue {
    poll_table pt;
    struct epitem *epi;
};
    /* Initialize the poll table using the queue callback */
    epq.epi = epi;
    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
    revents = ep_item_poll(epi, &epq.pt);

init_poll_funcptr 將方法ep_ptable_queue_proc賦值到poll_table中。接著調(diào)用ep_item_poll。

static inline unsigned int ep_item_poll(struct epitem *epi, poll_table *pt)
{
    pt->_key = epi->event.events;

    return epi->ffd.file->f_op->poll(epi->ffd.file, pt) & epi->event.events;
}

此時會調(diào)用目標(biāo)文件的poll方法。而這個目標(biāo)文件在此時是eventfd,換句話說調(diào)用的是eventfd的poll的方法。

文件:/fs/eventfd.c

static unsigned int eventfd_poll(struct file *file, poll_table *wait)
{
    struct eventfd_ctx *ctx = file->private_data;
    unsigned int events = 0;
    unsigned long flags;

    poll_wait(file, &ctx->wqh, wait);

    spin_lock_irqsave(&ctx->wqh.lock, flags);
    if (ctx->count > 0)
        events |= POLLIN;
    if (ctx->count == ULLONG_MAX)
        events |= POLLERR;
    if (ULLONG_MAX - 1 > ctx->count)
        events |= POLLOUT;
    spin_unlock_irqrestore(&ctx->wqh.lock, flags);

    return events;
}

在這個方法中有一個核心方法poll_wait。這個方法取出了file的私有數(shù)據(jù)拿到eventfd的上下文中的等待隊列頭,并且調(diào)用poll_table中的proc方法

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
    if (p && p->_qproc && wait_address)
        p->_qproc(filp, wait_address, p);
}

而這個方法剛好就是上面init_poll_funcptr初始化進來的ep_ptable_queue_proc。

ep_ptable_queue_proc

文件:/fs/eventpoll.c

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
                 poll_table *pt)
{
    struct epitem *epi = ep_item_from_epqueue(pt);
    struct eppoll_entry *pwq;

    if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
        init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
        pwq->whead = whead;
        pwq->base = epi;
        add_wait_queue(whead, &pwq->wait);
        list_add_tail(&pwq->llink, &epi->pwqlist);
        epi->nwait++;
    } else {
        /* We have to signal that an error occurred */
        epi->nwait = -1;
    }
}

這里出現(xiàn)了幾個等待隊列的頭部:

  • whead 是從eventfd傳下來的等待隊列
  • pwq 是eppoll_entry ,在這個結(jié)構(gòu)體中有包含著兩個等待隊列頭部,whead指代原來目標(biāo)被監(jiān)聽對象的等待隊列頭,這里指的是eventfd的等待隊列。把whead添加到pwq->wait隊列中。
struct eppoll_entry {
    /* List header used to link this structure to the "struct epitem" */
    struct list_head llink;

    /* The "base" pointer is set to the container "struct epitem" */
    struct epitem *base;

    /*
     * Wait queue item that will be linked to the target file wait
     * queue head.
     */
    wait_queue_t wait;

    /* The wait queue head that linked the "wait" wait queue item */
    wait_queue_head_t *whead;
};

如果nwait標(biāo)志位大于等于0(此時設(shè)置的是0),并且申請了pwq_cache一段內(nèi)存是成功的。此時將會初始化pwq中的等待頭,并且設(shè)置一個poll_wait的回調(diào)函數(shù);接著把eventfd中的等待隊列頭部添加到pwq的wait隊列,這樣就相當(dāng)于把epoll中的pwqlist和eventfd的等待隊列關(guān)聯(lián)起來;最后把當(dāng)前的pwq->llink添加到epi的pwqlist中。為后面回調(diào)作準(zhǔn)備。

這種設(shè)計十分常見,幾乎所有關(guān)于poll和epoll方法的重寫都是這樣設(shè)計的。需要重寫一個poll_wait的方法,把自己的等待隊列和上層調(diào)度者的等待隊列關(guān)聯(lián)起來,這樣一旦喚醒了該文件的等待隊列同時也會喚起上層調(diào)度者對應(yīng)的等待隊列。

在這個過程中做了一個很重要的事情,那就是設(shè)定了ep_poll_callback方法作為自定義喚醒方法。每當(dāng)想要喚醒掛起的進程將會執(zhí)行這個方法,我們稍后再聊。

把epi對象添加到ep的紅黑樹中

/*把file結(jié)構(gòu)體的 f_ep_links指針指給epoll的flink方便epoll查找*/
    spin_lock(&tfile->f_lock);
    list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);
    spin_unlock(&tfile->f_lock);

    ep_rbtree_insert(ep, epi);
static void ep_rbtree_insert(struct eventpoll *ep, struct epitem *epi)
{
    int kcmp;
    struct rb_node **p = &ep->rbr.rb_node, *parent = NULL;
    struct epitem *epic;

    while (*p) {
        parent = *p;
        epic = rb_entry(parent, struct epitem, rbn);
        kcmp = ep_cmp_ffd(&epi->ffd, &epic->ffd);
        if (kcmp > 0)
            p = &parent->rb_right;
        else
            p = &parent->rb_left;
    }
    rb_link_node(&epi->rbn, parent, p);
    rb_insert_color(&epi->rbn, &ep->rbr);
}

能看到這里僅僅只是一個很常規(guī)的紅黑樹添加的方法,找到合適的地方插入到eventpoll結(jié)構(gòu)體中的rbr最后進行左右旋轉(zhuǎn)平衡,對紅黑樹算法感興趣的可以閱讀我之前寫的紅黑樹一文。

校驗新增的epitem并添加到準(zhǔn)備隊列

 if (full_check && reverse_path_check())
        goto error_remove_epi;

    /* We have to drop the new item inside our item list to keep track of it */
    spin_lock_irqsave(&ep->lock, flags);

    /* If the file is already "ready" we drop it inside the ready list */
    if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
        list_add_tail(&epi->rdllink, &ep->rdllist);
        ep_pm_stay_awake(epi);

        /* Notify waiting tasks that events are available */
        if (waitqueue_active(&ep->wq))
            wake_up_locked(&ep->wq);
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
    }

    spin_unlock_irqrestore(&ep->lock, flags);

    atomic_long_inc(&ep->user->epoll_watches);

    /* We have to call this outside the lock */
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);

在這個過程中主要做了兩件事情:

  • 1.reverse_path_check校驗是否會出現(xiàn)喚醒風(fēng)暴
  • 2.檢查是否出現(xiàn)遺漏的event監(jiān)聽時間,并添加到準(zhǔn)備隊列
reverse_path_check校驗是否會出現(xiàn)喚醒風(fēng)暴
static const int path_limits[PATH_ARR_SIZE] = { 1000, 500, 100, 50, 10 };
static int path_count[PATH_ARR_SIZE];


static int path_count_inc(int nests)
{
    /* Allow an arbitrary number of depth 1 paths */
    if (nests == 0)
        return 0;

    if (++path_count[nests] > path_limits[nests])
        return -1;
    return 0;
}

static void path_count_init(void)
{
    int i;

    for (i = 0; i < PATH_ARR_SIZE; i++)
        path_count[i] = 0;
}

static int reverse_path_check_proc(void *priv, void *cookie, int call_nests)
{
    int error = 0;
    struct file *file = priv;
    struct file *child_file;
    struct epitem *epi;

    /* CTL_DEL can remove links here, but that can't increase our count */
    rcu_read_lock();
    list_for_each_entry_rcu(epi, &file->f_ep_links, fllink) {
        child_file = epi->ep->file;
        if (is_file_epoll(child_file)) {
            if (list_empty(&child_file->f_ep_links)) {
                if (path_count_inc(call_nests)) {
                    error = -1;
                    break;
                }
            } else {
                error = ep_call_nested(&poll_loop_ncalls,
                            EP_MAX_NESTS,
                            reverse_path_check_proc,
                            child_file, child_file,
                            current);
            }
            if (error != 0)
                break;
        } else {
            printk(KERN_ERR "reverse_path_check_proc: "
                "file is not an ep!\n");
        }
    }
    rcu_read_unlock();
    return error;
}

static int reverse_path_check(void)
{
    int error = 0;
    struct file *current_file;

    /* let's call this for all tfiles */
    list_for_each_entry(current_file, &tfile_check_list, f_tfile_llink) {
        path_count_init();
        error = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
                    reverse_path_check_proc, current_file,
                    current_file, current);
        if (error)
            break;
    }
    return error;
}

在上面的ep_loop_check是為了避免監(jiān)聽嵌套層級過深,這里則會判斷每一個層級是否過于龐大。對于epoll來說,為了避免每一層鏈接的帶有epoll監(jiān)聽對象過大,對每一層epoll都做了大小的限制。因為一旦一個層級放置監(jiān)聽數(shù)量過大的另一個epoll會導(dǎo)致一旦喚醒就會喚醒一場風(fēng)暴一樣,卷席整個系統(tǒng),導(dǎo)致性能急速下降。

為了處理這個問題,epoll對每一層做了如下的數(shù)量限制:

第0層1000,第1層500,第2層100,第3層50,第4層10

那么我們求一下總數(shù),一個epoll對象一共能夠監(jiān)聽能夠監(jiān)聽2.5*10^10 這么多。其實也足夠使用了。不過面向服務(wù)器開發(fā)的朋友,倒是有可能使用這么多,到底怎么把socket鏈接均勻的分布在不同epoll也是不錯的優(yōu)化點。

添加到準(zhǔn)備隊列中
 if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
        list_add_tail(&epi->rdllink, &ep->rdllist);
        ep_pm_stay_awake(epi);

        /* Notify waiting tasks that events are available */
        if (waitqueue_active(&ep->wq))
            wake_up_locked(&ep->wq);
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
    }

能看到當(dāng)epi的rdllink沒有添加到ep->rdllist),則去添加;接著調(diào)用ep_pm_stay_awake,如果初始化了epitem中的ws對象,則會避免cpu沉睡,接著判斷ep所在的等待隊列是否存在,此時雖然初始化了,但是沒有加入到等待隊列中。同理poll_wait等待隊列,這個隊列一般是處理poll文件操作的。

從這里面可以得知,如果我們已經(jīng)在監(jiān)聽了,同時在注冊新的監(jiān)聽文件描述符同時,發(fā)生了事件事件變化,此時也會把相應(yīng)的監(jiān)聽對象添加到eventpoll的rdllist準(zhǔn)備隊列。其實準(zhǔn)備隊列就是指已經(jīng)監(jiān)聽到發(fā)生變化的文件描述符,準(zhǔn)備通過epoll_wait返回上層的核心數(shù)據(jù)結(jié)構(gòu)。

epoll_wait 系統(tǒng)調(diào)用等待觸發(fā)epoll監(jiān)聽回調(diào)

epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

把對應(yīng)的文件描述符注冊到epoll之后,接著就開始嘗試著阻塞監(jiān)聽epoll中所有注冊到epoll的文件描述符的數(shù)據(jù)變化。一旦發(fā)生變化則會調(diào)上來,而所有發(fā)生變化的對應(yīng)的事件就是eventItems中的數(shù)據(jù)。

文件:/fs/eventpoll.c

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
        int, maxevents, int, timeout)
{
    int error;
    struct fd f;
    struct eventpoll *ep;

    /* The maximum number of event must be greater than zero */
    if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
        return -EINVAL;

    /* Verify that the area passed by the user is writeable */
    if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event)))
        return -EFAULT;

    /* Get the "struct file *" for the eventpoll file */
    f = fdget(epfd);
...
    if (!is_file_epoll(f.file))
        goto error_fput;

    ep = f.file->private_data;

    /* Time to fish for events ... */
    error = ep_poll(ep, events, maxevents, timeout);

...
    return error;
}

在這個系統(tǒng)調(diào)用中核心方法是ep_poll。

ep_poll

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
           int maxevents, long timeout)
{
    int res = 0, eavail, timed_out = 0;
    unsigned long flags;
    long slack = 0;
    wait_queue_t wait;
    ktime_t expires, *to = NULL;

    if (timeout > 0) {
        struct timespec end_time = ep_set_mstimeout(timeout);

        slack = select_estimate_accuracy(&end_time);
        to = &expires;
        *to = timespec_to_ktime(end_time);
    } else if (timeout == 0) {
        /*
         * Avoid the unnecessary trip to the wait queue loop, if the
         * caller specified a non blocking operation.
         */
        timed_out = 1;
        spin_lock_irqsave(&ep->lock, flags);
        goto check_events;
    }

fetch_events:
    spin_lock_irqsave(&ep->lock, flags);

    if (!ep_events_available(ep)) {
        /*
         * We don't have any available event to return to the caller.
         * We need to sleep here, and we will be wake up by
         * ep_poll_callback() when events will become available.
         */
        init_waitqueue_entry(&wait, current);
        __add_wait_queue_exclusive(&ep->wq, &wait);

        for (;;) {
            /*
             * We don't want to sleep if the ep_poll_callback() sends us
             * a wakeup in between. That's why we set the task state
             * to TASK_INTERRUPTIBLE before doing the checks.
             */
            set_current_state(TASK_INTERRUPTIBLE);
            if (ep_events_available(ep) || timed_out)
                break;
            if (signal_pending(current)) {
                res = -EINTR;
                break;
            }

            spin_unlock_irqrestore(&ep->lock, flags);
            if (!freezable_schedule_hrtimeout_range(to, slack,
                                HRTIMER_MODE_ABS))
                timed_out = 1;

            spin_lock_irqsave(&ep->lock, flags);
        }
        __remove_wait_queue(&ep->wq, &wait);

        set_current_state(TASK_RUNNING);
    }
check_events:
    /* Is it worth to try to dig for events ? */
    eavail = ep_events_available(ep);

    spin_unlock_irqrestore(&ep->lock, flags);

    /*
     * Try to transfer events to user space. In case we get 0 events and
     * there's still timeout left over, we go trying again in search of
     * more luck.
     */
    if (!res && eavail &&
        !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
        goto fetch_events;

    return res;
}

首先這里先獲取timeout,如果是0則會理解進入到check_events標(biāo)簽對應(yīng)的代碼段,否則則會先進入fetch_events代碼段。先來弄清楚一般情況,設(shè)置了timeout的epoll_wait。

在fetch_events代碼段中做了如下的事情,先把之前在ep_alloc初始化好的ep->wq將會加入到當(dāng)前的進程的等待隊列中,進行超時等待。這個過程允許中斷打斷。等待被監(jiān)聽對象喚起當(dāng)前的進程。

接下來將會執(zhí)行check_events代碼段,這個代碼段會判斷當(dāng)前的準(zhǔn)備隊列是否為空,為空則沒有必要執(zhí)行,直接返回。不為空則調(diào)用ep_send_events。

在這里需要注意一點,此時epoll_wait使用eventpoll中的wq作為等待隊列進行進程的掛起處理。而這個對象實際上是可以被被監(jiān)聽對象打斷的,這個時候就來看看在epoll_ctl設(shè)置監(jiān)聽時候設(shè)置到epi->pwqlist中的自定義喚醒進程方法。

ep_poll_callback自定義喚醒方法

下面這個方法就是整個回調(diào)機制的核心

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
    int pwake = 0;
    unsigned long flags;
    struct epitem *epi = ep_item_from_wait(wait);
    struct eventpoll *ep = epi->ep;

....
    spin_lock_irqsave(&ep->lock, flags);

...

    if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
        if (epi->next == EP_UNACTIVE_PTR) {
            epi->next = ep->ovflist;
            ep->ovflist = epi;
            if (epi->ws) {
                __pm_stay_awake(ep->ws);
            }

        }
        goto out_unlock;
    }

    /* If this file is already in the ready list we exit soon */
    if (!ep_is_linked(&epi->rdllink)) {
        list_add_tail(&epi->rdllink, &ep->rdllist);
        ep_pm_stay_awake_rcu(epi);
    }


    if (waitqueue_active(&ep->wq))
        wake_up_locked(&ep->wq);
    if (waitqueue_active(&ep->poll_wait))
        pwake++;

out_unlock:
    spin_unlock_irqrestore(&ep->lock, flags);

    /* We have to call this outside the lock */
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);

    return 1;
}

通過wait等待隊列反過來找epitem;接著把當(dāng)前的epitem添加到對應(yīng)eventpoll的ovflist鏈表中;把epi->rdllink添加到ep的ep->rdllist的尾部,因為是一個鏈表環(huán)所以也是頭部,解析來要查找什么發(fā)生了變化的file只需要讀取頭部即可。接著調(diào)用wake_up_locked打斷epoll_wait的循環(huán)。如果需要則調(diào)用ep_poll_safewake處理調(diào)用poll方法而在掛起的進程。

明白這點之后繼續(xù)看ep_send_events方法。

ep_send_events

static int ep_send_events(struct eventpoll *ep,
              struct epoll_event __user *events, int maxevents)
{
    struct ep_send_events_data esed;

    esed.maxevents = maxevents;
    esed.events = events;

    return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
}

調(diào)用ep_scan_ready_list,并把ep_send_events_proc方法指針傳入。

static int ep_scan_ready_list(struct eventpoll *ep,
                  int (*sproc)(struct eventpoll *,
                       struct list_head *, void *),
                  void *priv, int depth, bool ep_locked)
{
    int error, pwake = 0;
    unsigned long flags;
    struct epitem *epi, *nepi;
    LIST_HEAD(txlist);

    ...
    spin_lock_irqsave(&ep->lock, flags);
    list_splice_init(&ep->rdllist, &txlist);
    ep->ovflist = NULL;
    spin_unlock_irqrestore(&ep->lock, flags);

    /*
     * Now call the callback function.
     */
    error = (*sproc)(ep, &txlist, priv);

    spin_lock_irqsave(&ep->lock, flags);
    /*
     * During the time we spent inside the "sproc" callback, some
     * other events might have been queued by the poll callback.
     * We re-insert them inside the main ready-list here.
     */
    for (nepi = ep->ovflist; (epi = nepi) != NULL;
         nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {

        if (!ep_is_linked(&epi->rdllink)) {
            list_add_tail(&epi->rdllink, &ep->rdllist);
            ep_pm_stay_awake(epi);
        }
    }

    ep->ovflist = EP_UNACTIVE_PTR;


    list_splice(&txlist, &ep->rdllist);
    __pm_relax(ep->ws);

    if (!list_empty(&ep->rdllist)) {

        if (waitqueue_active(&ep->wq))
            wake_up_locked(&ep->wq);
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
    }
    spin_unlock_irqrestore(&ep->lock, flags);

    if (!ep_locked)
        mutex_unlock(&ep->mtx);

    /* We have to call this outside the lock */
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);

    return error;
}

在這個過程中會把ovflist設(shè)置為NULL,這個只是一個標(biāo)志位告訴epoll它監(jiān)聽的文件出現(xiàn)了狀態(tài)的變化;接著把rdllist拷貝到txlist;然后會調(diào)用ep_send_events_proc方法拷貝變化的數(shù)據(jù)到用戶空間對應(yīng)的eventpoll對象中。最后再檢測一次是否還有ovflist中是否有遺漏的數(shù)據(jù)。

因為在從內(nèi)核拷貝到用戶空間這個行為并不是一個原子操作,可能出現(xiàn)異步的情況,這個時候可能還在ep_send_events_proc中進行數(shù)據(jù)的拷貝,此時又進行了回調(diào),因此需要最后再一次的校驗和添加到rdllist中并且再來一次wake_up_locked步驟重新執(zhí)行一次該行為。

ep_read_events_proc

最后再看看這個拷貝數(shù)據(jù)的核心方法

static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
                   void *priv)
{
    struct ep_send_events_data *esed = priv;
    int eventcnt;
    unsigned int revents;
    struct epitem *epi;
    struct epoll_event __user *uevent;
    struct wakeup_source *ws;
    poll_table pt;

    init_poll_funcptr(&pt, NULL);

    /*
     * We can loop without lock because we are passed a task private list.
     * Items cannot vanish during the loop because ep_scan_ready_list() is
     * holding "mtx" during this call.
     */
    for (eventcnt = 0, uevent = esed->events;
         !list_empty(head) && eventcnt < esed->maxevents;) {
        epi = list_first_entry(head, struct epitem, rdllink);

        /*
         * Activate ep->ws before deactivating epi->ws to prevent
         * triggering auto-suspend here (in case we reactive epi->ws
         * below).
         *
         * This could be rearranged to delay the deactivation of epi->ws
         * instead, but then epi->ws would temporarily be out of sync
         * with ep_is_linked().
         */
        ws = ep_wakeup_source(epi);
        if (ws) {
            if (ws->active)
                __pm_stay_awake(ep->ws);
            __pm_relax(ws);
        }

        list_del_init(&epi->rdllink);

        revents = ep_item_poll(epi, &pt);

        /*
         * If the event mask intersect the caller-requested one,
         * deliver the event to userspace. Again, ep_scan_ready_list()
         * is holding "mtx", so no operations coming from userspace
         * can change the item.
         */
        if (revents) {
            if (__put_user(revents, &uevent->events) ||
                __put_user(epi->event.data, &uevent->data)) {
                list_add(&epi->rdllink, head);
                ep_pm_stay_awake(epi);
                return eventcnt ? eventcnt : -EFAULT;
            }
            eventcnt++;
            uevent++;
            if (epi->event.events & EPOLLONESHOT)
                epi->event.events &= EP_PRIVATE_BITS;
            else if (!(epi->event.events & EPOLLET)) {

                list_add_tail(&epi->rdllink, &ep->rdllist);
                ep_pm_stay_awake(epi);
            }
        }
    }

    return eventcnt;
}

做了如下幾點事情:

  • 1.首先清除上一次殘留下來的準(zhǔn)備隊列。
  • 2.進入用戶空間對應(yīng)的epoll_event指針數(shù)組不斷向后循環(huán)。在循環(huán)中獲取從ep_poll_callback回調(diào)回來的eventpoll中的全局準(zhǔn)備隊列的頭部,不斷的通過__put_user從內(nèi)核空間拷貝到用戶空間中(__put_user 比起copy_to_user拷貝的數(shù)量要更小更快).能走到這個拷貝函數(shù)是因為再一次的調(diào)用poll方法,確認(rèn)對應(yīng)的文件描述符中的緩沖區(qū)是否返回正常的返回碼(大于0,一般的實現(xiàn)是指緩沖區(qū)中是否還有數(shù)據(jù)),又因為此時polltable中的方法是null,不會循環(huán)添加等待隊列。
  • 3.處理EPOLLET邊緣觸發(fā)和EPOLLLT水平觸發(fā)的區(qū)別。能看到在循環(huán)的末尾如果沒有打開EPOLLET(邊緣觸發(fā)),則會繼續(xù)把當(dāng)前的epi添加到eventpoll中的準(zhǔn)備隊列中。

這就是邊緣觸發(fā)和水平觸發(fā)實現(xiàn)核心,如果返回了正常的數(shù)據(jù),同時發(fā)現(xiàn)關(guān)閉了邊緣觸發(fā)的標(biāo)志位,則會繼續(xù)把當(dāng)前的epitem添加到全局的eventpoll中的準(zhǔn)備隊列,繼續(xù)下次讀取,知道對應(yīng)文件描述符中poll操作返回0為止。

這就是在內(nèi)核中怎么實現(xiàn)邊緣觸發(fā)(狀態(tài)變化不管數(shù)據(jù)緩沖區(qū)是否有數(shù)據(jù)就返回一次)和水平觸發(fā)(只要緩沖區(qū)還有數(shù)據(jù)則會不斷的觸發(fā)返回)。

總結(jié)

epoll之所以叫做eventpoll,其實核心是執(zhí)行每一個注冊到epoll中文件描述符的poll方法。每當(dāng)調(diào)用每一個文件描述符的poll的方法時候,就會把自己的等待行為和被監(jiān)聽事件鏈接到一起,同時被打斷回調(diào)。

epoll第一次閱讀源碼必定會頭大(作者就是這樣),因為里面有數(shù)個等待隊列,每個等待隊列做的事情都不一樣,如果沒注意到這些等待隊列做的事情,邏輯將會混亂。

里面主要包含三個等待隊列:

  • eventpoll.wq 這個是主要的等待隊列,當(dāng)調(diào)用epoll_wait的時候,就是用這個等待隊列進行進程的調(diào)度掛起。于此同時在ep_poll_callback中,將會獲取到ep對象并且打斷這個等待隊列的掛起

  • eventpoll.pwqlist 主要就是為了和被監(jiān)聽的文件描述符poll中的等待隊列關(guān)聯(lián)起來。一旦進行被監(jiān)聽事件發(fā)生了需要打斷poll方法的等待隊列掛起操作時候?qū)?,通知到epoll的回調(diào)

  • eventpoll.poll_wait 這個等待隊列一般是用在epoll本身的poll方法中。

在這個過程中,每監(jiān)聽一個事件,就會在當(dāng)前進程的eventpoll的文件描述符中的eventpoll的紅黑樹中增加一個epitem對象。

每當(dāng)被監(jiān)聽事件被喚起時候,將會帶著自定義喚起事件一起喚起整個epoll。在回調(diào)中,會把自己添加到eventpoll的準(zhǔn)備隊列中,后面的掃描拷貝。

下面是epoll的設(shè)計圖:


epoll設(shè)計模型.png

思考

最后再來解決一下開篇的疑問,為什么在Handler的Looper是一個死循環(huán)不會卡死?因為Handler的核心是epoll,epoll是一個異步回調(diào)監(jiān)聽事件變化狀態(tài)的系統(tǒng)調(diào)用,當(dāng)沒有事件的時候?qū)M入到進程掛起。

為什么Handler不會引起ANR?ANR的管理是在AMS中進行一個Handler的延時事件,當(dāng)?shù)竭_時間后還沒有拆出掉這個handler事件將會爆出ANR異常。而Hander整個過程并沒有涉及到這些流程?返回來說,ANR是依賴于Handler的機制?只有會專門開一篇聊anr。

當(dāng)我們徹底理解epoll的源碼實現(xiàn)之后,讓我們反過來想想Handler為什么這么設(shè)計?Handler為了在整個Android系統(tǒng)來說可說是一個命脈。除去從Zygote到四大組件的啟動的流程,幾乎其他所有的操作都是在mainhandler中完成的,無論是點擊觸摸事件,還是View的繪制。

只要寫過OpenGL就知道,一個顯示系統(tǒng)也好,操作系統(tǒng)也好,所有的操作都不可避免的需要在一個循環(huán)中處理,只有在一個循環(huán)中才能保證系統(tǒng)源源不斷的運行。但是并不是所有的事件都必須在所有的循環(huán)中都檢測一遍執(zhí)行一遍,這樣就太消耗性能。相反,如果有一個系統(tǒng)調(diào)用可以做到只需要更新需要更新的系統(tǒng)事件,其他時候就把資源讓渡給更加需要的地方,才是一個系統(tǒng)的合理設(shè)計。

也是基于這個思路,在poll,select之上就誕生了epoll系統(tǒng)調(diào)用。這個系統(tǒng)調(diào)用通過回調(diào)很靈活的解決了資源調(diào)度的合理分配。

而Handler作為整個系統(tǒng)的運行命脈使用epoll能夠監(jiān)聽大量的事件,且能對所有事件的變化快速的反應(yīng)過來,這是一個極好的設(shè)計。于此同時,因為epoll在內(nèi)核中會拷貝數(shù)據(jù),為了加速epoll的喚醒速度,Android特定設(shè)計了一個喚醒文件描述符eventfd作為Handler的喚醒標(biāo)志。在上一篇文章閱讀eventfd的源碼,可以知道這是一個用戶態(tài)內(nèi)核態(tài)進程線程之間快速的通知,而且數(shù)據(jù)量永遠超不過一個long型,對于put_user來說這簡直就是天大的喜事,因為它設(shè)計出來就是為了拷貝基礎(chǔ)類型。

基于這種回調(diào)的方式,Handler才會被稱為異步工具。沒工作的時候就通過nativePollOnce掛起進程,讓渡資源。

我們拋開Android系統(tǒng),把眼光放在整個領(lǐng)域上。如網(wǎng)絡(luò)編程,所有百萬級別的服務(wù)器全部都是用了epoll系統(tǒng)調(diào)用進行設(shè)計。為什么?很直觀的一個數(shù)據(jù),我們使用select或者poll把socket放在一個池子中每一次有數(shù)據(jù)變化了就進行循環(huán)檢測每一個socket中數(shù)據(jù)流的變化,這樣就會出現(xiàn)O(n)的時間消耗,雖然O(n)在算法中屬于比較好的時間復(fù)雜度,但是量一大就會變成一百萬一次循環(huán),這是不可能接受的。因此如果有辦法快速得知哪些socket出現(xiàn)了變化并且快速處理,這是一個從O(n)下降到O(1)級別的優(yōu)化,讓服務(wù)器承載更大網(wǎng)絡(luò)連接成為了可能。

而在Android中,也有類似的優(yōu)化。如騰訊開源的mars就是通過epoll進一步優(yōu)化整個網(wǎng)絡(luò)鏈接。

所以為什么說無論是前端也好,后端也好。我喜歡說一句話,殊途同歸。只要開發(fā)是基于某個平臺核心進行的,那么必須要對該平臺的核心有一定的理解,才能做得更好。

后話

接下來再補充一篇之前寫遺漏的Application的初始化與綁定,接下來就讓我們開啟Android渲染系統(tǒng)。對了,在這里我稍微宣揚一下我的個人博客,有興趣可以來這里看看,會同步更新移植文章,可能有其他更多雜談。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容