Netty IO事件與任務(wù)處理

概述

Netty的IO事件分別為讀事件(OP_READ)、寫(xiě)事件(OP_WRITE)、接收事件(OP_ACCEPT)、連接事件(OP_CONNECT)。其中讀、寫(xiě)事件可以發(fā)生在客戶端與服務(wù)端。接收事件只發(fā)生在服務(wù)端,服務(wù)端啟動(dòng)后會(huì)注冊(cè)接收事件監(jiān)聽(tīng)客戶端連接。連接事件只發(fā)生在客戶端,客戶端啟動(dòng)時(shí)會(huì)連接服務(wù)端。Netty任務(wù)分為普通任務(wù)(通過(guò)execute(Runnable task) 執(zhí)行)與定時(shí)任務(wù)(通過(guò)schedule(Runnable task,long delay,TimeUnit unit)執(zhí)行)。無(wú)論是IO事件還是任務(wù),都是通過(guò)NioEventLoop中對(duì)應(yīng)的線程來(lái)進(jìn)行處理。

NioEventLoop創(chuàng)建過(guò)程

在實(shí)例化NioEventLoopGroup時(shí),默認(rèn)會(huì)創(chuàng)建2倍CPU核心數(shù)的NioEventLoop。對(duì)于bossGroup來(lái)說(shuō),雖然會(huì)創(chuàng)建這么多NioEventLoop,但是如果只綁定一個(gè)端口進(jìn)行事件監(jiān)聽(tīng),實(shí)際上只會(huì)用到一個(gè)NioEventLoop,也就是說(shuō)只有一個(gè)線程在循環(huán)處理事件與任務(wù)。

EventLoopGroup bossGroup = new NioEventLoopGroup();

NioEventLoopGroup UML圖:

NioEventLoopGroup無(wú)參構(gòu)造方法最終會(huì)調(diào)到下面的構(gòu)造方法:

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory) {
    /**
     * nThreads:創(chuàng)建線程的數(shù)量,如果不傳,后續(xù)會(huì)默認(rèn)為2倍CPU核心數(shù)
     * executor:默認(rèn)為null,在NioEventLoopGroup父類中會(huì)進(jìn)行初始化
     * selectorProvider:用于創(chuàng)建Java NIO的Selector對(duì)象
     * selectStrategyFactory:IO多路復(fù)用器策略工廠,值為DefaultSelectStrategyFactory
     * RejectedExecutionHandlers.reject():拒絕策略,當(dāng)線程池任務(wù)隊(duì)列滿了后在往其中添加任務(wù)會(huì)觸發(fā)該拒絕策略
     */
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

NioEventLoopGroup構(gòu)造方法中會(huì)調(diào)用其父類MultithreadEventLoopGroup的構(gòu)造方法,該構(gòu)造方法會(huì)初始化默認(rèn)的線程數(shù)量,常量DEFAULT_EVENT_LOOP_THREADS值為2倍CPU核心數(shù):

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

MultithreadEventLoopGroup構(gòu)造方法中會(huì)調(diào)用其父類MultithreadEventExecutorGroup的構(gòu)造方法,該構(gòu)造方法會(huì)初始化線程執(zhí)行選擇器工廠,常量DefaultEventExecutorChooserFactory.INSTANCE值為EventExecutorChooserFactory:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

最終會(huì)調(diào)用MultithreadEventExecutorGroup如下構(gòu)造方法,在該方法中會(huì)循環(huán)創(chuàng)建NioEventLoop:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    
    if (executor == null) {
        // 創(chuàng)建線程執(zhí)行器
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
        // 創(chuàng)建EventExecutor數(shù)組,用來(lái)保存NioEventLoop
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        // ...
        // 創(chuàng)建NioEventLoop
        children[i] = newChild(executor, args);
        // ...
    }
        // 創(chuàng)建線程選擇器
    chooser = chooserFactory.newChooser(children);
        // ...
}

創(chuàng)建線程執(zhí)行器

線程執(zhí)行器ThreadPerTaskExecutor#execute方法內(nèi)部使用ThreadFactory來(lái)創(chuàng)建并啟動(dòng)線程,其中ThreadFactory就是調(diào)用其構(gòu)造方法傳入的DefaultThreadFactory,DefaultThreadFactory#newThread方法會(huì)創(chuàng)建線程,并設(shè)置線程屬性,如線程名稱等:

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        // 使用線程工廠創(chuàng)建并啟動(dòng)線程
        threadFactory.newThread(command).start();
    }
}

創(chuàng)建NioEventLoop

調(diào)用NioEventLoopGroup#newChild方法進(jìn)行NioEventLoop的創(chuàng)建:

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

方法內(nèi)部會(huì)調(diào)用NioEventLoop的構(gòu)造方法進(jìn)行創(chuàng)建:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    // 調(diào)用父類構(gòu)造方法,創(chuàng)建任務(wù)隊(duì)列以及初始化父類屬性
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    // 創(chuàng)建IO多路復(fù)用器
    selector = openSelector();
    selectStrategy = strategy;
}

該構(gòu)造方法中有兩個(gè)比較重要的操作:一是調(diào)用父類構(gòu)造方法創(chuàng)建任務(wù)隊(duì)列等,二是調(diào)用openSelector方法創(chuàng)建IO多路復(fù)用器,我們先看openSelector方法:

private Selector openSelector() {
    final Selector selector;
    try {
        // 創(chuàng)建Java Nio的Selector對(duì)象
        selector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }
        // 是否禁用對(duì)Java Selector的優(yōu)化,如果禁用則標(biāo)識(shí)不優(yōu)化直接返回Java的Selector,默認(rèn)為false
    if (DISABLE_KEYSET_OPTIMIZATION) {
        return selector;
    }
        // 創(chuàng)建SelectedSelectionKeySet,底層是數(shù)組實(shí)現(xiàn),用于替換Java Selector底層selectedKeys的數(shù)據(jù)結(jié)構(gòu)
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
        // 通過(guò)反射獲取到Selector的實(shí)現(xiàn)類
    Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                return Class.forName(
                        "sun.nio.ch.SelectorImpl",
                        false,
                        PlatformDependent.getSystemClassLoader());
            } catch (ClassNotFoundException e) {
                    // ...
            }
        }
    });

    // ...

    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                // 獲取selectedKeys、publicSelectedKeys屬性
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                selectedKeysField.setAccessible(true);
                publicSelectedKeysField.setAccessible(true);
                                // 通過(guò)反射將新的key結(jié)構(gòu)替換原生結(jié)構(gòu)
                selectedKeysField.set(selector, selectedKeySet);
                publicSelectedKeysField.set(selector, selectedKeySet);
                return null;
            } catch (NoSuchFieldException e) {
                // ...
            }
        }
    });
        // 保存selectedKeys屬性,后續(xù)處理IO事件信息時(shí)可以直接通過(guò)該屬性獲取事件信息
    selectedKeys = selectedKeySet;

    return selector;
}

該方法會(huì)創(chuàng)建Java Selector,并通過(guò)反射替換其對(duì)應(yīng)的selectedKeys、publicSelectedKeys屬性。這里Netty對(duì)原生Selector數(shù)據(jù)結(jié)構(gòu)進(jìn)行了優(yōu)化,由原本HashSet實(shí)現(xiàn)的數(shù)據(jù)結(jié)構(gòu)替換為了Netty基于數(shù)組實(shí)現(xiàn)的數(shù)據(jù)結(jié)構(gòu):SelectedSelectionKeySet。

接下來(lái)繼續(xù)跟進(jìn),該構(gòu)造方法中會(huì)調(diào)用父類的構(gòu)造方法,對(duì)父類屬性進(jìn)行初始化,先看下NioEventLoop UML圖:

NioEventLoop父類為SingleThreadEventLoop,在其父類構(gòu)造方法中會(huì)創(chuàng)建一個(gè)MpscQueue類型的隊(duì)列:

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                boolean addTaskWakesUp, int maxPendingTasks,
                                RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
    // 用來(lái)存放收尾工作的任務(wù)隊(duì)列
    tailTasks = newTaskQueue(maxPendingTasks);
}

SingleThreadEventLoop構(gòu)造方法中會(huì)調(diào)用其父類SingleThreadEventExecutor的構(gòu)造方法:

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedHandler) {
    // 最終會(huì)調(diào)用AbstractEventExecutor構(gòu)造函數(shù)保存parent,此處parent即為NioEventLoopGroup
    super(parent);
    // 默認(rèn)false,當(dāng)且僅當(dāng)調(diào)用addTask(Runnable)將喚醒執(zhí)行線程
    this.addTaskWakesUp = addTaskWakesUp;
    // 最大等待任務(wù)數(shù),默認(rèn)值為Integer.MAX_VALUE
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    // 保存線程執(zhí)行器
    this.executor = ObjectUtil.checkNotNull(executor, "executor");
    // 創(chuàng)建MpscQueue類型任務(wù)隊(duì)列
    taskQueue = newTaskQueue(this.maxPendingTasks);
    // 保存拒絕策略
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

NioEventLoop線程啟動(dòng)

服務(wù)端NioEventLoop中的線程啟動(dòng)是在channel注冊(cè)時(shí)觸發(fā)的,我們?cè)賮?lái)回顧下:

#AbstractChannel#AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // ...
        // 判斷當(dāng)前線程是否為IO線程,因?yàn)楫?dāng)前線程為主線程,且此時(shí)IO線程還未創(chuàng)建,所以會(huì)走到else方法
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            // 異步執(zhí)行任務(wù)
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            // ...
        }
    }
}

NioEventLoop#execute方法實(shí)現(xiàn)在其父類SingleThreadEventExecutor中:

#SingleThreadEventExecutor
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
        // 當(dāng)前線程為主線程,且此時(shí)IO線程還未創(chuàng)建,inEventLoop方法返回false
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        // 啟動(dòng)線程
        startThread();
        // 添加任務(wù)
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        // 喚醒線程
        wakeup(inEventLoop);
    }
}

execute方法會(huì)主要分為三步,第一步:調(diào)用startThread方法創(chuàng)建并啟動(dòng)線程;第二步:將任務(wù)添加到任務(wù)隊(duì)列中;第三步:?jiǎn)拘丫€程執(zhí)行任務(wù)。先看下startThread方法:

private void startThread() {
    // 線程是否尚未啟動(dòng)
    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
        // 將ST_NOT_STARTED設(shè)置為ST_STARTED
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            // 啟動(dòng)線程
            doStartThread();
        }
    }
}

startThread方法最終會(huì)調(diào)用doStartThread方法來(lái)啟動(dòng)線程:

private void doStartThread() {
    assert thread == null;
    // 使用線程執(zhí)行器創(chuàng)建并啟動(dòng)線程
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // 保存創(chuàng)建的線程
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                // 處理IO事件與異步任務(wù)
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                // ...
            }
        }
    });
}

調(diào)用executor#execute方法創(chuàng)建并啟動(dòng)線程,這里的executor類型為:ThreadPerTaskExecutor,是實(shí)例化NioEventLoopGroup時(shí),在其父類MultithreadEventExecutorGroup中創(chuàng)建的,ThreadPerTaskExecutor#execute方法內(nèi)部通過(guò)線程工廠創(chuàng)建并啟動(dòng)線程。線程啟動(dòng)后主要做兩件事情,一:保存創(chuàng)建的線程,二:處理IO事件與異步任務(wù)(后面會(huì)講到)。

線程啟動(dòng)后,會(huì)調(diào)用addTask方法,將任務(wù)放到任務(wù)隊(duì)列中,等待線程處理:

protected void addTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (!offerTask(task)) {
        reject(task);
    }
}

addTask方法會(huì)嘗試將任務(wù)放入taskQueue中,如果放入失敗則會(huì)觸發(fā)拒絕策略。

任務(wù)添加到taskQueue中后,會(huì)調(diào)用wakeup方法喚醒線程,因?yàn)榇藭r(shí)線程可能因?yàn)闆](méi)有任務(wù)而進(jìn)入到阻塞狀態(tài),使用wakeup方法可以將線程從阻塞中喚醒,處理任務(wù)。

IO事件與任務(wù)處理流程

NioEventLoop中的線程啟動(dòng)后,會(huì)一直循環(huán)處理IO事件與異步任務(wù):

SingleThreadEventExecutor.this.run();

該方法會(huì)調(diào)用其子類NioEventLoop的run方法,看下NioEventLoop的run方法:

protected void run() {
    for (;;) {
        try {
            // 檢測(cè)IO事件
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                    // fallthrough
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            // ioRatio控制IO事件與非IO事件執(zhí)行時(shí)間占比
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    // 處理IO事件
                    processSelectedKeys();
                } finally {
                    // 處理普通任務(wù)與定時(shí)任務(wù)
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // 根據(jù)比率計(jì)算非IO事件任務(wù)處理事件
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

IO事件與任務(wù)處理主要分為三步,第一:檢測(cè)IO事件,第二:處理IO事件,第三:處理普通任務(wù)與定時(shí)任務(wù)。

檢測(cè)IO事件

通過(guò)SelectStrategy#calculateStrategy方法計(jì)算走什么策略,SelectStrategy是在創(chuàng)建NioEventLoop時(shí)通過(guò)IO多路復(fù)用器策略工廠DefaultSelectStrategyFactory進(jìn)行創(chuàng)建的:

#DefaultSelectStrategy
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

hasTasks值為true,標(biāo)識(shí)taskQueue或者tailQueue中有任務(wù),則調(diào)用IntSupplier#get方法,該方法內(nèi)部會(huì)調(diào)用Selector#selectNow方法,selectNow方法是一個(gè)非阻塞方法,不管有沒(méi)有IO事件都會(huì)立即返回。如果任務(wù)隊(duì)列中沒(méi)有任務(wù),則直接返回SelectStrategy.SELECT。

如果SelectStrategy#calculateStrategy方法返回SelectStrategy.SELECT,則會(huì)嘗試將wakenUp屬性設(shè)置為false,并調(diào)用select方法。因?yàn)镾elector#wakeup方法是一個(gè)比較耗時(shí)的操作,而用戶線程和IO線程都有可能操作該屬性,因此使用原子操作防止多個(gè)線程重復(fù)喚醒。接著看下select方法:

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        // 計(jì)算最近一次要執(zhí)行的定時(shí)任務(wù)的最后期限,如果定時(shí)任務(wù)隊(duì)列中沒(méi)有任務(wù)則返回當(dāng)前事件+1s
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
        for (;;) {
            // 計(jì)算定時(shí)任務(wù)將要執(zhí)行的事件與當(dāng)前時(shí)間的時(shí)間差
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                // 如果時(shí)間差<0.5s,且沒(méi)有調(diào)用select方法阻塞過(guò),則調(diào)用selectNow方法,然后退出循環(huán)
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            // 如果任務(wù)隊(duì)列有任務(wù),且嘗試設(shè)置wakenUp屬性為true成功,則調(diào)用selectNow方法,然后退出循環(huán)
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }
                        // 阻塞等待IO事件
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;
                        // 如果有其中一種情況則退出循環(huán):有IO事件、外部線程喚醒、任務(wù)對(duì)立有任務(wù)、定時(shí)任務(wù)隊(duì)列有任務(wù)
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                break;
            }
            if (Thread.interrupted()) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +
                            "Thread.currentThread().interrupt() was called. Use " +
                            "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // timeoutMillis elapsed without anything selected.
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // The selector returned prematurely many times in a row.
                // Rebuild the selector to work around the problem.
                logger.warn(
                        "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                        selectCnt, selector);
                                // 重建selector,解決JDK空輪詢bug
                rebuildSelector();
                selector = this.selector;

                // Select again to populate selectedKeys.
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            currentTimeNanos = time;
        }
    } catch (CancelledKeyException e) {
        // ...
    }
}

IO事件處理

processSelectedKeys方法,用來(lái)處理所有IO事件:

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized(selectedKeys.flip());
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

Netty默認(rèn)會(huì)開(kāi)啟對(duì)Selector的優(yōu)化,所以會(huì)進(jìn)入processSelectedKeysOptimized方法處理IO事件:

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
    for (int i = 0;; i ++) {
        // 獲取事件
        final SelectionKey k = selectedKeys[i];
        if (k == null) {
            break;
        }
        // null out entry in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
        selectedKeys[i] = null;
                // 獲取attachment
        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
            // 處理IO事件
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        // ...
    }
}

最終會(huì)調(diào)用processSelectedKey方法進(jìn)行IO事件的處理:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        // ...
    try {
        int readyOps = k.readyOps();
        // 處理OP_CONNECT事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // 處理OP_WRITE事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }

        // 處理OP_READ、OP_ACCEPT事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                return;
            }
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

方法內(nèi)部會(huì)調(diào)用Unsafe類的方法進(jìn)行處理,以O(shè)P_ACCEPT事件為例,我們看下其read方法,OP_ACCEPT事件的read方法是在AbstractNioMessageChannel類的內(nèi)部類NioMessageUnsafe中:

#AbstractNioMessageChannel
private final class NioMessageUnsafe extends AbstractNioUnsafe {

    private final List<Object> readBuf = new ArrayList<Object>();

    @Override
    public void read() {
        assert eventLoop().inEventLoop();
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.reset(config);

        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                do {
                    // 處理OP_ACCEPT事件
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }

                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
                exception = t;
            }

            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                readPending = false;
                // 觸發(fā)channelRead事件
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
            // 觸發(fā)channelReadComplete事件
            pipeline.fireChannelReadComplete();

            if (exception != null) {
                closed = closeOnReadError(exception);

                pipeline.fireExceptionCaught(exception);
            }

            if (closed) {
                inputShutdown = true;
                if (isOpen()) {
                    close(voidPromise());
                }
            }
        } finally {
            
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

NioMessageUnsafe#read方法主要做了兩件事,一是:調(diào)用NioServerSocketChannel#doReadMessages方法處理事件,二是:調(diào)用ChannelPipeline發(fā)送channelRead、channelReadComplete事件。

NioServerSocketChannel#doReadMessages方法中會(huì)調(diào)用Java的ServerSocketChannel方法建立連接:

#NioServerSocketChannel
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = javaChannel().accept();

    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}

到此IO事件處理流程就結(jié)束了,真正的事件處理還是由不同的Unsafe類調(diào)用對(duì)應(yīng)的channel中的方法來(lái)進(jìn)行處理。

任務(wù)處理

runAllTasks方法用來(lái)處理普通任務(wù)與定時(shí)任務(wù):

protected boolean runAllTasks() {
    assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false;

    do {
        // 從定時(shí)任務(wù)隊(duì)列中獲取可調(diào)度的任務(wù)放入到taskQueue中
        fetchedAll = fetchFromScheduledTaskQueue();
        // 從taskQueue中取出任務(wù),執(zhí)行其run方法
        if (runAllTasksFrom(taskQueue)) {
            ranAtLeastOne = true;
        }
    } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

    if (ranAtLeastOne) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }
    afterRunningAllTasks();
    return ranAtLeastOne;
}

runAllTasks方法還是比較容易理解,方法主要做了兩件事,一是:將可調(diào)度的定時(shí)任務(wù)從scheduledTaskQueue隊(duì)列放入到taskQueue隊(duì)列中,然后循環(huán)取出taskQueue中的任務(wù),執(zhí)行其run方法。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容