chap8 高性能異步編程框架和中間件
-
Netty
Netty框架將網(wǎng)絡(luò)編程邏輯與業(yè)務(wù)邏輯處理分離開(kāi)來(lái),其內(nèi)部會(huì)自動(dòng)處理好網(wǎng)絡(luò)與異步處理邏輯,讓我們專心寫自己的業(yè)務(wù)處理邏輯。同時(shí),Netty的異步非阻塞能力與CompletableFuture結(jié)合可以讓我們輕松實(shí)現(xiàn)網(wǎng)絡(luò)請(qǐng)求的異步調(diào)用
Netty之所以能提供高性能網(wǎng)絡(luò)通信,其中一個(gè)原因是它使用Reactor線程模型。在Netty中,每個(gè)EventLoopGroup本身都是一個(gè)線程池,其中包含了自定義個(gè)數(shù)的NioEventLoop,每個(gè)NioEventLoop是一個(gè)線程,并且每個(gè)NioEventLoop里面持有自己的NIO Selector選擇器。在Netty中,客戶端持有一個(gè)EventLoopGroup用來(lái)處理網(wǎng)絡(luò)IO操作;在服務(wù)器端持有兩個(gè)EventLoopGroup,其中boss組是專門用來(lái)接收客戶端發(fā)來(lái)的TCP鏈接請(qǐng)求的,worker組是專門用來(lái)處理完成三次握手的鏈接套接字的網(wǎng)絡(luò)IO請(qǐng)求的
在Netty中,NioEventLoop是EventLoop的一個(gè)實(shí)現(xiàn),每個(gè)NioEventLoop中會(huì)管理自己的一個(gè)selector選擇器和監(jiān)控選擇器就緒事件的線程;每個(gè)Channel在整個(gè)生命周期中固定關(guān)聯(lián)到某一個(gè)NioEventLoop;但是,每個(gè)NioEventLoop中可以關(guān)聯(lián)多個(gè)Channel
ChannelPipeline:Netty中的ChannelPipeline類似于Tomcat容器中的Filter鏈,屬于設(shè)計(jì)模式中的責(zé)任鏈模式,其中鏈上的每個(gè)節(jié)點(diǎn)就是一個(gè)ChannelHandler。在Netty中,每個(gè)Channel有屬于自己的ChannelPipeline,管線中的處理器會(huì)對(duì)從Channel中讀取或者要寫入Channel中的數(shù)據(jù)進(jìn)行依次處理
每個(gè)NioEventLoopGroup里面包含了多個(gè)Nio EventLoop,每個(gè)NioEventLoop中包含了一個(gè)NIO Selector、一個(gè)隊(duì)列、一個(gè)線程;其中線程用來(lái)做輪詢注冊(cè)到Selector上的Channel的讀寫事件和對(duì)投遞到隊(duì)列里面的事件進(jìn)行處理
每個(gè)NioEventLoop中會(huì)管理好多客戶端發(fā)來(lái)的連接,并通過(guò)循環(huán)輪詢處理每個(gè)連接的讀寫事件
-
Netty之所以說(shuō)是異步非阻塞網(wǎng)絡(luò)框架,是因?yàn)橥ㄟ^(guò)NioSocketChannel的write系列方法向連接里面寫入數(shù)據(jù)時(shí)是非阻塞的,是可以馬上返回的(即使調(diào)用寫入的線程是我們的業(yè)務(wù)線程)
- 如果調(diào)用線程是IO線程,則會(huì)在IO線程上執(zhí)行寫入
- 如果發(fā)現(xiàn)調(diào)用線程不是IO線程,則會(huì)把寫入請(qǐng)求封裝為WriteTask并投遞到與其對(duì)應(yīng)的NioEventLoop中的隊(duì)列里面,然后等其對(duì)應(yīng)的NioEventLoop中的線程輪詢連接套接字的讀寫事件時(shí)捎帶從隊(duì)列里面取出來(lái)并執(zhí)行
- 每個(gè)NioSocketChannel對(duì)應(yīng)的讀寫事件都是在與其對(duì)應(yīng)的NioEvent Loop管理的單線程內(nèi)執(zhí)行的,不存在并發(fā),所以無(wú)須加鎖處理
使用Netty框架進(jìn)行網(wǎng)絡(luò)通信時(shí),當(dāng)我們發(fā)起請(qǐng)求后請(qǐng)求會(huì)馬上返回,而不會(huì)阻塞我們的業(yè)務(wù)調(diào)用線程;如果我們想要獲取請(qǐng)求的響應(yīng)結(jié)果,也不需要業(yè)務(wù)調(diào)用線程使用阻塞的方式來(lái)等待,而是當(dāng)響應(yīng)結(jié)果出來(lái)時(shí)使用IO線程異步通知業(yè)務(wù),由此可知,在整個(gè)請(qǐng)求-響應(yīng)過(guò)程中,業(yè)務(wù)線程不會(huì)由于阻塞等待而不能干其他事情
-
完成TCP三次握手的套接字應(yīng)該注冊(cè)到worker線程池中的哪一個(gè)NioEventLoop的Selector上
- 關(guān)于NioEventLoop的分配,采用輪詢?nèi)∧5姆绞絹?lái)進(jìn)行分配
-
如果NioEventLoop中的線程負(fù)責(zé)監(jiān)聽(tīng)注冊(cè)到Selector上的所有連接的讀寫事件和處理隊(duì)列里面的消息,那么會(huì)不會(huì)導(dǎo)致由于處理隊(duì)列里面任務(wù)耗時(shí)太長(zhǎng)導(dǎo)致來(lái)不及處理連接的讀寫事件
Netty默認(rèn)是采用時(shí)間均分策略來(lái)避免某一方處于饑餓狀態(tài)
處理所有注冊(cè)到當(dāng)前NioEventLoop的Selector上的所有連接套接字的讀寫事件
統(tǒng)計(jì)其耗時(shí),默認(rèn)情況下ioRatio為50
使用相同的時(shí)間來(lái)運(yùn)行隊(duì)列里面的任務(wù),也就是處理套接字讀寫事件與運(yùn)行隊(duì)列里面任務(wù)是使用時(shí)間片輪轉(zhuǎn)方式輪詢執(zhí)行
landon TODO runAllTasks中如何保證運(yùn)行隊(duì)列任務(wù)的時(shí)間和io一樣?任務(wù)執(zhí)行的時(shí)間不固定啊
// 從隊(duì)列里拿出一個(gè)任務(wù) Runnable task = pollTask(); ... // 設(shè)定deadline final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0 ... // 無(wú)限循環(huán) for (;;) { // 執(zhí)行任務(wù),任務(wù)計(jì)數(shù) safeExecute(task); runTasks ++; // landon-這里代碼是核心,注釋寫的也比較清楚,當(dāng)任務(wù)計(jì)數(shù)到了64個(gè)的時(shí)候才執(zhí)行一次檢測(cè),如果執(zhí)行時(shí)間超時(shí),則直接break,所以這里如果有某個(gè)任務(wù)執(zhí)行時(shí)間特別長(zhǎng),則是很有可能超過(guò)deadline的 // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } // landon-這里就是如果沒(méi)有到64個(gè)任務(wù),隊(duì)列就沒(méi)有,直接break.所以從代碼上看,如果有很多隊(duì)列的任務(wù)都很耗時(shí),但是又沒(méi)有超過(guò)64個(gè),則肯定會(huì)導(dǎo)致指定執(zhí)行隊(duì)列的時(shí)間過(guò)長(zhǎng)的,但是從設(shè)計(jì)上看,應(yīng)該任務(wù)都會(huì)很快執(zhí)行的 task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } }
-
多個(gè)套接字注冊(cè)到同一個(gè)NioEventLoop的Selector上,使用單線程輪詢處理每個(gè)套接字上的事件,如果某一個(gè)套接字網(wǎng)絡(luò)請(qǐng)求比較頻繁,輪詢線程是不是會(huì)一直處理該套接字的請(qǐng)求,而使其他套接字請(qǐng)求得不到及時(shí)處理
- 默認(rèn)情況下maxMessagePerRead為16,所以對(duì)應(yīng)NioEventLoop管理的每個(gè)NioSocketChannel中的數(shù)據(jù),在一次事件循環(huán)內(nèi)最多連續(xù)讀取16次數(shù)據(jù),并不會(huì)一直讀取,這就有效避免了其他NioSocketChannel的請(qǐng)求事件得不到及時(shí)處理的情況
-
基于Netty與CompletableFuture實(shí)現(xiàn)RPC異步調(diào)用
-
thenCombine
兩個(gè)CompletionStage是并行執(zhí)行的,它們之間并沒(méi)有先后依賴順序,
other并不會(huì)等待先前的CompletableFuture執(zhí)行完畢后再執(zhí)行其實(shí)從功能上來(lái)講,它們的功能更類似
thenAcceptBoth,只不過(guò)thenAcceptBoth是純消費(fèi),它的函數(shù)參數(shù)沒(méi)有返回值,而thenCombine的函數(shù)參數(shù)fn有返回值landon,所以合并之后的任務(wù)執(zhí)行默認(rèn)線程應(yīng)該是不確定的
@Sharable注解是讓服務(wù)端所有接收的鏈接對(duì)應(yīng)的channel復(fù)用同一個(gè)NettyServerHandler實(shí)例,這里可以使用@Sharable方式是因?yàn)镹ettyServer Handler內(nèi)的處理是無(wú)狀態(tài)的,不會(huì)存在線程安全問(wèn)題
-
rpcSyncCall
- 創(chuàng)建一個(gè)CompletableFuture future
- 創(chuàng)建一個(gè)消息,有消息id,異步發(fā)送
- 保存future上下文,<id,future>
- future.get同步等待結(jié)果
- 在handler收到消息的時(shí)候,從上下文拿到future,調(diào)用future.complete
-
rpcAsyncCall
- 則不是調(diào)用future.get等待結(jié)果,而是直接返回
- 然后future.whenComplete指定回調(diào)
-
可以把異步調(diào)用改造為Reactive編程風(fēng)格,只需要把返回的CompletableFuture轉(zhuǎn)換為Flowable即可
使用defer,當(dāng)訂閱的時(shí)候才執(zhí)行rpc操作
-
future.whenComplete時(shí),發(fā)射結(jié)果,這里使用了ReplayProcessor創(chuàng)建含有一個(gè)元素的流
landon-這里用到了Processor
發(fā)起rpc調(diào)用后馬上返回了一個(gè)Flowable流對(duì)象,但這時(shí)真正的rpc調(diào)用還沒(méi)有發(fā)出去,等代碼3訂閱了流對(duì)象時(shí)才真正發(fā)起rpc調(diào)用
由于CompletableFuture是可以設(shè)置回調(diào)函數(shù)的,所以把其轉(zhuǎn)換為Reactive風(fēng)格編程很容易
-
-
Dubbo
-
概述
- Provider為服務(wù)提供者集群,服務(wù)提供者負(fù)責(zé)暴露提供的服務(wù),并將服務(wù)注冊(cè)到服務(wù)注冊(cè)中心
- Consumer為服務(wù)消費(fèi)者集群,服務(wù)消費(fèi)者通過(guò)RPC遠(yuǎn)程調(diào)用服務(wù)提供者提供的服務(wù)
- Registry負(fù)責(zé)服務(wù)注冊(cè)與發(fā)現(xiàn)
- Monitor為統(tǒng)計(jì)服務(wù)的調(diào)用次數(shù)和調(diào)用時(shí)間的監(jiān)控中心
- 服務(wù)提供者在啟動(dòng)時(shí)會(huì)將自己提供的服務(wù)注冊(cè)到服務(wù)注冊(cè)中心
- 服務(wù)消費(fèi)者在啟動(dòng)時(shí)會(huì)去服務(wù)注冊(cè)中心訂閱自己所需服務(wù)的地址列表,由服務(wù)注冊(cè)中心向它異步返回其所需服務(wù)接口的提供者的地址列表,再由服務(wù)消費(fèi)者根據(jù)路由規(guī)則和設(shè)置的負(fù)載均衡算法選擇一個(gè)服務(wù)提供者IP進(jìn)行調(diào)用
- 監(jiān)控平臺(tái)主要用來(lái)統(tǒng)計(jì)服務(wù)的調(diào)用次數(shù)和調(diào)用耗時(shí),服務(wù)消費(fèi)者和提供者,在內(nèi)存中累計(jì)調(diào)用次數(shù)和調(diào)用耗時(shí),并定時(shí)每分鐘發(fā)送一次統(tǒng)計(jì)數(shù)據(jù)到監(jiān)控中心,監(jiān)控中心則使用數(shù)據(jù)繪制圖表來(lái)顯示。監(jiān)控平臺(tái)不是分布式系統(tǒng)必須有的,但是這些數(shù)據(jù)有助于系統(tǒng)運(yùn)維和調(diào)優(yōu)。服務(wù)提供者和消費(fèi)者可以直接配置監(jiān)控平臺(tái)的地址,也可以通過(guò)服務(wù)注冊(cè)中心來(lái)獲取
-
dubbo的異步調(diào)用
- Dubbo框架中的異步調(diào)用是發(fā)生在服務(wù)消費(fèi)端的,異步調(diào)用實(shí)現(xiàn)基于NIO的非阻塞能力實(shí)現(xiàn)并行調(diào)用,服務(wù)消費(fèi)端不需要啟動(dòng)多線程即可完成并行調(diào)用多個(gè)遠(yuǎn)程服務(wù),相比多線程其開(kāi)銷較小
- 當(dāng)服務(wù)消費(fèi)端發(fā)起RPC調(diào)用時(shí)使用的是用戶線程(步驟1),請(qǐng)求會(huì)被轉(zhuǎn)換為IO線程(步驟2),具體向遠(yuǎn)程服務(wù)提供方發(fā)起遠(yuǎn)程調(diào)用
- 步驟2的IO線程使用NIO發(fā)起遠(yuǎn)程調(diào)用,用戶線程通過(guò)步驟3創(chuàng)建了一個(gè)Future對(duì)象,然后通過(guò)步驟4將其設(shè)置到RpcContext中
- 然后用戶線程則可以在某個(gè)時(shí)間從RpcContext中獲取設(shè)置的Future對(duì)象(步驟5),并且通過(guò)步驟6設(shè)置回調(diào)函數(shù),這樣用戶線程就返回了
- 當(dāng)服務(wù)提供方返回結(jié)果(步驟7)后,調(diào)用方線程模型中的線程池線程會(huì)把結(jié)果通過(guò)步驟8寫入Future,然后就會(huì)回調(diào)注冊(cè)的回調(diào)函數(shù)
- 調(diào)用線程異步調(diào)用發(fā)起后會(huì)馬上返回一個(gè)Future,并在Future上設(shè)置一個(gè)回調(diào)函數(shù),然后調(diào)用線程就可以忙自己的事情去了,不需要同步等待服務(wù)提供方返回結(jié)果。當(dāng)服務(wù)提供方返回結(jié)果時(shí),調(diào)用方的IO線程會(huì)把響應(yīng)結(jié)果傳遞給Dubbo框架內(nèi)部線程池中的線程,后者則會(huì)回調(diào)注冊(cè)的回調(diào)函數(shù),由此可見(jiàn),在整個(gè)過(guò)程中,發(fā)起異步調(diào)用的用戶線程是不會(huì)被阻塞的
- 首先考慮在一個(gè)線程(記為線程A)中通過(guò)RPC請(qǐng)求獲取服務(wù)B和服務(wù)C的數(shù)據(jù),然后基于兩者的結(jié)果做一些事情。在同步RPC調(diào)用情況下,線程A在調(diào)用服務(wù)B后需要等待服務(wù)B返回結(jié)果,才可以對(duì)服務(wù)C發(fā)起調(diào)用,等服務(wù)C返回結(jié)果后才可以結(jié)合服務(wù)B和服務(wù)C的結(jié)果做一件事
- 線程A同步獲取服務(wù)B的結(jié)果后,再同步調(diào)用服務(wù)C獲取結(jié)果,可見(jiàn)在同步調(diào)用的情況下,線程A必須按順序?qū)Χ鄠€(gè)服務(wù)請(qǐng)求進(jìn)行調(diào)用,因而調(diào)用線程必須等待,這顯然會(huì)浪費(fèi)資源。在Dubbo中,使用異步調(diào)用可以避免這個(gè)問(wèn)題。兩次異步遠(yuǎn)程過(guò)程調(diào)用,并行的
-
dubbo的異步執(zhí)行
Dubbo框架的異步執(zhí)行是發(fā)生在服務(wù)提供端的,在Provider端非異步執(zhí)行時(shí),其對(duì)調(diào)用方發(fā)來(lái)的請(qǐng)求的處理是在Dubbo內(nèi)部線程模型的線程池中的線程來(lái)執(zhí)行的,在Dubbo中服務(wù)提供方提供的所有服務(wù)接口都使用這一個(gè)線程池來(lái)執(zhí)行,所以當(dāng)一個(gè)服務(wù)執(zhí)行比較耗時(shí)時(shí),可能會(huì)占用線程池中的很多線程,這可能就會(huì)影響到其他服務(wù)的處理
Provider端異步執(zhí)行則將服務(wù)的處理邏輯從Dubbo內(nèi)部線程池切換到業(yè)務(wù)自定義線程,避免Dubbo線程池中線程被過(guò)度占用,有助于避免不同服務(wù)間的互相影響
Provider端異步執(zhí)行對(duì)節(jié)省資源和提升RPC響應(yīng)性能是沒(méi)有效果的,這是因?yàn)槿绻?wù)處理比較耗時(shí),雖然不是使用Dubbo框架的內(nèi)部線程,但還是需要業(yè)務(wù)自己的線程來(lái)處理,另外副作用還有會(huì)新增一次線程上下文切換(從Dubbo內(nèi)部線程池線程切換到業(yè)務(wù)線程)
Landon:這里指provider的線程模型,即從io線程到dubbo的內(nèi)部線程再到業(yè)務(wù)線程
和游戲服務(wù)器模型基本一致
注意:這個(gè)切換到業(yè)務(wù)線程執(zhí)行對(duì)節(jié)省資源和提升RPC響應(yīng)性能是沒(méi)有效果的,而客戶端異步調(diào)用這邊是有作用的,因?yàn)椴粫?huì)阻塞調(diào)用線程
-
Dubbo中提供了兩種異步處理方法
- 使用AsyncContext實(shí)現(xiàn)異步執(zhí)行
- 用RpcContext.startAsync()開(kāi)啟服務(wù)異步執(zhí)行,然后返回一個(gè)asyncContext
- 把服務(wù)處理任務(wù)提交到業(yè)務(wù)線程池后sayHello方法就直接返回了null
- 同時(shí)也釋放了Dubbo內(nèi)部線程池中的線程
- 具體業(yè)務(wù)處理邏輯則在自定義業(yè)務(wù)線程池內(nèi)執(zhí)行,任務(wù)內(nèi)首先執(zhí)行代碼2.2切換任務(wù)的上下文,這是因?yàn)镽pcContext.getContext()是ThreadLocal變量,不能跨線程,這里切換上下文就是為了把保存的上下文內(nèi)容設(shè)置到當(dāng)前線程內(nèi)
- 最后把任務(wù)執(zhí)行結(jié)果寫入異步上下文
- 基于CompletableFuture簽名的接口實(shí)現(xiàn)異步執(zhí)行
- 基于定義CompletableFuture簽名的接口實(shí)現(xiàn)異步執(zhí)行需要接口方法返回值為CompletableFuture
- 方法內(nèi)部使用CompletableFuture.supplyAsync讓本來(lái)應(yīng)由Dubbo內(nèi)部線程池中線程處理的服務(wù),轉(zhuǎn)為由業(yè)務(wù)自定義線程池中線程來(lái)處理,所以Dubbo內(nèi)部線程池線程會(huì)得到及時(shí)釋放
- 調(diào)用sayHello方法的線程是Dubbo線程模型線程池中的線程,而業(yè)務(wù)在bizThreadpool中的線程處理,所以代碼2.1保存了RpcContext對(duì)象(ThreadLocal變量),以便在業(yè)務(wù)處理線程中使用
- 使用AsyncContext實(shí)現(xiàn)異步執(zhí)行
-
Dubbo demo
-
mac安裝zookeeper
% brew info zookeeper zookeeper: stable 3.5.7 (bottled), HEAD Centralized server for distributed coordination of services https://zookeeper.apache.org/ Not installed From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/zookeeper.rb ==> Dependencies Build: ant ?, autoconf ?, automake ?, libtool ?, pkg-config ? ==> Options --HEAD Install HEAD version ==> Caveats To have launchd start zookeeper now and restart at login: brew services start zookeeper Or, if you don't want/need a background service you can just run: zkServer start // 注: 2020.03.26 看zk官網(wǎng)zk的版本是3.6.0 % brew install zookeeper /usr/local/Cellar/zookeeper/3.5.7 // 配置文件目錄 /usr/local/etc/zookeeper // 啟動(dòng)zk % zkServer start /usr/bin/java ZooKeeper JMX enabled by default Using config: /usr/local/etc/zookeeper/zoo.cfg Starting zookeeper ... STARTED clientPort=2181 % zkServer status /usr/bin/java ZooKeeper JMX enabled by default Using config: /usr/local/etc/zookeeper/zoo.cfg Client port found: 2181. Client address: localhost. Error contacting service. It is probably not running. // provider啟動(dòng)連zk報(bào)錯(cuò),Exception in thread "main" java.lang.IllegalStateException: zookeeper not connected // 查看日志 /usr/local/etc/zookeeper 有l(wèi)og4j.properties log4j.appender.zklog.File = /usr/local/var/log/zookeeper/zookeeper.log 2020-03-26 14:21:11 NIOServerCnxnFactory [ERROR] Thread Thread[main,5,main] died java.lang.NoSuchMethodError: java.nio.ByteBuffer.clear()Ljava/nio/ByteBuffer; at org.apache.jute.BinaryOutputArchive.stringToByteBuffer(BinaryOutputArchive.java:77) 2020-03-26 14:17:27 NIOServerCnxnFactory [ERROR] Thread Thread[NIOWorkerThread-1,5,main] died java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer; % zkCli /usr/bin/java Connecting to localhost:2181 Welcome to ZooKeeper! % brew upgrade zookeeper // 臨時(shí)解決辦法 1. 手動(dòng)去下載zookeeper 3.5.7 2. 拷貝zookeeper-3.5.7.jar和zookeeper-jute-3.5.7.jar到/usr/local/Cellar/zookeeper/3.5.7/libexec 3. 刪除原有zookeeper-3.5.6-SNAPSHOT.jar - 猜測(cè)是是通過(guò)brew下載的zookeeper-3.5.6-SNAPSHOT.jar // 成功 % zkServer status /usr/bin/java ZooKeeper JMX enabled by default Using config: /usr/local/etc/zookeeper/zoo.cfg Client port found: 2181. Client address: localhost. Mode: standalone // 順便裝上maven % brew info maven maven: stable 3.6.3 Java-based project management https://maven.apache.org/ Conflicts with: mvnvm (because also installs a 'mvn' executable) Not installed From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/maven.rb ==> Dependencies Required: openjdk ? % brew install maven // 配合文件 /usr/local/Cellar/maven/3.6.3_1/libexec/conf -
provider application兩種方式
- startWithBootstrap和startWithExport,推薦前者
-
consumer application兩種方式
- runWithBootstrap和runWithRefer,推薦前者
- ReferenceConfig對(duì)象內(nèi)部封裝了所有通訊細(xì)節(jié),對(duì)象較重,請(qǐng)緩存復(fù)用
- ReferenceConfigCache.getCache().get(reference)
- 傳統(tǒng)的dubbo服務(wù)面向接口編程,如果需要調(diào)用其他服務(wù)則需要引入該服務(wù)對(duì)應(yīng)的接口
- 比如跨語(yǔ)言接口,所以dubbo支持api泛化調(diào)用,即invoke傳入接口名字,method和parameter等即可
- ReferenceConfig<GenericService>
-
異步調(diào)用
-
最簡(jiǎn)單的就是服務(wù)接口返回CompletableFuture,接口實(shí)現(xiàn)則是使用CompletableFuture.supplyAsync,傳入自定義線程池
- 第二種接口實(shí)現(xiàn)是使用Serverlet 3.0的異步接口
AsyncContext- 注意:Dubbo提供了一個(gè)類似Serverlet 3.0的異步接口
AsyncContext,在沒(méi)有CompletableFuture簽名接口的情況下,也可以實(shí)現(xiàn)Provider端的異步執(zhí)行 - 這個(gè)是指沒(méi)有CompletableFuture簽名接口
- 注意:Dubbo提供了一個(gè)類似Serverlet 3.0的異步接口
- 這種是服務(wù)端異步執(zhí)行。Provider端異步執(zhí)行將阻塞的業(yè)務(wù)從Dubbo內(nèi)部線程池切換到業(yè)務(wù)自定義線程,避免Dubbo線程池的過(guò)度占用,有助于避免不同服務(wù)間的互相影響。異步執(zhí)行無(wú)益于節(jié)省資源或提升RPC響應(yīng)性能,因?yàn)槿绻麡I(yè)務(wù)執(zhí)行需要阻塞,則始終還是要有線程來(lái)負(fù)責(zé)執(zhí)行。
- 第二種接口實(shí)現(xiàn)是使用Serverlet 3.0的異步接口
-
對(duì)于consumer
- 接口直接返回結(jié)果,實(shí)現(xiàn)也直接返回
- 但是consumer這邊,reference.setAsync(true)
- 這樣consumer這邊調(diào)用接口就直接返回了
- 然后通過(guò)RpcContext.getContext().getCompletableFuture()獲得future并設(shè)置complete回調(diào)
- 另外一種是不設(shè)置reference.setAsync(true)
- 而是直接context.asyncCall,傳入一個(gè)callable執(zhí)行同步方法,并返回CompletableFuture
- 這兩種相當(dāng)于接口的實(shí)現(xiàn)是同步的,但是在調(diào)用端執(zhí)行了異步,調(diào)用即返回然后回調(diào)
-
總結(jié)
consumer可以異步執(zhí)行,此時(shí)接口可以是同步的,可以直接返回,也可以用CompletableFuture.completedFuture包裝
provider可以異步執(zhí)行,接口必須要是CompletableFuture
-
兩者解決的問(wèn)題不同。如果provider端的服務(wù)比較耗時(shí),建議切到自定義業(yè)務(wù)線程池。而consumer端則通常都是異步的,不影響調(diào)用端
- 即兩端都可以異步,無(wú)論接口定義是什么樣子的
-
如果你只有這樣的同步服務(wù)定義,而又不喜歡RpcContext的異步使用方式。那還有一種方式,就是利用Java 8提供的default接口實(shí)現(xiàn),重載一個(gè)帶有帶有CompletableFuture簽名的方法
這個(gè)指的consumer端
https://github.com/apache/dubbo-async-processor#compiler-hacker-processer
-
在測(cè)試過(guò)程中無(wú)法找到AsyncSignal這個(gè)類,參數(shù)用了另外一個(gè)字段,測(cè)試倒是通過(guò)。所以猜測(cè)AsyncSignal只是示例,相當(dāng)于一個(gè)標(biāo)識(shí)符,‘specially designed to distinguish async method’
public interface GreetingsService { String sayHi(String name); // AsyncSignal is totally optional, you can use any parameter type as long as java allows your to do that. default CompletableFuture<String> sayHi(String name, AsyncSignal signal) { return CompletableFuture.completedFuture(sayHi(name)); } } -
另外一種實(shí)現(xiàn)是Compiler hacker processer,第一種是AsyncSignal
-
就是新寫一個(gè)方法,重命名,如sayHiAsync,返回CompletableFuture
public interface GreetingsService { String sayHi(String name); // Any name is ok default CompletableFuture<String> sayHiAsync(String name) { return CompletableFuture.completedFuture(sayHi(name)); } } 缺點(diǎn)是之前的方法就不再起作用
The essential part is to overwrite a new async method. Another way would be to generate a new method with a different name, for example,
sayHiAsync, then we can get rid ofAsyncSignal. But there's an obvious flaw of this approach, that is, all method level configurators and routers defined tosayHiwill not take effect anymore.
-
-
-
Disruptor
Disruptor是一個(gè)高性能的線程間消息傳遞庫(kù)
-
要理解Disruptor是什么,最好的方法是將它與目前你已經(jīng)很好地理解且與之非常相似的東西進(jìn)行比較,例如與Java的BlockingQueue進(jìn)行對(duì)比。與隊(duì)列一樣,Disruptor的目的也是在同一進(jìn)程內(nèi)的線程之間傳遞數(shù)據(jù)(例如消息或事件)
- Disruptor中的同一個(gè)消息會(huì)向所有消費(fèi)者發(fā)送,即多播能力(Multicast Event)
- 為事件預(yù)先分配內(nèi)存(Event Preallocation),避免運(yùn)行時(shí)因頻繁地進(jìn)行垃圾回收與內(nèi)存分配而增加開(kāi)銷
- 可選擇無(wú)鎖(Optionally Lock-free),使用兩階段協(xié)議,讓多個(gè)線程可同時(shí)修改不同元素
- 緩存行填充,避免偽共享(prevent false sharing)
Disruptor使用Sequence作為識(shí)別特定組件所在位置的方法。每個(gè)消費(fèi)者(EventProcessor)都像Disruptor本身一樣維護(hù)一個(gè)Sequence
Sequencer是Disruptor的真正核心。該接口的2個(gè)實(shí)現(xiàn)(單生產(chǎn)者和多生產(chǎn)者)實(shí)現(xiàn)了所有并發(fā)算法,用于在生產(chǎn)者和消費(fèi)者之間快速、正確地傳遞數(shù)據(jù)
Wait Strategy:等待策略,確定消費(fèi)者如何等待生產(chǎn)者將事件放入Disruptor
每個(gè)消費(fèi)者持有自己的當(dāng)前消費(fèi)序號(hào),由于是環(huán)形buffer,因而生產(chǎn)者寫入事件時(shí)要看序號(hào)最小的消費(fèi)者序號(hào),以避免覆蓋還沒(méi)有被消費(fèi)的事件
Disruptor具有多播能力(Multicast),這是Java中隊(duì)列和Disruptor之間最大的行為差異。當(dāng)有多個(gè)消費(fèi)者在同一個(gè)Disruptor上監(jiān)聽(tīng)事件時(shí),所有事件都會(huì)發(fā)布給所有消費(fèi)者,而Java隊(duì)列中的每個(gè)事件只會(huì)發(fā)送給某一個(gè)消費(fèi)者。Disruptor的行為旨在用于需要對(duì)同一數(shù)據(jù)進(jìn)行獨(dú)立的多個(gè)并行操作的情況
Disruptor的目標(biāo)之一是在低延遲環(huán)境中使用。在低延遲系統(tǒng)中,必須減少或移除運(yùn)行時(shí)內(nèi)存分配;在基于Java的系統(tǒng)中,目的是減少由于垃圾收集導(dǎo)致的系統(tǒng)停頓。為了支持這一點(diǎn),用戶可以預(yù)先為Disruptor中的事件分配其所需的存儲(chǔ)空間(也就是聲明Ring Buffer的大小)。在構(gòu)造Ring Buffer期間,EventFactory由用戶提供,并將在Disruptor的Ring Buffer中每個(gè)事件元素創(chuàng)建時(shí)被調(diào)用。將新數(shù)據(jù)發(fā)布到Disruptor時(shí),API將允許用戶獲取構(gòu)造的對(duì)象,以便調(diào)用方法或更新該存儲(chǔ)對(duì)象上的字段,Disruptor保證這些操作只要正確實(shí)現(xiàn)就是并發(fā)安全的
低延遲期望推動(dòng)的另一個(gè)關(guān)鍵實(shí)現(xiàn)細(xì)節(jié)是使用無(wú)鎖算法來(lái)實(shí)現(xiàn)Disruptor,所有內(nèi)存可見(jiàn)性和正確性保證都是使用內(nèi)存屏障(體現(xiàn)為volatile關(guān)鍵字)或CAS操作實(shí)現(xiàn)的。在Disruptor的實(shí)現(xiàn)中只有一種情況需要實(shí)際鎖定—當(dāng)使用BlockingWaitStrategy策略時(shí),這僅僅是為了使用條件變量,以便在等待新事件到達(dá)時(shí)parked消費(fèi)線程。許多低延遲系統(tǒng)將使用忙等待(busy-wait)來(lái)避免使用條件可能引起的抖動(dòng),但是大量在系統(tǒng)繁忙等待的操作可能導(dǎo)致性能顯著下降,尤其是在CPU資源嚴(yán)重受限的情況下
在JDK的BlockingQueue中添加或取出元素時(shí)是需要加獨(dú)占鎖的,通過(guò)鎖來(lái)保證多線程對(duì)底層共享的數(shù)據(jù)結(jié)構(gòu)進(jìn)行并發(fā)讀寫的線程安全性,使用鎖會(huì)導(dǎo)致同時(shí)只有一個(gè)線程可以向隊(duì)列添加或刪除元素。Disruptor則使用兩階段協(xié)議,讓多個(gè)線程可同時(shí)修改不同元素,需要注意,消費(fèi)元素時(shí)只能讀取到已經(jīng)提交的元素。在Disruptor中某個(gè)線程要訪問(wèn)Ring Buffer中某個(gè)序列號(hào)下對(duì)應(yīng)的元素時(shí),要先通過(guò)CAS操作獲取對(duì)應(yīng)元素的所有權(quán)(第一階段),然后通過(guò)序列號(hào)獲取對(duì)應(yīng)的元素對(duì)象并對(duì)其中的屬性進(jìn)行修改,最后再發(fā)布元素(第二階段),只有發(fā)布后的元素才可以被消費(fèi)者讀取。當(dāng)多個(gè)線程寫入元素時(shí),它們都會(huì)先執(zhí)行CAS操作,獲取到Ring buffer中的某一個(gè)元素的所有權(quán),然后可以并發(fā)對(duì)自己的元素進(jìn)行修改。注意,只有序列號(hào)小的元素發(fā)布后,后面的元素才可以發(fā)布。可知相比使用鎖,使用CAS大大減少了開(kāi)銷,提高了并發(fā)度
其實(shí)在單生產(chǎn)者的情況下Disruptor甚至可以省去CAS的開(kāi)銷,因?yàn)樵谶@種情況下,只有一個(gè)線程來(lái)請(qǐng)求修改Ring Buffer中的數(shù)據(jù),生產(chǎn)者的序列號(hào)使用普通的long型變量即可。在創(chuàng)建Disruptor時(shí)是可以指定是單生產(chǎn)者還是多生產(chǎn)者的,如果你的業(yè)務(wù)就是單生產(chǎn)者模型,那么創(chuàng)建Disruptor時(shí)指定生產(chǎn)者模式為ProducerType.SINGLE效果會(huì)更好
Disruptor中的Ring Buffer底層是一個(gè)地址連續(xù)的數(shù)組,數(shù)組內(nèi)相鄰的元素很容易會(huì)被放入同一個(gè)Cache行里,從而導(dǎo)致偽共享的出現(xiàn)。Disruptor則通過(guò)緩存行填充,讓數(shù)組中的每個(gè)元素獨(dú)占一個(gè)緩存行從而解決了偽共享問(wèn)題的出現(xiàn)。另外為了避免Ring Buffer中序列號(hào)(定位元素的游標(biāo))與其他元素共享緩存行,對(duì)其也進(jìn)行了緩存行填充,以提高訪問(wèn)序列號(hào)時(shí)緩存的命中率
-
關(guān)鍵實(shí)現(xiàn)原理
landon
- 目前看線程,每1個(gè)consumer handler都是固定線程的,依次排隊(duì)去消費(fèi)產(chǎn)生的事件。
- 有多個(gè)消費(fèi)者,每個(gè)消費(fèi)者有自己的sequence。因?yàn)槭黔h(huán)形buffer,當(dāng)緩沖期滿了,但還有沒(méi)有消費(fèi)的元素,那么此時(shí)生產(chǎn)者只能等。其實(shí)這個(gè)和阻塞隊(duì)列滿時(shí)是一樣的。那么什么時(shí)候等?只要判斷當(dāng)前很多消費(fèi)者的最小sequence,這里指在環(huán)里的位置,那么生產(chǎn)的當(dāng)前在環(huán)里的位置一一定小于這個(gè)位置。這個(gè)通常指生產(chǎn)者速度較快,過(guò)了一圈后,消費(fèi)者還沒(méi)有消費(fèi)完之前的元素
- 測(cè)試是buffer為4,依次放0,1,2,10,11,12
- 0號(hào)元素的消費(fèi)者h(yuǎn)andler耗時(shí),當(dāng)生產(chǎn)11(index 0)時(shí),則阻塞等待0號(hào)消耗完畢
- 關(guān)于ringbuffer
- 我們實(shí)現(xiàn)的ring buffer和大家常用的隊(duì)列之間的區(qū)別是,我們不刪除buffer中的數(shù)據(jù),也就是說(shuō)這些數(shù)據(jù)一直存放在buffer中,直到新的數(shù)據(jù)覆蓋他們
- 為什么要和ArrayBlockingQueue對(duì)比呢?這是因?yàn)閮蓚€(gè)底層的數(shù)據(jù)結(jié)構(gòu)類似,都是通過(guò)一個(gè)環(huán)形數(shù)組實(shí)現(xiàn)
-
https://colobu.com/2014/12/22/why-is-disruptor-faster-than-ArrayBlockingQueue/
- ArrayBlockingQueue通過(guò)ReentrantLock以及它的兩個(gè)condition來(lái)控制并發(fā)
- 壓入元素時(shí):如果數(shù)組已滿,則等待notFull,如果消費(fèi)者取出了元素,則會(huì)調(diào)用
notFull.signal();: - 這時(shí)put方法會(huì)被喚醒
- 取出元素時(shí):如果數(shù)組為空,則調(diào)用
notEmpty.await();等待, enqueue會(huì)調(diào)用notEmpty.signal();喚醒它: - 這種wait-notify(signal)也就是教科書(shū)上標(biāo)準(zhǔn)的處理隊(duì)列的方式
- 壓入元素時(shí):如果數(shù)組已滿,則等待notFull,如果消費(fèi)者取出了元素,則會(huì)調(diào)用
- RingBuffer使用了padding方式來(lái)提供CPU cache的命中率
- 如果producer生產(chǎn)的快,追上消費(fèi)者的時(shí)候
可以通過(guò)gatingSequences讓生產(chǎn)者等待消費(fèi)者消費(fèi)。
這個(gè)時(shí)候是通過(guò)LockSupport.parkNanos(1L);不停的循環(huán),直到有消費(fèi)者消費(fèi)掉一個(gè)或者多個(gè)事件 - 如果消費(fèi)者消費(fèi)的快,追上生產(chǎn)者的時(shí)候
這個(gè)時(shí)候由于消費(fèi)者將自己最后能處理的sequence寫回到光標(biāo)后sequence.set(availableSequence);, 如果生產(chǎn)者還沒(méi)有寫入一個(gè)事件, 那么它就會(huì)調(diào)用waitStrategy.waitFor
等待。 如果生產(chǎn)者publish一個(gè)事件,它會(huì)更改光標(biāo)的值:cursor.set(sequence);,然后通知等待的消費(fèi)者繼續(xù)處理waitStrategy.signalAllWhenBlocking(); - 在使用BlockingWaitStrategy情況下,其實(shí)這和ArrayBlockingQueue類似,因?yàn)锳rrayBlockingQueue也是通過(guò)Lock的方式等待。 性能測(cè)試結(jié)果顯示Disruptor在這種策略下性能比ArrayBlockingQueue要略好一點(diǎn),但是達(dá)不到10倍的顯著提升,大概兩倍左右。 這大概就是生產(chǎn)者使用不斷的LockSupport.parkNanos方式帶來(lái)的提升吧
- 但是如果換為YieldingWaitStrategy, CPU使用率差別不大,但是卻帶來(lái)了10倍的性能提升。 這是因?yàn)橄M(fèi)者不需sleep, 通過(guò)spin-yield方式降低延遲率,提高了吞吐率
- 多生產(chǎn)者時(shí)在請(qǐng)求下一個(gè)sequence時(shí)有競(jìng)爭(zhēng)的情況,所以通過(guò)
cursor.compareAndSet(current, next)的spin來(lái)實(shí)現(xiàn),直到成功的設(shè)置next才返回
- ArrayBlockingQueue通過(guò)ReentrantLock以及它的兩個(gè)condition來(lái)控制并發(fā)
- 偽共享
- 數(shù)據(jù)X、Y、Z被加載到同一Cache Line中,線程A在Core1修改X,線程B在Core2上修改Y。根據(jù)MESI大法,假設(shè)是Core1是第一個(gè)發(fā)起操作的CPU核,Core1上的L1 Cache Line由S(共享)狀態(tài)變成M(修改,臟數(shù)據(jù))狀態(tài),然后告知其他的CPU核,圖例則是Core2,引用同一地址的Cache Line已經(jīng)無(wú)效了;當(dāng)Core2發(fā)起寫操作時(shí),首先導(dǎo)致Core1將X寫回主存,Cache Line狀態(tài)由M變?yōu)镮(無(wú)效),而后才是Core2從主存重新讀取該地址內(nèi)容,Cache Line狀態(tài)由I變成E(獨(dú)占),最后進(jìn)行修改Y操作, Cache Line從E變成M。可見(jiàn)多個(gè)線程操作在同一Cache Line上的不同數(shù)據(jù),相互競(jìng)爭(zhēng)同一Cache Line,導(dǎo)致線程彼此牽制影響,變成了串行程序,降低了并發(fā)性。此時(shí)我們則需要將共享在多線程間的數(shù)據(jù)進(jìn)行隔離,使他們不在同一個(gè)Cache Line上,從而提升多線程的性能
-
akka
-
Akka是一個(gè)工具包,用于在JVM上構(gòu)建高并發(fā)、分布式、彈性、基于消息驅(qū)動(dòng)的應(yīng)用程序
- Akka基于Actor模型和Streams,讓我們可以構(gòu)建可伸縮的,并且可以高效使用服務(wù)器資源,使用多個(gè)服務(wù)器進(jìn)行擴(kuò)展的系統(tǒng)
- 在單臺(tái)計(jì)算機(jī)上可以處理高達(dá)每秒5000萬(wàn)條消息。內(nèi)存占用少;每GB堆可以創(chuàng)建約250萬(wàn)個(gè)actor(參與者)
-
鎖
- 使用鎖會(huì)嚴(yán)重影響并發(fā)度,使用鎖在現(xiàn)在CPU架構(gòu)中是一個(gè)比較昂貴的操作,因?yàn)楫?dāng)線程獲取鎖失敗后會(huì)把線程從用戶態(tài)切換到內(nèi)核態(tài)把線程掛起,稍后喚醒后又需要從內(nèi)核態(tài)切換到用戶態(tài)繼續(xù)運(yùn)行
- 獲取鎖失敗的調(diào)用線程會(huì)被阻塞掛起,因此它不能做任何有意義的事情。即使在桌面應(yīng)用程序中這也是不可取的,我們想要的是即使后臺(tái)有一個(gè)比較耗時(shí)的工作在運(yùn)行,也要保證系統(tǒng)對(duì)用戶的一部分請(qǐng)求有響應(yīng)。在后端應(yīng)用中,阻塞是完全浪費(fèi)資源的。另外可能有人認(rèn)為,雖然當(dāng)前線程阻塞了,但是我們可以通過(guò)啟動(dòng)新線程來(lái)彌補(bǔ)這一點(diǎn),需要注意,線程也是一種昂貴的資源,操作系統(tǒng)對(duì)線程個(gè)數(shù)是有限制的
- 鎖的存在帶來(lái)了新的威脅,即死鎖問(wèn)題
- 如果不使用足夠多的鎖,則不能保證多線程下對(duì)象中數(shù)據(jù)不受到破壞
- 如果在對(duì)象中每個(gè)數(shù)據(jù)訪問(wèn)時(shí)都加了鎖,則會(huì)導(dǎo)致系統(tǒng)性能下降,并且很容易導(dǎo)致死鎖
- 鎖只能在單JVM內(nèi)(本地鎖)很好地工作。當(dāng)涉及跨多臺(tái)機(jī)協(xié)調(diào)時(shí),只能使用分布式鎖。但是分布式鎖的效率比本地鎖低幾個(gè)數(shù)量級(jí),這是因?yàn)榉植际芥i協(xié)議需要跨多臺(tái)機(jī)在網(wǎng)絡(luò)上進(jìn)行多次往返通信,其造成的最大影響就是延遲
-
共享內(nèi)存
- 在現(xiàn)在計(jì)算機(jī)硬件架構(gòu)中,計(jì)算機(jī)系統(tǒng)中為了解決主內(nèi)存與CPU運(yùn)行速度的差距,在CPU與主內(nèi)存之間添加了一級(jí)或多級(jí)高速緩沖存儲(chǔ)器(Cache),每個(gè)Cache由多個(gè)Cache行組成,這些Cache一般是集成到CPU內(nèi)部的,所以也叫CPU Cache。當(dāng)我們寫入變量時(shí),實(shí)際是寫入當(dāng)前CPU的Cache中,而不是直接寫入主內(nèi)存中,并且當(dāng)前CPU核對(duì)自己Cache寫入的變量對(duì)其他CPU核是不可見(jiàn)的,這就是Java內(nèi)存模型中共享變量的內(nèi)存不可見(jiàn)問(wèn)題
-
堆棧
- 主調(diào)用線程需要在異步任務(wù)執(zhí)行完畢或者出異常時(shí)被通知,但是沒(méi)有調(diào)用堆??梢詡鬟f異常。異步任務(wù)執(zhí)行失敗的通知只能通過(guò)輔助方式完成,比如Future方式,將錯(cuò)誤碼寫到主調(diào)用線程所在的地方。如果沒(méi)有此通知,則主調(diào)用線程將永遠(yuǎn)不會(huì)收到失敗通知,并且任務(wù)將丟失
- 當(dāng)真的發(fā)生錯(cuò)誤時(shí),這種情況會(huì)變得更糟,當(dāng)異步工作線程遇到錯(cuò)誤時(shí)會(huì)導(dǎo)致最終陷入無(wú)法恢復(fù)的境地。異步線程當(dāng)前正在執(zhí)行的實(shí)際任務(wù)并沒(méi)有存放起來(lái)。實(shí)際上,由于到達(dá)頂部的異常使所有調(diào)用棧退出,任務(wù)狀態(tài)已經(jīng)完全丟失了
- 為了在當(dāng)前系統(tǒng)上實(shí)現(xiàn)任何有意義的并發(fā)性和提高性能,線程必須以高效的方式在彼此之間委派任務(wù),而不會(huì)阻塞。使用這種類型的任務(wù)委派并發(fā)(甚至在網(wǎng)絡(luò)/分布式計(jì)算中更是如此),基于調(diào)用堆棧的錯(cuò)誤處理會(huì)導(dǎo)致崩潰。因此需要引入新的顯式錯(cuò)誤信令機(jī)制,讓失敗成為域模型的一部分
- 具有工作委派的并發(fā)系統(tǒng)需要處理服務(wù)故障,并需要具有從故障中恢復(fù)的原則與方法。此類服務(wù)的客戶端需要注意,任務(wù)/消息可能會(huì)在重新啟動(dòng)期間丟失。即使沒(méi)有發(fā)生損失,由于先前排隊(duì)的任務(wù)(較長(zhǎng)的隊(duì)列)或者垃圾回收導(dǎo)致的延遲等,將會(huì)導(dǎo)致響應(yīng)可能會(huì)被任意延遲。面對(duì)這些情況,并發(fā)系統(tǒng)應(yīng)以超時(shí)的形式處理響應(yīng)截止日期
-
Actor模型解決了傳統(tǒng)編程模型的問(wèn)題
- 在Actor模型中每個(gè)Actor都有自己的地址,Actor之間通過(guò)地址相互通過(guò)消息通信。Actor的目的是處理消息,這些消息是從其他Actor發(fā)送給當(dāng)前Actor的。連接發(fā)送方和接收方Actor的是Actor的郵箱
- Akka中對(duì)失敗的處理使用了“讓它崩潰”的理念,這部分關(guān)鍵代碼被監(jiān)控者監(jiān)控著(每個(gè)Actor實(shí)際就是一個(gè)監(jiān)控者),監(jiān)控者的唯一職責(zé)是知道失敗后該干什么
- 另外Actor模型并不在意接收消息的是當(dāng)前JVM內(nèi)的Actor還是遠(yuǎn)端機(jī)器上的Actor,這允許我們基于許多計(jì)算機(jī)上構(gòu)建系統(tǒng)
- 使用消息傳遞避免鎖和阻塞
- Actor模型中組件之間的相互通信不再使用方法調(diào)用,而是通過(guò)發(fā)消息的方式進(jìn)行通信,使用發(fā)消息的方式,不會(huì)導(dǎo)致發(fā)消息的調(diào)用線程的執(zhí)行權(quán)轉(zhuǎn)移到消息接收者。每個(gè)Actor可以連續(xù)發(fā)消息,由于是異步的,不會(huì)被阻塞。因此在同等時(shí)間內(nèi)其可以完成更多工作
- 對(duì)于對(duì)象,當(dāng)調(diào)用其方法返回時(shí),它會(huì)釋放調(diào)用其線程的控制權(quán);Actor的行為與對(duì)象類似,當(dāng)接收者Actor接收到消息后,會(huì)對(duì)消息進(jìn)行反應(yīng),并在處理完消息后返回,所以Actor的執(zhí)行符合我們認(rèn)知中的執(zhí)行邏輯
- 傳遞消息和調(diào)用方法之間的重要區(qū)別是,消息沒(méi)有返回值。通過(guò)發(fā)送消息,Actor會(huì)將工作委托給另一個(gè)Actor。正如我們?cè)谡{(diào)用堆棧誤解中看到的那樣,如果期望返回值,則發(fā)送方Actor調(diào)用線程將需要阻塞或調(diào)用線程會(huì)執(zhí)行其他Actor的工作。相反,接收方會(huì)在回復(fù)消息中傳遞結(jié)果
- Actor對(duì)消息做出反應(yīng),就像對(duì)象對(duì)在其上的調(diào)用方法一樣。區(qū)別在于,接收消息的Actor是獨(dú)立于消息發(fā)送方Actor執(zhí)行的,是一次接一個(gè)地響應(yīng)傳入的消息,而不是多個(gè)線程并發(fā)執(zhí)行,因此不會(huì)破壞Actor內(nèi)部狀態(tài)和不變量。當(dāng)每個(gè)Actor都按順序處理發(fā)送給它的消息時(shí),不同的Actor會(huì)并發(fā)工作,因此Actor系統(tǒng)可以同時(shí)處理硬件支持的盡可能多的消息
- 由于每個(gè)Actor同時(shí)最多只能處理一條消息,因而可以保持Actor的不變性,而無(wú)須使用鎖等進(jìn)行同步
- 當(dāng)Actor收到消息時(shí),會(huì)發(fā)生以下情況
- Actor將消息添加到隊(duì)列的末尾
- 如果Actor沒(méi)有被安排執(zhí)行,則將其標(biāo)記為準(zhǔn)備執(zhí)行
- Actor系統(tǒng)框架內(nèi)的調(diào)度程序?qū)⒔邮赵揂ctor并開(kāi)始執(zhí)行它
- Actor從隊(duì)列的前面選擇消息
- Actor修改內(nèi)部狀態(tài),將消息發(fā)送給其他Actor
- Actor處于無(wú)調(diào)度、空閑狀態(tài)
- 為了實(shí)現(xiàn)上述行為,Actor需要具有下面特性
- 一個(gè)郵箱(用于存放發(fā)送者發(fā)來(lái)的消息)
- 行為(Actor的狀態(tài)、內(nèi)部變量等)
- 消息(代表信號(hào)的數(shù)據(jù)片段,類似于方法調(diào)用及其參數(shù))
- 執(zhí)行環(huán)境(一種使具有消息的Actor響應(yīng)并調(diào)用其消息處理代碼的機(jī)制)
- 地址(每個(gè)Actor有自己的地址)
- 其中,Actor的行為描述了其如何響應(yīng)消息(例如發(fā)送更多消息和/或更改狀態(tài))。執(zhí)行環(huán)境則編排了一個(gè)線程池,以透明地驅(qū)動(dòng)所有這些動(dòng)作
- 通過(guò)將執(zhí)行與信號(hào)分離(方法調(diào)用方式會(huì)轉(zhuǎn)換任務(wù)的執(zhí)行權(quán),消息傳遞則不會(huì))來(lái)保留封裝性
- 不需要鎖。只能通過(guò)消息修改Actor的內(nèi)部狀態(tài),而消息是順序處理的,以試圖消除保持不變性時(shí)的競(jìng)爭(zhēng)問(wèn)題
- 任何地方都沒(méi)有使用鎖,發(fā)送者也不會(huì)被阻塞??梢栽谑畮讉€(gè)線程上有效地調(diào)度數(shù)百萬(wàn)個(gè)Actor,從而充分發(fā)揮現(xiàn)代CPU的潛力
- Actor的狀態(tài)是本地的而不是共享的,更改和數(shù)據(jù)通過(guò)消息進(jìn)行傳遞,這與現(xiàn)代系統(tǒng)中內(nèi)存的實(shí)際工作方式相對(duì)應(yīng)
- 使用Actor優(yōu)雅地處理錯(cuò)誤
- 當(dāng)目標(biāo)Actor上運(yùn)行被代理的任務(wù)發(fā)生錯(cuò)誤時(shí),比如任務(wù)內(nèi)參數(shù)校驗(yàn)錯(cuò)誤或者執(zhí)行拋出了NPE異常等。在這種情況下,目標(biāo)Actor封裝的服務(wù)是完整的,只是任務(wù)執(zhí)行本身發(fā)生了錯(cuò)誤。目標(biāo)Actor應(yīng)該向消息發(fā)送方回復(fù)一條消息,提示錯(cuò)誤情況
- 當(dāng)服務(wù)本身遇到內(nèi)部錯(cuò)誤時(shí),Akka強(qiáng)制將所有Actor組織成樹(shù)狀層次結(jié)構(gòu),即創(chuàng)建另一個(gè)Actor的Actor成為該新Actor的父節(jié)點(diǎn)。這與操作系統(tǒng)將進(jìn)程組織到樹(shù)結(jié)構(gòu)中的方式非常相似。就像進(jìn)程一樣,當(dāng)一個(gè)Actor失敗時(shí),它的父Actor會(huì)收到通知,并且可以對(duì)失敗做出反應(yīng)。同樣,如果父Actor停止了,則其所有子Actor也將遞歸停止。這項(xiàng)服務(wù)被稱為監(jiān)督(supervisor),它是Akka的核心
https://developer.lightbend.com/guides/akka-quickstart-java/
-
demo
remote示例要引入akka-remote_2.13
-
https://doc.akka.io/docs/akka/current/remoting.html
- Classic Remoting (Deprecated)
- Artery Remoting instead
-
注意事項(xiàng)
-
默認(rèn)的配置文件
- 這是以前的remote方式,而新版默認(rèn)是用的Artery,默認(rèn)端口是25520
- 即使按照下面的配置文件指定端口也不會(huì),啟動(dòng)的時(shí)候看日志會(huì)看到是artery方式
- 需要關(guān)閉Artery:remote.artery.enabled = false
- 當(dāng)關(guān)閉此artery后,啟動(dòng)會(huì)報(bào)錯(cuò)'Classic remoting is enabled but Netty is not on the classpath'
- 需要:Classic remoting depends on Netty. This needs to be explicitly added as a dependency so that users not using classic remoting do not have to have Netty on the classpath:
akka { actor { // 遠(yuǎn)端的actor provider = "akka.remote.RemoteActorRefProvider" } remote { // tcp傳輸 enabled-transports = ["akka.remote.netty.tcp"] // ip和監(jiān)聽(tīng)端口 netty.tcp { hostname = "127.0.0.1" port = 2552 } } }
-
-
按照新的配置方式
- artery方式,則啟動(dòng)的actor順利按照指定的2552端口監(jiān)聽(tīng)
akka { actor { // 遠(yuǎn)端的actor provider = "akka.remote.RemoteActorRefProvider" allow-java-serialization = on } remote { artery { transport = tcp canonical.hostname = "127.0.0.1" canonical.port = 2552 } } } 另外遠(yuǎn)程要傳遞消息,做序列化,所以要打開(kāi)allow-java-serialization,否則序列化報(bào)錯(cuò)
另外ActorSelection指定遠(yuǎn)程的path的actor名字是遠(yuǎn)程通過(guò)actorOf創(chuàng)建的actor名字
Akka中每個(gè)Actor都有自己的地址,可以是本地的,也可以是遠(yuǎn)程的,對(duì)于遠(yuǎn)程的Actor,只需要將其地址配置好,就可以像本地Actor一樣使用了
-
-
rocketmq
-
RocketMQ主要由4部分組成,分別為NameServer集群、Broker集群、Producer集群和Consumer集群。每部分都可以進(jìn)行水平擴(kuò)展,而不會(huì)出現(xiàn)單點(diǎn)問(wèn)題
- NameServer集群:名稱服務(wù)集群,提供輕量級(jí)的服務(wù)發(fā)現(xiàn)與路由服務(wù),每個(gè)名稱服務(wù)器記錄了全部Broker的路由信息,并且提供相應(yīng)的讀寫服務(wù),支持快速存儲(chǔ)擴(kuò)展
- Broker集群:Broker集群,Broker通過(guò)提供輕量級(jí)的主題和隊(duì)列機(jī)制來(lái)維護(hù)消息存儲(chǔ)。它支持推和拉兩種模型,包含容錯(cuò)機(jī)制(2個(gè)副本或3個(gè)副本),并提供了強(qiáng)大的平滑峰值,提供積累數(shù)以億計(jì)的消息并保證其在原始時(shí)間順序的被消費(fèi)能力。此外,Broker也提供災(zāi)難恢復(fù)、豐富的度量統(tǒng)計(jì)和警報(bào)機(jī)制
- Producer集群:生產(chǎn)者集群,提供分布式部署,分布式的生產(chǎn)者發(fā)送消息到Broker集群,具體選擇哪一個(gè)Broker機(jī)器是通過(guò)一定的負(fù)載均衡策略來(lái)決定的,發(fā)送消息中支持故障快速恢復(fù),并且具有較低的延遲
- Consumer集群:消費(fèi)者集群,消費(fèi)者在推和拉模型中支持分布式部署。它還支持集群消費(fèi)和消息廣播。它提供實(shí)時(shí)消息訂閱機(jī)制,可以滿足大多數(shù)消費(fèi)者的需求
Broker在啟動(dòng)時(shí)會(huì)去連接NameServer,然后將topic信息注冊(cè)到NameServer, NameServer維護(hù)了所有topic的信息和對(duì)應(yīng)的Broker路由信息。Broker與NameServer之間是有心跳檢查的,NameServer發(fā)現(xiàn)Broker掛了后,會(huì)從注冊(cè)信息里面刪除該Broker,這類似Zookeeper實(shí)現(xiàn)的服務(wù)注冊(cè);Producer則需要配置NameServer的地址,然后定時(shí)從NameServer獲取對(duì)應(yīng)topic的路由信息(這個(gè)topic的消息應(yīng)該路由到那個(gè)Broker)。同時(shí)Producer與NameServer、Proudcer與Broker有心跳檢查;同樣,Consumer需要配置NameServer的地址,然后定時(shí)從NameServer獲取對(duì)應(yīng)topic的路由信息(應(yīng)該從那個(gè)Broker的消息隊(duì)列獲取消息)。同時(shí)Consumer與NameServer、Consumer與Broker有心跳檢查。
-
demo
-
rocketmq-all-4.7.0 編譯
% pwd /Users/landon30/2020/demo/rocketmq-logging 4.7.0 % mvn -Prelease-all -DskipTests clean install -U // 編譯報(bào)錯(cuò) [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.5.1:compile (default-compile) on project rocketmq-logging: Compilation failure: Compilation failure: [ERROR] 不再支持源選項(xiàng) 6。請(qǐng)使用 7 或更高版本。 [ERROR] 不再支持目標(biāo)選項(xiàng) 6。請(qǐng)使用 7 或更高版本。 修改 <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> 同理其他模塊 % pwd /Users/landon30/2020/demo/rocketmq-rocketmq-all-4.7.0/distribution/target/rocketmq-4.7.0/rocketmq-4.7.0 // 需要設(shè)置JAVA_HOME環(huán)境變量 ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !! /Library/Java/JavaVirtualMachines/jdk1.8.0_241.jdk/Contents/Home vim ~/.bash_profile,編輯環(huán)境變量 // 啟動(dòng)nameserver % nohup sh bin/mqnamesrv & [1] 17557 appending output to nohup.out % tail -f ~/logs/rocketmqlogs/namesrv.log 2020-04-09 16:28:21 INFO main - tls.client.keyPassword = null 2020-04-09 16:28:21 INFO main - tls.client.certPath = null 2020-04-09 16:28:21 INFO main - tls.client.authServer = false 2020-04-09 16:28:21 INFO main - tls.client.trustCertPath = null 2020-04-09 16:28:21 INFO main - Using OpenSSL provider 2020-04-09 16:28:21 INFO main - SSLContext created for server 2020-04-09 16:28:22 INFO main - Try to start service thread:FileWatchService started:false lastThread:null 2020-04-09 16:28:22 INFO NettyEventExecutor - NettyEventExecutor service started 2020-04-09 16:28:22 INFO FileWatchService - FileWatchService service started 2020-04-09 16:28:22 INFO main - The Name Server boot success. serializeType=JSON 17589 NamesrvStartup // 啟動(dòng)broker 異常 // 為什么總報(bào) java.lang.NoSuchMethodError // 應(yīng)該是編譯問(wèn)題 比如編譯是用jdk9+編譯的,但是卻運(yùn)行在小于9的環(huán)境中 // 所以應(yīng)該是netty編譯環(huán)境的問(wèn)題 > nohup sh bin/mqbroker -n localhost:9876 & 2020-04-09 18:02:42 WARN brokerOutApi_thread_1 - registerBroker Exception, localhost:9876 org.apache.rocketmq.remoting.exception.RemotingSendRequestException: send request to <ocalhost/127.0.0.1:9876> failed at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeSyncImpl(NettyRemotingAbstract.java:440) ~[rocketmq-remoting-4.7.0.jar:4.7.0] at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:373) ~[rocketmq-remoting-4.7.0.jar:4.7.0] at org.apache.rocketmq.broker.out.BrokerOuterAPI.registerBroker(BrokerOuterAPI.java:194) ~[rocketmq-broker-4.7.0.jar:4.7.0] at org.apache.rocketmq.broker.out.BrokerOuterAPI.access$000(BrokerOuterAPI.java:61) ~[rocketmq-broker-4.7.0.jar:4.7.0] at org.apache.rocketmq.broker.out.BrokerOuterAPI$1.run(BrokerOuterAPI.java:150) ~[rocketmq-broker-4.7.0.jar:4.7.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_241] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_241] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_241] Caused by: io.netty.handler.codec.EncoderException: java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer; at io.netty.handler.codec.MessageToByteEncoder.write(MessageToByteEncoder.java:125) ~[netty-all-4.0.42.Final.jar:4.0.42.Final] https://github.com/eclipse/jetty.project/issues/3244 參考 2020-04-09 18:16:06 INFO main - The broker[landon30deMacBook-Pro.local, 10.2.182.240:10911] boot success. serializeType=JSON and name server is localhost:9876 // 參考 https://github.com/netty/netty/issues?q=java.lang.NoSuchMethodError // 臨時(shí)解決方案是將pom.xml中netty的版本從4.0.42切換到了4.0.43,解決 -
https://rocketmq.apache.org/docs/quick-start/
> unzip rocketmq-all-4.7.0-source-release.zip > cd rocketmq-all-4.7.0/ > mvn -Prelease-all -DskipTests clean install -U > cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0 Start Name Server > nohup sh bin/mqnamesrv & > tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success... Start Broker > nohup sh bin/mqbroker -n localhost:9876 & > tail -f ~/logs/rocketmqlogs/broker.log The broker[%s, 172.30.30.233:10911] boot success...
-
-
consumer
- 同一個(gè)消費(fèi)集群的每臺(tái)機(jī)器中的實(shí)例名稱要一樣。然后設(shè)置了NameServer的地址為127.0.0.1:9876
- 從第一個(gè)消息的偏移量開(kāi)始消費(fèi),指定訂閱主題和主題下的tag
- 設(shè)置回調(diào)的消息處理
- 啟動(dòng)消費(fèi)實(shí)例,連接NameServer獲取Broker的地址,并與Broker進(jìn)行連接
-
producer sync
- 同一個(gè)生產(chǎn)者集群實(shí)例中的實(shí)例名稱要一致。然后設(shè)置了NameServer的地址為127.0.0.1:9876
- 啟動(dòng)生產(chǎn)者實(shí)例,然后實(shí)例就會(huì)去連接NameServer并獲取Broker的地址,然后生產(chǎn)者實(shí)例就會(huì)與Broker建立連接
- 創(chuàng)建Message消息實(shí)體,其中第一個(gè)參數(shù)為主題名稱,這里為TopicTest;第二個(gè)參數(shù)為Tag類型,這里為TagA;第三個(gè)參數(shù)為消息體內(nèi)容,是個(gè)二進(jìn)制數(shù)據(jù)。代碼4.2調(diào)用生產(chǎn)者實(shí)例的send方法同步發(fā)送消息,需要注意,這里同步意味著當(dāng)消息同步通過(guò)底層網(wǎng)絡(luò)通信投遞到TCP發(fā)送buffer后才會(huì)返回,整個(gè)過(guò)程中需要阻塞調(diào)用線程
- 調(diào)用線程調(diào)用RocketmqClient的send方法發(fā)送消息后,其內(nèi)部會(huì)首先創(chuàng)建一個(gè)ResponseFuture對(duì)象,并切換到IO線程把請(qǐng)求發(fā)送到Broker,接著調(diào)用線程會(huì)調(diào)用ResponseFuture的wait方法阻塞調(diào)用線程,等IO線程把請(qǐng)求寫入TCP發(fā)送Buffer后,IO線程會(huì)設(shè)置ResponseFuture對(duì)象說(shuō)請(qǐng)求已經(jīng)完成,然后調(diào)用線程就會(huì)從wait方法返回。需要注意的是,RocketMQ返回成功是指已經(jīng)把請(qǐng)求發(fā)送到了其TCP發(fā)送Buffer中,這時(shí)候請(qǐng)求還沒(méi)到Broker
-
producer.async
- 注意demo中,主線程不能結(jié)束,否則拋出異常RemotingConnectException,所以加了一個(gè)sleep
- 在發(fā)送消息的同時(shí)設(shè)置了一個(gè)CallBack,調(diào)用該方法后,該方法會(huì)馬上返回,然后等真的把消息投遞到Broker后,底層IO線程會(huì)回調(diào)設(shè)置的Callback來(lái)通知,消息已經(jīng)發(fā)送成功或者消息發(fā)送失敗的原因
- RocketMQ客戶端內(nèi)部把請(qǐng)求提交到線程池后就返回了。消息發(fā)送任務(wù)會(huì)被在線程池內(nèi)異步執(zhí)行,異步發(fā)送任務(wù)內(nèi)首先會(huì)創(chuàng)建一個(gè)ResponseFuture對(duì)象,然后切換到IO線程來(lái)具體發(fā)送請(qǐng)求,等IO線程將請(qǐng)求發(fā)送到TCP發(fā)送Buffer后,IO線程會(huì)設(shè)置ResponseFuture對(duì)象的值,然后ResponseFuture中保存的CallBack的執(zhí)行切換到線程池來(lái)執(zhí)行??芍褂卯惒桨l(fā)送消息方式調(diào)用線程不會(huì)被阻塞
-
chap9 Go語(yǔ)言的異步編程能力
-
Go語(yǔ)言概述
- 傳統(tǒng)的編程模型,比如經(jīng)常使用Java、C++、Python編程時(shí),多線程之間需要通過(guò)共享內(nèi)存(比如在堆上創(chuàng)建的共享變量)來(lái)通信。這時(shí)為保證線程安全,多線程共享的數(shù)據(jù)結(jié)構(gòu)需要使用鎖來(lái)保護(hù),多線程訪問(wèn)共享數(shù)據(jù)結(jié)構(gòu)時(shí)需要競(jìng)爭(zhēng)獲取鎖,只有獲取到鎖的線程才可以存取共享數(shù)據(jù)
- Go中不僅在語(yǔ)言層面提供了這種低級(jí)并發(fā)同步原語(yǔ)—鎖,比如互斥鎖、讀寫鎖、條件變量等,而且Go的并發(fā)原語(yǔ)—goroutine和channel提供了一種優(yōu)雅而獨(dú)特的結(jié)構(gòu)化開(kāi)發(fā)并發(fā)軟件的方式。Go鼓勵(lì)使用channel在goroutine之間傳遞對(duì)共享數(shù)據(jù)的引用,而不是明確地使用鎖來(lái)保護(hù)對(duì)共享數(shù)據(jù)的訪問(wèn)。這種方法確保在給定時(shí)間只有一個(gè)goroutine可以訪問(wèn)共享數(shù)據(jù)。這個(gè)理念被總結(jié)為:不要通過(guò)共享內(nèi)存來(lái)通信,而要通過(guò)通信來(lái)共享內(nèi)存
- Go中并發(fā)模型采用了通道,體現(xiàn)為CSP的一個(gè)變種。之所以選擇CSP,一方面是因?yàn)镚oogle的開(kāi)發(fā)工程師對(duì)它的熟悉程度,另一方面因?yàn)镃SP具有一種無(wú)須對(duì)其模型做任何深入的改變就能輕易添加到過(guò)程性編程模型中的特性
- 在其他語(yǔ)言,比如Java中線程模型的實(shí)現(xiàn)是一個(gè)操作系統(tǒng)內(nèi)核線程對(duì)應(yīng)著一個(gè)使用new Thread創(chuàng)建的線程,而由于操作系統(tǒng)線程個(gè)數(shù)是有限制的,所以限制了創(chuàng)建線程的個(gè)數(shù)。另外,當(dāng)線程執(zhí)行阻塞操作時(shí),線程要從用戶態(tài)切換到內(nèi)核態(tài)執(zhí)行,這個(gè)開(kāi)銷是比較大的;而在Go中線程模型則是一個(gè)操作系統(tǒng)線程對(duì)應(yīng)多個(gè)goroutine,用戶可以創(chuàng)建的goroutine個(gè)數(shù)只受內(nèi)存大小限制,而且上下文切換發(fā)生在用戶態(tài),切換速度比較快,并且開(kāi)銷比較小,所以Go中一臺(tái)機(jī)器可以創(chuàng)建百萬(wàn)個(gè)goroutine
- 在Java中編寫并發(fā)程序時(shí)需要在操作系統(tǒng)線程層面進(jìn)行考慮,但是在Go中,不需要考慮操作系統(tǒng)線程,而是需要站在goroutine和通道上進(jìn)行思考,當(dāng)然有時(shí)候也會(huì)在共享內(nèi)存上進(jìn)行思考
- 在Go中只需要在要異步執(zhí)行的方法前面加上go關(guān)鍵字,就可以讓方法與主goroutine并發(fā)運(yùn)行。另外結(jié)合goroutine和channel,可以方便地實(shí)現(xiàn)異步非阻塞回壓功能
-
Go語(yǔ)言的線程模型
-
一對(duì)一模型
- 這種線程模型下用戶線程與內(nèi)核線程是一一對(duì)應(yīng)的,當(dāng)從程序入口點(diǎn)(比如main函數(shù))啟動(dòng)后,操作系統(tǒng)就創(chuàng)建了一個(gè)進(jìn)程。這個(gè)main函數(shù)所在的線程就是主線程。在main函數(shù)內(nèi)當(dāng)我們使用高級(jí)語(yǔ)言創(chuàng)建一個(gè)用戶線程的時(shí)候,其實(shí)對(duì)應(yīng)創(chuàng)建了一個(gè)內(nèi)核線程
- 這種線程模型的優(yōu)點(diǎn)是,在多處理器上多個(gè)線程可以真正實(shí)現(xiàn)并行運(yùn)行,并且當(dāng)一個(gè)線程由于網(wǎng)絡(luò)IO等原因被阻塞時(shí),其他線程不受影響
- 缺點(diǎn)是由于一般操作系統(tǒng)會(huì)限制內(nèi)核線程的個(gè)數(shù),所以用戶線程的個(gè)數(shù)會(huì)受到限制。另外由于用戶線程與系統(tǒng)線程一一對(duì)應(yīng),當(dāng)用戶線程比如執(zhí)行IO操作(執(zhí)行系統(tǒng)調(diào)用)時(shí),需要從用戶態(tài)的用戶程序執(zhí)行切換到內(nèi)核態(tài)執(zhí)行內(nèi)核操作,然后等執(zhí)行完畢后又會(huì)從內(nèi)核態(tài)切換到用戶態(tài)執(zhí)行用戶程序,而這個(gè)切換操作開(kāi)銷是比較大的
- 另外這里提下,Java的線程模型就是使用的這種一對(duì)一的模型,所以Java中多線程對(duì)共享變量使用鎖同步時(shí)會(huì)導(dǎo)致獲取鎖失敗的線程進(jìn)行上下文切換,而JUC包提供的無(wú)鎖CAS操作則不會(huì)產(chǎn)生上下文切換
-
多對(duì)一模型
- 多對(duì)一模型是指多個(gè)用戶線程對(duì)應(yīng)一個(gè)內(nèi)核線程,同時(shí)同一個(gè)用戶線程只能對(duì)應(yīng)一個(gè)內(nèi)核線程,這時(shí)候?qū)?yīng)同一個(gè)內(nèi)核線程的多個(gè)用戶線程的上下文切換是由用戶態(tài)的運(yùn)行時(shí)線程庫(kù)來(lái)做的,而不是由操作系統(tǒng)調(diào)度系統(tǒng)來(lái)做的
- 這種模型的好處是由于上下文切換在用戶態(tài),因而切換速度很快,開(kāi)銷很??;另外,可創(chuàng)建的用戶線程的數(shù)量可以很多,只受內(nèi)存大小限制
- 這種模型由于多個(gè)用戶線程對(duì)應(yīng)一個(gè)內(nèi)核線程,當(dāng)該內(nèi)核線程對(duì)應(yīng)的一個(gè)用戶線程被阻塞掛起時(shí),該內(nèi)核線程對(duì)應(yīng)的其他用戶線程也不能運(yùn)行,因?yàn)檫@時(shí)候內(nèi)核線程已經(jīng)被阻塞掛起了。另外這種模型并不能很好地利用多核CPU進(jìn)行并發(fā)運(yùn)行
-
多對(duì)多模型
- 多對(duì)多模型則結(jié)合一對(duì)一和多對(duì)一模型的特點(diǎn),讓大量的用戶線程對(duì)應(yīng)少數(shù)幾個(gè)內(nèi)核線程
- 這時(shí)候每個(gè)內(nèi)核線程對(duì)應(yīng)多個(gè)用戶線程,每個(gè)用戶線程又可以對(duì)應(yīng)多個(gè)內(nèi)核線程,當(dāng)一個(gè)用戶線程阻塞后,其對(duì)應(yīng)的當(dāng)前內(nèi)核線程會(huì)被阻塞,但是被阻塞的內(nèi)核線程對(duì)應(yīng)的其他用戶線程可以切換到其他內(nèi)核線程上繼續(xù)運(yùn)行,所以多對(duì)多模型是可以充分利用多核CPU提升運(yùn)行效能的
- 另外多對(duì)多模型也對(duì)用戶線程個(gè)數(shù)沒(méi)有限制,理論上只要內(nèi)存夠用可以無(wú)限創(chuàng)建
-
Go線程模型屬于多對(duì)多線程模型
- 每個(gè)內(nèi)核線程對(duì)應(yīng)多個(gè)用戶線程,每個(gè)用戶線程又可以對(duì)應(yīng)多個(gè)內(nèi)核線程,當(dāng)一個(gè)用戶線程阻塞后,其對(duì)應(yīng)的當(dāng)前內(nèi)核線程會(huì)被阻塞,但是被阻塞的內(nèi)核線程對(duì)應(yīng)的其他用戶線程可以切換到其他內(nèi)核線程上繼續(xù)運(yùn)行,所以多對(duì)多模型是可以充分利用多核CPU提升運(yùn)行效能的
- Go中使用Go語(yǔ)句創(chuàng)建的goroutine可以認(rèn)為是輕量級(jí)的用戶線程。Go線程模型包含3個(gè)概念:內(nèi)核線程(M-Machine)、goroutine(G-Goroutine)和邏輯處理器(P-Processor)。在Go中每個(gè)邏輯處理器(P)會(huì)綁定到某一個(gè)內(nèi)核線程上,每個(gè)邏輯處理器(P)內(nèi)有一個(gè)本地隊(duì)列,用來(lái)存放Go運(yùn)行時(shí)分配的goroutine。在上面介紹的多對(duì)多線程模型中是操作系統(tǒng)調(diào)度線程在物理CPU上運(yùn)行,在Go中則是Go的運(yùn)行時(shí)調(diào)度goroutine在邏輯處理器(P)上運(yùn)行
- 在Go中存在兩級(jí)調(diào)度,一級(jí)是操作系統(tǒng)的調(diào)度系統(tǒng),該調(diào)度系統(tǒng)調(diào)度邏輯處理器占用CPU時(shí)間片運(yùn)行;一級(jí)是Go的運(yùn)行時(shí)調(diào)度系統(tǒng),該調(diào)度系統(tǒng)調(diào)度某個(gè)goroutine在邏輯處理上運(yùn)行
- 使用Go語(yǔ)句創(chuàng)建一個(gè)goroutine后,創(chuàng)建的goroutine會(huì)被放入Go運(yùn)行時(shí)調(diào)度器的全局運(yùn)行隊(duì)列中,然后Go運(yùn)行時(shí)調(diào)度器會(huì)把全局隊(duì)列中的goroutine分配給不同的邏輯處理器(P),分配的goroutine會(huì)被放到邏輯處理器(P)的本地隊(duì)列中,當(dāng)本地隊(duì)列中某個(gè)goroutine就緒后,待分配到時(shí)間片后就可以在邏輯處理器上運(yùn)行了
- 為了避免某些goroutine出現(xiàn)饑餓現(xiàn)象,被分配到某一個(gè)邏輯處理器(P)上的多個(gè)goroutine是分時(shí)在該邏輯處理器上運(yùn)行的,而不是獨(dú)占運(yùn)行直到結(jié)束
- goroutine內(nèi)部實(shí)現(xiàn)與在多個(gè)操作系統(tǒng)線程(OS線程)之間復(fù)用的協(xié)程(coroutine)一樣。如果一個(gè)goroutine阻塞OS線程,例如等待輸入,則該OS線程對(duì)應(yīng)的邏輯處理器(P)中的其他goroutine將遷移到其他OS線程,以便它們繼續(xù)運(yùn)行
- 假設(shè)goroutine1在執(zhí)行文件讀取操作,則goroutine1會(huì)導(dǎo)致內(nèi)核線程1阻塞,這時(shí)候Go運(yùn)行時(shí)調(diào)度器會(huì)把goroutine1所在的邏輯處理器1遷移到其他內(nèi)核線程上(這里是內(nèi)核線程2上),這時(shí)候邏輯處理器1上的goroutine2和goroutine3就不會(huì)受goroutine1的影響了。等goroutine1文件讀取操作完成后,goroutine1又會(huì)被Go運(yùn)行時(shí)調(diào)度系統(tǒng)重新放入邏輯處理器1的本地隊(duì)列
在Go中,使用go關(guān)鍵字跟上一個(gè)函數(shù),就創(chuàng)建了一個(gè)goroutine,每個(gè)goroutine可以認(rèn)為是一個(gè)輕量級(jí)的線程,其占用更少的堆棧空間
-
可以把通道理解為一個(gè)并發(fā)安全的隊(duì)列,生產(chǎn)者goroutine可以向通道里放入元素,消費(fèi)者goroutine可以從通道里獲取元素。
- 從隊(duì)列大小來(lái)看,通道可以分為有緩沖通道和無(wú)緩沖通道,無(wú)緩沖通道里最多有一個(gè)元素,有緩沖通道里面可以有很多元素
- 另外,通道還是有方向的
- 通道是可以關(guān)閉的
Go中以消息進(jìn)行通信的方式允許程序員安全地協(xié)調(diào)多個(gè)并發(fā)任務(wù),并且容易理解語(yǔ)義和控制流,這通常比其他語(yǔ)言(如Java)中的回調(diào)函數(shù)(callback)或共享內(nèi)存方式更優(yōu)雅簡(jiǎn)單
-
Go的并發(fā)原語(yǔ)使構(gòu)建流式數(shù)據(jù)管道變得很容易,從而使IO操作和多核CPU更加有效
- 管道是由一系列節(jié)點(diǎn)組成,這些節(jié)點(diǎn)使用通道連接起來(lái)。其中每個(gè)節(jié)點(diǎn)是一組運(yùn)行相同功能的goroutine,在每個(gè)階段goroutine從上游通道獲取元素,然后對(duì)該數(shù)據(jù)執(zhí)行某些操作,然后把操作后的結(jié)果再寫入下游的通道。除了第一個(gè)和最后一個(gè)節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)可以有任意多個(gè)輸入和輸出通道,第一個(gè)節(jié)點(diǎn)有時(shí)候被稱為數(shù)據(jù)源或者生產(chǎn)者,最后一個(gè)節(jié)點(diǎn)被稱為數(shù)據(jù)終點(diǎn)或者消費(fèi)者
- 如果你對(duì)流式編程有經(jīng)驗(yàn)的話,可能會(huì)發(fā)現(xiàn)管道和反應(yīng)式庫(kù)比如RxJava中的流式編程很相似
- 借助Go中的并發(fā)原語(yǔ)goroutine與通道,可以非常方便地構(gòu)建異步非阻塞、具有回壓功能的程序
-