前言
上一篇文章,和大家講述了Handler的中使用到的eventfd系統(tǒng)調(diào)用原理。而本文將會著重剖析epoll系統(tǒng)調(diào)用,而整個handler核心的系統(tǒng)就是epoll。
如果遇到問題歡迎來到這里討論:http://m.itdecent.cn/p/d38b2970ff3f
正文
在聊epoll的原理之前,我們來看看epoll在Handler中的使用。
epoll的使用一般分為三個步驟:
- 1.調(diào)用epoll_create構(gòu)建一個epoll的句柄
EpollFd = epoll_create(EPOLL_SIZE_HINT);
- 2.調(diào)用epoll_ctl 注冊一個事件的監(jiān)聽,這個事件一般是文件描述符的數(shù)據(jù)是否發(fā)生變化
epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
該方法的意思是,往mEpollFd句柄中注冊一個新的事件監(jiān)聽mWakeEventFd,把eventItem作為相關(guān)的數(shù)據(jù)傳入
- 3.調(diào)用epoll_wait 阻塞當(dāng)前的循環(huán),直到監(jiān)聽到數(shù)據(jù)流發(fā)生變化,就釋放阻塞進行下一步
epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
該方法的意思就是指,當(dāng)前將會監(jiān)聽mEpollFd句柄,設(shè)定了最大的監(jiān)聽量以及超時事件。如果發(fā)生了某些監(jiān)聽對象發(fā)生了變化,則把相關(guān)變化的數(shù)據(jù)輸出到eventItems中。
知道了如何使用,我們就嘗試著剖析一下依照這個調(diào)用順序,解剖整個epoll的源碼原理。
首先我們需要有一個意識,那就是epoll本質(zhì)上和binder很相似, 但不是一個驅(qū)動,而是通過通過fs_initcall的方式,為內(nèi)核添加新的功能。
epoll的初始化
static int __init eventpoll_init(void)
{
struct sysinfo si;
si_meminfo(&si);
/*
* Allows top 4% of lomem to be allocated for epoll watches (per user).
*/
max_user_watches = (((si.totalram - si.totalhigh) / 25) << PAGE_SHIFT) /
EP_ITEM_COST;
BUG_ON(max_user_watches < 0);
/*
* Initialize the structure used to perform epoll file descriptor
* inclusion loops checks.
*/
ep_nested_calls_init(&poll_loop_ncalls);
/* Initialize the structure used to perform safe poll wait head wake ups */
ep_nested_calls_init(&poll_safewake_ncalls);
/* Initialize the structure used to perform file's f_op->poll() calls */
ep_nested_calls_init(&poll_readywalk_ncalls);
/* Allocates slab cache used to allocate "struct epitem" items */
epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),
0, SLAB_HWCACHE_ALIGN | SLAB_PANIC, NULL);
/* Allocates slab cache used to allocate "struct eppoll_entry" */
pwq_cache = kmem_cache_create("eventpoll_pwq",
sizeof(struct eppoll_entry), 0, SLAB_PANIC, NULL);
return 0;
}
fs_initcall(eventpoll_init);
能看到在epoll在內(nèi)核啟動時候,會初始化如下幾個數(shù)據(jù)結(jié)構(gòu):
- 1.poll_loop_ncalls 用于緩存循環(huán)查找epoll文件描述符時候的緩存路徑結(jié)構(gòu)
- 2.poll_safewake_ncalls 保存著那些已經(jīng)加入到等待隊列那些可以安全喚醒的項。
- 3.poll_readywalk_ncalls 已經(jīng)執(zhí)行了file的poll操作的文件描述符
- 4.epi_cache 一個緩存的epitem隊列
- 5.pwq_cache 一個緩存的eppoll_entry隊列
這些對象是做什么的什么的,稍后的解析就能明白了。
epoll_create源碼解析
文件:/bionic/libc/bionic/sys_epoll.cpp
int epoll_create(int size) {
if (size <= 0) {
errno = EINVAL;
return -1;
}
return epoll_create1(0);
}
能看到在Android的epoll_create其實這個size設(shè)置的毫無意義,直接會調(diào)用epoll_create1這個系統(tǒng)調(diào)用,并且flag為0.接下來看看內(nèi)核中的方法。
文件:/fs/eventpoll.c
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;
if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
error = ep_alloc(&ep);
if (error < 0)
return error;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0) {
error = fd;
goto out_free_ep;
}
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
...
ep->file = file;
fd_install(fd, file);
return fd;
...
}
我們一樣可以和eventfd系統(tǒng)調(diào)用進行比較,做的是事情如下:
- 1.ep_alloc 初始化epoll需要的句柄以及所有的數(shù)據(jù)
- get_unused_fd_flags 獲取fdtable空閑的fd
- anon_inode_getfile 構(gòu)建一個名字為[eventpoll]的文件,并且把epoll對應(yīng)的文件操作設(shè)置到file結(jié)構(gòu)體中,把ep作為全局變量設(shè)置到file的私有數(shù)據(jù)中
- fd_install 把fd和file結(jié)構(gòu)體關(guān)聯(lián)起來。
ep_alloc
static int ep_alloc(struct eventpoll **pep)
{
int error;
struct user_struct *user;
struct eventpoll *ep;
user = get_current_user();
error = -ENOMEM;
ep = kzalloc(sizeof(*ep), GFP_KERNEL);
if (unlikely(!ep))
goto free_uid;
spin_lock_init(&ep->lock);
mutex_init(&ep->mtx);
init_waitqueue_head(&ep->wq);
init_waitqueue_head(&ep->poll_wait);
INIT_LIST_HEAD(&ep->rdllist);
ep->rbr = RB_ROOT;
ep->ovflist = EP_UNACTIVE_PTR;
ep->user = user;
*pep = ep;
return 0;
free_uid:
free_uid(user);
return error;
}
能看到這個過程實際上就是賦值eventpoll的過程。在這個過程為eventpoll初始化了如下的數(shù)據(jù):
- 1.eventpoll 相關(guān)的線程的lock,mutex
- 2.eventpoll 中的等待隊列頭poll_wait
- 3.eventpoll 中文件描述符已經(jīng)處理過poll方法,其實就是準(zhǔn)備好的文件描述隊列
- 4.eventpoll 中的紅黑樹根部
- 5.ovflist 輸出到外界已經(jīng)發(fā)生變化的文件描述符
- user_struct 用于跟蹤進程用戶的信息
最后epollevent將會持有file結(jié)構(gòu)體。
最后這些epollevent會被設(shè)置為file中的中的私有數(shù)據(jù)。
到這里epoll需要注意初始化數(shù)據(jù)結(jié)構(gòu)就完成了,接下來看看epoll_ctl是怎么把需要監(jiān)聽的文件描述符設(shè)置到epoll中。
epoll_ctl 的系統(tǒng)調(diào)用
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
int error;
int full_check = 0;
struct fd f, tf;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
struct eventpoll *tep = NULL;
error = -EFAULT;
if (ep_op_has_event(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event)))
goto error_return;
error = -EBADF;
f = fdget(epfd);
...
/* Get the "struct file *" for the target file */
tf = fdget(fd);
/* The target file descriptor must support poll */
error = -EPERM;
if (!tf.file->f_op->poll)
goto error_tgt_fput;
/* Check if EPOLLWAKEUP is allowed */
if (ep_op_has_event(op))
ep_take_care_of_epollwakeup(&epds);
....
ep = f.file->private_data;
mutex_lock_nested(&ep->mtx, 0);
if (op == EPOLL_CTL_ADD) {
if (!list_empty(&f.file->f_ep_links) ||
is_file_epoll(tf.file)) {
full_check = 1;
mutex_unlock(&ep->mtx);
mutex_lock(&epmutex);
if (is_file_epoll(tf.file)) {
error = -ELOOP;
if (ep_loop_check(ep, tf.file) != 0) {
clear_tfile_check_list();
goto error_tgt_fput;
}
} else
list_add(&tf.file->f_tfile_llink,
&tfile_check_list);
mutex_lock_nested(&ep->mtx, 0);
if (is_file_epoll(tf.file)) {
tep = tf.file->private_data;
mutex_lock_nested(&tep->mtx, 1);
}
}
}
epi = ep_find(ep, tf.file, fd);
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tf.file, fd, full_check);
} else
error = -EEXIST;
if (full_check)
clear_tfile_check_list();
break;
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else
error = -ENOENT;
break;
}
if (tep != NULL)
mutex_unlock(&tep->mtx);
mutex_unlock(&ep->mtx);
...
}
epoll_ctl把文件描述符添加到監(jiān)聽中大致分為以下3個步驟:
- 1.首先從用戶空間拷貝相關(guān)的信息。設(shè)置f為epoll_ctl傳下來epoll句柄對應(yīng)的fd,tfd則是需要監(jiān)聽對象對應(yīng)的句柄。
- 2.循環(huán)檢測監(jiān)聽對象的文件描述符是否出現(xiàn)嵌套深度過深
- 3.處理epoll_ctl的操作標(biāo)志,如果是添加,調(diào)用ep_insert會把當(dāng)前文件描述添加到緩存中,并且調(diào)用文件描述符的poll方法。通過ep_find找到有相同的epoll則會報錯
值得注意的是后面兩點,我們著重剖析看看。
循環(huán)檢測每一個添加進來的監(jiān)聽對象的合法性
static LIST_HEAD(tfile_check_list);
ep = f.file->private_data;
mutex_lock_nested(&ep->mtx, 0);
if (op == EPOLL_CTL_ADD) {
if (!list_empty(&f.file->f_ep_links) ||
is_file_epoll(tf.file)) {
full_check = 1;
mutex_unlock(&ep->mtx);
mutex_lock(&epmutex);
if (is_file_epoll(tf.file)) {
error = -ELOOP;
if (ep_loop_check(ep, tf.file) != 0) {
clear_tfile_check_list();
goto error_tgt_fput;
}
} else
list_add(&tf.file->f_tfile_llink,
&tfile_check_list);
mutex_lock_nested(&ep->mtx, 0);
if (is_file_epoll(tf.file)) {
tep = tf.file->private_data;
mutex_lock_nested(&tep->mtx, 1);
}
}
}
當(dāng)前判斷是添加對象的操作則會處理一個額外的判斷。
- epoll句柄對應(yīng)的file結(jié)構(gòu)體中f_ep_links隊列不為空,或者目標(biāo)監(jiān)聽的file結(jié)構(gòu)體中含有poll方法。
說明這種情況比較特殊一個可能是類似socket文件描述符一樣自身帶有著poll方法,一種可能是本身就是epoll對象,這樣就會出現(xiàn)一個環(huán),當(dāng)通知一個epoll有回喚醒另一個epoll對象,這個對象有可能繼續(xù)喚醒回來,出現(xiàn)一個死循環(huán)。
- 都沒有,說明此時是一個普通的文件描述符,直接添加到tfile_check_list。
如果目標(biāo)文件有poll函數(shù)則把file中的私有數(shù)據(jù)賦值給tep。
我們先來注重看看第一種情況,其核心函數(shù)是ep_loop_check,一般常用是第一種情況。
ep_loop_check 檢測嵌套循環(huán)
/* Visited nodes during ep_loop_check(), so we can unset them when we finish */
static LIST_HEAD(visited_list);
#define EP_MAX_NESTS 4
static int ep_loop_check(struct eventpoll *ep, struct file *file)
{
int ret;
struct eventpoll *ep_cur, *ep_next;
ret = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
ep_loop_check_proc, file, ep, current);
/* clear visited list */
list_for_each_entry_safe(ep_cur, ep_next, &visited_list,
visited_list_link) {
ep_cur->visited = 0;
list_del(&ep_cur->visited_list_link);
}
return ret;
}
這段代碼就是為了處理下面這個問題,那就是如果epoll自己監(jiān)聽自己怎么辦?自己喚醒自己,接著繼續(xù)通知自己數(shù)據(jù)來了又要喚醒自己。而且如果一個epoll注冊多個相同的監(jiān)聽對象,豈不是會出現(xiàn)喚醒返回的結(jié)果出現(xiàn)重復(fù)的對象。
ep_call_nested
static int ep_call_nested(struct nested_calls *ncalls, int max_nests,
int (*nproc)(void *, void *, int), void *priv,
void *cookie, void *ctx)
{
int error, call_nests = 0;
unsigned long flags;
struct list_head *lsthead = &ncalls->tasks_call_list;
struct nested_call_node *tncur;
struct nested_call_node tnode;
spin_lock_irqsave(&ncalls->lock, flags);
list_for_each_entry(tncur, lsthead, llink) {
if (tncur->ctx == ctx &&
(tncur->cookie == cookie || ++call_nests > max_nests)) {
/*
* Ops ... loop detected or maximum nest level reached.
* We abort this wake by breaking the cycle itself.
*/
error = -1;
goto out_unlock;
}
}
/* Add the current task and cookie to the list */
tnode.ctx = ctx;
tnode.cookie = cookie;
list_add(&tnode.llink, lsthead);
spin_unlock_irqrestore(&ncalls->lock, flags);
/* Call the nested function */
error = (*nproc)(priv, cookie, call_nests);
/* Remove the current task from the list */
spin_lock_irqsave(&ncalls->lock, flags);
list_del(&tnode.llink);
out_unlock:
spin_unlock_irqrestore(&ncalls->lock, flags);
return error;
}
為了弄懂整個方法,先標(biāo)記以下幾個關(guān)鍵的對象意味著什么:
- priv 其實是被監(jiān)聽文件的file結(jié)構(gòu)體
- cookie 是目前要把被監(jiān)聽對象添加到epoll句柄的那個epoll監(jiān)聽主體
- nested_call_node 是nested_calls鏈表的子節(jié)點
- ctx 上下文是指當(dāng)前執(zhí)行當(dāng)前系統(tǒng)調(diào)用對應(yīng)的task_struct進程是哪一個
分清楚這些對象之后,鏈表lsthead(nested_calls的tasks_call_list)的循環(huán)實際上是就是查找全局變量poll_loop_ncalls中每一個子節(jié)點中的是否存在一模一樣的進程,第二條件是判斷當(dāng)前的需要添加到的ep對象是否是同一個或者已經(jīng)添加了超過4次。
如果不滿足則會為當(dāng)前這個目標(biāo)file結(jié)構(gòu)體設(shè)置ctx以及cookie,并且添加到lsthead(nested_calls的tasks_call_list)保存起來,接著執(zhí)行上面?zhèn)飨聛淼姆椒ㄖ羔?,?dāng)執(zhí)行完之后就把剛加入的nested_call_node從poll_loop_ncalls刪除。為了更好的明白這段代碼的邏輯,我們再來看看這個方法指針ep_loop_check_proc
ep_loop_check_proc
static int ep_loop_check_proc(void *priv, void *cookie, int call_nests)
{
int error = 0;
struct file *file = priv;
struct eventpoll *ep = file->private_data;
struct eventpoll *ep_tovisit;
struct rb_node *rbp;
struct epitem *epi;
mutex_lock_nested(&ep->mtx, call_nests + 1);
ep->visited = 1;
list_add(&ep->visited_list_link, &visited_list);
for (rbp = rb_first(&ep->rbr); rbp; rbp = rb_next(rbp)) {
epi = rb_entry(rbp, struct epitem, rbn);
if (unlikely(is_file_epoll(epi->ffd.file))) {
ep_tovisit = epi->ffd.file->private_data;
if (ep_tovisit->visited)
continue;
error = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
ep_loop_check_proc, epi->ffd.file,
ep_tovisit, current);
if (error != 0)
break;
} else {
if (list_empty(&epi->ffd.file->f_tfile_llink))
list_add(&epi->ffd.file->f_tfile_llink,
&tfile_check_list);
}
}
mutex_unlock(&ep->mtx);
return error;
}
這一段代碼其實就是不斷的遍歷掛在epoll監(jiān)聽主體的紅黑樹中每一子節(jié)點epitem結(jié)構(gòu)體,檢查每一個判斷到有poll方法的file結(jié)構(gòu)體,獲取里面的私有數(shù)據(jù)epitem,判斷是否已經(jīng)遍歷過了這個file結(jié)構(gòu)體。一般遍歷過的epitem,其visited就會為1.沒遍歷過的一般為0.
什么時候會出現(xiàn)1和0的差異呢?一般情況下,如果是一個沒有連接到任何一個epoll主體對象的epoll對象都為0,鏈接過則為1.這樣就能很好的區(qū)分出所有的epoll是否會出現(xiàn)鏈接監(jiān)聽重復(fù)了。
如果當(dāng)前的對應(yīng)epitem已經(jīng)訪問過了則查找下一個子節(jié)點,沒有訪問過,說明就要考慮一個特殊情況這個epoll或者說帶有這poll方法的file結(jié)構(gòu)體。這個情況就可能出現(xiàn)循環(huán)監(jiān)聽,因此需要不斷向著子節(jié)點查詢校驗。
當(dāng)找到一個沒有重寫poll的file結(jié)構(gòu)體。此時是把對應(yīng)節(jié)點下的f_tfile_llink拷貝到tfile_check_list。
為了減少遞歸次數(shù),使用了visit_list記錄已經(jīng)訪問過的文件,用visit標(biāo)志位避免重復(fù)判斷。
這個過程做了什么呢?其實這個過程就是為了解決循環(huán)嵌套監(jiān)聽做的努力?實際上沒有做什么,太過深入細節(jié)反而容易忘記初衷:
if (ep_loop_check(ep, tf.file) != 0) {
clear_tfile_check_list();
goto error_tgt_fput;
}
其實就是在判斷當(dāng)前目標(biāo)file中到error不為0的時候會報錯,錯誤是什么時候返回的,就是在ep_call_nested的遍歷循環(huán)中,通過檢驗當(dāng)前進程屬否出現(xiàn)loop深度過深(超過4層),一旦超過則返回-1.也就是說不允許你嵌套監(jiān)聽過多層次。當(dāng)然如果遇到了自己監(jiān)聽自己就會稱為一個有向圖數(shù)據(jù)結(jié)構(gòu)一定會超出4層,一定會報錯。
因此我們可以得到一個epoll_ctl使用細節(jié),請不要自己監(jiān)聽自己,也不要epoll監(jiān)聽epoll的層數(shù)超過4層。
ep_insert 把當(dāng)前被監(jiān)聽對象插入到epoll對象中
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd, int full_check)
{
int error, revents, pwake = 0;
unsigned long flags;
long user_watches;
struct epitem *epi;
struct ep_pqueue epq;
user_watches = atomic_long_read(&ep->user->epoll_watches);
if (unlikely(user_watches >= max_user_watches))
return -ENOSPC;
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;
/* Item initialization follow here ... */
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
if (epi->event.events & EPOLLWAKEUP) {
error = ep_create_wakeup_source(epi);
if (error)
goto error_create_wakeup_source;
} else {
RCU_INIT_POINTER(epi->ws, NULL);
}
/* Initialize the poll table using the queue callback */
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
revents = ep_item_poll(epi, &epq.pt);
error = -ENOMEM;
if (epi->nwait < 0)
goto error_unregister;
/*把file結(jié)構(gòu)體的 f_ep_links指針指給epoll的flink方便epoll查找*/
spin_lock(&tfile->f_lock);
list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);
spin_unlock(&tfile->f_lock);
ep_rbtree_insert(ep, epi);
/* now check if we've created too many backpaths */
error = -EINVAL;
if (full_check && reverse_path_check())
goto error_remove_epi;
/* We have to drop the new item inside our item list to keep track of it */
spin_lock_irqsave(&ep->lock, flags);
/* If the file is already "ready" we drop it inside the ready list */
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
atomic_long_inc(&ep->user->epoll_watches);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return 0;
...
}
epitem的插入大致分為如下幾個步驟:
- 1.初始化epitem中所有的數(shù)據(jù),rdllink,fllink,pwqlist這幾個隊列,設(shè)置epi中的event數(shù)據(jù)就是從系統(tǒng)調(diào)用傳下來的數(shù)據(jù)(eventItem),如果打開了EPOLLWAKEUP則創(chuàng)建一個wakeup_source注冊監(jiān)聽,用于避免系統(tǒng)沉睡(電量消耗檢測的關(guān)鍵之一,一般在AlarmManager和InputManager中使用)。
- 2.調(diào)用被監(jiān)聽對象的poll方法
- 3.把epi對象添加到ep的紅黑樹中
- 4.如果file結(jié)構(gòu)體已經(jīng)準(zhǔn)備好了,就添加到準(zhǔn)備列表中。接著保持cpu避免沉睡,嘗試著喚醒對應(yīng)進程,判斷當(dāng)前的事件是可以走得通的。
其實重點是從第二點開始,我們先從第二點開始閱讀。
調(diào)用被監(jiān)聽對象的poll方法
struct ep_pqueue {
poll_table pt;
struct epitem *epi;
};
/* Initialize the poll table using the queue callback */
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
revents = ep_item_poll(epi, &epq.pt);
init_poll_funcptr 將方法ep_ptable_queue_proc賦值到poll_table中。接著調(diào)用ep_item_poll。
static inline unsigned int ep_item_poll(struct epitem *epi, poll_table *pt)
{
pt->_key = epi->event.events;
return epi->ffd.file->f_op->poll(epi->ffd.file, pt) & epi->event.events;
}
此時會調(diào)用目標(biāo)文件的poll方法。而這個目標(biāo)文件在此時是eventfd,換句話說調(diào)用的是eventfd的poll的方法。
static unsigned int eventfd_poll(struct file *file, poll_table *wait)
{
struct eventfd_ctx *ctx = file->private_data;
unsigned int events = 0;
unsigned long flags;
poll_wait(file, &ctx->wqh, wait);
spin_lock_irqsave(&ctx->wqh.lock, flags);
if (ctx->count > 0)
events |= POLLIN;
if (ctx->count == ULLONG_MAX)
events |= POLLERR;
if (ULLONG_MAX - 1 > ctx->count)
events |= POLLOUT;
spin_unlock_irqrestore(&ctx->wqh.lock, flags);
return events;
}
在這個方法中有一個核心方法poll_wait。這個方法取出了file的私有數(shù)據(jù)拿到eventfd的上下文中的等待隊列頭,并且調(diào)用poll_table中的proc方法
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && p->_qproc && wait_address)
p->_qproc(filp, wait_address, p);
}
而這個方法剛好就是上面init_poll_funcptr初始化進來的ep_ptable_queue_proc。
ep_ptable_queue_proc
文件:/fs/eventpoll.c
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}
這里出現(xiàn)了幾個等待隊列的頭部:
- whead 是從eventfd傳下來的等待隊列
- pwq 是eppoll_entry ,在這個結(jié)構(gòu)體中有包含著兩個等待隊列頭部,whead指代原來目標(biāo)被監(jiān)聽對象的等待隊列頭,這里指的是eventfd的等待隊列。把whead添加到pwq->wait隊列中。
struct eppoll_entry {
/* List header used to link this structure to the "struct epitem" */
struct list_head llink;
/* The "base" pointer is set to the container "struct epitem" */
struct epitem *base;
/*
* Wait queue item that will be linked to the target file wait
* queue head.
*/
wait_queue_t wait;
/* The wait queue head that linked the "wait" wait queue item */
wait_queue_head_t *whead;
};
如果nwait標(biāo)志位大于等于0(此時設(shè)置的是0),并且申請了pwq_cache一段內(nèi)存是成功的。此時將會初始化pwq中的等待頭,并且設(shè)置一個poll_wait的回調(diào)函數(shù);接著把eventfd中的等待隊列頭部添加到pwq的wait隊列,這樣就相當(dāng)于把epoll中的pwqlist和eventfd的等待隊列關(guān)聯(lián)起來;最后把當(dāng)前的pwq->llink添加到epi的pwqlist中。為后面回調(diào)作準(zhǔn)備。
這種設(shè)計十分常見,幾乎所有關(guān)于poll和epoll方法的重寫都是這樣設(shè)計的。需要重寫一個poll_wait的方法,把自己的等待隊列和上層調(diào)度者的等待隊列關(guān)聯(lián)起來,這樣一旦喚醒了該文件的等待隊列同時也會喚起上層調(diào)度者對應(yīng)的等待隊列。
在這個過程中做了一個很重要的事情,那就是設(shè)定了ep_poll_callback方法作為自定義喚醒方法。每當(dāng)想要喚醒掛起的進程將會執(zhí)行這個方法,我們稍后再聊。
把epi對象添加到ep的紅黑樹中
/*把file結(jié)構(gòu)體的 f_ep_links指針指給epoll的flink方便epoll查找*/
spin_lock(&tfile->f_lock);
list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);
spin_unlock(&tfile->f_lock);
ep_rbtree_insert(ep, epi);
static void ep_rbtree_insert(struct eventpoll *ep, struct epitem *epi)
{
int kcmp;
struct rb_node **p = &ep->rbr.rb_node, *parent = NULL;
struct epitem *epic;
while (*p) {
parent = *p;
epic = rb_entry(parent, struct epitem, rbn);
kcmp = ep_cmp_ffd(&epi->ffd, &epic->ffd);
if (kcmp > 0)
p = &parent->rb_right;
else
p = &parent->rb_left;
}
rb_link_node(&epi->rbn, parent, p);
rb_insert_color(&epi->rbn, &ep->rbr);
}
能看到這里僅僅只是一個很常規(guī)的紅黑樹添加的方法,找到合適的地方插入到eventpoll結(jié)構(gòu)體中的rbr最后進行左右旋轉(zhuǎn)平衡,對紅黑樹算法感興趣的可以閱讀我之前寫的紅黑樹一文。
校驗新增的epitem并添加到準(zhǔn)備隊列
if (full_check && reverse_path_check())
goto error_remove_epi;
/* We have to drop the new item inside our item list to keep track of it */
spin_lock_irqsave(&ep->lock, flags);
/* If the file is already "ready" we drop it inside the ready list */
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
atomic_long_inc(&ep->user->epoll_watches);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
在這個過程中主要做了兩件事情:
- 1.reverse_path_check校驗是否會出現(xiàn)喚醒風(fēng)暴
- 2.檢查是否出現(xiàn)遺漏的event監(jiān)聽時間,并添加到準(zhǔn)備隊列
reverse_path_check校驗是否會出現(xiàn)喚醒風(fēng)暴
static const int path_limits[PATH_ARR_SIZE] = { 1000, 500, 100, 50, 10 };
static int path_count[PATH_ARR_SIZE];
static int path_count_inc(int nests)
{
/* Allow an arbitrary number of depth 1 paths */
if (nests == 0)
return 0;
if (++path_count[nests] > path_limits[nests])
return -1;
return 0;
}
static void path_count_init(void)
{
int i;
for (i = 0; i < PATH_ARR_SIZE; i++)
path_count[i] = 0;
}
static int reverse_path_check_proc(void *priv, void *cookie, int call_nests)
{
int error = 0;
struct file *file = priv;
struct file *child_file;
struct epitem *epi;
/* CTL_DEL can remove links here, but that can't increase our count */
rcu_read_lock();
list_for_each_entry_rcu(epi, &file->f_ep_links, fllink) {
child_file = epi->ep->file;
if (is_file_epoll(child_file)) {
if (list_empty(&child_file->f_ep_links)) {
if (path_count_inc(call_nests)) {
error = -1;
break;
}
} else {
error = ep_call_nested(&poll_loop_ncalls,
EP_MAX_NESTS,
reverse_path_check_proc,
child_file, child_file,
current);
}
if (error != 0)
break;
} else {
printk(KERN_ERR "reverse_path_check_proc: "
"file is not an ep!\n");
}
}
rcu_read_unlock();
return error;
}
static int reverse_path_check(void)
{
int error = 0;
struct file *current_file;
/* let's call this for all tfiles */
list_for_each_entry(current_file, &tfile_check_list, f_tfile_llink) {
path_count_init();
error = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
reverse_path_check_proc, current_file,
current_file, current);
if (error)
break;
}
return error;
}
在上面的ep_loop_check是為了避免監(jiān)聽嵌套層級過深,這里則會判斷每一個層級是否過于龐大。對于epoll來說,為了避免每一層鏈接的帶有epoll監(jiān)聽對象過大,對每一層epoll都做了大小的限制。因為一旦一個層級放置監(jiān)聽數(shù)量過大的另一個epoll會導(dǎo)致一旦喚醒就會喚醒一場風(fēng)暴一樣,卷席整個系統(tǒng),導(dǎo)致性能急速下降。
為了處理這個問題,epoll對每一層做了如下的數(shù)量限制:
第0層1000,第1層500,第2層100,第3層50,第4層10
那么我們求一下總數(shù),一個epoll對象一共能夠監(jiān)聽能夠監(jiān)聽2.5*10^10 這么多。其實也足夠使用了。不過面向服務(wù)器開發(fā)的朋友,倒是有可能使用這么多,到底怎么把socket鏈接均勻的分布在不同epoll也是不錯的優(yōu)化點。
添加到準(zhǔn)備隊列中
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
能看到當(dāng)epi的rdllink沒有添加到ep->rdllist),則去添加;接著調(diào)用ep_pm_stay_awake,如果初始化了epitem中的ws對象,則會避免cpu沉睡,接著判斷ep所在的等待隊列是否存在,此時雖然初始化了,但是沒有加入到等待隊列中。同理poll_wait等待隊列,這個隊列一般是處理poll文件操作的。
從這里面可以得知,如果我們已經(jīng)在監(jiān)聽了,同時在注冊新的監(jiān)聽文件描述符同時,發(fā)生了事件事件變化,此時也會把相應(yīng)的監(jiān)聽對象添加到eventpoll的rdllist準(zhǔn)備隊列。其實準(zhǔn)備隊列就是指已經(jīng)監(jiān)聽到發(fā)生變化的文件描述符,準(zhǔn)備通過epoll_wait返回上層的核心數(shù)據(jù)結(jié)構(gòu)。
epoll_wait 系統(tǒng)調(diào)用等待觸發(fā)epoll監(jiān)聽回調(diào)
epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
把對應(yīng)的文件描述符注冊到epoll之后,接著就開始嘗試著阻塞監(jiān)聽epoll中所有注冊到epoll的文件描述符的數(shù)據(jù)變化。一旦發(fā)生變化則會調(diào)上來,而所有發(fā)生變化的對應(yīng)的事件就是eventItems中的數(shù)據(jù)。
文件:/fs/eventpoll.c
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
int error;
struct fd f;
struct eventpoll *ep;
/* The maximum number of event must be greater than zero */
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
/* Verify that the area passed by the user is writeable */
if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event)))
return -EFAULT;
/* Get the "struct file *" for the eventpoll file */
f = fdget(epfd);
...
if (!is_file_epoll(f.file))
goto error_fput;
ep = f.file->private_data;
/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
...
return error;
}
在這個系統(tǒng)調(diào)用中核心方法是ep_poll。
ep_poll
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res = 0, eavail, timed_out = 0;
unsigned long flags;
long slack = 0;
wait_queue_t wait;
ktime_t expires, *to = NULL;
if (timeout > 0) {
struct timespec end_time = ep_set_mstimeout(timeout);
slack = select_estimate_accuracy(&end_time);
to = &expires;
*to = timespec_to_ktime(end_time);
} else if (timeout == 0) {
/*
* Avoid the unnecessary trip to the wait queue loop, if the
* caller specified a non blocking operation.
*/
timed_out = 1;
spin_lock_irqsave(&ep->lock, flags);
goto check_events;
}
fetch_events:
spin_lock_irqsave(&ep->lock, flags);
if (!ep_events_available(ep)) {
/*
* We don't have any available event to return to the caller.
* We need to sleep here, and we will be wake up by
* ep_poll_callback() when events will become available.
*/
init_waitqueue_entry(&wait, current);
__add_wait_queue_exclusive(&ep->wq, &wait);
for (;;) {
/*
* We don't want to sleep if the ep_poll_callback() sends us
* a wakeup in between. That's why we set the task state
* to TASK_INTERRUPTIBLE before doing the checks.
*/
set_current_state(TASK_INTERRUPTIBLE);
if (ep_events_available(ep) || timed_out)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
if (!freezable_schedule_hrtimeout_range(to, slack,
HRTIMER_MODE_ABS))
timed_out = 1;
spin_lock_irqsave(&ep->lock, flags);
}
__remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING);
}
check_events:
/* Is it worth to try to dig for events ? */
eavail = ep_events_available(ep);
spin_unlock_irqrestore(&ep->lock, flags);
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;
return res;
}
首先這里先獲取timeout,如果是0則會理解進入到check_events標(biāo)簽對應(yīng)的代碼段,否則則會先進入fetch_events代碼段。先來弄清楚一般情況,設(shè)置了timeout的epoll_wait。
在fetch_events代碼段中做了如下的事情,先把之前在ep_alloc初始化好的ep->wq將會加入到當(dāng)前的進程的等待隊列中,進行超時等待。這個過程允許中斷打斷。等待被監(jiān)聽對象喚起當(dāng)前的進程。
接下來將會執(zhí)行check_events代碼段,這個代碼段會判斷當(dāng)前的準(zhǔn)備隊列是否為空,為空則沒有必要執(zhí)行,直接返回。不為空則調(diào)用ep_send_events。
在這里需要注意一點,此時epoll_wait使用eventpoll中的wq作為等待隊列進行進程的掛起處理。而這個對象實際上是可以被被監(jiān)聽對象打斷的,這個時候就來看看在epoll_ctl設(shè)置監(jiān)聽時候設(shè)置到epi->pwqlist中的自定義喚醒進程方法。
ep_poll_callback自定義喚醒方法
下面這個方法就是整個回調(diào)機制的核心
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;
....
spin_lock_irqsave(&ep->lock, flags);
...
if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
if (epi->next == EP_UNACTIVE_PTR) {
epi->next = ep->ovflist;
ep->ovflist = epi;
if (epi->ws) {
__pm_stay_awake(ep->ws);
}
}
goto out_unlock;
}
/* If this file is already in the ready list we exit soon */
if (!ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake_rcu(epi);
}
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
out_unlock:
spin_unlock_irqrestore(&ep->lock, flags);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return 1;
}
通過wait等待隊列反過來找epitem;接著把當(dāng)前的epitem添加到對應(yīng)eventpoll的ovflist鏈表中;把epi->rdllink添加到ep的ep->rdllist的尾部,因為是一個鏈表環(huán)所以也是頭部,解析來要查找什么發(fā)生了變化的file只需要讀取頭部即可。接著調(diào)用wake_up_locked打斷epoll_wait的循環(huán)。如果需要則調(diào)用ep_poll_safewake處理調(diào)用poll方法而在掛起的進程。
明白這點之后繼續(xù)看ep_send_events方法。
ep_send_events
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;
esed.maxevents = maxevents;
esed.events = events;
return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
}
調(diào)用ep_scan_ready_list,并把ep_send_events_proc方法指針傳入。
static int ep_scan_ready_list(struct eventpoll *ep,
int (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv, int depth, bool ep_locked)
{
int error, pwake = 0;
unsigned long flags;
struct epitem *epi, *nepi;
LIST_HEAD(txlist);
...
spin_lock_irqsave(&ep->lock, flags);
list_splice_init(&ep->rdllist, &txlist);
ep->ovflist = NULL;
spin_unlock_irqrestore(&ep->lock, flags);
/*
* Now call the callback function.
*/
error = (*sproc)(ep, &txlist, priv);
spin_lock_irqsave(&ep->lock, flags);
/*
* During the time we spent inside the "sproc" callback, some
* other events might have been queued by the poll callback.
* We re-insert them inside the main ready-list here.
*/
for (nepi = ep->ovflist; (epi = nepi) != NULL;
nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
if (!ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
}
ep->ovflist = EP_UNACTIVE_PTR;
list_splice(&txlist, &ep->rdllist);
__pm_relax(ep->ws);
if (!list_empty(&ep->rdllist)) {
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
if (!ep_locked)
mutex_unlock(&ep->mtx);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return error;
}
在這個過程中會把ovflist設(shè)置為NULL,這個只是一個標(biāo)志位告訴epoll它監(jiān)聽的文件出現(xiàn)了狀態(tài)的變化;接著把rdllist拷貝到txlist;然后會調(diào)用ep_send_events_proc方法拷貝變化的數(shù)據(jù)到用戶空間對應(yīng)的eventpoll對象中。最后再檢測一次是否還有ovflist中是否有遺漏的數(shù)據(jù)。
因為在從內(nèi)核拷貝到用戶空間這個行為并不是一個原子操作,可能出現(xiàn)異步的情況,這個時候可能還在ep_send_events_proc中進行數(shù)據(jù)的拷貝,此時又進行了回調(diào),因此需要最后再一次的校驗和添加到rdllist中并且再來一次wake_up_locked步驟重新執(zhí)行一次該行為。
ep_read_events_proc
最后再看看這個拷貝數(shù)據(jù)的核心方法
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
void *priv)
{
struct ep_send_events_data *esed = priv;
int eventcnt;
unsigned int revents;
struct epitem *epi;
struct epoll_event __user *uevent;
struct wakeup_source *ws;
poll_table pt;
init_poll_funcptr(&pt, NULL);
/*
* We can loop without lock because we are passed a task private list.
* Items cannot vanish during the loop because ep_scan_ready_list() is
* holding "mtx" during this call.
*/
for (eventcnt = 0, uevent = esed->events;
!list_empty(head) && eventcnt < esed->maxevents;) {
epi = list_first_entry(head, struct epitem, rdllink);
/*
* Activate ep->ws before deactivating epi->ws to prevent
* triggering auto-suspend here (in case we reactive epi->ws
* below).
*
* This could be rearranged to delay the deactivation of epi->ws
* instead, but then epi->ws would temporarily be out of sync
* with ep_is_linked().
*/
ws = ep_wakeup_source(epi);
if (ws) {
if (ws->active)
__pm_stay_awake(ep->ws);
__pm_relax(ws);
}
list_del_init(&epi->rdllink);
revents = ep_item_poll(epi, &pt);
/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, ep_scan_ready_list()
* is holding "mtx", so no operations coming from userspace
* can change the item.
*/
if (revents) {
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
ep_pm_stay_awake(epi);
return eventcnt ? eventcnt : -EFAULT;
}
eventcnt++;
uevent++;
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
else if (!(epi->event.events & EPOLLET)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
}
}
return eventcnt;
}
做了如下幾點事情:
- 1.首先清除上一次殘留下來的準(zhǔn)備隊列。
- 2.進入用戶空間對應(yīng)的epoll_event指針數(shù)組不斷向后循環(huán)。在循環(huán)中獲取從ep_poll_callback回調(diào)回來的eventpoll中的全局準(zhǔn)備隊列的頭部,不斷的通過__put_user從內(nèi)核空間拷貝到用戶空間中(__put_user 比起copy_to_user拷貝的數(shù)量要更小更快).能走到這個拷貝函數(shù)是因為再一次的調(diào)用poll方法,確認(rèn)對應(yīng)的文件描述符中的緩沖區(qū)是否返回正常的返回碼(大于0,一般的實現(xiàn)是指緩沖區(qū)中是否還有數(shù)據(jù)),又因為此時polltable中的方法是null,不會循環(huán)添加等待隊列。
- 3.處理EPOLLET邊緣觸發(fā)和EPOLLLT水平觸發(fā)的區(qū)別。能看到在循環(huán)的末尾如果沒有打開EPOLLET(邊緣觸發(fā)),則會繼續(xù)把當(dāng)前的epi添加到eventpoll中的準(zhǔn)備隊列中。
這就是邊緣觸發(fā)和水平觸發(fā)實現(xiàn)核心,如果返回了正常的數(shù)據(jù),同時發(fā)現(xiàn)關(guān)閉了邊緣觸發(fā)的標(biāo)志位,則會繼續(xù)把當(dāng)前的epitem添加到全局的eventpoll中的準(zhǔn)備隊列,繼續(xù)下次讀取,知道對應(yīng)文件描述符中poll操作返回0為止。
這就是在內(nèi)核中怎么實現(xiàn)邊緣觸發(fā)(狀態(tài)變化不管數(shù)據(jù)緩沖區(qū)是否有數(shù)據(jù)就返回一次)和水平觸發(fā)(只要緩沖區(qū)還有數(shù)據(jù)則會不斷的觸發(fā)返回)。
總結(jié)
epoll之所以叫做eventpoll,其實核心是執(zhí)行每一個注冊到epoll中文件描述符的poll方法。每當(dāng)調(diào)用每一個文件描述符的poll的方法時候,就會把自己的等待行為和被監(jiān)聽事件鏈接到一起,同時被打斷回調(diào)。
epoll第一次閱讀源碼必定會頭大(作者就是這樣),因為里面有數(shù)個等待隊列,每個等待隊列做的事情都不一樣,如果沒注意到這些等待隊列做的事情,邏輯將會混亂。
里面主要包含三個等待隊列:
eventpoll.wq 這個是主要的等待隊列,當(dāng)調(diào)用epoll_wait的時候,就是用這個等待隊列進行進程的調(diào)度掛起。于此同時在ep_poll_callback中,將會獲取到ep對象并且打斷這個等待隊列的掛起
eventpoll.pwqlist 主要就是為了和被監(jiān)聽的文件描述符poll中的等待隊列關(guān)聯(lián)起來。一旦進行被監(jiān)聽事件發(fā)生了需要打斷poll方法的等待隊列掛起操作時候?qū)?,通知到epoll的回調(diào)
eventpoll.poll_wait 這個等待隊列一般是用在epoll本身的poll方法中。
在這個過程中,每監(jiān)聽一個事件,就會在當(dāng)前進程的eventpoll的文件描述符中的eventpoll的紅黑樹中增加一個epitem對象。
每當(dāng)被監(jiān)聽事件被喚起時候,將會帶著自定義喚起事件一起喚起整個epoll。在回調(diào)中,會把自己添加到eventpoll的準(zhǔn)備隊列中,后面的掃描拷貝。
下面是epoll的設(shè)計圖:

思考
最后再來解決一下開篇的疑問,為什么在Handler的Looper是一個死循環(huán)不會卡死?因為Handler的核心是epoll,epoll是一個異步回調(diào)監(jiān)聽事件變化狀態(tài)的系統(tǒng)調(diào)用,當(dāng)沒有事件的時候?qū)M入到進程掛起。
為什么Handler不會引起ANR?ANR的管理是在AMS中進行一個Handler的延時事件,當(dāng)?shù)竭_時間后還沒有拆出掉這個handler事件將會爆出ANR異常。而Hander整個過程并沒有涉及到這些流程?返回來說,ANR是依賴于Handler的機制?只有會專門開一篇聊anr。
當(dāng)我們徹底理解epoll的源碼實現(xiàn)之后,讓我們反過來想想Handler為什么這么設(shè)計?Handler為了在整個Android系統(tǒng)來說可說是一個命脈。除去從Zygote到四大組件的啟動的流程,幾乎其他所有的操作都是在mainhandler中完成的,無論是點擊觸摸事件,還是View的繪制。
只要寫過OpenGL就知道,一個顯示系統(tǒng)也好,操作系統(tǒng)也好,所有的操作都不可避免的需要在一個循環(huán)中處理,只有在一個循環(huán)中才能保證系統(tǒng)源源不斷的運行。但是并不是所有的事件都必須在所有的循環(huán)中都檢測一遍執(zhí)行一遍,這樣就太消耗性能。相反,如果有一個系統(tǒng)調(diào)用可以做到只需要更新需要更新的系統(tǒng)事件,其他時候就把資源讓渡給更加需要的地方,才是一個系統(tǒng)的合理設(shè)計。
也是基于這個思路,在poll,select之上就誕生了epoll系統(tǒng)調(diào)用。這個系統(tǒng)調(diào)用通過回調(diào)很靈活的解決了資源調(diào)度的合理分配。
而Handler作為整個系統(tǒng)的運行命脈使用epoll能夠監(jiān)聽大量的事件,且能對所有事件的變化快速的反應(yīng)過來,這是一個極好的設(shè)計。于此同時,因為epoll在內(nèi)核中會拷貝數(shù)據(jù),為了加速epoll的喚醒速度,Android特定設(shè)計了一個喚醒文件描述符eventfd作為Handler的喚醒標(biāo)志。在上一篇文章閱讀eventfd的源碼,可以知道這是一個用戶態(tài)內(nèi)核態(tài)進程線程之間快速的通知,而且數(shù)據(jù)量永遠超不過一個long型,對于put_user來說這簡直就是天大的喜事,因為它設(shè)計出來就是為了拷貝基礎(chǔ)類型。
基于這種回調(diào)的方式,Handler才會被稱為異步工具。沒工作的時候就通過nativePollOnce掛起進程,讓渡資源。
我們拋開Android系統(tǒng),把眼光放在整個領(lǐng)域上。如網(wǎng)絡(luò)編程,所有百萬級別的服務(wù)器全部都是用了epoll系統(tǒng)調(diào)用進行設(shè)計。為什么?很直觀的一個數(shù)據(jù),我們使用select或者poll把socket放在一個池子中每一次有數(shù)據(jù)變化了就進行循環(huán)檢測每一個socket中數(shù)據(jù)流的變化,這樣就會出現(xiàn)O(n)的時間消耗,雖然O(n)在算法中屬于比較好的時間復(fù)雜度,但是量一大就會變成一百萬一次循環(huán),這是不可能接受的。因此如果有辦法快速得知哪些socket出現(xiàn)了變化并且快速處理,這是一個從O(n)下降到O(1)級別的優(yōu)化,讓服務(wù)器承載更大網(wǎng)絡(luò)連接成為了可能。
而在Android中,也有類似的優(yōu)化。如騰訊開源的mars就是通過epoll進一步優(yōu)化整個網(wǎng)絡(luò)鏈接。
所以為什么說無論是前端也好,后端也好。我喜歡說一句話,殊途同歸。只要開發(fā)是基于某個平臺核心進行的,那么必須要對該平臺的核心有一定的理解,才能做得更好。
后話
接下來再補充一篇之前寫遺漏的Application的初始化與綁定,接下來就讓我們開啟Android渲染系統(tǒng)。對了,在這里我稍微宣揚一下我的個人博客,有興趣可以來這里看看,會同步更新移植文章,可能有其他更多雜談。