java8Stream原理深度解析

Java8 Stream原理深度解析

Author:Dorae
Date:2017年11月2日19:10:39
轉(zhuǎn)載請注明出處

由于切換博客,2018/09/20及之前文章可能會存在圖片無法查看的問題,請移步這里。


上一篇文章中簡要介紹了Java8的函數(shù)式編程,而在Java8中另外一個比較大且非常重要的改動就是Stream。在這篇文章中,將會對流的實現(xiàn)原理進行深度,解析,具體關(guān)于如何使用,請參考《Java8函數(shù)式編程》。

常用的流操作

在深入原理之前,我們有必要知道關(guān)于Stream的一些基礎(chǔ)知識,關(guān)于Stream的操作分類,如表1-1所示。

表1-1 Stream的常用操作分類(<font size=1>表格引自這里</font>)

表1-1

如表1-1中所示,Stream中的操作可以分為兩大類:中間操作與結(jié)束操作,中間操作只是對操作進行了記錄,只有結(jié)束操作才會觸發(fā)實際的計算(即惰性求值),這也是Stream在迭代大集合時高效的原因之一。中間操作又可以分為無狀態(tài)(Stateless)操作與有狀態(tài)(Stateful)操作,前者是指元素的處理不受之前元素的影響;后者是指該操作只有拿到所有元素之后才能繼續(xù)下去。結(jié)束操作又可以分為短路與非短路操作,這個應(yīng)該很好理解,前者是指遇到某些符合條件的元素就可以得到最終結(jié)果;而后者是指必須處理所有元素才能得到最終結(jié)果。

原理探秘

在探究Stream的執(zhí)行原理之前,我們先看如下兩段代碼(本文將以code_1為例進行說明):

code_1

public static void main(String[] args) {
    List<String> list = Lists.newArrayList(
            "bcd", "cde", "def", "abc");
    List<String> result = list.stream()
            //.parallel()
            .filter(e -> e.length() >= 3)
            .map(e -> e.charAt(0))
            //.peek(System.out :: println)
            //.sorted()
            //.peek(e -> System.out.println("++++" + e))
            .map(e -> String.valueOf(e))
            .collect(Collectors.toList());
    System.out.println("----------------------------");
    System.out.println(result);
}

code_2

public void targetMethod() {
    List<String> list = Lists.newArrayList(
            "bcd", "cde", "def", "abc");
    List<String> result = Lists.newArrayListWithCapacity(list.size());
    for (String str : list) {
        if (str.length() >= 3) {
            char e = str.charAt(0);
            String tempStr = String.valueOf(e);
            result.add(tempStr);
        }
    }
    System.out.println("----------------------------");
    System.out.println(result);
}

很明顯,在最終結(jié)果上而言,code_1與code_2是等價的。那么,Stream是怎么做的呢?顯然不是每次操作都進行迭代,因為這對于執(zhí)行時間與存儲中間變量來說都將是噩夢。

要解決的問題

顯然,如果code_2只對集合迭代了一次,也就是說相當(dāng)高效。那么這么做有沒有弊端?有!模板代碼、中間變量、不利于并行都是其存在的問題。但是按著code_2的思路可以知道有以下幾個問題需要解決:

  • 如何記錄每次操作?
  • 操作如何疊加?
  • 疊加后的操作如何執(zhí)行?
  • 最后的結(jié)果如何存儲?

包結(jié)構(gòu)分析

那么Stream是如何解決的呢?所謂源碼之下,無所遁形。那么,首先來看一下Stream包的結(jié)構(gòu)(如圖1-1所示)。

圖1-1

圖1-1 Stream包的結(jié)構(gòu)示意圖

其中各個部分的主要功能為:

  1. 主要是各種操作的工廠類、數(shù)據(jù)的存儲結(jié)構(gòu)以及收集器的工廠類等;
  2. 主要用于Stream的惰性求值實現(xiàn);
  3. Stream的并行計算框架;
  4. 存儲并行流的中間結(jié)果;
  5. 終結(jié)操作的定義;

我們單獨把第二部分拎出來用于說明Stream的惰性求值實現(xiàn),如圖1-2所示,Java8針對Int、long、double進行了優(yōu)化,主要用于頻繁的拆裝箱。我們以引用類型進行介紹,在圖中已經(jīng)標(biāo)為綠色。

  • BaseStream規(guī)定了流的基本接口,比如iterator、spliterator、isParallel等;
  • Stream中定義了map、filter、flatmap等用戶關(guān)注的常用操作;
  • PipelineHelper主要用于Stream執(zhí)行過程中相關(guān)結(jié)構(gòu)的構(gòu)建;
  • Head、StatelessOp、StatefulOp為ReferencePipeline中的內(nèi)部類。
圖1-2

圖1-2

<font color="red">操作如何記錄</font>

關(guān)于操作如何記錄,在JDK源碼注釋中多次用(操作)stage來標(biāo)識用戶的每一次操作,而通常情況下Stream的操作又需要一個回調(diào)函數(shù),所以一個完整的操作是由數(shù)據(jù)來源、操作、回調(diào)函數(shù)組成的三元組來表示。而在具體實現(xiàn)中,使用實例化的ReferencePipeline來表示,即圖1-2中的Head、StatelessOp、StatefulOp的實例。

如code_3、code_4所示為調(diào)用stream.map()的關(guān)鍵的兩個方法,在用戶
調(diào)用一系列操作后會形成如圖1-3所示的雙鏈表結(jié)構(gòu)。

圖1-3

圖1-3

code_3(ReferencePipeline.map())

@Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}

code_4(AbstractPipeline.AbstractPipeline())

AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
    if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    previousStage.nextStage = this;

    this.previousStage = previousStage;
    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    this.sourceStage = previousStage.sourceStage;
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
    this.depth = previousStage.depth + 1;
}

<font color="red">如何疊加</font>

在上一步已經(jīng)在stage中記錄了每一步操作,此時并沒有執(zhí)行。但是stage只是保存了當(dāng)前的操作,并不能確定下一個stage需要何種操作,何種數(shù)據(jù),其實JDK為此定義了Sink接口,其中只有begin()、end()、cancellationRequested()、accept()四個接口(如表1-2所示,摘自這里),其中中間操作的子類中包含一個指向下游sink的指針。

表1-2

表1-2

現(xiàn)在轉(zhuǎn)向code_3,可以看出,在satge鏈中,每一步都包含了opWrapSink()。當(dāng)調(diào)用終結(jié)操作時,將會觸發(fā)code_5從最后一個stage(終結(jié)操作產(chǎn)生的satge)開始,遞歸產(chǎn)生圖1-4所示的結(jié)構(gòu)。

code_5(AbstractPipeline.wrapSink())

@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    Objects.requireNonNull(sink);

    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}
圖1-4

圖1-4

<font color="red">如何執(zhí)行</font>

所有的操作已經(jīng)形成了圖1-4的結(jié)構(gòu),接下來就會觸發(fā)code_6,此時結(jié)果就會產(chǎn)生對應(yīng)的結(jié)果啦!

code_6(AbstractPipelie.copyInto())

@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);

    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        spliterator.forEachRemaining(wrappedSink);
        wrappedSink.end();
    }
    else {
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}

并行原理

那么,Stream是如何并行執(zhí)行的呢?其實產(chǎn)生stage鏈的過程和串行并沒有區(qū)別,只是在最終執(zhí)行時進行了相應(yīng)的調(diào)整,我們將code_1改變?yōu)閏ode_7

code_7

public static void main(String[] args) {
    List<String> list = Lists.newArrayList(
            "bcd", "cde", "def", "abc");
    List<String> result = list.stream()
            .parallel()
            .filter(e -> e.length() >= 3)
            //.map(e -> e.charAt(0))
            //.peek(System.out :: println)
            .sorted()
            //.peek(e -> System.out.println("++++" + e))
            .map(e -> String.valueOf(e))
            .collect(Collectors.toList());
    System.out.println("----------------------------");
    System.out.println(result);
}

那么最終產(chǎn)生的stage鏈與sink的結(jié)構(gòu)如圖1-5所示,因為此時stage鏈中有一個有狀態(tài)操作(sorted()),也就是說在這里必須處理完所有元素才能進行下一步操作。那么此時無論是并行還是串行,此時都會產(chǎn)生兩個sink鏈,也就是代表了兩次迭代,才產(chǎn)生了最終結(jié)果。

圖1-5

圖1-5

那么,究竟是如何并行的呢?其實當(dāng)調(diào)用collect操作時會調(diào)用code_8,其中的evaluateParallel()如code_9所示。

code_8(AbstractPipeline.evaluate())

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

code_9(ReduceOp.evaluateParallel())

@Override
    public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                     Spliterator<P_IN> spliterator) {
        return new ReduceTask<>(this, helper, spliterator).invoke().get();
    }

其實Stream的并行處理是基于ForkJoin框架的,相關(guān)類與接口的結(jié)構(gòu)如圖1-6所示。其中AbstractShortCircuitTask用于處理短路操作,其他相關(guān)操作類似,會產(chǎn)生對應(yīng)的Task。

圖1-6

圖1-6

關(guān)于code_8中獲取源Spliterator,如code_10所示,

code_10(AbstractPipeline.sourceSpliterator())

@SuppressWarnings("unchecked")
private Spliterator<?> sourceSpliterator(int terminalFlags) {
    Spliterator<?> spliterator = null;
    if (sourceStage.sourceSpliterator != null) {
        spliterator = sourceStage.sourceSpliterator;
        sourceStage.sourceSpliterator = null;
    }
    else if (sourceStage.sourceSupplier != null) {
        spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
        sourceStage.sourceSupplier = null;
    }
    else {
        throw new IllegalStateException(MSG_CONSUMED);
    }

    if (isParallel() && sourceStage.sourceAnyStateful) {
        //如果是并行流并且有stage包含stateful操作
        //那么就會依次遍歷stage,直到遇到stateful stage時
        int depth = 1;
        for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
             u != e;
             u = p, p = p.nextStage) {

            int thisOpFlags = p.sourceOrOpFlags;
            if (p.opIsStateful()) {
                depth = 0;

                if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
                    //如果有短路操作,則去除相應(yīng)標(biāo)記
                    thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
                }
                //盡量以惰性求值的方式進行操作
                spliterator = p.opEvaluateParallelLazy(u, spliterator);

                thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
                        ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
                        : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
            }
            p.depth = depth++;
            p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
        }
    }

    if (terminalFlags != 0)  {
        // Apply flags from the terminal operation to last pipeline stage
        combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
    }

    return spliterator;
}

<font color="red">如何并行執(zhí)行</font>

關(guān)于各個task就行是如何并行執(zhí)行,其實最終調(diào)用的是code_11所示,對應(yīng)的流程如圖1-7所示,其中交替fork子節(jié)點是為了緩和數(shù)據(jù)分片不均造成的性能退化。

code_11(AbstractTask.compute())

@Override
public void compute() {
    Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
    long sizeEstimate = rs.estimateSize();
    long sizeThreshold = getTargetSize(sizeEstimate);
    boolean forkRight = false;
    @SuppressWarnings("unchecked") K task = (K) this;
    while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
        K leftChild, rightChild, taskToFork;
        task.leftChild  = leftChild = task.makeChild(ls);
        task.rightChild = rightChild = task.makeChild(rs);
        task.setPendingCount(1);
        if (forkRight) {
            forkRight = false;
            rs = ls;
            task = leftChild;
            taskToFork = rightChild;
        }
        else {
            forkRight = true;
            task = rightChild;
            taskToFork = leftChild;
        }
        taskToFork.fork();
        sizeEstimate = rs.estimateSize();
    }
    task.setLocalResult(task.doLeaf());
    task.tryComplete();
}
圖1-7

圖1-7

影響并行流的因素

數(shù)據(jù)大??;源數(shù)據(jù)結(jié)構(gòu)(分割越容易越好),arraylist、數(shù)組比較好,hashSet、treeSet次之,linked最差;裝箱;核的數(shù)量(可使用);單元處理開銷(越大越好)

建議:

終結(jié)操作以外的操作,盡量避免副作用,避免突變基于堆棧的引用,或者在執(zhí)行過程中進行任何I/O;傳遞給流操作的數(shù)據(jù)源應(yīng)該是互不干擾(避免修改數(shù)據(jù)源)。

小結(jié)

本文主要探究了Stream的實現(xiàn)原理,并沒有涉及到具體的流操作的用法(讀者可以參考《java8函數(shù)式編程》),并且給出了使用Stream的部分建議。

參考文章

深入理解Java Stream流水線
Java 8 Stream探秘
java.util.stream 庫簡介

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容