AKKA
akka基于actor模型, 是一個(gè)用于構(gòu)建可擴(kuò)展的彈性的快速響應(yīng)的應(yīng)用程序的平臺(tái);
actor模型:是一個(gè)并行計(jì)算模型。 它把a(bǔ)ctor作為基本元素來(lái)對(duì)待:未響應(yīng)一個(gè)接收到的消息,一個(gè)actor能夠自己做出一些決策,如創(chuàng)建更多的actor或者發(fā)送更多的消息

概念介紹
Actor:
actor是akka中最核心的概念,它是一個(gè)封裝了狀態(tài)和行為的對(duì)象,actor之間可以通過(guò)交換消息的方式進(jìn)行通信,每個(gè)actor都有自己的收件箱,通過(guò)actor能夠簡(jiǎn)化鎖及線(xiàn)程管理,actor具有如下特性:
- 提供了一種高級(jí)抽象,能夠簡(jiǎn)化在并發(fā)/并行應(yīng)用場(chǎng)景下的編程開(kāi)發(fā)
- 提供了異步非阻塞、高性能的事件驅(qū)動(dòng)編程模型
- 超輕量級(jí)事件處理(每GB堆內(nèi)存幾百萬(wàn)actor)
類(lèi)介紹
ActorSystem
在A(yíng)kka中,ActorSystem是一個(gè)重量級(jí)的結(jié)構(gòu),他需要分配多個(gè)線(xiàn)程,所以實(shí)際應(yīng)用中,actorSystem一般是單例對(duì)象,我們通過(guò)ActorSystem創(chuàng)建很多actor,負(fù)責(zé)創(chuàng)建和監(jiān)督actor
Actor
Actor負(fù)責(zé)通信,它包含一些重要的生命周期方法:
- preStart(): 在A(yíng)ctor對(duì)象構(gòu)造方法執(zhí)行后執(zhí)行
- receive(): 在actor的preStart方法執(zhí)行完成后執(zhí)行,用于接收消息,會(huì)被反復(fù)執(zhí)行
Demo
使用akka做一個(gè)簡(jiǎn)易的通信模型,實(shí)現(xiàn)一個(gè)主從結(jié)構(gòu)通信
Master
主對(duì)象類(lèi),即注冊(cè)中心,統(tǒng)計(jì)當(dāng)前在線(xiàn)的worker數(shù)目
package akkaDemo
import akka.actor._
import com.typesafe.config.ConfigFactory
import org.apache.commons.cli.{GnuParser, Options}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration._
/**
* @author phil.zhang
*/
class Master extends Actor {
// workMap
private val workerMap = new mutable.HashMap[Int, WorkerInfo]()
private val workerList = new ListBuffer[WorkerInfo]()
override def preStart(): Unit = {
println("master 已經(jīng)啟動(dòng)")
import context.dispatcher
// 循環(huán)檢查心跳
context.system.scheduler.schedule(0 millis, 10 seconds, self, Check)
}
override def receive: Receive = {
// 接受注冊(cè)信息并統(tǒng)計(jì)
case RegisterMessage(workId, memory, cores) => {
val info = new WorkerInfo(workId, memory, cores)
info.lastHeartBeatTime = System.currentTimeMillis()
workerMap.put(workId, info)
workerList += info
val size = workerList.size
println(info)
println(s"worker$workId 注冊(cè)成功,當(dāng)前worker共:$size")
sender ! RegisterdMessage("注冊(cè)成功")
}
// 檢查心跳
case Check => {
val now = System.currentTimeMillis()
val outTimeList = workerList.filter(worker => now - worker.lastHeartBeatTime > 5000)
outTimeList.foreach(workerInfo => {
workerList -= workerInfo
workerMap.remove(workerInfo.workerId)
println("移除" + workerInfo.workerId)
})
}
// 接受心跳后更新心跳時(shí)間
case SendHeartBeat(workId) => {
if (workerMap.contains(workId)) {
val workerInfo = workerMap(workId)
workerInfo.lastHeartBeatTime=System.currentTimeMillis()
}
}
}
}
object Master {
def main(args: Array[String]): Unit = {
val options = new Options()
options.addOption("h", true, "host")
options.addOption("p", true, "port")
val parser = new GnuParser()
val line = parser.parse(options, args)
val host = line.getOptionValue("h")
val port = line.getOptionValue("p").toInt
val configStr=
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val config=ConfigFactory.parseString(configStr)
val actorSystem = ActorSystem("actorSystem", config)
val master = actorSystem.actorOf(Props(new Master), "master")
}
}
Worker
工作對(duì)象類(lèi), 向主類(lèi)注冊(cè),并保持心跳
package akkaDemo
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import org.apache.commons.cli.{GnuParser, Options}
import scala.concurrent.duration._
/**
* @author phil.zhang
* @date 2020/2/6
*/
class Worker(val memory:Int,val cores:Int,val masterHost:String,val masterPort:String) extends Actor{
var master:ActorSelection = _
override def preStart(): Unit = {
// actorSystem 是master的ActorSystem的名字, master是masterActor的名字
master=context.actorSelection(s"akka.tcp://actorSystem@$masterHost:$masterPort/user/master")
// 向主類(lèi)注冊(cè)
master ! RegisterMessage(1, memory, cores)
println("worker注冊(cè)")
}
override def receive: Receive = {
// 主類(lèi)注冊(cè)返回信息
case RegisterdMessage(message) => {
println("worker" + message)
import context.dispatcher
// 循環(huán)發(fā)起心跳
context.system.scheduler.schedule(0 millis, 2 seconds,self, HeartBeat)
}
// 發(fā)送心跳
case HeartBeat => {
master ! SendHeartBeat(1)
}
}
}
object Worker {
def main(args: Array[String]): Unit = {
val options = new Options()
options.addOption("mh",true, "master host")
options.addOption("mp",true, "master port")
options.addOption("h",true, "host")
options.addOption("p",true, "host")
options.addOption("m",true, "memory")
options.addOption("c",true, "cores")
val parser = new GnuParser()
val line = parser.parse(options, args)
val m_host = line.getOptionValue("mh")
val m_port = line.getOptionValue("mp")
val host = line.getOptionValue("h")
val port = line.getOptionValue("p")
val memory = line.getOptionValue("m").toInt
val cores = line.getOptionValue("c").toInt
val configStr=
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val config=ConfigFactory.parseString(configStr)
val actorSystem = ActorSystem("workerActorSystem", config)
val worker = actorSystem.actorOf(Props(new Worker(memory,cores,m_host,m_port)),"worker")
}
}
WorkerInfo
工作對(duì)象信息類(lèi), 用于描述工作對(duì)象
package akkaDemo
/**
* @author phil.zhang
* @date 2020/2/6
*/
class WorkerInfo(val workerId:Int,val memory: Int,val cores:Int) {
// 用于記錄上次心跳時(shí)間
var lastHeartBeatTime:Long = _
override def toString = s"$workerId,$memory,$cores"
}
Message
定義了一些信息類(lèi)型
package akkaDemo
/**
* @author phil.zhang
* @date 2020/2/6
*/
trait Message extends Serializable{
}
// slave發(fā)給master的心跳信息
case class SendHeartBeat(workId: Int) extends Message
// slave發(fā)給master的注冊(cè)信息
case class RegisterMessage(workId: Int, memory: Int, cores: Int) extends Message
// master發(fā)給slave的注冊(cè)反饋信息
case class RegisterdMessage(message: String) extends Message
// master發(fā)給自己的檢查信息, 所以不需要序列化
case object Check
// slave發(fā)給自己的心跳信息,所以不需要序列化
case object HeartBeat
主要maven依賴(lài)
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.5.3</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>2.5.3</version>
</dependency>