Akka系列(三):監(jiān)管與容錯

Akka作為一種成熟的生產(chǎn)環(huán)境并發(fā)解決方案,必須擁有一套完善的錯誤異常處理機制,本文主要講講Akka中的監(jiān)管和容錯。

監(jiān)管

看過我上篇文章的同學(xué)應(yīng)該對Actor系統(tǒng)的工作流程有了一定的了解Akka系列(二):Akka中的Actor系統(tǒng),它的很重要的概念就是分而治之,既然我們把任務(wù)分配給Actor去執(zhí)行,那么我們必須去監(jiān)管相應(yīng)的Actor,當(dāng)Actor出現(xiàn)了失敗,比如系統(tǒng)環(huán)境錯誤,各種異常,能根據(jù)我們制定的相應(yīng)監(jiān)管策略進行錯誤恢復(fù),就是后面我們會說到的容錯。

監(jiān)管者

既然有監(jiān)管這一事件,那必然存在著監(jiān)管者這么一個角色,那么在ActorSystem中是如何確定這種角色的呢?

我們先來看下ActorSystem中的頂級監(jiān)管者:

actor-syatem-guardian.png

一個actor系統(tǒng)在其創(chuàng)建過程中至少要啟動三個actor,如上圖所示,下面來說說這三個Actor的功能:

1./: 根監(jiān)管者

顧名思義,它是一個老大,它監(jiān)管著ActorSystem中所有的頂級Actor,頂級Actor有以下幾種:

  • /user: 是所有由用戶創(chuàng)建的頂級actor的監(jiān)管者;用ActorSystem.actorOf創(chuàng)建的actor在其下。
  • /system: 是所有由系統(tǒng)創(chuàng)建的頂級actor的監(jiān)管者,如日志監(jiān)聽器,或由配置指定在actor系統(tǒng)啟動時自動部署的actor。
  • /deadLetters: 是死信actor,所有發(fā)往已經(jīng)終止或不存在的actor的消息會被重定向到這里。
  • /temp:是所有系統(tǒng)創(chuàng)建的短時actor的監(jiān)管者,例如那些在ActorRef.ask的實現(xiàn)中用到的actor。
  • /remote: 是一個人造虛擬路徑,用來存放所有其監(jiān)管者是遠程actor引用的actor。

跟我們平常打交道最多的就是/user,它是我們在程序中用ActorSystem.actorOf創(chuàng)建的actor的監(jiān)管者,下面的容錯我們重點關(guān)心的就是它下面的失敗處理,其他幾種頂級Actor具體功能定義已經(jīng)給出,有興趣的也可以去了解一下。

根監(jiān)管者監(jiān)管著所有頂級Actor,對它們的各種失敗情況進行處理,一般來說如果錯誤要上升到根監(jiān)管者,整個系統(tǒng)就會停止。

2./user: 頂級actor監(jiān)管者

上面已經(jīng)講過/user是所有由用戶創(chuàng)建的頂級actor的監(jiān)管者,即用ActorSystem.actorOf創(chuàng)建的actor,我們可以自己制定相應(yīng)的監(jiān)管策略,但由于它是actor系統(tǒng)啟動時就產(chǎn)生的,所以我們需要在相應(yīng)的配置文件里配置,具體的配置可以參考這里Akka配置

3./system: 系統(tǒng)監(jiān)管者

/system所有由系統(tǒng)創(chuàng)建的頂級actor的監(jiān)管者,比如Akka中的日志監(jiān)聽器,因為在Akka中日志本身也是用Actor實現(xiàn)的,/system的監(jiān)管策略如下:對收到的除ActorInitializationExceptionActorKilledException之外的所有Exception無限地執(zhí)行重啟,當(dāng)然這也會終止其所有子actor。所有其他Throwable被上升到根監(jiān)管者,然后整個actor系統(tǒng)將會關(guān)閉。

用戶創(chuàng)建的普通actor的監(jiān)管:

上一篇文章介紹了Actor系統(tǒng)的組織結(jié)構(gòu),它是一種樹形結(jié)構(gòu),其實這種結(jié)構(gòu)對actor的監(jiān)管是非常有利的,Akka實現(xiàn)的是一種叫“父監(jiān)管”的形式,每一個被創(chuàng)建的actor都由其父親所監(jiān)管,這種限制使得actor的監(jiān)管結(jié)構(gòu)隱式符合其樹形結(jié)構(gòu),所以我們可以得出一個結(jié)論:

一個被創(chuàng)建的Actor肯定是一個被監(jiān)管者,也可能是一個監(jiān)管者,它監(jiān)管著它的子級Actor

監(jiān)管策略

上面我們對ActorSystem中的監(jiān)管角色有了一定的了解,那么到底是如何制定相應(yīng)的監(jiān)管策略呢?Akka中有以下4種策略:

  • 恢復(fù)下屬,保持下屬當(dāng)前積累的內(nèi)部狀態(tài)
  • 重啟下屬,清除下屬的內(nèi)部狀態(tài)
  • 永久地停止下屬
  • 升級失?。ㄑ乇O(jiān)管樹向上傳遞失?。?,由此失敗自己

這其實很好理解,下面是一個簡單例子:

 override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: ArithmeticException => Resume  //恢復(fù)
      case _: NullPointerException => Restart //重啟
      case _: IllegalArgumentException => Stop //停止
      case _: Exception => Escalate  //向上級傳遞
    }

我們可以根據(jù)異常的不同使用不同監(jiān)管策略,在后面我會具體給出一個示例程序幫助大家理解。我們在實現(xiàn)自己的策略時,需要復(fù)寫Actor中的supervisorStrategy,因為Actor的默認監(jiān)管策略如下:

  final val defaultDecider: Decider = {
    case _: ActorInitializationException ? Stop
    case _: ActorKilledException         ? Stop
    case _: DeathPactException           ? Stop
    case _: Exception                    ? Restart
  }

它對除了它指定的異常進行停止,其他異常都是對下屬進行重啟。

Akka中有兩種類型的監(jiān)管策略:OneForOneStrategyAllForOneStrategy,它們的主要區(qū)別在于:

  • OneForOneStrategy: 該策略只會應(yīng)用到發(fā)生故障的子actor上。
  • AllForOneStrategy: 該策略會應(yīng)用到所有的子actor上。

我們一般都使用OneForOneStrategy來進行制定相關(guān)監(jiān)管策略,當(dāng)然你也可以根據(jù)具體需求選擇合適的策略。另外我們可以給我們的策略配置相應(yīng)參數(shù),比如上面maxNrOfRetries,withinTimeRange等,這里的含義是每分鐘最多進行10次重啟,若超出這個界限相應(yīng)的Actor將會被停止,當(dāng)然你也可以使用策略的默認配置,具體配置信息可以參考源碼。

監(jiān)管容錯示例

本示例主要演示Actor在發(fā)生錯誤時,它的監(jiān)管者會根據(jù)相應(yīng)的監(jiān)管策略進行不同的處理。源碼鏈接

因為這個例子比較簡單,這里我直接貼上相應(yīng)代碼,后面根據(jù)具體的測試用例來解釋各種監(jiān)管策略所進行的響應(yīng):

class Supervisor extends Actor {
  //監(jiān)管下屬,根據(jù)下屬拋出的異常進行相應(yīng)的處理
  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: ArithmeticException => Resume
      case _: NullPointerException => Restart
      case _: IllegalArgumentException => Stop
      case _: Exception => Escalate
    }
  var childIndex = 0 //用于標(biāo)示下屬Actor的序號

  def receive = {
    case p: Props =>
      childIndex += 1
      //返回一個Child Actor的引用,所以Supervisor Actor是Child Actor的監(jiān)管者
      sender() ! context.actorOf(p,s"child${childIndex}")
  }
}

class Child extends Actor {
  val log = Logging(context.system, this)
  var state = 0
  def receive = {
    case ex: Exception => throw ex //拋出相應(yīng)的異常
    case x: Int => state = x //改變自身狀態(tài)
    case s: Command if s.content == "get" =>
      log.info(s"the ${s.self} state is ${state}")
      sender() ! state //返回自身狀態(tài)
  }
}

case class Command(  //相應(yīng)命令
    content: String,
    self: String
)

現(xiàn)在我們來看看具體的測試用例:
首先我們先構(gòu)建一個測試環(huán)境:

class GuardianSpec(_system: ActorSystem)
    extends TestKit(_system)
    with WordSpecLike
    with Matchers
    with ImplicitSender {

  def this() = this(ActorSystem("GuardianSpec"))

  "A supervisor" must {

    "apply the chosen strategy for its child" in {
        code here...
        val supervisor = system.actorOf(Props[Supervisor], "supervisor") //創(chuàng)建一個監(jiān)管者
        supervisor ! Props[Child]
        val child = expectMsgType[ActorRef] // 從 TestKit 的 testActor 中獲取回應(yīng)
    } 
  }
}

1.TestOne:正常運行

child ! 50 // 將狀態(tài)設(shè)為 50
child ! Command("get",child.path.name)
expectMsg(50)

正常運行,測試通過。

2.TestTwo:拋出ArithmeticException

child ! new ArithmeticException // crash it
child ! Command("get",child.path.name)
expectMsg(50)     

大家猜這時候測試會通過嗎?答案是通過,原因是根據(jù)我們制定的監(jiān)管策略,監(jiān)管者在面對子級Actor拋出ArithmeticException異常時,它會去恢復(fù)相應(yīng)出異常的Actor,并保持該Actor的狀態(tài),所以此時Actor的狀態(tài)值還是50,測試通過。

3.TestThree:拋出NullPointerException

child ! new NullPointerException // crash it harder
child ! "get"
expectMsg(50)   

這種情況下測試還會通過嗎?答案是不通過,原因是根據(jù)我們制定的監(jiān)管策略,監(jiān)管者在面對子級Actor拋出NullPointerException異常時,它會去重啟相應(yīng)出異常的Actor,其狀態(tài)會被清除,所以此時Actor的狀態(tài)值應(yīng)該是0,測試不通過。

4.TestFour:拋出IllegalArgumentException

supervisor ! Props[Child] // create new child
val child2 = expectMsgType[ActorRef]
child2 ! 100 // 將狀態(tài)設(shè)為 100
watch(child) // have testActor watch “child”
child ! new IllegalArgumentException // break it
expectMsgPF() {
  case Terminated(`child`) => (println("the child stop"))
}
child2 ! Command("get",child2.path.name)
expectMsg(100)   

這里首先我們又創(chuàng)建了一個Child Actor為child2,并將它的狀態(tài)置為100,這里我們監(jiān)控前面創(chuàng)建的child1,然后給其發(fā)送一個IllegalArgumentException的消息,讓其拋出該異常,測試結(jié)果:

the child stop
測試通過

從結(jié)果中我們可以看出,child在拋出IllegalArgumentException后,會被其監(jiān)管著停止,但監(jiān)管者下的其他Actor還是正常工作。

5.TestFive:拋出一個自定義異常

 watch(child2)
 child2 ! Command("get",child2.path.name) // verify it is alive
 expectMsg(100)
 supervisor ! Props[Child] // create new child
 val child3 = expectMsgType[ActorRef]
 child2 ! new Exception("CRASH") // escalate failure
 expectMsgPF() {
    case t @ Terminated(`child2`) if t.existenceConfirmed => (
       println("the child2 stop")
    )
}
child3 ! Command("get",child3.path.name)
expectMsg(0)  

這里首先我們又創(chuàng)建了一個Child Actor為child3,這里我們監(jiān)控前面創(chuàng)建的child2,然后給其發(fā)送一個Exception("CRASH")的消息,讓其拋出該異常,測試結(jié)果:

the child2 stop
測試不通過

很多人可能會疑惑為什么TestFour可以通過,這里就通不過不了呢?因為這里錯誤Actor拋出的異常其監(jiān)管者無法處理,只能將失敗上溯傳遞,而頂級actor的缺省策略是對所有的Exception情況(ActorInitializationException和ActorKilledException例外)進行重啟. 由于缺省的重啟指令會停止所有的子actor,所以我們這里的child3也會被停止。導(dǎo)致測試不通過。當(dāng)然這里你也可以復(fù)寫默認的重啟方法,比如:

override def preRestart(cause: Throwable, msg: Option[Any]) {}

這樣重啟相應(yīng)Actor時就不會停止其子級下的所有Actor了。

本文主要介紹了Actor系統(tǒng)中的監(jiān)管和容錯,這一部分內(nèi)容在Akka中也是很重要的,它與Actor的樹形組織結(jié)構(gòu)巧妙結(jié)合,本文大量參考了Akka官方文檔的相應(yīng)章節(jié),有興趣的同學(xué)可以點擊這里Akka docs。也可以下載我的示例程序,里面包含了一個官方的提供的容錯示例。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容