三、netty源碼分析之EventLoop

一、EventLoop功能概述

上篇我們分析了EventLoopGroup的核心能力,EventLoopGroup具有執(zhí)行任務、注冊Channel、執(zhí)行器調(diào)度等能力。今天我們來看一下EventLoop。我們先來看看EventLoop的類圖關(guān)系:

EventLoop

我們可以看到,EventLoop接口繼承了EventLoopGroup接口。為什么EventLoop要繼承EventLoopGroup呢?從上一篇的分析,我們知道,EventLoopGroup最主要的功能時對EventLoop進行管理調(diào)度,EventLoopGroup的其他大部分功能,都是交給自己管理的EventLoop來處理的。而EventLoop繼承EventLoopGroup,就是為了繼承EventLoopGroup任務執(zhí)行、優(yōu)雅停機、Channel注冊等功能窗口。
除了繼承EventLoopGroup之外,EventLoop還繼承了EventExecutor接口。我們可以看一下EventExecutor的具體內(nèi)容:

/**
 * The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes
 * with some handy methods to see if a {@link Thread} is executed in a event loop.
 * Besides this, it also extends the {@link EventExecutorGroup} to allow for a generic
 * way to access methods.
 *
 */
public interface EventExecutor extends EventExecutorGroup {

    /**
     * Returns a reference to itself.
     */
    @Override
    EventExecutor next();

    /**
     * Return the {@link EventExecutorGroup} which is the parent of this {@link EventExecutor},
     */
    EventExecutorGroup parent();

    /**
     * Calls {@link #inEventLoop(Thread)} with {@link Thread#currentThread()} as argument
     */
    boolean inEventLoop();

    /**
     * Return {@code true} if the given {@link Thread} is executed in the event loop,
     * {@code false} otherwise.
     */
    boolean inEventLoop(Thread thread);

    /**
     * Return a new {@link Promise}.
     */
    <V> Promise<V> newPromise();

    /**
     * Create a new {@link ProgressivePromise}.
     */
    <V> ProgressivePromise<V> newProgressivePromise();

    /**
     * Create a new {@link Future} which is marked as succeeded already. So {@link Future#isSuccess()}
     * will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
     * every call of blocking methods will just return without blocking.
     */
    <V> Future<V> newSucceededFuture(V result);

    /**
     * Create a new {@link Future} which is marked as failed already. So {@link Future#isSuccess()}
     * will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also
     * every call of blocking methods will just return without blocking.
     */
    <V> Future<V> newFailedFuture(Throwable cause);
}

從接口的頭部注釋我們可以看到,EventExecutor是一個特殊的EventExecutorGroup,它提供了一些易用的方法來判斷一個線程是否正在事件循環(huán)中執(zhí)行。至于EventExecutorGroup我們上一篇分析過這個接口的能力,這里就不再贅述了。我們看一看EventExecutor的幾個重要的的方法:
首先是EventExecutorGroup parent();方法,EventExecutor只有事件執(zhí)行的能力,沒有調(diào)度的能力,所以這個方法只會返回對象自身。
然后是兩個重載的inEventLoop方法,用來判斷線程是否正在事件循環(huán)中執(zhí)行。
隨后是兩個創(chuàng)建Promise的方法,關(guān)于Promise的作用,大家不清楚的可以查一下相關(guān)資料,內(nèi)部具體實現(xiàn)我們在后面的文章中再做分析。
最后,是一對創(chuàng)建Future的方法,我們從注釋中可以看到這兩個方法的作用,就是創(chuàng)建一個已經(jīng)被標記成成功/失敗的Future對象。所有已經(jīng)注冊的FutureListener都會被直接通知。所有的阻塞方法都會非阻塞的返回。

我們的EventLoop繼承了OrderedEventExecutor,而OrderedEventExecutor直接繼承了EventExecutor,本身并無定義其他方法。但是我們可以從OrderedEventExecutor的頭部注釋中看到,OrderedEventExecutor其實是一個標記接口,這個接口保證所有執(zhí)行的任務必須按照順序執(zhí)行,并且要串行執(zhí)行!所以我們可以相信,實現(xiàn)了OrderedEventExecutor的類,執(zhí)行任務的時候回保證任務執(zhí)行的順序性,并且同一時刻只能執(zhí)行一個任務。

到這里,我們可以知道,EventLoop的核心能力:EventLoop是一個可以優(yōu)雅停機的任務執(zhí)行器,它能保證提交的任務都被順序串行執(zhí)行。接下來我們根據(jù)EventLoop的一個具體實現(xiàn)類NioEventLoop來更直觀的理解一下EventLoop的能力。

從NioEventLoop來看EventLoop在netty中扮演的角色

首先我們先看一看NioEventLoop的構(gòu)造方法:

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

我們一路跟蹤,會發(fā)現(xiàn),這個構(gòu)造方法調(diào)用了父類SingleThreadEventExecutor的構(gòu)造方法:

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
        this.executor = ThreadExecutorMap.apply(executor, this);
        this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

我們可以看到這里面有一行this.executor = ThreadExecutorMap.apply(executor, this);。這個構(gòu)造方法傳入的executor參數(shù)就是我們上節(jié)提到過的NioEventLoopGrop在創(chuàng)建NioEventLoop時傳入的ThreadPerTaskExecutor對象。這里在給成員變量賦值的時候調(diào)用了ThreadExecutorMap.apply(executor, this),我們可以看一下這里面的具體內(nèi)容:

    //ThreadExecutorMap類的相關(guān)內(nèi)容

    private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();

    public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
        ObjectUtil.checkNotNull(executor, "executor");
        ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
        return new Executor() {
            @Override
            public void execute(final Runnable command) {
                executor.execute(apply(command, eventExecutor));
            }
        };
    }

    public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
        ObjectUtil.checkNotNull(command, "command");
        ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
        return new Runnable() {
            @Override
            public void run() {
                setCurrentEventExecutor(eventExecutor);
                try {
                    command.run();
                } finally {
                    setCurrentEventExecutor(null);
                }
            }
        };
    }

    private static void setCurrentEventExecutor(EventExecutor executor) {
        mappings.set(executor);
    }

我們可以看到,Executor apply(final Executor executor, final EventExecutor eventExecutor)重新創(chuàng)建了一個Executor對象,這個對象執(zhí)行任務還是調(diào)用參數(shù)傳入的Executor 來執(zhí)行,只不過是在傳入的任務中做了一個靜態(tài)代理,在任務執(zhí)行的前后分別將執(zhí)行此任務的EventExecutor綁定、解綁到自身持有的一個FastThreadLocal中。這里的FastThreadLocal是netty自己實現(xiàn)的一個處理線程單例的工具,這個FastThreadLocal究竟比我們jdk中的ThreadLocal快在哪里呢?我們把這個類的set方法拿出來看一下(在此之前你必須要知道jdkThreadLoop的實現(xiàn)原理):

  //FastThreadLocal的set方法
    public final void set(V value) {
        if (value != InternalThreadLocalMap.UNSET) {
            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
            setKnownNotUnset(threadLocalMap, value);
        } else {
            remove();
        }
    }

InternalThreadLocalMap的get()方法:

    public static InternalThreadLocalMap get() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            return fastGet((FastThreadLocalThread) thread);
        } else {
            return slowGet();
        }
    }

    private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
        if (threadLocalMap == null) {
            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
        }
        return threadLocalMap;
    }

    private static InternalThreadLocalMap slowGet() {
        ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
        InternalThreadLocalMap ret = slowThreadLocalMap.get();
        if (ret == null) {
            ret = new InternalThreadLocalMap();
            slowThreadLocalMap.set(ret);
        }
        return ret;
    }

我們可以看到這個FastThreadLocal在獲取Map的時候會判斷當前的線程是否是FastThreadLocalThread的對象,是的話就調(diào)用fastGet(FastThreadLocalThread thread)方法獲取InternalThreadLocalMap(不存在就創(chuàng)建);如果不是FastThreadLocalThread的對象,就調(diào)用slowGet()獲取,獲取邏輯是從一個靜態(tài)的ThreadLocal對象中獲取當前線程綁定的InternalThreadLocalMap對象,沒有的話就創(chuàng)建一個。在獲取到InternalThreadLocalMap的對象后,怎么向里面賦值呢?我們可以看一下FastThreadLocal中的set方法賦值的真正邏輯:

  // FastThreadLocal的set方法
    private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
        //index是FastThreadLocal維護的一個索引對象
        if (threadLocalMap.setIndexedVariable(index, value)) {
            addToVariablesToRemove(threadLocalMap, this);
        }
    }

  // InternalThreadLocalMap的方法
    public boolean setIndexedVariable(int index, Object value) {
        Object[] lookup = indexedVariables;
        if (index < lookup.length) {
            Object oldValue = lookup[index];
            lookup[index] = value;
            return oldValue == UNSET;
        } else {
            expandIndexedVariableTableAndSet(index, value);
            return true;
        }
    }

我們可以看到,其實InternalThreadLocalMap內(nèi)部是一個數(shù)組,每個FastThreadLocal都記錄了自身維護的線程單例的對象再數(shù)組中的位置,即index這個成員變量。這個index的值是在FastThreadLocal初始化的時候從InternalThreadLocalMap內(nèi)部的一個靜態(tài)遞增變量處獲取的。 InternalThreadLocalMap這種方式和jdk內(nèi)部的ThreadLocalMap使用散列表的方式存儲對象相比,優(yōu)點是:獲取和設(shè)置線程單例對象的時候,少了hash值計算這一步,并且沒有hash沖撞的情況發(fā)生。這一點相比ThreadLocalMap*的確性能會有所提升。這也是netty對性能優(yōu)化的一方面體現(xiàn),后面我們還會看到好多在細節(jié)上的優(yōu)化。

我們花了很大的篇幅分析了NioEventLoop的構(gòu)造方法,目的就是為了讓大家看到netty對性能的優(yōu)化都是落到很多細節(jié)上的。下面我們繼續(xù)分析NioEventLoop構(gòu)造方法的剩余內(nèi)容,接下來我們會看到netty的另一個優(yōu)化,在此之前大家要熟悉Java的NIO,不然接下來內(nèi)容肯定是看不懂的!

我們可以看到NioEventLoop有下面幾個成員變量:

    private Selector selector;
    private Selector unwrappedSelector;
    private SelectedSelectionKeySet selectedKeys;

    private final SelectorProvider provider;

我們在NioEventLoop構(gòu)造方法中可以看到對這幾個成員變量的初始化過程。
首先,我們可以看到,構(gòu)造方法中通過openSelector()方法生成了一個SelectorTuple的對象,然后將SelectorTuple中的selectorunwrappedSelector賦值給NioEventLoop的隊形屬性。我們可以看一下openSelector()的內(nèi)容:

    private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            //創(chuàng)建一個Selector對象
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        if (DISABLE_KEY_SET_OPTIMIZATION) {
            //禁止優(yōu)化選型,如果選擇禁止優(yōu)化,就直接創(chuàng)建一個SelectorTuple對象返回
            return new SelectorTuple(unwrappedSelector);
        }

        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });

        if (!(maybeSelectorImplClass instanceof Class) ||
            // ensure the current selector implementation is what we can instrument.
            !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            }
            return new SelectorTuple(unwrappedSelector);
        }
        //下面是優(yōu)化的開始
        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    //通過反射獲取Selector的相關(guān)Field
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                        // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
                        // This allows us to also do this in Java9+ without any extra flags.
                        long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                        long publicSelectedKeysFieldOffset =
                                PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                        if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                            //通過反射設(shè)置Selector對應的屬性的值
                            PlatformDependent.putObject(
                                    unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                            PlatformDependent.putObject(
                                    unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                            return null;
                        }
                        // We could not retrieve the offset, lets try reflection as last-resort.
                    }

                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }
                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }

                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    return e;
                } catch (IllegalAccessException e) {
                    return e;
                }
            }
        });

        if (maybeException instanceof Exception) {
            selectedKeys = null;
            Exception e = (Exception) maybeException;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
            return new SelectorTuple(unwrappedSelector);
        }
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
        return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }

我們可以看到,整個openSelector()方法做的事情就是:判斷參數(shù)是否允許相關(guān)優(yōu)化,如果允許優(yōu)化,就將創(chuàng)建的Selector的對象的兩個屬性:selectedKeys、publicSelectedKeys重寫為:SelectedSelectionKeySet對象。關(guān)于selectedKeys、publicSelectedKeys,大家可以看一看Selector的API,這里不再贅述。這里為什么要對這兩個屬性重新賦值呢?為什么重新賦值了就是優(yōu)化了呢?我們先來看一下這兩個屬性在Selector中是什么:

    //SelectorImpl的部分代碼
    protected Set<SelectionKey> selectedKeys = new HashSet();
    this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);

我們可以看到,原來的selectedKeyspublicSelectedKeys歸根結(jié)底都是HashSet。而替換成的SelectedSelectionKeySet又是什么呢?我們來看一下:

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    SelectionKey[] keys;
    int size;

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }

    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }

        keys[size++] = o;
        if (size == keys.length) {
            increaseCapacity();
        }
        return true;
    }

    @Override
    public boolean remove(Object o) {
        return false;
    }

    @Override
    public boolean contains(Object o) {
        return false;
    }

    @Override
    public int size() {
        return size;
    }

    @Override
    public Iterator<SelectionKey> iterator() {
       //省略
    }

    void reset() {
        reset(0);
    }

    void reset(int start) {
        Arrays.fill(keys, start, size, null);
        size = 0;
    }

    private void increaseCapacity() {
      //省略
    }
}

我們可以看到,SelectedSelectionKeySet繼承了AbstractSet,但是它內(nèi)部實現(xiàn)壓根不能算是一個Set,因為它的add方法沒有保證元素在集合中唯一的相關(guān)實現(xiàn)!為什么要這么做呢?我們不妨先看一下jdk中對selectedKeys這個集合添加元素的相關(guān)邏輯,由于沒有源碼,只能看到變量名是var這種定義,不過不影響我們對邏輯的理解:

                            if (WindowsSelectorImpl.this.selectedKeys.contains(var10)) {
                                if (var9.clearedCount != var1) {
                                    if (var10.channel.translateAndSetReadyOps(var4, var10) && var9.updateCount != var1) {
                                        var9.updateCount = var1;
                                        ++var6;
                                    }
                                } else if (var10.channel.translateAndUpdateReadyOps(var4, var10) && var9.updateCount != var1) {
                                    var9.updateCount = var1;
                                    ++var6;
                                }

                                var9.clearedCount = var1;
                            } else {
                                if (var9.clearedCount != var1) {
                                    var10.channel.translateAndSetReadyOps(var4, var10);
                                    if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
                                        WindowsSelectorImpl.this.selectedKeys.add(var10);
                                        var9.updateCount = var1;
                                        ++var6;
                                    }
                                } else {
                                    var10.channel.translateAndUpdateReadyOps(var4, var10);
                                    if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
                                        WindowsSelectorImpl.this.selectedKeys.add(var10);
                                        var9.updateCount = var1;
                                        ++var6;
                                    }
                                }

                                var9.clearedCount = var1;
                            }

我們可以看到,在把元素添加到selectedKeys之前,會判斷selectedKeys是否已經(jīng)包含了這個元素,包含的話就操作已經(jīng)在就不再進行添加操作,不包含的時候才進行添加操作。而SelectedSelectionKeySet的判斷是否存在指定元素的方法始終返回false,也就意味著,selectedKeys會被添加重復的SelectionKey對象。添加重復的SelectionKey對象會有什么影響呢?在netty中對準備就緒的SelectionKey做處理之前,都會判斷SelectionKey對象就緒的狀態(tài),處理完該事件之后,會把SelectionKey對象的就緒狀態(tài)移除。所以如果重復添加SelectionKey對象,在這里是不會有任何影響的!那這種用數(shù)組直接替代HashMap的操作有什么好處呢?首先,我們看,NioEventLoop繼承了SingleThreadEventLoop,我們可以猜出,NioEventLoop是單線程操作selectedKeys的。單線程操作數(shù)組有什么好處呢?單線程操作可以充分利用CPU的高速緩存,避免偽共享的發(fā)生!并且netty的處理selectedKeys時,只會在處理完所有的就緒的SelectionKey清空數(shù)組,之后再次調(diào)用select方法。所以不存在添加時找空槽的情況,只要順序的往數(shù)組里面加元素就可以了!這種操作比HashMap添加、刪除操作性能要高太多(做了一個小的測試,從容量為10000的數(shù)組和HashMap中刪除元素,HashMap耗時大概是數(shù)組的十倍左右)。

我們花了大量的篇幅分析了EventLoop的構(gòu)造方法。這里主要是想讓大家看到netty對性能的優(yōu)化真的無處不在!而且是千方百計的去優(yōu)化!這也是netty被廣泛應用的原因。包括好多高性能高吞吐的中間件也使用了netty做通信,比如RocketMQ、Spark。而我們在分析類似netty這種高性能框架的源碼時,一定要注意到這些優(yōu)化細節(jié),這樣我們才能清楚這些框架哪里好,才能知道怎么樣才能正確的使用這些框架來充分發(fā)揮它們的優(yōu)勢!

我們繼續(xù)看NioEventLoop的主要邏輯,接下來我們看一下run()方法:

    @Override
    protected void run() {
        for (;;) {
            try {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));

                        // 'wakenUp.compareAndSet(false, true)' is always evaluated
                        // before calling 'selector.wakeup()' to reduce the wake-up
                        // overhead. (Selector.wakeup() is an expensive operation.)
                        //
                        // However, there is a race condition in this approach.
                        // The race condition is triggered when 'wakenUp' is set to
                        // true too early.
                        //
                        // 'wakenUp' is set to true too early if:
                        // 1) Selector is waken up between 'wakenUp.set(false)' and
                        //    'selector.select(...)'. (BAD)
                        // 2) Selector is waken up between 'selector.select(...)' and
                        //    'if (wakenUp.get()) { ... }'. (OK)
                        //
                        // In the first case, 'wakenUp' is set to true and the
                        // following 'selector.select(...)' will wake up immediately.
                        // Until 'wakenUp' is set to false again in the next round,
                        // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                        // any attempt to wake up the Selector will fail, too, causing
                        // the following 'selector.select(...)' call to block
                        // unnecessarily.
                        //
                        // To fix this problem, we wake up the selector again if wakenUp
                        // is true immediately after selector.select(...).
                        // It is inefficient in that it wakes up the selector for both
                        // the first case (BAD - wake-up required) and the second case
                        // (OK - no wake-up required).

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    handleLoopException(e);
                    continue;
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

run()方法是父類SingleThreadEventExecutor的模板方法的實現(xiàn)。我們可以看到,run()方法就是一個不斷的循環(huán),在循環(huán)內(nèi)做了什么操作呢?首先,先調(diào)用selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())來獲取select策略。我們先來看一下SelectStrategy這個接口:

public interface SelectStrategy {

    /**
     * Indicates a blocking select should follow.
     * 接下來要執(zhí)行阻塞的select操作
     */
    int SELECT = -1;
    /**
     * IO循環(huán)應該被重試,非阻塞select接下來會被直接執(zhí)行
     * Indicates the IO loop should be retried, no blocking select to follow directly.
     */
    int CONTINUE = -2;
    /**
     * 接下來不要阻塞獲取新的事件IO循環(huán)
     * Indicates the IO loop to poll for new events without blocking.
     */
    int BUSY_WAIT = -3;

    /**
     * The {@link SelectStrategy} can be used to steer the outcome of a potential select
     * call.
     *
     * @param selectSupplier The supplier with the result of a select result.
     * @param hasTasks true if tasks are waiting to be processed.
     * @return {@link #SELECT} if the next step should be blocking select {@link #CONTINUE} if
     *         the next step should be to not select but rather jump back to the IO loop and try
     *         again. Any value >= 0 is treated as an indicator that work needs to be done.
     */
    int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}

SelectStrategy提供了三種默認的select策略,即SELECT、CONTINUE、BUSY_WAIT。netty中實現(xiàn)了一個默認的DefaultSelectStrategy,它的計算select策略的方式是:

    @Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }

如果當前EventLoop任務隊列中沒有任務,就執(zhí)行SELECT策略,即阻塞的select。如果有的話,就返回當前NioEventLoop中持有的Selector對象的selectNow()方法的返回值,就緒的IO事件的數(shù)量。也就是不選擇任何select模式。這個過程中其實已經(jīng)執(zhí)行了一次非阻塞的selectNow操作

    private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get() throws Exception {
            return selectNow();
        }
    };

    int selectNow() throws IOException {
        try {
            return selector.selectNow();
        } finally {
            // restore wakeup state if needed
            if (wakenUp.get()) {
                selector.wakeup();
            }
        }
    }

在獲取到需要執(zhí)行的IO select策略后,就選擇執(zhí)行具體的內(nèi)容,我們可以看到,CONTINUE對應的執(zhí)行方法就是不執(zhí)行接下來的邏輯,重新執(zhí)行select策略的選擇。而NIO不支持忙等操作,所以BUSY_WAIT的邏輯和SELECT的邏輯是一致性的,都調(diào)用了select(wakenUp.getAndSet(false));方法。這里,我們先要清楚wakenUp這個成員變量的含義,我們先看一下這塊內(nèi)容:

    /**
     * Boolean that controls determines if a blocked Selector.select should
     * break out of its selection process. In our case we use a timeout for
     * the select method and the select method will block for that time unless
     * waken up.
     */
    private final AtomicBoolean wakenUp = new AtomicBoolean();

wekenUp的含義是:控制阻塞的Selector.select在執(zhí)行的過程中,是否允許被打斷。在使用Selector.select的過程中,select方法會被設(shè)置超時時間,設(shè)置wekenUp為ture時,Selector.select超時后不會繼續(xù)重新再次被調(diào)用。
清楚了wekenUp這個參數(shù)的含義后,我們看一下NioEventLoop的具體select操作是什么邏輯:

    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            //計算出select阻塞時間的最后截止時間,這個時間計算的方式是當前時間加上提交到當前EventLoop中的最近需要執(zhí)行的定時任務的延遲時間
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

            for (;;) {
                // 計算出select的阻塞時間,加500000L是為了始終進位。如果整個select操作執(zhí)行的時間超過了selectDeadLineNanos,整個方法就結(jié)束
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                // Selector#wakeup. So we need to check task queue again before executing select operation.
                // If we don't, the task might be pended until select operation was timed out.
                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                // 如果任務被添加進來,并且任務中想要調(diào)用Selector#wakeup方法讓Selector提前從阻塞的select方法中返回的話,如果不執(zhí)行下面操作,就實現(xiàn)不了這個效果,只能等Selector的select方法阻塞timeoutMillis時間后返回。
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }

                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The code exists in an extra method to ensure the method is not too big to inline as this
                    // branch is not very likely to get hit very frequently.
                    selector = selectRebuildSelector(selectCnt);
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
            // Harmless exception - log anyway
        }
    }

我們來分析一下這段代碼的邏輯(在此之前,我們先要清楚,理論上只有當當前EventLoop的任務隊列中沒有任務的時候才會調(diào)用select這個方法SelectStrategy中的邏輯,只有hasTask()是false的時候才返回SELECT,在在調(diào)用EventLoop的select方法之前,wakenUp會被設(shè)置成false)。首先,計算出select操作最長的阻塞時間timeoutMillis。然后判斷hasTasks()的返回值,即EventLoop中是否有添加的任務,如果有的話就說明我們在之前的SelectStrategy選擇select策略之后,又有新的任務添加進來了,這個時候為了防止新添加的任務要等到select操作阻塞完成才有機會執(zhí)行,就做了一個判斷:當前的wekenUp如果是false,就設(shè)置成ture,然后執(zhí)行一個非阻塞的selector.selectNow后跳出NioEventLoop.select;否則就繼續(xù)執(zhí)行接下來的邏輯。也就是執(zhí)行Selector.select阻塞操作。selector.selectNow方法結(jié)束后會判斷,是否有就緒的IO事件,當一下情況滿足任意一條就跳出循環(huán)結(jié)束EventLoop.select方法:有就緒的IO事件、wakenUp在NioEventLoop.select調(diào)用之前是true、當前EventLoop有提交的立即執(zhí)行的任務、當前EventLoop中有提交的定時執(zhí)行的任務。如果不滿足任意情況,就判斷是否當前線程有中斷狀態(tài),有的話也跳出循環(huán)。最后判斷循環(huán)的總時間是否大于設(shè)置的Selector.select的超時時間,判斷Selector.select是不是因為超時而結(jié)束。如果是因為超時而結(jié)束就將selectCnt設(shè)置為1,繼續(xù)循環(huán);不是的話就判斷循環(huán)的次數(shù)是否大于SELECTOR_AUTO_REBUILD_THRESHOLD,是的話就跳出循環(huán),這塊是為了解決jdk6的關(guān)于NIO的bug,我們可以先不用管這個邏輯。到此整個NioEventLoop.select的過程就結(jié)束了。這個過程看起來非常亂,我們要弄清楚整個流程,首先要先明白wakenUp這個屬性的生命周期。我們可以看到,wakenUp這個屬性提供給外部的修改窗口只有一個:

    @Override
    protected void wakeup(boolean inEventLoop) {
        if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
            selector.wakeup();
        }
    }

這個方法是protected修飾的,也就是說,這個方法是不提供其他包調(diào)用的,所以這個方法是一個netty內(nèi)部的調(diào)用方法,我們可以搜索到這個方法在哪里使用:

wakeup方法被調(diào)用處

我們可以看到,這個方法主要是在停機的時候調(diào)用的。為的就是在停機的時候?qū)elector.select從阻塞中喚醒。
細心地朋友也許會發(fā)現(xiàn),NioEventLoop.select方法在調(diào)用之前,會把wakenUp設(shè)置為false,這是為什么呢?為的就是在外部調(diào)用NioEventLoop.wakeup方法的時候wakenUp.compareAndSet(false, true)這個會設(shè)置成功,然后可以調(diào)用selector.wakeup()將Selector喚醒。

到這里,我們再回過頭去看NioEventLoop.select方法,這個方法的作用其實就是:調(diào)用Selector.select方法來阻塞地獲取就緒的IO事件,并且在任何時候都可以響應weakUp操作,如果NioEventLoop中添加定時任務,NioEventLoop.select會執(zhí)行的時間最多就是到最近定時任務執(zhí)行的時間,沒有定時任務就最多執(zhí)行1s。這樣去理解是不是簡單多了!!!

細心的朋友可能會問:為什么要限制NioEventLoop.select的執(zhí)行時間最長到下一個定時任務執(zhí)行的時間呢?我們先帶著疑問繼續(xù)往下看NioEventLoop.run方法。

在結(jié)束了select操作之后,繼續(xù)判斷一下wakenUp的標志,如果設(shè)置為ture,就調(diào)用selector.wakeup();使下一次的selector.select非阻塞。
隨后會獲取當前的ioRatio,我們之前提過這個參數(shù),這個參數(shù)是設(shè)置我們的IO操作在整個事件執(zhí)行中的時間占比的,我們看一下下面的具體邏輯。首先,會判斷ioRatio是不是設(shè)置100,如果是設(shè)置百分之百,就先執(zhí)行processSelectedKeys(),再執(zhí)行runAllTasks(),不設(shè)置事件占比限制。如果ioRatio不是100,就先執(zhí)行processSelectedKeys(),并且記錄下processSelectedKeys()的執(zhí)行時間,然后計算出剩余時間,使用這個剩余時間來限制runAllTasks方法。這兩個方法就是干什么的呢?這里我先給出答案:processSelectedKeys()的作用是處理所有的就緒的selectedKeys,也就是就緒的IO事件;而runAllTasks這兩個重載方法就是執(zhí)行所有的提交任務。到這里,我們可以明白為什么要限制NioEventLoop.select的執(zhí)行時間最長到下一個定時任務開始執(zhí)行的時間了。因為IO處理和任務執(zhí)行是在一個線程里執(zhí)行的,如果不限制NioEventLoop.select的執(zhí)行時間,到達下一個定時任務需要執(zhí)行的時間的時候,有可能整個線程還阻塞在select方法上!
接下來我們繼續(xù)分析processSelectedKeys()runAllTasks分別是怎么處理IO事件和提交的任務的。我們先來看一下processSelectedKeys()

    private void processSelectedKeys() {
        //判斷是否使用了優(yōu)化過的Selector,是的話就循環(huán)數(shù)組,不是的話就循環(huán)iterator。
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);

                selectAgain();
                i = -1;
            }
        }
    }

首先方法會判斷我們是否使用數(shù)組來替代Map優(yōu)化Selector,這個是我們上邊分析NioEventLoop的構(gòu)造方法講的。我們這里只看優(yōu)化的方法,其實兩個邏輯都是一樣的,只是循環(huán)的方法不一樣。整個執(zhí)行過程就是遍歷就緒的SelectionKey。然后交給processSelectedKey的兩個重載方法去處理。這里會根據(jù)SelectionKey的attachment對象的類型來判斷調(diào)用哪個重載方法。我們先不用管這個attachment對象是什么時候被添加的,這個會在我們只會的分析中講到,我們先來看一下這兩個方法的邏輯:

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

    private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
        int state = 0;
        try {
            task.channelReady(k.channel(), k);
            state = 1;
        } catch (Exception e) {
            k.cancel();
            invokeChannelUnregistered(task, k, e);
            state = 2;
        } finally {
            switch (state) {
            case 0:
                k.cancel();
                invokeChannelUnregistered(task, k, null);
                break;
            case 1:
                if (!k.isValid()) { // Cancelled by channelReady()
                    invokeChannelUnregistered(task, k, null);
                }
                break;
            }
        }
    }

processSelectedKey(SelectionKey k, AbstractNioChannel ch):我們方法開始的驗證邏輯先不看,主要看下面的事件就緒的邏輯。首先,會獲取就緒的事件,判斷就緒的事件中是否包含連接事件,如果包含,就將當前
SelectionKey的連接就緒事件從SelectionKey的感興趣的事件中剔除掉,然后將就緒事件交給就緒的AbstractNioChannel的unsafe去處理,調(diào)用的是unsafe.finishConnect()方法。具體處理邏輯我們本篇不做分析。然后就是去判斷就緒的事件中是否包含了寫就緒、讀就緒、ACCEPT事件,包含的話都是委托給unsafe的對應方法。
processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task):這個方法很簡單,就是執(zhí)行NioTaskchannelReady方法,如果執(zhí)行失敗了,就執(zhí)行channelUnregistered方法。我們這里可以猜測NioTask是一個IO就緒事件的回掉方法。
IO就緒事件的處理邏輯很簡單,我們接下里看一下提交任務的處理邏輯,我們只看可以設(shè)置超時時間的重載方法protected boolean runAllTasks(long timeoutNanos)

    protected boolean runAllTasks(long timeoutNanos) {
        //把定時任務隊列里到達執(zhí)行時間的任務添加到非定時任務隊列
        fetchFromScheduledTaskQueue();
        //從非定時任務隊列獲取任務
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

首先,將到達執(zhí)行時間的定時任務添加到非定時任務的執(zhí)行列表中,然后從費定時任務列表中獲取任務,沒有的話就執(zhí)行afterRunningAllTasks();,這是一個開放方法,我們這里先不看具體內(nèi)容。如果有任務,就加入循環(huán)中,循環(huán)的內(nèi)容就是:調(diào)用safeExecute來執(zhí)行任務,其實就是在try-cache中執(zhí)行任務,防止有異常終止。然后已經(jīng)執(zhí)行的方法計數(shù)加以,判斷調(diào)用runAllTasks執(zhí)行的任務個數(shù)和0x3F的與是不是0,也就是是不是64的倍數(shù)。如果是就檢查任務執(zhí)行的時間有沒有超過設(shè)置的超時時間,超過了就結(jié)束循環(huán),然后調(diào)用afterRunningAllTasks();。沒有超時的話就繼續(xù)獲取任務。這個邏輯也比較簡單。

分析到這里我們就把NioEventLoop.run方法分析完了。run方法的作用用一句話概括就是處理就緒的IO事件和提交的任務。那么問題來了,這個run方法在什么時候被調(diào)用呢?我們一路跟著調(diào)用鏈尋找會發(fā)現(xiàn),在NioEventLoop父類SingleThreadEventExecutorexecute(Runnable task)方法被調(diào)用的時候就調(diào)用了run()方法。當然run()方法是一個重載方法,我們上面分析的是NioEventLoop的實現(xiàn)。
這里我們的NioEventLoop的關(guān)鍵代碼分析就基本上結(jié)束了。

三、復盤

本篇我們分析了NioEventLoopNioEventLoop除了可以執(zhí)行提交的任務之外,還可以監(jiān)聽注冊的Channel的IO事件,并且可以根據(jù)ioRatio來控制兩者執(zhí)行的時間占比。這都是通過它的run()方法來執(zhí)行的。
那么,NioEventLoop在netty中的定位也顯而易見了:真正的任務執(zhí)行者。在EventLoop的基礎(chǔ)上,netty實現(xiàn)了一個抽象類SingleThreadEventLoop,SingleThreadEventLoop還繼承了SingleThreadEventExecutor,這就使SingleThreadEventLoop具有一個開放性的模板方法:run()方法,我們可以通過run()來實現(xiàn)自己的任務處理邏輯。而NioEventLoop就是通過實現(xiàn)run()方法來定制自己可以同時處理提交的任務和就緒的IO事件的能力。
下篇,我們會分析,netty是怎么將各個組件串聯(lián)起來的。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容