akka編程demo

AKKA

akka基于actor模型, 是一個(gè)用于構(gòu)建可擴(kuò)展的彈性的快速響應(yīng)的應(yīng)用程序的平臺(tái);
actor模型:是一個(gè)并行計(jì)算模型。 它把a(bǔ)ctor作為基本元素來(lái)對(duì)待:未響應(yīng)一個(gè)接收到的消息,一個(gè)actor能夠自己做出一些決策,如創(chuàng)建更多的actor或者發(fā)送更多的消息

image.png

概念介紹

Actor:

actor是akka中最核心的概念,它是一個(gè)封裝了狀態(tài)和行為的對(duì)象,actor之間可以通過(guò)交換消息的方式進(jìn)行通信,每個(gè)actor都有自己的收件箱,通過(guò)actor能夠簡(jiǎn)化鎖及線(xiàn)程管理,actor具有如下特性:

  • 提供了一種高級(jí)抽象,能夠簡(jiǎn)化在并發(fā)/并行應(yīng)用場(chǎng)景下的編程開(kāi)發(fā)
  • 提供了異步非阻塞、高性能的事件驅(qū)動(dòng)編程模型
  • 超輕量級(jí)事件處理(每GB堆內(nèi)存幾百萬(wàn)actor)

類(lèi)介紹

ActorSystem

在A(yíng)kka中,ActorSystem是一個(gè)重量級(jí)的結(jié)構(gòu),他需要分配多個(gè)線(xiàn)程,所以實(shí)際應(yīng)用中,actorSystem一般是單例對(duì)象,我們通過(guò)ActorSystem創(chuàng)建很多actor,負(fù)責(zé)創(chuàng)建和監(jiān)督actor

Actor

Actor負(fù)責(zé)通信,它包含一些重要的生命周期方法:

  • preStart(): 在A(yíng)ctor對(duì)象構(gòu)造方法執(zhí)行后執(zhí)行
  • receive(): 在actor的preStart方法執(zhí)行完成后執(zhí)行,用于接收消息,會(huì)被反復(fù)執(zhí)行

Demo

使用akka做一個(gè)簡(jiǎn)易的通信模型,實(shí)現(xiàn)一個(gè)主從結(jié)構(gòu)通信

Master

主對(duì)象類(lèi),即注冊(cè)中心,統(tǒng)計(jì)當(dāng)前在線(xiàn)的worker數(shù)目

package akkaDemo

import akka.actor._
import com.typesafe.config.ConfigFactory
import org.apache.commons.cli.{GnuParser, Options}

import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration._
/**
  * @author phil.zhang
  */
class Master extends Actor {

  // workMap
  private val workerMap = new mutable.HashMap[Int, WorkerInfo]()

  private val workerList = new ListBuffer[WorkerInfo]()

  override def preStart(): Unit = {
    println("master 已經(jīng)啟動(dòng)")

    import context.dispatcher
    // 循環(huán)檢查心跳
    context.system.scheduler.schedule(0 millis, 10 seconds, self, Check)
  }

  override def receive: Receive = {
    // 接受注冊(cè)信息并統(tǒng)計(jì)
    case RegisterMessage(workId, memory, cores) => {
      val info = new WorkerInfo(workId, memory, cores)
      info.lastHeartBeatTime = System.currentTimeMillis()
      workerMap.put(workId, info)
      workerList += info
      val size = workerList.size
      println(info)
      println(s"worker$workId 注冊(cè)成功,當(dāng)前worker共:$size")
      sender ! RegisterdMessage("注冊(cè)成功")
    }
      // 檢查心跳
    case Check => {
      val now = System.currentTimeMillis()
      val outTimeList = workerList.filter(worker => now - worker.lastHeartBeatTime > 5000)
      outTimeList.foreach(workerInfo => {
        workerList -= workerInfo
        workerMap.remove(workerInfo.workerId)
        println("移除" + workerInfo.workerId)
      })
    }
      // 接受心跳后更新心跳時(shí)間
    case SendHeartBeat(workId) => {
      if (workerMap.contains(workId)) {
        val workerInfo = workerMap(workId)
        workerInfo.lastHeartBeatTime=System.currentTimeMillis()
      }
    }
  }
}

object Master {

  def main(args: Array[String]): Unit = {
    val options = new Options()
    options.addOption("h", true, "host")
    options.addOption("p", true, "port")

    val parser = new GnuParser()
    val line = parser.parse(options, args)

    val host = line.getOptionValue("h")
    val port = line.getOptionValue("p").toInt

    val configStr=
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
      """.stripMargin

    val config=ConfigFactory.parseString(configStr)
    val actorSystem = ActorSystem("actorSystem", config)
    val master = actorSystem.actorOf(Props(new Master), "master")
  }

}

Worker

工作對(duì)象類(lèi), 向主類(lèi)注冊(cè),并保持心跳

package akkaDemo

import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import org.apache.commons.cli.{GnuParser, Options}

import scala.concurrent.duration._

/**
  * @author phil.zhang
  * @date 2020/2/6
  */
class Worker(val memory:Int,val cores:Int,val masterHost:String,val masterPort:String) extends Actor{

  var master:ActorSelection = _

  override def preStart(): Unit = {

    // actorSystem 是master的ActorSystem的名字, master是masterActor的名字
    master=context.actorSelection(s"akka.tcp://actorSystem@$masterHost:$masterPort/user/master")
    // 向主類(lèi)注冊(cè)
    master ! RegisterMessage(1, memory, cores)
    println("worker注冊(cè)")
  }

  override def receive: Receive = {
    // 主類(lèi)注冊(cè)返回信息
    case RegisterdMessage(message) => {
      println("worker" + message)

      import context.dispatcher
      // 循環(huán)發(fā)起心跳
      context.system.scheduler.schedule(0 millis, 2 seconds,self, HeartBeat)
    }
      // 發(fā)送心跳
    case HeartBeat => {
      master ! SendHeartBeat(1)
    }
  }
}

object Worker {

  def main(args: Array[String]): Unit = {
    val options = new Options()
    options.addOption("mh",true, "master host")
    options.addOption("mp",true, "master port")
    options.addOption("h",true, "host")
    options.addOption("p",true, "host")
    options.addOption("m",true, "memory")
    options.addOption("c",true, "cores")

    val parser = new GnuParser()
    val line = parser.parse(options, args)

    val m_host = line.getOptionValue("mh")
    val m_port = line.getOptionValue("mp")
    val host = line.getOptionValue("h")
    val port = line.getOptionValue("p")
    val memory = line.getOptionValue("m").toInt
    val cores = line.getOptionValue("c").toInt

    val configStr=
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
      """.stripMargin

    val config=ConfigFactory.parseString(configStr)

    val actorSystem = ActorSystem("workerActorSystem", config)

    val worker = actorSystem.actorOf(Props(new Worker(memory,cores,m_host,m_port)),"worker")

  }
}

WorkerInfo

工作對(duì)象信息類(lèi), 用于描述工作對(duì)象

package akkaDemo

/**
  * @author phil.zhang
  * @date 2020/2/6
  */
class WorkerInfo(val workerId:Int,val memory: Int,val cores:Int) {

  // 用于記錄上次心跳時(shí)間
  var lastHeartBeatTime:Long = _
  
  override def toString = s"$workerId,$memory,$cores"
}

Message

定義了一些信息類(lèi)型

package akkaDemo

/**
  * @author phil.zhang
  * @date 2020/2/6
  */
trait Message extends Serializable{

}

// slave發(fā)給master的心跳信息
case class SendHeartBeat(workId: Int) extends Message

// slave發(fā)給master的注冊(cè)信息
case class RegisterMessage(workId: Int, memory: Int, cores: Int) extends Message

// master發(fā)給slave的注冊(cè)反饋信息
case class RegisterdMessage(message: String) extends Message

// master發(fā)給自己的檢查信息, 所以不需要序列化
case object Check

// slave發(fā)給自己的心跳信息,所以不需要序列化
case object HeartBeat

主要maven依賴(lài)

    <dependency>
      <groupId>commons-cli</groupId>
      <artifactId>commons-cli</artifactId>
      <version>1.2</version>
    </dependency>

    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-actor_2.11</artifactId>
      <version>2.5.3</version>
    </dependency>
    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-remote_2.11</artifactId>
      <version>2.5.3</version>
    </dependency>
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 持久化 當(dāng)我們?cè)诩合到y(tǒng)中,一臺(tái)機(jī)器向另一臺(tái)機(jī)器發(fā)送一段數(shù)據(jù),負(fù)責(zé)接收的機(jī)器在接收數(shù)據(jù)前突然宕機(jī),就會(huì)造成數(shù)據(jù)丟失...
    mango_knight閱讀 4,921評(píng)論 0 4
  • 前言 一 不得不說(shuō)的Actor模型 1.1 Actor模型的誕生與發(fā)展 1.2 Actor模型是什么? 1.3 A...
    hedgehog1112閱讀 505評(píng)論 0 0
  • 這篇文章主要介紹了Flink通過(guò)Akka實(shí)現(xiàn)的分布式通信。它第一次在0.9版本中出現(xiàn)。通過(guò)Akka,所有的遠(yuǎn)程程序...
    alvin_wang閱讀 10,070評(píng)論 0 12
  • 傳統(tǒng)的游戲服務(wù)器要么是單線(xiàn)程要么是多線(xiàn)程,過(guò)去幾十年里CPU一直遵循摩爾定律發(fā)展,帶來(lái)的結(jié)果是單核頻率越來(lái)越高。而...
    JunChow520閱讀 67,481評(píng)論 14 58
  • Actor系統(tǒng)的實(shí)體 在A(yíng)ctor系統(tǒng)中,actor之間具有樹(shù)形的監(jiān)管結(jié)構(gòu),并且actor可以跨多個(gè)網(wǎng)絡(luò)節(jié)點(diǎn)進(jìn)行透...
    JasonDing閱讀 3,530評(píng)論 2 6

友情鏈接更多精彩內(nèi)容