引言
本篇文章并不會講解并行流的使用,因?yàn)樗氖褂煤芎唵?,網(wǎng)上的資料也非常的多,正是因?yàn)樗褂蒙系暮唵危拍軒椭脩羝帘未罅考?xì)節(jié),實(shí)現(xiàn)惰性計(jì)算,自動并行化等高級功能。本文將剖析流與并行流的執(zhí)行細(xì)節(jié),讀完本文后,你看到流代碼就能想象出它具體的執(zhí)行流程。在此之前,筆者已經(jīng)通讀了它的源碼與相關(guān)文檔。
先來看一段簡單的流操作:
List<Widget> widgets = new ArrayList<>();
widgets.add(ofWidget(Color.RED, 2));
widgets.add(ofWidget(Color.WHITE, 3));
widgets.add(ofWidget(Color.RED, 5));
widgets.add(ofWidget(Color.RED, 4));
int sum = widgets.stream()
.filter(b -> b.getColor() == Color.RED)
.mapToInt(b -> b.getWeight())
.sum();
下面我將帶著大家逐步想象出它的執(zhí)行流程。
Spliterator
是不是覺得這個(gè)單詞的拼寫有點(diǎn)像 Split + Iterator? 其實(shí)它本質(zhì)上就是一個(gè)可以通過 trySplit 方法指導(dǎo)并行流進(jìn)行數(shù)據(jù)切分的 迭代器(Iterator)。
就像 Java 的集合類都會通過 iterator() 返回自己的 Iterator 一樣,到了Java8,它們也同時(shí)定義了一個(gè) spliterator() 方法返回自己的 Spliterator。
有兩種方式訪問 Spliterator 中數(shù)據(jù):
-
boolean tryAdvance(Consumer<? super T> action):使用action處理該 Spliterator 中下一個(gè)元素,并且向下移動一個(gè)元素。如果有元素可以處理的話就返回true,否則返回false -
void forEachRemaining(Consumer<? super T> action):對于 Spliterator 中剩下的所有元素,逐個(gè)使用action處理,并移動到 Spliterator 的最后
所有的流都是以一個(gè) Spliterator 作為數(shù)據(jù)源的,通過 StreamSupport.stream 就可以將一個(gè) Spliterator 轉(zhuǎn)換成一個(gè)流,上面的例子中所調(diào)用的 Collection.stream 就是這樣實(shí)現(xiàn)的:
default Stream<E> stream() {
// false 表示是串行流,并行流則傳 true
return StreamSupport.stream(spliterator(), false);
}
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}
Spliterator 和 Iterator 也可以非常方便地互相轉(zhuǎn)換:
- Spliterator 轉(zhuǎn) Iterator:
Spliterators.iterator(spliterator) - Iterator 轉(zhuǎn) Spliterator:
Spliterators.spliteratorUnknownSize(iterator)
JDK 有一些 Stream 的實(shí)現(xiàn)其實(shí)就是用 Iterator 轉(zhuǎn)的,比如 BufferedReader.lines() 返回的流:
public Stream<String> lines() {
Iterator<String> iter = new Iterator<String>() {
String nextLine = null;
@Override
public boolean hasNext() {
if (nextLine != null) {
return true;
} else {
try {
nextLine = readLine();
return (nextLine != null);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
@Override
public String next() {
if (nextLine != null || hasNext()) {
String line = nextLine;
nextLine = null;
return line;
} else {
throw new NoSuchElementException();
}
}
};
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
iter, Spliterator.ORDERED | Spliterator.NONNULL), false);
}
后面我們會學(xué)到,并行流是通過切分?jǐn)?shù)據(jù)源的方式,正常集合類的 trySplit() 實(shí)現(xiàn)都是比較高效的(每次切分一半出去),但是如果 Spliterator 是通過 Iterator 轉(zhuǎn)換過來的話,Iterator 本身又沒有切分功能,那它是怎么切分的呢?
因?yàn)榇藭r(shí) Spliterator 一組切多大比較合適,就只能非常粗暴地每 1024 個(gè)分一組了(事實(shí)上這個(gè)數(shù)字還會不斷增大,第二組就是 2048 個(gè),第三組 3072 .....,至于為什么是這個(gè)數(shù)字,我也不太清楚)。
由此可以看出,如果 Iterator 里面的數(shù)據(jù)本來就不是非常多的話,轉(zhuǎn)成 Spliterator 后所生成的并行流,可能達(dá)不到你想要的效果,甚至和串行流無差。
Pipeline
繼續(xù)之前的那個(gè)例子:
int sum = widgets.stream()
.filter(b -> b.getColor() == Color.RED)
.mapToInt(b -> b.getWeight())
.sum();
為了支持這種鏈?zhǔn)秸{(diào)用,除了最后的 sum 方法外,其他每個(gè)方法返回的都是 Stream 類型的對象,但是 Stream 只是一個(gè)接口而已,它具體返回的是什么呢?
答案就是 Pipeline,它們實(shí)現(xiàn)了 Stream,你在 sum 之前的一切調(diào)用都是在構(gòu)建一個(gè)由 Pipeline 組成的鏈表鏈表,而沒有進(jìn)行真正的計(jì)算,因此稱之為“惰性”。stream() 返回的是一個(gè)頭接點(diǎn),而 filter 和 mapToInt 都是在尾部添加節(jié)點(diǎn),并返回尾節(jié)點(diǎn)(就是剛添加的那個(gè)節(jié)點(diǎn))的引用。
看一下類圖你就會更加明白:

為了讓圖能夠簡潔一些,我畫的不怎么正式,除了給正常對象使用的 ReferencePipeline(實(shí)現(xiàn)了 Stream) ,Java 還為每種基本類型定義了一個(gè) Pipeline,就是圖中 IntPipeline(實(shí)現(xiàn)了 IntStream), LongePipeline(實(shí)現(xiàn)了 LongStream) 以及省略號后面的一堆,在源碼中將這些不同的 Pipeline 稱為流的“形狀”(Shape),為什么要給每種基本類型都定義一種 Pipeline 呢?據(jù)說是為了避免頻繁的拆箱裝箱影響性能。
每種形狀的 Pipeline 還分別定義了三個(gè)子類,分別是 StatelessOp, StatefulOp 和 Head:
-
StatelessOp:無狀態(tài)操作,比如filter,map這種每個(gè)元素都可以單獨(dú)計(jì)算 -
StatefulOp:有狀態(tài)操作,比如sorted,distinct這種需要考慮前后元素的 -
Head:顧名思義,一般作為 Pipeline 的頭節(jié)點(diǎn)
對照著之前的示例代碼:
-
widgets.stream()返回的就是一個(gè)頭節(jié)點(diǎn),即ReferencePipeline.Head -
filter()是無狀態(tài)操作,返回的其實(shí)就是ReferencePipeline.StatelessOp -
mapToInt()雖然也是無狀態(tài)操作,但它的流的形狀是基本類型 int,所以返回的是IntPipeline.StatelessOp
假如把 sum 之前構(gòu)建流的代碼單獨(dú)抽出來:
IntStream myStream = widgets.stream()
.filter(b -> b.getColor() == Color.RED)
.mapToInt(b -> b.getWeight())
最后構(gòu)建出 myStream 看上去就像下面這樣:

如果你還想進(jìn)一步確認(rèn),可以點(diǎn)開 ReferencePipeline.filter 的源代碼看下:
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
// 返回了一個(gè) StatelessOp
// 將 this 作為構(gòu)造函數(shù)的第一個(gè)參數(shù),表示以當(dāng)前的這個(gè) Pipeline 作為上游,鏈接起來
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
注意它在返回 StatelessOp 的同時(shí)還重寫了 opWrapSink 方法,記住這個(gè)方法,它引出了流中的下一個(gè)重要概念-Sink。
Sink
之前的示例代碼中,sum 調(diào)用之前,我們只是在不斷地組裝 Pipeline 鏈表而已,沒有做任何實(shí)質(zhì)性地計(jì)算。所有的計(jì)算都是在 sum 調(diào)用中完成。下面我們來深入 它的執(zhí)行細(xì)節(jié)。
首先會根據(jù) Pipeline 鏈表反向構(gòu)建出一個(gè) Sink 鏈表(通過 Pipeline 的 opWrapSink 方法):

這部分的核心代碼位于 AbstractPipeline.wrapSink 中:
// 在之前示例代碼中,傳入的 sink 就是 ReducingSink
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
看著是不是有點(diǎn)眼熟,這不就是單鏈表逆序的代碼嗎?
至于為什么最后是一個(gè) ReducingSink 節(jié)點(diǎn)呢?因?yàn)?sum 其實(shí)調(diào)用的是 reduce(如果不理解 reduce 的話,這里簡要說一下,其實(shí)就是將 (((1+2)+3)+4) 這樣的運(yùn)算應(yīng)用于 [1,2,3,4],其中 + 可以換成任意自定義運(yùn)算):
public final int sum() {
return reduce(0, Integer::sum);
}
最后一個(gè) Sink 節(jié)點(diǎn)究竟是什么取決于 TerminalOp 是什么,ReduceOp 對應(yīng)的就是 ReducingSink,其實(shí) TerminalOp 總共就只有四種,而且顧名思義:
ForEachOpReduceOpMatchOpFindOp
最后我們再來明確一下 Sink 接口的具體含義,它最重要的三個(gè)方法就是 begin,end 和 accept 方法。
begin(int)用于通知 Sink,表示要開始一批數(shù)據(jù)傳輸了,讓 Sink 提前做好準(zhǔn)備,int 參數(shù)表示這一批數(shù)據(jù)的大小,-1 表示大小未知,這個(gè)信息可能對于 filter,map 等操作沒有什么用,但是對于 sort 操作則可以提前分配好數(shù)組大小。這個(gè) begin 也是會順著 Sink 鏈條一路調(diào)用下去的,比如經(jīng)過一個(gè) filter 之后,原本確定的大小就變成了 -1:

begin 調(diào)用完成后,Sink 就從初始化狀態(tài)(initial state)進(jìn)入激活狀態(tài)(active state),只有在激活狀態(tài)下,才可以調(diào)用 Sink 的 accept 函數(shù)處理數(shù)據(jù),正是在這個(gè) accept 函數(shù)里,各個(gè)操作實(shí)現(xiàn)了自己的核心邏輯,比如 filter 操作只有在接收到的數(shù)據(jù)滿足斷言時(shí)才會傳遞給下游 Sink 的 accept 方法。
在這一批數(shù)據(jù)處理完畢后,調(diào)用 end 方法,讓 Sink 重新從激活狀態(tài)返回初始化狀態(tài),一個(gè)簡單的示意圖如下:

串行流執(zhí)行
串行流的最終執(zhí)行位于 AbstractPipeline.copyInto 方法中:
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
wrappedSink 就是之前構(gòu)建好的 Sink 鏈,就只需要將 spliterator 作為數(shù)據(jù)源,源源不斷地向 Sink 鏈中灌數(shù)據(jù)就可以了,forEachRemaining 方法其實(shí)就是在對 spliterator 中的每個(gè)元素調(diào)用 wrappedSink.accept 方法,這樣數(shù)據(jù)經(jīng)過層層處理和過濾最終會在 ReducingSink 中沉淀。

為什么一定要有 begin 和 end 調(diào)用呢?看起來直接調(diào)用 forEachRemaining 就行了啊。
在串行流中確實(shí)看不出有什么用,因?yàn)榇辛饕话憔褪?forEachRemaining 一路遍歷到結(jié)束,并不會存在數(shù)據(jù)分批,這個(gè)約定其實(shí)是設(shè)計(jì)給并行流的,串行流的實(shí)現(xiàn)只是遵守一下約定而已。
流短路
思考下面一段流代碼:
Optional<Integer> find = Arrays.asList(0, 1, 2, 3)
.stream()
.filter(i -> i % 2 == 1)
.findFirst();
System.out.println(find.get()); // 1
上面一段代碼找出列表中第一個(gè)奇數(shù),邏輯肯定是沒有問題的,但是處于程序員的本能,你會思考它的性能是否存在問題,它究竟是會把 0,1,2,3 整個(gè)遍歷一遍,還是遍歷找到 1 后就停止?這對于長列表有很大的性能影響。
流的開發(fā)者當(dāng)然早就想到了這個(gè)優(yōu)化,所以上面這段代碼中,流會發(fā)生短路,遇到 1 后流就停止了。
還是看 AbstractPipeline.copyInto 方法:
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
//... 正常流計(jì)算
}
else {
// 短路流計(jì)算
copyIntoWithCancel(wrappedSink, spliterator);
}
}
進(jìn)入 copyIntoWithCancel:
AbstractPipeline p = AbstractPipeline.this;
// 找到最上層的 Pipeline,也就是 Head
while (p.depth > 0) {
p = p.previousStage;
}
wrappedSink.begin(spliterator.getExactSizeIfKnown());
p.forEachWithCancel(spliterator, wrappedSink);
wrappedSink.end();
其實(shí) copyIntoWithCancel 里面和正常流計(jì)算的邏輯差別也不是太大,區(qū)別就在于數(shù)據(jù)來源于 Head 節(jié)點(diǎn)的 forEachWithCancel,而不是直接調(diào)的 Spliterator 的 forEachRemaining。當(dāng)然, Head 的數(shù)據(jù)也還是來自于 Spliterator:
@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}
和之前直接用 forEachRemaining 直接迭代完全部的數(shù)據(jù)不同,這里使用 tryAdvance 每次迭代一個(gè)元素,就是為了讓流支持短路,短路操作對應(yīng)的 Sink 會在需要短路時(shí)讓 cancellationRequested() 返回 true,讓整個(gè) while 循環(huán)退出,剩下的流元素就不迭代了。
findFirst 短路操作對應(yīng)的 Sink 就是 FindSink:
private static abstract class FindSink<T, O> implements TerminalSink<T, O> {
boolean hasValue;
T value;
FindSink() {} // Avoid creation of special accessor
@Override
public void accept(T value) {
if (!hasValue) {
hasValue = true;
this.value = value;
}
}
@Override
public boolean cancellationRequested() {
return hasValue;
}
//...
}
非常簡單,只要 Sink 接收到值就會將 hasValue 置為 true,下一次迭代的時(shí)候 cancellationRequested() 方法返回 true 就會將整個(gè)流停掉,實(shí)現(xiàn)短路。
流中除了 FindOp 這個(gè)短路操作外,還提供了下面幾個(gè)短路操作:
-
MatchOp:包括anyMath和allMatch操作 -
SliceOp:其實(shí)就是limit操作
并行流
鋪墊了這么一堆,終于到文章的主題--并行流了。在集合類上調(diào)用 parallelStream 或者在串行流上調(diào)用 parallel 都可以獲得一個(gè)并行流,其余的代碼沒有任何區(qū)別。
并行流能夠?qū)⒋胁僮鞑⑿谢?,其?shí)也沒用什么特別高級的方法,就是最簡單的數(shù)據(jù)并行,之前在將 Spliterator 時(shí)也提到過,就是多次調(diào)用 trySplit 方法,將數(shù)據(jù)切成很多份,然后對每一份數(shù)據(jù)調(diào)用串行流的邏輯,每一份都得出一個(gè)結(jié)果,如果是 forEach 的話,到這里就結(jié)束了,對于 reduce 操作,還需要將每一份的結(jié)果再合并起來。
注意:因?yàn)?"惰性" 的緣故,我上面說的這一切都是在調(diào)用流的終結(jié)函數(shù)時(shí)發(fā)生,調(diào)用中間操作時(shí),還是和串行流一樣,構(gòu)建 Pipeline 鏈表

所以 reduce 才有一個(gè)叫做 combiner 的參數(shù),它就用來合并每一份結(jié)果的邏輯,如果不理解上圖,是很難理解這個(gè)參數(shù)的含義的:
// 統(tǒng)計(jì)字符流中 '{' 的數(shù)量
int num = Stream.of('{','}','{','}').parallel()
.reduce(0,
// accumulator 負(fù)責(zé)累計(jì)出一份數(shù)據(jù)的結(jié)果
(acc, ch) -> ch == '{'? ++acc: acc,
// combiner 負(fù)責(zé)將并行的多份結(jié)果整合成最終結(jié)果
Integer::sum);
需要注意的是,reduce 的 identity 參數(shù)應(yīng)該是一個(gè)無狀態(tài)的“值”,而不應(yīng)該是一個(gè)有狀態(tài)的對象(比如,不能是一個(gè) ArrayList),因?yàn)樗鼤还蚕淼剿胁l(fā)的任務(wù)中,會出現(xiàn)意想不到的結(jié)果。如果你非要想放一個(gè)有狀態(tài)的對象的話,可以考慮 collect,下一小節(jié)中我們再詳細(xì)介紹 collect 的概念與原理。
明眼的小伙伴們一眼就能看出來,剛剛我們說的并行流原理其實(shí)只適用于全都是 StatelessOp 的情況,如果流中函數(shù) sorted 這種有狀態(tài)操作的話,哪能這么完美互不相干地切分?jǐn)?shù)據(jù)啊?
我們以下面的流為例:
Stream.of(4, 3, 2, 1).parallel()
.filter(i -> i % 2 == 0)
.sorted()
.map(i -> i * 2)
.reduce(Integer::sum);
其實(shí)這種情況的處理也很簡單,從 sorted 處,將流切成兩半,第一部分將數(shù)據(jù)收集完后,使用 Arrays.parallelSort 將數(shù)據(jù)排好后,再作為結(jié)果傳遞給第二部分,第二部分將其作為數(shù)據(jù)源,計(jì)算出最終的結(jié)果,一個(gè)簡單的示意圖如下:

注意:上圖的這個(gè)流程,依舊都是在流的終結(jié)函數(shù)(即 reduce)中發(fā)生
其他 StatefulOp,比如 distinct,也都是采用這種方式處理的。
collect 和 Collectors
官方文檔稱 collect 為 “有狀態(tài)” 的 reduce,collect 方法其實(shí)產(chǎn)生的也是一個(gè) reduceOp:
List<Integer> list = Stream.of(1,2,3,4,5).parallel()
.collect(ArrayList::new,
ArrayList::add,
ArrayList::addAll);
這三個(gè)參數(shù)也和 reduce 的 identity, accumulator 和 combiner 的含義差不多,只不過第一個(gè)參數(shù)稱為 supplier,其實(shí)就是“惰性”的 identity,對于每一份數(shù)據(jù),調(diào)用一次 supplier.get 得到一個(gè)單獨(dú) identity,于是就避免了之前 reduce 中 identity 多線程共享的問題。
如果每次把流專成 List 都要寫上面這么多代碼的話,未免太煩人了。所以流的開發(fā)者通過 Collectors 類提供了很多現(xiàn)成的 Collector,使用 collect(Collectors.toList()) 和上面的代碼效果一樣。
Collector 作為一個(gè)接口,其中的 supplier,accumulator 和 combiner 方法的含義和三參數(shù) collect 方法中同名參數(shù)含義也是類似的。
Collectors 比較有趣的地方是,很多方法提供了一個(gè) downstream 參數(shù),用于將多個(gè) Collector 整合在一起,可以實(shí)現(xiàn)一些類似于 SQL 的申明式數(shù)據(jù)處理。
比如“查詢每個(gè)部門工資最高的員工”,在 SQL 中我們可以寫做:
select department, max(salary) from person group by department;
利用 Collectors 組合功能,我們也能達(dá)到類似的效果:
class Person {
String department;
int salary;
public Person(String department, int salary) {
this.department = department;
this.salary = salary;
}
public String getDepartment() {
return department;
}
public int getSalary() {
return salary;
}
}
List<Person> persions = Arrays.asList(new Person("finance", 10000),
new Person("technique", 8999),
new Person("finance", 7888),
new Person("technique", 15662));
Map<String, Optional<Person>> collect = persions.stream()
.collect(
// group by departmant
groupingBy(Person::getDepartment,
reducing(
// max(salary)
BinaryOperator.maxBy(Comparator.comparingInt(Person::getSalary)))
)
);
如何給流拓展其他操作
在使用過程中你會發(fā)現(xiàn) Java 流還是缺少很多方便操作的,比如“壓縮”兩個(gè)流,或者獲取流中每個(gè)元素的索引等等,在 Guava 的 Streams 工具類中,補(bǔ)充了很多這樣的方法:
public class Pair {
int i;
int j;
public Pair(int i, int j) {
this.i = i;
this.j = j;
}
@Override
public String toString() {
return "Pair{" +
"i=" + i +
", j=" + j +
'}';
}
}
// 利用 zip 講兩個(gè)整形流壓縮成 Pair 流
// Pair(1, 10)
// Pair(2, 20)
Stream<Integer> iStream = Stream.of(1,2);
Stream<Integer> jStream = Stream.of(10, 20);
Streams.zip(iStream, jStream, Pair::new).forEach(System.out::println);
// 給流加上索引
// a:0
// b:1
Streams.mapWithIndex(Stream.of("a", "b"),
(str, index) -> str + ":" + index).forEach(System.out::println);
如果你想自己定義一些流操作可以參考 Streams 里面這些方法的實(shí)現(xiàn),如果你點(diǎn)開他們,會發(fā)現(xiàn)也沒有什么神奇的。
zip 就是先把兩個(gè)流的 iterator 取出來,把這兩個(gè) iterator 合成一個(gè) Spliterator 后,把 Spliterator 轉(zhuǎn)換成 Stream 后返回:
public static <A, B, R> Stream<R> zip(
Stream<A> streamA, Stream<B> streamB, BiFunction<? super A, ? super B, R> function) {
//... 省略一些校驗(yàn)代碼
// 先將兩個(gè)流的 iterator 取出來
Iterator<A> itrA = Spliterators.iterator(splitrA);
Iterator<B> itrB = Spliterators.iterator(splitrB);
// 組成新 iterator,把新 iterator 轉(zhuǎn)換成
return StreamSupport.stream(
new AbstractSpliterator<R>(
Math.min(splitrA.estimateSize(), splitrB.estimateSize()), characteristics) {
@Override
public boolean tryAdvance(Consumer<? super R> action) {
if (itrA.hasNext() && itrB.hasNext()) {
action.accept(function.apply(itrA.next(), itrB.next()));
return true;
}
return false;
}
},
isParallel)
.onClose(streamA::close)
.onClose(streamB::close);
}
而 mapWithIndex 大概就是先把流的 iterator 取出來后,組合一些邏輯成為 Spliterator,再轉(zhuǎn)換成一個(gè) Stream 返回:
public static <T, R> Stream<R> mapWithIndex(
Stream<T> stream, FunctionWithIndex<? super T, ? extends R> function) {
//...
Spliterator<T> fromSpliterator = stream.spliterator();
Iterator<T> fromIterator = Spliterators.iterator(fromSpliterator);
return StreamSupport.stream(
new AbstractSpliterator<R>(
fromSpliterator.estimateSize(),
fromSpliterator.characteristics() & (Spliterator.ORDERED | Spliterator.SIZED)) {
long index = 0;
@Override
public boolean tryAdvance(Consumer<? super R> action) {
if (fromIterator.hasNext()) {
action.accept(function.apply(fromIterator.next(), index++));
return true;
}
return false;
}
},
isParallel)
.onClose(stream::close);
//...
}