帶你看穿 Java 流與并行流的底層執(zhí)行流程

引言


本篇文章并不會講解并行流的使用,因?yàn)樗氖褂煤芎唵?,網(wǎng)上的資料也非常的多,正是因?yàn)樗褂蒙系暮唵危拍軒椭脩羝帘未罅考?xì)節(jié),實(shí)現(xiàn)惰性計(jì)算,自動并行化等高級功能。本文將剖析流與并行流的執(zhí)行細(xì)節(jié),讀完本文后,你看到流代碼就能想象出它具體的執(zhí)行流程。在此之前,筆者已經(jīng)通讀了它的源碼與相關(guān)文檔。

先來看一段簡單的流操作:

        List<Widget> widgets = new ArrayList<>();
        widgets.add(ofWidget(Color.RED, 2));
        widgets.add(ofWidget(Color.WHITE, 3));
        widgets.add(ofWidget(Color.RED, 5));
        widgets.add(ofWidget(Color.RED, 4));
        int sum = widgets.stream()
                .filter(b -> b.getColor() == Color.RED)
                .mapToInt(b -> b.getWeight())
                .sum();

下面我將帶著大家逐步想象出它的執(zhí)行流程。

Spliterator


是不是覺得這個(gè)單詞的拼寫有點(diǎn)像 Split + Iterator? 其實(shí)它本質(zhì)上就是一個(gè)可以通過 trySplit 方法指導(dǎo)并行流進(jìn)行數(shù)據(jù)切分的 迭代器(Iterator)。

就像 Java 的集合類都會通過 iterator() 返回自己的 Iterator 一樣,到了Java8,它們也同時(shí)定義了一個(gè) spliterator() 方法返回自己的 Spliterator。

有兩種方式訪問 Spliterator 中數(shù)據(jù):

  • boolean tryAdvance(Consumer<? super T> action):使用 action 處理該 Spliterator 中下一個(gè)元素,并且向下移動一個(gè)元素。如果有元素可以處理的話就返回 true,否則返回 false
  • void forEachRemaining(Consumer<? super T> action):對于 Spliterator 中剩下的所有元素,逐個(gè)使用 action 處理,并移動到 Spliterator 的最后

所有的流都是以一個(gè) Spliterator 作為數(shù)據(jù)源的,通過 StreamSupport.stream 就可以將一個(gè) Spliterator 轉(zhuǎn)換成一個(gè)流,上面的例子中所調(diào)用的 Collection.stream 就是這樣實(shí)現(xiàn)的:

    default Stream<E> stream() {
        // false 表示是串行流,并行流則傳 true
        return StreamSupport.stream(spliterator(), false);
    }
    
    default Stream<E> parallelStream() {
        return StreamSupport.stream(spliterator(), true);
    }

Spliterator 和 Iterator 也可以非常方便地互相轉(zhuǎn)換:

  • Spliterator 轉(zhuǎn) Iterator:Spliterators.iterator(spliterator)
  • Iterator 轉(zhuǎn) Spliterator: Spliterators.spliteratorUnknownSize(iterator)

JDK 有一些 Stream 的實(shí)現(xiàn)其實(shí)就是用 Iterator 轉(zhuǎn)的,比如 BufferedReader.lines() 返回的流:

    public Stream<String> lines() {
        Iterator<String> iter = new Iterator<String>() {
            String nextLine = null;

            @Override
            public boolean hasNext() {
                if (nextLine != null) {
                    return true;
                } else {
                    try {
                        nextLine = readLine();
                        return (nextLine != null);
                    } catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                }
            }

            @Override
            public String next() {
                if (nextLine != null || hasNext()) {
                    String line = nextLine;
                    nextLine = null;
                    return line;
                } else {
                    throw new NoSuchElementException();
                }
            }
        };
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
                iter, Spliterator.ORDERED | Spliterator.NONNULL), false);
    }

后面我們會學(xué)到,并行流是通過切分?jǐn)?shù)據(jù)源的方式,正常集合類的 trySplit() 實(shí)現(xiàn)都是比較高效的(每次切分一半出去),但是如果 Spliterator 是通過 Iterator 轉(zhuǎn)換過來的話,Iterator 本身又沒有切分功能,那它是怎么切分的呢?

因?yàn)榇藭r(shí) Spliterator 一組切多大比較合適,就只能非常粗暴地每 1024 個(gè)分一組了(事實(shí)上這個(gè)數(shù)字還會不斷增大,第二組就是 2048 個(gè),第三組 3072 .....,至于為什么是這個(gè)數(shù)字,我也不太清楚)。

由此可以看出,如果 Iterator 里面的數(shù)據(jù)本來就不是非常多的話,轉(zhuǎn)成 Spliterator 后所生成的并行流,可能達(dá)不到你想要的效果,甚至和串行流無差。

Pipeline


繼續(xù)之前的那個(gè)例子:

        int sum = widgets.stream()
                .filter(b -> b.getColor() == Color.RED)
                .mapToInt(b -> b.getWeight())
                .sum();

為了支持這種鏈?zhǔn)秸{(diào)用,除了最后的 sum 方法外,其他每個(gè)方法返回的都是 Stream 類型的對象,但是 Stream 只是一個(gè)接口而已,它具體返回的是什么呢?

答案就是 Pipeline,它們實(shí)現(xiàn)了 Stream,你在 sum 之前的一切調(diào)用都是在構(gòu)建一個(gè)由 Pipeline 組成的鏈表鏈表,而沒有進(jìn)行真正的計(jì)算,因此稱之為“惰性”。stream() 返回的是一個(gè)頭接點(diǎn),而 filtermapToInt 都是在尾部添加節(jié)點(diǎn),并返回尾節(jié)點(diǎn)(就是剛添加的那個(gè)節(jié)點(diǎn))的引用。

看一下類圖你就會更加明白:

Pipeline 類圖

為了讓圖能夠簡潔一些,我畫的不怎么正式,除了給正常對象使用的 ReferencePipeline(實(shí)現(xiàn)了 Stream) ,Java 還為每種基本類型定義了一個(gè) Pipeline,就是圖中 IntPipeline(實(shí)現(xiàn)了 IntStream), LongePipeline(實(shí)現(xiàn)了 LongStream) 以及省略號后面的一堆,在源碼中將這些不同的 Pipeline 稱為流的“形狀”(Shape),為什么要給每種基本類型都定義一種 Pipeline 呢?據(jù)說是為了避免頻繁的拆箱裝箱影響性能。

每種形狀的 Pipeline 還分別定義了三個(gè)子類,分別是 StatelessOp, StatefulOpHead

  • StatelessOp:無狀態(tài)操作,比如 filter, map 這種每個(gè)元素都可以單獨(dú)計(jì)算
  • StatefulOp:有狀態(tài)操作,比如 sorted,distinct 這種需要考慮前后元素的
  • Head:顧名思義,一般作為 Pipeline 的頭節(jié)點(diǎn)

對照著之前的示例代碼:

  • widgets.stream() 返回的就是一個(gè)頭節(jié)點(diǎn),即 ReferencePipeline.Head
  • filter() 是無狀態(tài)操作,返回的其實(shí)就是 ReferencePipeline.StatelessOp
  • mapToInt() 雖然也是無狀態(tài)操作,但它的流的形狀是基本類型 int,所以返回的是 IntPipeline.StatelessOp

假如把 sum 之前構(gòu)建流的代碼單獨(dú)抽出來:

        IntStream myStream = widgets.stream()
                .filter(b -> b.getColor() == Color.RED)
                .mapToInt(b -> b.getWeight())

最后構(gòu)建出 myStream 看上去就像下面這樣:

Pipeline 鏈表

如果你還想進(jìn)一步確認(rèn),可以點(diǎn)開 ReferencePipeline.filter 的源代碼看下:

    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        // 返回了一個(gè) StatelessOp
        // 將 this 作為構(gòu)造函數(shù)的第一個(gè)參數(shù),表示以當(dāng)前的這個(gè) Pipeline 作為上游,鏈接起來
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

注意它在返回 StatelessOp 的同時(shí)還重寫了 opWrapSink 方法,記住這個(gè)方法,它引出了流中的下一個(gè)重要概念-Sink。

Sink


之前的示例代碼中,sum 調(diào)用之前,我們只是在不斷地組裝 Pipeline 鏈表而已,沒有做任何實(shí)質(zhì)性地計(jì)算。所有的計(jì)算都是在 sum 調(diào)用中完成。下面我們來深入 它的執(zhí)行細(xì)節(jié)。

首先會根據(jù) Pipeline 鏈表反向構(gòu)建出一個(gè) Sink 鏈表(通過 Pipeline 的 opWrapSink 方法):

反向構(gòu)建 Sink 鏈表

這部分的核心代碼位于 AbstractPipeline.wrapSink 中:

    // 在之前示例代碼中,傳入的 sink 就是 ReducingSink
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);

        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }

看著是不是有點(diǎn)眼熟,這不就是單鏈表逆序的代碼嗎?

至于為什么最后是一個(gè) ReducingSink 節(jié)點(diǎn)呢?因?yàn)?sum 其實(shí)調(diào)用的是 reduce(如果不理解 reduce 的話,這里簡要說一下,其實(shí)就是將 (((1+2)+3)+4) 這樣的運(yùn)算應(yīng)用于 [1,2,3,4],其中 + 可以換成任意自定義運(yùn)算):

    public final int sum() {
        return reduce(0, Integer::sum);
    }

最后一個(gè) Sink 節(jié)點(diǎn)究竟是什么取決于 TerminalOp 是什么,ReduceOp 對應(yīng)的就是 ReducingSink,其實(shí) TerminalOp 總共就只有四種,而且顧名思義:

  • ForEachOp
  • ReduceOp
  • MatchOp
  • FindOp

最后我們再來明確一下 Sink 接口的具體含義,它最重要的三個(gè)方法就是 begin,endaccept 方法。

begin(int)用于通知 Sink,表示要開始一批數(shù)據(jù)傳輸了,讓 Sink 提前做好準(zhǔn)備,int 參數(shù)表示這一批數(shù)據(jù)的大小,-1 表示大小未知,這個(gè)信息可能對于 filter,map 等操作沒有什么用,但是對于 sort 操作則可以提前分配好數(shù)組大小。這個(gè) begin 也是會順著 Sink 鏈條一路調(diào)用下去的,比如經(jīng)過一個(gè) filter 之后,原本確定的大小就變成了 -1

Spliterator 數(shù)據(jù)源

begin 調(diào)用完成后,Sink 就從初始化狀態(tài)(initial state)進(jìn)入激活狀態(tài)(active state),只有在激活狀態(tài)下,才可以調(diào)用 Sink 的 accept 函數(shù)處理數(shù)據(jù),正是在這個(gè) accept 函數(shù)里,各個(gè)操作實(shí)現(xiàn)了自己的核心邏輯,比如 filter 操作只有在接收到的數(shù)據(jù)滿足斷言時(shí)才會傳遞給下游 Sink 的 accept 方法。

在這一批數(shù)據(jù)處理完畢后,調(diào)用 end 方法,讓 Sink 重新從激活狀態(tài)返回初始化狀態(tài),一個(gè)簡單的示意圖如下:

Spliterator 數(shù)據(jù)源

串行流執(zhí)行


串行流的最終執(zhí)行位于 AbstractPipeline.copyInto 方法中:

            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();

wrappedSink 就是之前構(gòu)建好的 Sink 鏈,就只需要將 spliterator 作為數(shù)據(jù)源,源源不斷地向 Sink 鏈中灌數(shù)據(jù)就可以了,forEachRemaining 方法其實(shí)就是在對 spliterator 中的每個(gè)元素調(diào)用 wrappedSink.accept 方法,這樣數(shù)據(jù)經(jīng)過層層處理和過濾最終會在 ReducingSink 中沉淀。

Spliterator 數(shù)據(jù)源

為什么一定要有 beginend 調(diào)用呢?看起來直接調(diào)用 forEachRemaining 就行了啊。

在串行流中確實(shí)看不出有什么用,因?yàn)榇辛饕话憔褪?forEachRemaining 一路遍歷到結(jié)束,并不會存在數(shù)據(jù)分批,這個(gè)約定其實(shí)是設(shè)計(jì)給并行流的,串行流的實(shí)現(xiàn)只是遵守一下約定而已。

流短路


思考下面一段流代碼:

        Optional<Integer> find = Arrays.asList(0, 1, 2, 3)
                .stream()
                .filter(i -> i % 2 == 1)
                .findFirst();
        System.out.println(find.get());   // 1

上面一段代碼找出列表中第一個(gè)奇數(shù),邏輯肯定是沒有問題的,但是處于程序員的本能,你會思考它的性能是否存在問題,它究竟是會把 0,1,2,3 整個(gè)遍歷一遍,還是遍歷找到 1 后就停止?這對于長列表有很大的性能影響。

流的開發(fā)者當(dāng)然早就想到了這個(gè)優(yōu)化,所以上面這段代碼中,流會發(fā)生短路,遇到 1 后流就停止了。

還是看 AbstractPipeline.copyInto 方法:

    @Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            //... 正常流計(jì)算
        }
        else {
            // 短路流計(jì)算
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

進(jìn)入 copyIntoWithCancel

        AbstractPipeline p = AbstractPipeline.this;
        // 找到最上層的 Pipeline,也就是 Head
        while (p.depth > 0) {
            p = p.previousStage;
        }
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        p.forEachWithCancel(spliterator, wrappedSink);
        wrappedSink.end();

其實(shí) copyIntoWithCancel 里面和正常流計(jì)算的邏輯差別也不是太大,區(qū)別就在于數(shù)據(jù)來源于 Head 節(jié)點(diǎn)的 forEachWithCancel,而不是直接調(diào)的 Spliterator 的 forEachRemaining。當(dāng)然, Head 的數(shù)據(jù)也還是來自于 Spliterator:

    @Override
    final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
        do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
    }

和之前直接用 forEachRemaining 直接迭代完全部的數(shù)據(jù)不同,這里使用 tryAdvance 每次迭代一個(gè)元素,就是為了讓流支持短路,短路操作對應(yīng)的 Sink 會在需要短路時(shí)讓 cancellationRequested() 返回 true,讓整個(gè) while 循環(huán)退出,剩下的流元素就不迭代了。

findFirst 短路操作對應(yīng)的 Sink 就是 FindSink

    private static abstract class FindSink<T, O> implements TerminalSink<T, O> {
        boolean hasValue;
        T value;

        FindSink() {} // Avoid creation of special accessor

        @Override
        public void accept(T value) {
            if (!hasValue) {
                hasValue = true;
                this.value = value;
            }
        }

        @Override
        public boolean cancellationRequested() {
            return hasValue;
        }

        //...
    }

非常簡單,只要 Sink 接收到值就會將 hasValue 置為 true,下一次迭代的時(shí)候 cancellationRequested() 方法返回 true 就會將整個(gè)流停掉,實(shí)現(xiàn)短路。

流中除了 FindOp 這個(gè)短路操作外,還提供了下面幾個(gè)短路操作:

  • MatchOp:包括 anyMathallMatch 操作
  • SliceOp:其實(shí)就是 limit 操作

并行流


鋪墊了這么一堆,終于到文章的主題--并行流了。在集合類上調(diào)用 parallelStream 或者在串行流上調(diào)用 parallel 都可以獲得一個(gè)并行流,其余的代碼沒有任何區(qū)別。

并行流能夠?qū)⒋胁僮鞑⑿谢?,其?shí)也沒用什么特別高級的方法,就是最簡單的數(shù)據(jù)并行,之前在將 Spliterator 時(shí)也提到過,就是多次調(diào)用 trySplit 方法,將數(shù)據(jù)切成很多份,然后對每一份數(shù)據(jù)調(diào)用串行流的邏輯,每一份都得出一個(gè)結(jié)果,如果是 forEach 的話,到這里就結(jié)束了,對于 reduce 操作,還需要將每一份的結(jié)果再合并起來。

注意:因?yàn)?"惰性" 的緣故,我上面說的這一切都是在調(diào)用流的終結(jié)函數(shù)時(shí)發(fā)生,調(diào)用中間操作時(shí),還是和串行流一樣,構(gòu)建 Pipeline 鏈表

并行流示意圖

所以 reduce 才有一個(gè)叫做 combiner 的參數(shù),它就用來合并每一份結(jié)果的邏輯,如果不理解上圖,是很難理解這個(gè)參數(shù)的含義的:

// 統(tǒng)計(jì)字符流中 '{' 的數(shù)量
int num = Stream.of('{','}','{','}').parallel()
                .reduce(0,
                        // accumulator 負(fù)責(zé)累計(jì)出一份數(shù)據(jù)的結(jié)果
                        (acc, ch) -> ch == '{'? ++acc: acc,
                        // combiner 負(fù)責(zé)將并行的多份結(jié)果整合成最終結(jié)果
                        Integer::sum);

需要注意的是,reduceidentity 參數(shù)應(yīng)該是一個(gè)無狀態(tài)的“值”,而不應(yīng)該是一個(gè)有狀態(tài)的對象(比如,不能是一個(gè) ArrayList),因?yàn)樗鼤还蚕淼剿胁l(fā)的任務(wù)中,會出現(xiàn)意想不到的結(jié)果。如果你非要想放一個(gè)有狀態(tài)的對象的話,可以考慮 collect,下一小節(jié)中我們再詳細(xì)介紹 collect 的概念與原理。

明眼的小伙伴們一眼就能看出來,剛剛我們說的并行流原理其實(shí)只適用于全都是 StatelessOp 的情況,如果流中函數(shù) sorted 這種有狀態(tài)操作的話,哪能這么完美互不相干地切分?jǐn)?shù)據(jù)啊?

我們以下面的流為例:

Stream.of(4, 3, 2, 1).parallel()
                .filter(i -> i % 2 == 0)
                .sorted()
                .map(i -> i * 2)
                .reduce(Integer::sum);

其實(shí)這種情況的處理也很簡單,從 sorted 處,將流切成兩半,第一部分將數(shù)據(jù)收集完后,使用 Arrays.parallelSort 將數(shù)據(jù)排好后,再作為結(jié)果傳遞給第二部分,第二部分將其作為數(shù)據(jù)源,計(jì)算出最終的結(jié)果,一個(gè)簡單的示意圖如下:

帶有排序操作的流

注意:上圖的這個(gè)流程,依舊都是在流的終結(jié)函數(shù)(即 reduce)中發(fā)生

其他 StatefulOp,比如 distinct,也都是采用這種方式處理的。

collect 和 Collectors


官方文檔稱 collect 為 “有狀態(tài)” 的 reducecollect 方法其實(shí)產(chǎn)生的也是一個(gè) reduceOp

        List<Integer> list = Stream.of(1,2,3,4,5).parallel()
                .collect(ArrayList::new,
                        ArrayList::add,
                        ArrayList::addAll);

這三個(gè)參數(shù)也和 reduceidentity, accumulatorcombiner 的含義差不多,只不過第一個(gè)參數(shù)稱為 supplier,其實(shí)就是“惰性”的 identity,對于每一份數(shù)據(jù),調(diào)用一次 supplier.get 得到一個(gè)單獨(dú) identity,于是就避免了之前 reduceidentity 多線程共享的問題。

如果每次把流專成 List 都要寫上面這么多代碼的話,未免太煩人了。所以流的開發(fā)者通過 Collectors 類提供了很多現(xiàn)成的 Collector,使用 collect(Collectors.toList()) 和上面的代碼效果一樣。

Collector 作為一個(gè)接口,其中的 supplier,accumulatorcombiner 方法的含義和三參數(shù) collect 方法中同名參數(shù)含義也是類似的。

Collectors 比較有趣的地方是,很多方法提供了一個(gè) downstream 參數(shù),用于將多個(gè) Collector 整合在一起,可以實(shí)現(xiàn)一些類似于 SQL 的申明式數(shù)據(jù)處理。

比如“查詢每個(gè)部門工資最高的員工”,在 SQL 中我們可以寫做:

select department, max(salary) from person group by department;

利用 Collectors 組合功能,我們也能達(dá)到類似的效果:

class Person {
    String department;
    int salary;

    public Person(String department, int salary) {
        this.department = department;
        this.salary = salary;
    }

    public String getDepartment() {
        return department;
    }

    public int getSalary() {
        return salary;
    }
}

List<Person> persions = Arrays.asList(new Person("finance", 10000),
        new Person("technique", 8999),
        new Person("finance", 7888),
        new Person("technique", 15662));

Map<String, Optional<Person>> collect = persions.stream()
        .collect(
            // group by departmant
            groupingBy(Person::getDepartment,
                reducing(
                    // max(salary)
                    BinaryOperator.maxBy(Comparator.comparingInt(Person::getSalary)))
                )
        );

如何給流拓展其他操作


在使用過程中你會發(fā)現(xiàn) Java 流還是缺少很多方便操作的,比如“壓縮”兩個(gè)流,或者獲取流中每個(gè)元素的索引等等,在 Guava 的 Streams 工具類中,補(bǔ)充了很多這樣的方法:

public class Pair {
    int i;
    int j;

    public Pair(int i, int j) {
        this.i = i;
        this.j = j;
    }

    @Override
    public String toString() {
        return "Pair{" +
                "i=" + i +
                ", j=" + j +
                '}';
    }
}

// 利用 zip 講兩個(gè)整形流壓縮成 Pair 流
// Pair(1, 10)
// Pair(2, 20)
Stream<Integer> iStream = Stream.of(1,2);
Stream<Integer> jStream = Stream.of(10, 20);
Streams.zip(iStream, jStream, Pair::new).forEach(System.out::println);

// 給流加上索引
// a:0
// b:1
Streams.mapWithIndex(Stream.of("a", "b"),
        (str, index) -> str + ":" + index).forEach(System.out::println);

如果你想自己定義一些流操作可以參考 Streams 里面這些方法的實(shí)現(xiàn),如果你點(diǎn)開他們,會發(fā)現(xiàn)也沒有什么神奇的。

zip 就是先把兩個(gè)流的 iterator 取出來,把這兩個(gè) iterator 合成一個(gè) Spliterator 后,把 Spliterator 轉(zhuǎn)換成 Stream 后返回:

  public static <A, B, R> Stream<R> zip(
      Stream<A> streamA, Stream<B> streamB, BiFunction<? super A, ? super B, R> function) {
    //... 省略一些校驗(yàn)代碼
    // 先將兩個(gè)流的 iterator 取出來
    Iterator<A> itrA = Spliterators.iterator(splitrA);
    Iterator<B> itrB = Spliterators.iterator(splitrB);
    // 組成新 iterator,把新 iterator 轉(zhuǎn)換成
    return StreamSupport.stream(
            new AbstractSpliterator<R>(
                Math.min(splitrA.estimateSize(), splitrB.estimateSize()), characteristics) {
              @Override
              public boolean tryAdvance(Consumer<? super R> action) {
                if (itrA.hasNext() && itrB.hasNext()) {
                  action.accept(function.apply(itrA.next(), itrB.next()));
                  return true;
                }
                return false;
              }
            },
            isParallel)
        .onClose(streamA::close)
        .onClose(streamB::close);
  }

mapWithIndex 大概就是先把流的 iterator 取出來后,組合一些邏輯成為 Spliterator,再轉(zhuǎn)換成一個(gè) Stream 返回:

  public static <T, R> Stream<R> mapWithIndex(
      Stream<T> stream, FunctionWithIndex<? super T, ? extends R> function) {
      //...
      Spliterator<T> fromSpliterator = stream.spliterator();
      
      Iterator<T> fromIterator = Spliterators.iterator(fromSpliterator);
      return StreamSupport.stream(
                new AbstractSpliterator<R>(
                    fromSpliterator.estimateSize(),
                    fromSpliterator.characteristics() & (Spliterator.ORDERED | Spliterator.SIZED)) {
                  long index = 0;
  
                  @Override
                  public boolean tryAdvance(Consumer<? super R> action) {
                    if (fromIterator.hasNext()) {
                      action.accept(function.apply(fromIterator.next(), index++));
                      return true;
                    }
                    return false;
                  }
                },
                isParallel)
            .onClose(stream::close);
      //...
  }

End


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容