Spark存儲(chǔ)體系底層架構(gòu)剖析-Spark商業(yè)環(huán)境實(shí)戰(zhàn)

本套系列博客從真實(shí)商業(yè)環(huán)境抽取案例進(jìn)行總結(jié)和分享,并給出Spark商業(yè)應(yīng)用的調(diào)優(yōu)建議和集群環(huán)境容量規(guī)劃等內(nèi)容,請(qǐng)持續(xù)關(guān)注本套博客。版權(quán)聲明:本套Spark調(diào)優(yōu)系列版權(quán)歸作者(秦凱新)所有,禁止轉(zhuǎn)載,歡迎學(xué)習(xí)。

Spark商業(yè)環(huán)境實(shí)戰(zhàn)及調(diào)優(yōu)進(jìn)階系列

1. Spark存儲(chǔ)體系組件關(guān)系解釋

BlockInfoManger 主要提供讀寫(xiě)鎖控制,層級(jí)僅僅位于BlockManger之下,通常Spark讀寫(xiě)操作都先調(diào)用BlockManger,然后咨詢BlockInfoManger是否存在鎖競(jìng)爭(zhēng),然后才會(huì)調(diào)用DiskStore和MemStore,進(jìn)而調(diào)用DiskBlockManger來(lái)確定數(shù)據(jù)與位置映射,或者調(diào)用 MemoryManger來(lái)確定內(nèi)存池的軟邊界和內(nèi)存使用申請(qǐng)。

image

1.1 Driver 與 Executor 與 SparkEnv 與 BlockManger 組件關(guān)系:

Driver與 Executor 組件各自擁有任務(wù)執(zhí)行的SparkEnv環(huán)境,而每一個(gè)SparkEnv 中都有一個(gè)BlockManger負(fù)責(zé)存儲(chǔ)服務(wù),作為高層抽象,BlockManger 之間需要通過(guò) RPCEnv,ShuffleClient,及BlocakTransferService相互通訊。

1.1 BlockInfoManger 與 BlockInfo 共享鎖和排它鎖讀寫(xiě)控制關(guān)系:

BlockInfo中具有讀寫(xiě)鎖的標(biāo)志,通過(guò)標(biāo)志可以判斷是否進(jìn)行寫(xiě)控制

  val NO_WRITER: Long = -1
  val NON_TASK_WRITER: Long = -1024
  
 * The task attempt id of the task which currently holds the write lock for this block, or
 * [[BlockInfo.NON_TASK_WRITER]] if the write lock is held by non-task code, or
 * [[BlockInfo.NO_WRITER]] if this block is not locked for writing.
 
 def writerTask: Long = _writerTask
 def writerTask_=(t: Long): Unit = {
 _writerTask = t
    checkInvariants()

BlockInfoManager具有BlockId與BlockInfo的映射關(guān)系以及任務(wù)id與BlockId的鎖映射:

 private[this] val infos = new mutable.HashMap[BlockId, BlockInfo]  
 
 *Tracks the set of blocks that each task has locked for writing.
 private[this] val writeLocksByTask = new mutable.HashMap[TaskAttemptId, mutable.Set[BlockId]]
                                       with mutable.MultiMap[TaskAttemptId, BlockId]
 
 *Tracks the set of blocks that each task has locked for reading, along with the number of times
 *that a block has been locked (since our read locks are re-entrant).
 private[this] val readLocksByTask =
 new mutable.HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]

1.3 DiskBlockManager 與 DiskStore 組件關(guān)系:

可以看到DiskStore內(nèi)部會(huì)調(diào)用DiskBlockManager來(lái)確定Block的讀寫(xiě)位置:

  • 以下是DiskStore的抽象寫(xiě)操作,需要傳入FileOutputStream => Unit高階函數(shù):

      def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
      if (contains(blockId)) {
      throw new IllegalStateException(s"Block $blockId is already present in the disk store")
      }
      logDebug(s"Attempting to put block $blockId")
      val startTime = System.currentTimeMillis
      
      val file = diskManager.getFile(blockId)
      
      val fileOutputStream = new FileOutputStream(file)
      var threwException: Boolean = true
      try {
          writeFunc(fileOutputStream)
          threwException = false
      } finally {
       try {
          Closeables.close(fileOutputStream, threwException)
       } finally {
       if (threwException) {
        remove(blockId)
              }
          }
      }
      val finishTime = System.currentTimeMillis
      logDebug("Block %s stored as %s file on disk in %d ms".format(
      file.getName,
      Utils.bytesToString(file.length()),
      finishTime - startTime))
      }
    
  • 以下是DiskStore的讀操作,調(diào)用DiskBlockManager來(lái)獲取數(shù)據(jù)位置:

      def getBytes(blockId: BlockId): ChunkedByteBuffer = {
      
      val file = diskManager.getFile(blockId.name)
     
      val channel = new RandomAccessFile(file, "r").getChannel
      Utils.tryWithSafeFinally {
    * For small files, directly read rather than memory map
      if (file.length < minMemoryMapBytes) {
      val buf = ByteBuffer.allocate(file.length.toInt)
      channel.position(0)
      while (buf.remaining() != 0) {
        if (channel.read(buf) == -1) {
          throw new IOException("Reached EOF before filling buffer\n" +
            s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
        }
      }
      buf.flip()
      new ChunkedByteBuffer(buf)
      } else {
      new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
          }
      } {
      channel.close()
       }
      }
    

1.3 MemManager 與 MemStore 與 MemoryPool 組件關(guān)系:

在這里要強(qiáng)調(diào)的是:第一代大數(shù)據(jù)框架hadoop只將內(nèi)存作為計(jì)算資源,而Spark不僅將內(nèi)存作為計(jì)算資源外,還將內(nèi)存的一部分納入存儲(chǔ)體系:

  • 內(nèi)存池模型 :邏輯上分為堆內(nèi)存和堆外內(nèi)存,然后堆內(nèi)存(或堆外內(nèi)存)內(nèi)部又分為StorageMemoryPool和ExecutionMemoryPool。
  • MemManager是抽象的,定義了內(nèi)存管理器的接口規(guī)范,方便以后擴(kuò)展,比如:老版的StaticMemoryManager和新版的UnifiedMemoryManager.
  • MemStore 依賴于UnifiedMemoryManager進(jìn)行內(nèi)存的申請(qǐng)和軟邊界變化或內(nèi)存釋放。
  • MemStore 內(nèi)部同時(shí)負(fù)責(zé)存儲(chǔ)真實(shí)的對(duì)象,比如內(nèi)部成員變量:entries ,建立了內(nèi)存中的BlockId與MemoryEntry(Block的內(nèi)存的形式)之間的映射。
  • MemStore 內(nèi)部的“占座”行為,如:內(nèi)部變量offHeapUnrollMemoryMap 和onHeapUnrollMemoryMap。

1.4 BlockManagerMaster 與 BlockManager 組件關(guān)系:

  • BlockManagerMaster的作用就是對(duì)存在于Dirver或Executor上的BlockManger進(jìn)行統(tǒng)一管理,這簡(jiǎn)直是代理行為,因?yàn)樗钟蠦lockManagerMasterEndpointREf,進(jìn)而和BlockManagerMasterEndpoint進(jìn)行通訊。

2. Spark存儲(chǔ)體系組件BlockTransferServic傳輸服務(wù)

未完待續(xù)

3. 總結(jié)

存儲(chǔ)體系是Spark的基石,我爭(zhēng)取把每一塊細(xì)微的知識(shí)點(diǎn)進(jìn)行剖析,和大部分博客不同的是,我會(huì)盡量采用最平實(shí)的語(yǔ)言,畢竟技術(shù)就是一層窗戶紙。

秦凱新 20181031 凌晨

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容