背景
微服務(wù)流行后,在我們項(xiàng)目開發(fā)過程中,一個(gè)服務(wù)經(jīng)常會(huì)調(diào)用N個(gè)微服務(wù),調(diào)用每個(gè)微服務(wù)可能需要幾百毫秒,試想,一個(gè)復(fù)雜的業(yè)務(wù)如果要調(diào)用上百的微服務(wù),如果各個(gè)服務(wù)同步執(zhí)行,可能就需要花費(fèi)好幾秒,試想:這些服務(wù)為什么不能并行運(yùn)行呢?
一個(gè)復(fù)雜的計(jì)算任務(wù),為什么不能分解成更小的任務(wù)單位,讓他們并行運(yùn)行呢?
本文通過以上兩個(gè)業(yè)務(wù)場(chǎng)景,比較各個(gè)實(shí)現(xiàn)方案的差異,在講解之前,我們先來了解下本文提到的RxJava。
案例
從一段最簡(jiǎn)單的服務(wù)開始:該服務(wù)需調(diào)用3個(gè)微服務(wù),每個(gè)微服務(wù)費(fèi)時(shí)250ms,三個(gè)微服務(wù)都獲取數(shù)據(jù)后返回給前端(該微服務(wù)三個(gè)服務(wù)分別是商品詳情,商品評(píng)論和推薦商品列表),如果按順序執(zhí)行,那么代碼是這樣的:
public static void main(String[] args) throws Exception {
long c = System.currentTimeMillis();
System.out.println("順序執(zhí)行:");
System.out.println(service("商品詳情微服務(wù)")+service("商品評(píng)論微服務(wù)")+service("推薦商品微服務(wù)"));
spendTime(c);
}
//模擬某個(gè)服務(wù)
private static String service(String srvName){
try {
Thread.sleep(250);
} catch (InterruptedException e) {
e.printStackTrace();
}
return srvName+"\r\n";
}
private static void spendTime(long preTime) {
System.out.println("花費(fèi):" + (System.currentTimeMillis() - preTime) + " 毫秒");
}
這段代碼毫無疑問,打印輸出:
花費(fèi):781 毫秒
改造一下,使用JDK8的CompletableFuture,3個(gè)微服務(wù)獨(dú)立線程運(yùn)行,都完成后通知主線程打印,代碼如下:
public static void main(String[] args) throws Exception {
final long cc = System.currentTimeMillis();
CompletableFuture<String> s1 = CompletableFuture.supplyAsync(() -> service("商品詳情微服務(wù)"));
CompletableFuture<String> s2 = CompletableFuture.supplyAsync(() -> service("商品評(píng)論微服務(wù)"));
CompletableFuture<String> s3 = CompletableFuture.supplyAsync(() -> service("推薦商品微服務(wù)"));
s1.thenCombine(s2, (i,j)->{
return i+j;
}).thenCombine(s3, (i,j)->{
System.out.println("使用JDK8的并行編程:");
System.out.println(i+j);
spendTime(cc);
return i+j;
});
}
以上代碼的執(zhí)行結(jié)果取決于3個(gè)微服務(wù)中最長(zhǎng)時(shí)間的那個(gè)服務(wù),相比原先速度有明顯提高:
花費(fèi):311 毫秒
那么以上的代碼使用RxJava怎么來寫呢?我們可以flatMap將服務(wù)分拆到各自獨(dú)立線程中去執(zhí)行,代碼如下:
private static String[] ss = {"商品詳情微服務(wù)","商品評(píng)論微服務(wù)","推薦商品微服務(wù)"};
public static void main(String[] args) throws Exception {
Observable.range(0,3)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer t) throws Exception {
return Observable.just(t)
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
.map(new Function<Integer, String>() {
@Override
public String apply(Integer t) throws Exception {
return service(ss[t]);
}
});
}
})
.reduce((s1,s2)->s1+s2)
.subscribe(s -> {
System.out.println("Observable:\r\n" + s);
spendTime(cc2);
});
}
花費(fèi):455 毫秒
RxJava模擬的針對(duì)每個(gè)數(shù)據(jù)項(xiàng)的并發(fā)操作調(diào)用時(shí)間上要比直接使用JDK8的API慢得多
第二個(gè)業(yè)務(wù)場(chǎng)景是將復(fù)雜的計(jì)算進(jìn)行拆分子計(jì)算任務(wù),然后將每個(gè)任務(wù)計(jì)算合并成最終計(jì)算結(jié)果,以下直接給出所有源碼,我們來看看幾種計(jì)算方式在耗時(shí)上的不同,復(fù)雜計(jì)算任務(wù)是:對(duì)1到210000000開根號(hào)求總和
package com.sumslack.rxjava;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
public class TestComputer {
private static final int MAX_I = 210000000;
private static void spendTime(long preTime) {
System.out.println("花費(fèi):" + (System.currentTimeMillis() - preTime) + " 毫秒");
}
private static void spendTime(long preTime,String str) {
System.out.println("[" + str + "] 花費(fèi):" + (System.currentTimeMillis() - preTime) + " 毫秒");
}
private static ExecutorService eService = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception{
int[] ss = new int[MAX_I];
for(int i=1;i<=MAX_I;i++) {
ss[i-1] = i;
}
long c = System.currentTimeMillis();
System.out.println(xx(0,MAX_I));
spendTime(c,"順序執(zhí)行");
final long cc5 = System.currentTimeMillis();
Observable.range(1, MAX_I).map(new Function<Integer, Double>() {
@Override
public Double apply(Integer t) throws Exception {
return Math.sqrt(t);
}
}).reduce((i,j)->i+j)
.subscribeOn(Schedulers.computation())
.subscribe(s -> {
spendTime(cc5,"Observable直接算");
});
final long cc = System.currentTimeMillis();
CompletableFuture<Double> cf1 = CompletableFuture.supplyAsync(() -> {
return xx(0,MAX_I/2);
});
CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
return xx(MAX_I/2,MAX_I);
});
cf1.thenCombine(cf2, (i,j)->{
System.out.println(""+(i+j));
spendTime(cc,"CompletableFuture");
return i+j;
});
//也可以用:CompletableFuture.allOf(cf1,cf2).join();
c = System.currentTimeMillis();
Double dd = Arrays.stream(ss).mapToDouble(d -> Math.sqrt(d)).reduce(0d,Double::sum);
System.out.println(dd);
spendTime(cc,"stream");
c = System.currentTimeMillis();
Double dd2 = Arrays.stream(ss).parallel().mapToDouble(d -> Math.sqrt(d)).reduce(0d,Double::sum);
System.out.println(dd2);
spendTime(cc,"parallel stream");
final long cc2 = System.currentTimeMillis();
Observable.fromArray(0,1,2)
.flatMap(new io.reactivex.functions.Function<Integer,ObservableSource<Double>>(){
@Override
public ObservableSource<Double> apply(Integer t) throws Exception {
if(t%3==0) {
return Observable.just(t)
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, Double>() {
@Override
public Double apply(Integer t) throws Exception {
return xx(0,MAX_I/3);
}
});
}else if(t%3==1) {
return Observable.just(t)
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, Double>() {
@Override
public Double apply(Integer t) throws Exception {
return xx(MAX_I/3,MAX_I*2/3);
}
});
}else {
return Observable.just(t)
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, Double>() {
@Override
public Double apply(Integer t) throws Exception {
return xx(MAX_I*2/3,MAX_I);
}
});
}
}
})
.reduce(new BiFunction<Double, Double, Double>() {
@Override
public Double apply(Double t1, Double t2) throws Exception {
return t1+t2;
}
})
.subscribe( s->{
System.out.println(s);
spendTime(cc2,"Observable");
});
Thread.sleep(100000);
}
private static double xx(int start,int end) {
double sum = 1;
for(int i=start;i<end;i++) {
sum += Math.sqrt(i+1);
}
return sum;
}
}
以下是費(fèi)時(shí)結(jié)果:
[順序執(zhí)行] 花費(fèi):1086 毫秒
[CompletableFuture] 花費(fèi):537 毫秒
[stream] 花費(fèi):1028 毫秒
[parallel stream] 花費(fèi):1305 毫秒
[Observable] 花費(fèi):461 毫秒
[Observable直接算] 花費(fèi):4265 毫秒
這里使用 RxJava 進(jìn)行計(jì)算任務(wù)分解求和是最快的,因?yàn)镴DK8并發(fā)編程我們分解的是兩個(gè)計(jì)算任務(wù),而RxJava分解成3個(gè)所致!
關(guān)于RxJava
RxJava 是 Reactive Extensions的Java實(shí)現(xiàn),通過使用Obserable/Flowable序列來構(gòu)建異步和基于事件的程序的庫,RxJava實(shí)現(xiàn)和擴(kuò)展了觀察者模式。
RxJava基于響應(yīng)式編程,是一種面向數(shù)據(jù)流和變化傳播的編程范式。傳統(tǒng)編程方式代碼都是順序執(zhí)行的,而響應(yīng)式編程是基于異步編程的,借助于CPU多核能力,提高運(yùn)行效率,降低延遲和阻塞,基于數(shù)據(jù)流模型,如一個(gè)函數(shù)可作用與數(shù)據(jù)流中的每項(xiàng),可變化傳播。在響應(yīng)式編程中,函數(shù)成為其第一等公民,同原型類型一樣,函數(shù)可作用與參數(shù),也可作為返回值。
RxJava基于函數(shù)式編程,傳統(tǒng)面向?qū)ο笫峭ㄟ^抽象出對(duì)象關(guān)系來解決問題,函數(shù)式編程是通過函數(shù)的組合來解決問題。
概念
-
Observable:被訂閱者,比如在安卓開發(fā)中,可能是某個(gè)數(shù)據(jù)源,數(shù)據(jù)源的變化要通知到UI,那么UI就是Observer,被訂閱者有冷熱之分,熱Observable無論有沒有訂閱者訂閱,事件流始終發(fā)送,而冷Observable則只有訂閱者訂閱事件流才開始發(fā)送數(shù)據(jù),它們之間是可以通過API相互轉(zhuǎn)化的,比如使用publish可以冷->熱,RefCount可以熱->冷; -
Observer:訂閱者;
RxJava編程
- 被訂閱者:用的做多的是
Observable,如果要支持背壓則使用Flowable,還可以使用Single(只要OnSuccess和onError,沒有onComplete),Completable(創(chuàng)建后不發(fā)射任何數(shù)據(jù),只有onComplete和onError)和Maybe(只發(fā)送0或1個(gè)數(shù)據(jù)); - 生命周期監(jiān)聽:Observable創(chuàng)建后可使用doXXX監(jiān)聽你說需要的生命周期回調(diào);
- 流的創(chuàng)建:create(使用一個(gè)函數(shù)從頭創(chuàng)建),just(指定值創(chuàng)建,最多10個(gè)),fromXXX(基于X類創(chuàng)建),repeat(特定數(shù)據(jù)重復(fù)N次創(chuàng)建),defer(直到有訂閱者訂閱時(shí)才創(chuàng)建),interval(每隔一段時(shí)間創(chuàng)建一個(gè)數(shù)據(jù)發(fā)送),timer(延遲一段時(shí)間后發(fā)送數(shù)據(jù));
- RxJava線程模型: 內(nèi)置多個(gè)線程控制器,包括single(定長(zhǎng)為1的線程池),newThread(啟動(dòng)新線程執(zhí)行),computation(大小為CPU核數(shù)線程池,一般用于密集型計(jì)算),io(適用IO操作),trampoline(直接在當(dāng)前線程運(yùn)行)和Schedulers.from(自定義);
- 變化操作符:map(數(shù)據(jù)轉(zhuǎn)型),flatMap(數(shù)據(jù)轉(zhuǎn)某個(gè)Observable后合并發(fā)送),scan(每個(gè)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),然后按順序發(fā)送),groupBy(按Key分組拆分成多個(gè)Observable),buffer(打包發(fā)送),window,cast(強(qiáng)制轉(zhuǎn)換類型);
- 過濾操作:filter(按條件過濾),takeLast(只發(fā)送最后N個(gè)數(shù)據(jù)),last(只發(fā)送最后一個(gè)數(shù)據(jù)),lastOrDefault(只發(fā)送最后一個(gè)數(shù)據(jù),為Null發(fā)送默認(rèn)值),takeLastBuffer(將最后N個(gè)數(shù)據(jù)當(dāng)做單個(gè)數(shù)據(jù)發(fā)送),skip(跳過N個(gè)發(fā)送),skipLast(跳過最后N個(gè)),take(只發(fā)送開始的N個(gè)數(shù)據(jù)),first,takeFirst(只發(fā)送滿足條件的第一個(gè)數(shù)據(jù)),elementAt(只發(fā)送第N個(gè)數(shù)據(jù)),timeout(指定事件內(nèi)沒發(fā)送數(shù)據(jù),就發(fā)送異常),distinct(去重),ofType(只發(fā)送特定類型的數(shù)據(jù)),ignoreElements(丟失所有正常數(shù)據(jù),只發(fā)送錯(cuò)誤或完成通知),sample(一段時(shí)間內(nèi),只處理最后一個(gè)數(shù)據(jù)),throttleFirst(一段時(shí)間內(nèi),只處理第一個(gè)數(shù)據(jù)),debounce(發(fā)送一個(gè)數(shù)據(jù),開始計(jì)時(shí),到了規(guī)定時(shí)間沒有再發(fā)送數(shù)據(jù),則開始處理數(shù)據(jù));
- 條件操作和布爾操作符:all(發(fā)送的數(shù)據(jù)是否都滿足條件),contains(發(fā)送的數(shù)據(jù)是否包含某數(shù)據(jù)),amb(多個(gè)被訂閱者數(shù)據(jù)發(fā)送只發(fā)送首次被訂閱的那個(gè)數(shù)據(jù)流),defaultIfEmpty(如果原始被訂閱者沒有值,則發(fā)送一個(gè)默認(rèn)值),sequenceEquals(判定兩個(gè)數(shù)據(jù)流是否一樣,返回true或false),skipUtil(直到符合條件才發(fā)送),skipWhile(直到條件不符合才開始發(fā)送),takeUntil(滿足條件后不發(fā)送)和takeWhile(條件滿足的一直發(fā)送);
- 合并和連接操作符:merge(將多個(gè)被訂閱數(shù)據(jù)流合并),zip(將多個(gè)數(shù)據(jù)流結(jié)合發(fā)送,返回?cái)?shù)據(jù)流的數(shù)據(jù)個(gè)數(shù)是最少的那個(gè)),combineLastest(類似zip,任意被訂閱者開始發(fā)送數(shù)據(jù)時(shí)即發(fā)送,而zip要每個(gè)被訂閱者開始發(fā)送數(shù)據(jù)才發(fā)送),join(兩個(gè)被訂閱者結(jié)合合并,總數(shù)據(jù)項(xiàng)是M*N項(xiàng)),startWith(在數(shù)據(jù)序列開頭插入指定項(xiàng)),connect,靈活控制發(fā)送數(shù)據(jù)規(guī)則可使用push,refCount,replay(保證所有訂閱者收到相同數(shù)據(jù));
- 背壓:被訂閱者發(fā)送數(shù)據(jù)過快以至于訂閱者來不及處理的情況;
總結(jié)
對(duì)于復(fù)雜計(jì)算,你可以將計(jì)算任務(wù)分解成N個(gè)子計(jì)算任務(wù),交給多個(gè)線程處理并將結(jié)果合并后取得最終結(jié)果,對(duì)于服務(wù)業(yè)務(wù)的調(diào)用,你應(yīng)該清楚,哪些子任務(wù)可以并行運(yùn)行,哪些需要順序執(zhí)行,使用RxJava在代碼上可能更加直觀,也可以使用JDK8的CompletableFuture,其實(shí)JDK8的很多API參考了RxJava的實(shí)現(xiàn),兩者在寫法上非常的類似,響應(yīng)式編程相比傳統(tǒng)代碼的順序執(zhí)行在思路上有很大的不同,理解上也有一定的難度,希望通過本文讓您全面了解函數(shù)式編程的實(shí)現(xiàn)思路。