異步編程淺析

一.非阻塞和異步

借用知乎用戶嚴(yán)肅的回答
在此總結(jié)下,同步和異步是針對消息通信機(jī)制,同步代表一個client發(fā)出一個調(diào)用,不管是遠(yuǎn)程調(diào)用還是本地調(diào)用,在沒有得到結(jié)果之前就不返回,一直等到調(diào)用返回,就得到返回值了。
而異步調(diào)用恰恰相反,調(diào)用在發(fā)出之后,就直接返回了,沒有返回結(jié)果。但是結(jié)果怎么辦呢,這個就是本篇文章討論的內(nèi)容,一般被調(diào)用者會通過一系列的信號或者回調(diào)來將結(jié)果告訴調(diào)用者。
而對于阻塞非阻塞是在調(diào)用方在等待消息的時候的狀態(tài),不管是同步還是異步,調(diào)用請求發(fā)出后,如果調(diào)用發(fā)一直守候在那,占用著資源,那就是阻塞的,如果不管它了,先去做別的事情,那就是非阻塞的。
舉一個網(wǎng)絡(luò)上的例子:
老張愛喝茶,廢話不說,煮開水。出場人物:老張,水壺兩把(普通水壺,簡稱水壺;會響的水壺,簡稱響水壺)。
1 老張把水壺放到火上,立等水開。(同步阻塞)
老張覺得自己有點傻
2 老張把水壺放到火上,去客廳看電視,時不時去廚房看看水開沒有。(同步非阻塞)
老張還是覺得自己有點傻,于是變高端了,買了把會響笛的那種水壺。水開之后,能大聲發(fā)出嘀~~~~的噪音。
3 老張把響水壺放到火上,立等水開。(異步阻塞)
老張覺得這樣傻等意義不大
4 老張把響水壺放到火上,去客廳看電視,水壺響之前不再去看它了,響了再去拿壺。(異步非阻塞)
老張覺得自己聰明了。
所謂同步異步,只是對于水壺而言。普通水壺,同步;響水壺,異步。雖然都能干活,但響水壺可以在自己完工之后,提示老張水開了。這是普通水壺所不能及的。同步只能讓調(diào)用者去輪詢自己(情況2中),造成老張效率的低下。所謂阻塞非阻塞,僅僅對于老張而言。立等的老張,阻塞;看電視的老張,非阻塞。情況1和情況3中老張就是阻塞的,媳婦喊他都不知道。雖然3中響水壺是異步的,可對于立等的老張沒有太大的意義。所以一般異步是配合非阻塞使用的,這樣才能發(fā)揮異步的效用。

二.觀察者(Observer )模式

1.觀察者模式介紹

觀察者模式定義了一種一對多的依賴關(guān)系,讓多個觀察者對象同時監(jiān)聽某一個主題對象。這個主題對象在狀態(tài)發(fā)生變化時,會通知所有觀察者對象,使它們能夠自動更新自己。
說得明白些就是有一個被觀察者,還有一些(可能不止一個)觀察者,觀察者通過注冊來綁定被觀察者,當(dāng)被觀察者有變化時,就會通知觀察者。這么一種模式,就叫做觀察者模式。
如下圖:

2.觀察者模式實現(xiàn)

在被觀察者內(nèi)部維護(hù)一個觀察者的數(shù)組,當(dāng)被觀察者改變時,執(zhí)行這個數(shù)組下的所有觀察者的notufy方法即可。java實現(xiàn)例子(簡單,無并發(fā))如下:

//抽象觀察者角色
public interface Watcher
{
    public void update(String str);

}
//抽象主題角色,watched:被觀察
public interface Watched
{
    public void addWatcher(Watcher watcher);

    public void removeWatcher(Watcher watcher);

    public void notifyWatchers(String str);

}

定義具體的觀察者和被觀察者

public class ConcreteWatcher implements Watcher
{

    @Override
    public void update(String str)
    {
        System.out.println(str);
    }

}
import java.util.ArrayList;
import java.util.List;

public class ConcreteWatched implements Watched
{
    // 存放觀察者
    private List<Watcher> list = new ArrayList<Watcher>();

    @Override
    public void addWatcher(Watcher watcher)
    {
        list.add(watcher);
    }

    @Override
    public void removeWatcher(Watcher watcher)
    {
        list.remove(watcher);
    }

    @Override
    public void notifyWatchers(String str)
    {
        // 自動調(diào)用實際上是主題進(jìn)行調(diào)用的
        for (Watcher watcher: list)
        {
            watcher.update(str);
        }
    }

}

上面代碼在被觀察者ConcreteWatched內(nèi)部維護(hù)了一個觀察者的list,當(dāng)被觀察者發(fā)生改變時,調(diào)用 notifyWatchers來調(diào)用所有的觀察者的方法。
當(dāng)然,各個語言有自己已經(jīng)實現(xiàn)好的觀察者模式代碼,不需要自己再額外編寫,并且語言內(nèi)部實現(xiàn)的模式考慮到了資源利用,并發(fā)處理,回收,異常處理等等其他情況,因此推薦使用系統(tǒng)自身的觀察者模式實現(xiàn)。

三.發(fā)布/訂閱(Publish/Subscribe)模式

發(fā)布/訂閱模式和觀察者模式很類似,都有一個數(shù)據(jù)產(chǎn)生方,都有一些數(shù)據(jù)接收方,它們還是有一些不一樣的地方。

如上圖,發(fā)布/訂閱模式有一個調(diào)度中心,發(fā)布者將消息發(fā)布到調(diào)度中心。訂閱者從調(diào)度中心訂閱,并且從調(diào)度中心獲得數(shù)據(jù)。這是一種不同的模式,觀察者模式強(qiáng)調(diào)對象的行為,發(fā)布/訂閱強(qiáng)調(diào)架構(gòu)和組件。
在大多數(shù)的情況下,可以將觀察者模式解耦成發(fā)布/訂閱模式,因此往往很多時候這兩種模式當(dāng)做一種模式,其實問題不大。

四.響應(yīng)式編程(Reactive programming)

響應(yīng)式編程(下面簡稱Rx)在如今的web框架中占的比例越來越多。響應(yīng)式編程的目標(biāo)是提供一致的編程接口, 幫助開發(fā)者更方便的處理異步數(shù)據(jù)流,使軟件開發(fā)更高效、更簡潔。Rx是一個多語言的實現(xiàn),已經(jīng)支持多種語言包括Java、Swift、C++、.NET、JavaScript、Ruby、Groovy、Scala等等,支持的庫包括: RxJava 、 RxSwift 、Rx.NET、RxJS、RXRuby等等。
參考資料:
英文
中文
其實本質(zhì)上,Rx就是對觀察者模式進(jìn)行封裝,一方面使得其擁有基本的異步消息傳遞能力而不需要處理線程同步以及并發(fā)等問題,另外還具備了很多其他功能。
比如android里面的代碼(需要導(dǎo)入RxJava和RxAndroid):

  Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                Log.d(TAG, "call: threadId:" + Thread.currentThread().getId());
                subscriber.onStart();
                subscriber.onNext("Hello World!");
                subscriber.onCompleted();
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "onNext: threadId:" + Thread.currentThread().getId());
                        Log.i(TAG, "onNext: s = " + s);
                    }
                });

可以指定訂閱者和被訂閱者的線程,是io線程還是mainThrad線程,另外還有onCompleted,onError,onNext三個回調(diào)方法,單憑這些,就基本能夠滿足異步的使用要求。
其他的一些Rx框架:
RxJava
RxAndroid
RxSwift
RxJs 4 and RxJs 5
Rx.Net
RxPy

五.js中異步編程的方法

1.回調(diào)函數(shù)(callback)

(1)介紹

函數(shù)A作為參數(shù)(函數(shù)引用)傳遞到另一個函數(shù)B中,并且這個函數(shù)B執(zhí)行函數(shù)A。我們就說函數(shù)A叫做回調(diào)函數(shù)。如果沒有名稱(函數(shù)表達(dá)式),就叫做匿名回調(diào)函數(shù)。
實際上,也就是把函數(shù)作為參數(shù)傳遞。

(2)示例

首先使用一個事例來演示js中的callback:

var i = 0;
function sleep(ms, callback) {
    setTimeout(function () {
        console.log('我執(zhí)行完啦!');
        i++;
        if (i >= 2) callback(new Error('i大于2'), null);
        else callback(null, i);
    }, ms);
}

sleep(3000, function (err,val) {
    if(err) console.log('出錯啦:'+err.message);
    else console.log(val);
})

上面,將callback函數(shù)通過高階函數(shù),參數(shù)的方式傳入進(jìn)去,然后再在里面直接調(diào)用,外面就能夠獲取到數(shù)據(jù)了。

(3)原理

將函數(shù)作為參數(shù)傳入另一個函數(shù),其實這個參數(shù)是一個指針入口,等另一個函數(shù)執(zhí)行完畢,會接下去執(zhí)行這個指針入口處的函數(shù),這樣數(shù)據(jù)就能連貫起來。
注意:回調(diào)的本質(zhì)還是發(fā)布訂閱模式,將函數(shù)通過入?yún)⒔尤耄喈?dāng)于訂閱了這個函數(shù)。
在node中,回調(diào)會在系統(tǒng)的事件循環(huán)中創(chuàng)建一個事件,系統(tǒng)會在每一個Tick訪問這個事件循環(huán),查看是否有回調(diào)產(chǎn)生,有的話執(zhí)行這個回調(diào)函數(shù)。而回調(diào)產(chǎn)生是系統(tǒng)層去產(chǎn)生的,在windows下是IOCP,linux是libuv實現(xiàn)的線程池產(chǎn)生。

(4)小結(jié)

回調(diào)是使用最廣泛的異步編程方式,但是其有幾個最大的缺點:
a.多層嵌套時,可讀性差,如下

step1(function (value1) {
step2(value1, function(value2) {
    step3(value2, function(value3) {
        step4(value3, function(value4) {
            // Do something with value4
        });
    });
});
});

b.異常處理無法在外部捕捉

try{
 setTimeout(function(){
    JSON.parse("{'a':'1'}")
    console.log("aaaa")
 },0)
}
catch(ex){
 console.log(ex); //不能catch到這個異常
}

c.流程不好控制
callback嵌套時的流程繁瑣,對于有依賴的項目不能夠獨立分出來,造成了性能浪費。
比如,當(dāng)C操作依賴于B操作和C操作,而B與A沒有依賴關(guān)系時,不用第三方庫(如async,eventproxy)的話,B與A本可以并行,卻串行了,性能有很大的提升空間。

2.事件(events)(nodejs)

(1)介紹

nodejs內(nèi)部含有events消息模塊,主要用來進(jìn)行消息的傳遞和接收

(2)示例

var events = require('events');//引入模塊
var  x =new events.EventEmitter();//創(chuàng)建實例
x.on('y', function(a,b,c){
 console.log('it\'s work1!'+a+b+c);
});//訂閱一個字段,可以是多個
x.emit('y','111','222', '3333');//發(fā)布一個字段
//注意:需要先訂閱再發(fā)布

(3)原理

events事件其實就是在本地維護(hù)一個key-value的數(shù)組,然后事件觸發(fā)時獲取數(shù)組中的訂閱者,然后運行訂閱者的方法。就是一個js版的發(fā)布訂閱模式。

(4)小結(jié)

events事件模型是發(fā)布訂閱模型在nodejs中的顯示應(yīng)用,當(dāng)然jquery中也有相應(yīng)的插件。引入events可以解決回調(diào)函數(shù)中嵌套過多的情況,還能解決異常不能被捕獲的情況。
雖然解決了嵌套過多的情況,但是每一次都需要發(fā)布和訂閱,會使內(nèi)存使用增多,以及代碼處處是訂閱的情況。

3.Promise

(1)介紹

Promises對象是CommonJS工作組提出的一種規(guī)范,目的是為異步編程提供統(tǒng)一接口。
Promise介紹
簡單說,它的思想是,每一個異步任務(wù)返回一個Promise對象,該對象有一個then方法,允許指定回調(diào)函數(shù)。比如,f1的回調(diào)函數(shù)f2,可以寫成:

f1().then(f2);

(2)示例

var i = 0;
//函數(shù)返回promise
function sleep(ms) {
    return new Promise(function (resolve, reject) {
        setTimeout(function () {
            console.log('我執(zhí)行好了');
            i++;
            if (i >= 2) reject(new Error('i>=2'));
            else resolve(i);
        }, ms);
    })
}

sleep(1000).then(function (val) {
    console.log(val);
    return sleep(1000)
}).then(function (val) {
    console.log(val);
    return sleep(1000)
}).then(function (val) {
    console.log(val);
    return sleep(1000)
}).catch(function (err) {
    console.log('出錯啦:' + err.message);
})

(3)原理

Promise本身是一個通過回調(diào)生成的狀態(tài)機(jī)模型,用兩個數(shù)組分別存成功隊列和失敗隊列,然后then就是向隊列中添加回調(diào)函數(shù),resolve和reject就是更改的狀態(tài),狀態(tài)改變并且觸發(fā)回調(diào)函數(shù)。

(4)小結(jié)

異步執(zhí)行的函數(shù)返回一個Promise對象,表明我只是給出一個承諾,不能立刻給你消息。等執(zhí)行完畢之后就跳回調(diào)用的地方,執(zhí)行then里面的函數(shù),并且將參數(shù)作為入?yún)⒎祷亍?br> Promise的好處是無論什么時候都能夠返回,回調(diào)函數(shù)會立馬執(zhí)行。另一方面采用鏈?zhǔn)教幚恚苊饬嘶卣{(diào)的函數(shù)嵌套。能夠catch所有的錯誤,因此不必?fù)?dān)心捕捉不到錯誤。與事件events相比,Promise里面的狀態(tài)(resolved,rejected)只要發(fā)生就固定了,不會改變,而事件events中必須及時去監(jiān)聽,如果錯過了,那就監(jiān)聽不到了。
Promise目前也有一些缺點,不能取消,
Promise從ES6提出,主流的瀏覽器和js環(huán)境基本都支持了Promise的特性,目前使用越來越廣泛。

4.async/await

(1)介紹

這中間其實還有一個異步方案Generator,但是自從async/await出來之后,跟Promise結(jié)合緊密,因此完全可以使用Promise+async/await來進(jìn)行js的終極異步方案了。
不得不說,javascript在這一步落后了C#一大截,不過不算晚。async/await已經(jīng)正式在ES7亮相。
async/await在node7.0中出現(xiàn),需要使用harmony模式運行,在7.6以上就能夠直接使用了。

(2)示例

定義一個異步函數(shù):

async function fn(){
return 0;
}

其實返回的就是一個Promise。
await寫在async中,此處promise其實就是C#中的Task,async和await和C#中的async/await使用一樣。
所以,只要是Task就能夠await,而不一定是async返回的函數(shù)才能await。比如,一個http request返回的是一個promise,就能夠進(jìn)行await進(jìn)行同步,或者一個settimeout的函數(shù),返回的是promise,也能使用await進(jìn)行同步,如下:

const request = require('request');

const options = {
  url: '******',
  headers: {
    'User-Agent': 'request'
  }
};

const getRepoData = () => {  //一個http request
  return new Promise((resolve, reject) => {
    request(options, (err, res, body) => {
      if (err) {
        reject(err);
      }
      resolve(body);
    });
  });
};

async function asyncFun() {//一個有http request的異步方法
 try {
    const value = await getRepoData();
    // ... 和上面的yield類似,如果有多個異步流程,可以放在這里,比如
    // const r1 = await getR1();
    // const r2 = await getR2();
    // const r3 = await getR3();
    // 每個await相當(dāng)于暫停,執(zhí)行await之后會等待它后面的函數(shù)(不是generator)返回值之后再執(zhí)行后面其它的await邏輯。
    return value;
  } catch (err) {
    console.log(err);
  }
}

asyncFun().then(x => console.log(`x: ${x}`)).catch(err => console.error(err));

(3)原理

async/await是一個語法糖,內(nèi)部原理還是和Promise一樣,使用回調(diào)和狀態(tài)機(jī)進(jìn)行控制,然后使用await進(jìn)行阻塞控制同步,達(dá)到控制流程的目的,只是。。這使用方法和習(xí)慣也太像C#了。。

(4)小結(jié)

使用Promise處理異步函數(shù),使用async/await處理異步函數(shù)的同步和步驟控制問題。async/await很好用,

5.RxJs

(1)介紹

RxJs是Rx家族的js版本,目前由ReactiveX組織維護(hù),github倉庫點此,它的第五版正在開發(fā),是第四版的重構(gòu)版本。
RxJs是Promise的高級版本,包含了Promise中一些沒有的特性,比如cancel屬性。

(2)示例

例子采用Rxjs 4

/* Get stock data somehow */
const source = getAsyncStockData();

const subscription = source
  .filter(quote => quote.price > 30)
  .map(quote => quote.price)
  .subscribe(
    price => console.log(`Prices higher than $30: ${price}`),
    err => console.log(`Something went wrong: ${err.message}`)
  );

/* When we're done */
subscription.dispose();

使用也很簡單,訂閱一個異步數(shù)據(jù)流source,然后采用builder的形式對數(shù)據(jù)進(jìn)行處理,簡潔明了。最后釋放資源。和其他Rx的使用類似。

(3)原理

RxJs是js中的觀察者模式,是比events更加高級的一種封裝。

(4)小結(jié)

RxJs在總體上是Promise的升級版,添加了cancel,可以emit多個值。一般的中小型項目中采用Promise+async/await已經(jīng)足夠,除非是一些大型項目,需要進(jìn)行一些復(fù)雜的操作,比如取消操作,多值傳遞等等。

6.總結(jié)

以上5種+Generator都是js中的異步處理方案,在條件允許下,盡量使用Promise+async/await進(jìn)行異步處理和流程控制,個人認(rèn)為的優(yōu)先級 async/await > Promise/Generator > events > callback

六.其他語言中的異步編程方法

其實語言之間相差不大,有些思想值得相互借鑒

1.python

(1)協(xié)程(coroutine)
(2)yield,生成器(Generator)
(3)yield from (從python3.3開始)
(4)asyncio模塊(從python3.4開始)
(5)async/await(從python3.5開始)//推薦
(6)附加:RxPy,看來有了async/await就沒必要了
于是乎~python從3.5版本開始也提供了async/await來支持原生協(xié)程

2.java

(1)Future 和 FutureTask,類似于js中的Promise,或者C#中的Task
(2)第三方框架Netty,就是實現(xiàn)了一整套從Future,到callback的異步框架
(3)RxJava
(4)第三方的事件,消息等等

3.C#中的異步編程

(1)async/await //推薦
無疑C#中的異步編程思想是超前的,在C#5.0版本就推出了async/await異步流程控制,配合Task的異步方法任務(wù),達(dá)到了異步編程的目的。
(2)Rx.Net

七.異步編程總結(jié)

1.未來的異步編程主要分為兩大類

(1)以async/await +FutureTask/Promise/Task為主的思想,由語言提供原生的代碼支持

(2)由Rx提供第三方的消息形式的發(fā)布訂閱模式。 目前在國內(nèi)還不溫不火。

希望越來越多的語言能夠體驗到async/await語法的便利之處。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容