Spark應(yīng)用分片介紹

引言

分布式計算的基本思路是將數(shù)據(jù)分為多個部分,將同樣的數(shù)據(jù)操作方式在數(shù)據(jù)的不同部分上執(zhí)行,分別獲得結(jié)果,然后通過“匯聚處理”的方式得到結(jié)果。如何將數(shù)據(jù)分為多個部分(也就是“分片”)便是其中的一個重要組成部分。Spark框架同樣對使用分片的操作,將數(shù)據(jù)分片(partition)處理。本文對Spark框架中的數(shù)據(jù)分片作簡單介紹。

輸入數(shù)據(jù)的分片

對于讀取批數(shù)據(jù)生成rdd的操作,數(shù)據(jù)的分片都是通過輸入文件格式本身提供的getSplit方法來對數(shù)據(jù)進(jìn)行分片。
本部分主要介紹對于不同數(shù)據(jù)源的數(shù)據(jù),spark如何定義/獲取數(shù)據(jù)的分片數(shù)。

text文件分片(sc.textFile為例):

def textFile(
  path: String,
  minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat]  /* 數(shù)據(jù)文件的輸入格式 :org.apache.hadoop.mapred.TextInputFormat */
, classOf[LongWritable], classOf[Text],
  minPartitions).map(pair => pair._2.toString).setName(path)
}
 hadoopFile方法生成HadoopRDD
 HadoopRDD(
  this,
  confBroadcast,
  Some(setInputPathsFunc),
  inputFormatClass,
  keyClass,
  valueClass,
  minPartitions) /* minPartitions為生成該RDD的最小分片數(shù),表示該RDD的分片數(shù)最小值,默認(rèn)為2*/
  .setName(path)

在執(zhí)行action方法(如count)時,spark應(yīng)用才真正開始計算,通過調(diào)用rdd.partitions.length計算出分片數(shù)

    def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}

通過跟蹤該方法可以看出該函數(shù)最終會調(diào)用到HadoopRDD的getPartitions方法,在該方法中通過inputFormat的getSplit方法計算分片數(shù)

    getInputFormat(jobConf).getSplits(jobConf, minPartitions)

TextInputFormat繼承至FileInputFormat,F(xiàn)ileInputFormat的getSplit方法網(wǎng)上有許多分析,這里不再展開,大致的原理是根據(jù)文件個數(shù),傳入的minpartitions,mapreduce.input.fileinputformat.split.minsize等參數(shù)計算出分片數(shù)。

hbase表分片

在讀取HBase數(shù)據(jù)時,沒有類似textFile的接口的封裝,可調(diào)用如下接口生成給予hbase數(shù)據(jù)的RDD,

val hBaseRDD = sc.newAPIHadoopRDD(conf, 
classOf[TableInputFormat], /*該類的全類名為:  org.apache.hadoop.hbase.mapreduce.TableInputFormat */
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],  
  classOf[org.apache.hadoop.hbase.client.Result])  
  
  該方法生成new NewHadoopRDD(this, fClass, kClass, vClass, jconf)

在執(zhí)行action操作時,同樣調(diào)用到rdd.partitions方法,跟蹤至newHadoopRDD之后,發(fā)現(xiàn)調(diào)用到

inputFormat.getSplits(new JobContextImpl(_conf, jobId))

查看對應(yīng)的getSplits方法可以看出:

默認(rèn)情況下(hbase.mapreduce.input.autobalance的值為false)hbase表如果存在多個region,則每個region設(shè)置為一個split。

如果設(shè)置了開啟均衡(設(shè)置hbase.mapreduce.input.autobalance的值為true:在hbase的region大小不均衡引發(fā)的數(shù)據(jù)傾斜,將導(dǎo)致不同的region處理耗時較多,該參數(shù)為了解決此場景),則會在每個region對應(yīng)一個split的基礎(chǔ)上,將較小(小于平均大?。┑膔egion進(jìn)行合并作為一個split,較大(大于平均size的三倍(其中三可配置))的region拆分為兩個region。

    splits偽代碼如下(源碼可參考TableInputFormatBase.calculateRebalancedSplits):
    
    while ( i < splits.size)
    {
        if(splits(i).size > averagesize * 3) {
        if(! splitAble)
            resultsplits.add(split(i))
        else{
            (split1,split2) = Split(splits(i))
            resultsplits.add(split1)
            resultsplits.add(split2)
        }
        i++ 
        }
        else if(splits(i).size > averagesize) {
            resultsplits.add(split(i))
            i++
        }else{
            startKey = split(i).getStartRow
            i++;
            while(totalSize + splits(i).size < averagesize * 3){
                totalSize += splits(i).size
                endKey = splits(i).getEndRow
            }
            resultsplits.add(new TableSplit(startKey,endKey,*))
        }
    }

Kafka數(shù)據(jù)的分片

Spark框架在讀取Kafka消息時,將Kafka數(shù)據(jù)抽象為KafkaRDD(SparkStreaming)或者KafkaSourceRDD(StructedStreaming),查看對應(yīng)RDD的getPartitions方法和定義:

KafkaSourceRDD:

    override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray
 }
 offsetRanges的數(shù)據(jù)結(jié)構(gòu)為
 private[kafka010] case class KafkaSourceRDDOffsetRange(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long,
preferredLoc: Option[String]) 

可以看出partition個數(shù)為對應(yīng)的TopicPartition的個數(shù)

KafkaRDD

override def getPartitions: Array[Partition] = {
    offsetRanges.zipWithIndex.map { case (o, i) =>
        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)
    }.toArray
  }
  offsetRanges數(shù)據(jù)結(jié)構(gòu)為:
  final class OffsetRange private(
        val topic: String,
        val partition: Int,
        val fromOffset: Long,
        val untilOffset: Long) 

可以看出partition個數(shù)為對應(yīng)的partition的個數(shù)

總結(jié)

在spark框架中,對于輸入數(shù)據(jù)獲取RDD的處理:

  • 讀取數(shù)據(jù)時的分片由數(shù)據(jù)量,數(shù)據(jù)"存儲格式"決定,框架/應(yīng)用并不能真正決定分片數(shù)。
  • 對于通過數(shù)據(jù)生成的RDD,如makeRDD,parallize等方法生成的RDD,則可以指定相應(yīng)的RDD的分片數(shù)。
  • 對于FileInputFormat格式的數(shù)據(jù),可通過設(shè)置最小的分片數(shù)來擴(kuò)大RDD分片數(shù),但不能決定最終由多少分片數(shù)(最終分片數(shù) >= 設(shè)置的最小分片數(shù))
  • 其他類型的數(shù)據(jù)/文件的分片方法也是通過輸入文件格式的getSplit方法來獲取分片
  • Split方法直接決定了輸入數(shù)據(jù)的分片數(shù),影響應(yīng)用并行度,在一些場景下,應(yīng)用可以定制特定的getSplits方法來實(shí)現(xiàn)一些特殊需求。如hive在處理小文件時自定義了combineFileInputForamt,Hbase在以region為單位劃分split之后,再跟進(jìn)每個region數(shù)據(jù)量來合并/分拆split來優(yōu)化性能

PS: 其他相關(guān)的數(shù)據(jù)分片

對于輸入文件的分片,不同的文件格式使用的分片方法不盡相同。 如hive中使用的parquet,RCFIle格式文件,其getsplits方法直接使用的是FileInputFormat.getSplits, 而orc格式文件的getsplits方法則是繼承于InputFormat

在Hive中默認(rèn)使用的是CombineFileInputFormat,它的作用是在啟動map時,會將多個小文件進(jìn)行合并,已啟動較少的map提升應(yīng)用運(yùn)行速度。其getsplits方法在合并小文件時會考慮更多的因素,如:

mapreduce.input.fileinputformat.split.minsize,
mapreduce.input.fileinputformat.split.minsize.per.node
mapreduce.input.fileinputformat.split.minsize.per.rack
mapreduce.input.fileinputformat.split.maxsize

經(jīng)過轉(zhuǎn)換的分片

  • Spark框架中,RDD的分片數(shù)決定了對RDD處理時的并發(fā)度,因此合理的RDD分片數(shù),對應(yīng)用的性能有較大影響。

  • RDD的轉(zhuǎn)換通常不會改變RDD的partition數(shù),如map,flatmap,mappartitions等操作并沒有傳入partition數(shù)的API,無法修改新生成的RDD的的分片數(shù)??蓞⒖紀(jì)rg.apache.spark.rdd.RDD

  • 如果需要強(qiáng)制修改新生成RDD的分片數(shù),可直接調(diào)用RDD.repartition,RDD.coalesce強(qiáng)制修改新生成RDD的分片數(shù)

  • 對于RDD[KEY,VALUE]類型的RDD的操作如join,reduceByKey,aggregateByKey,combineByKey等接口可通過傳入分片數(shù)/設(shè)置partitioner等方式設(shè)置shuffle之后的RDD的partition個數(shù),從而調(diào)整后續(xù)的stage的并發(fā)task個數(shù).可參考o(jì)rg.apache.spark.rdd.PairRDDFunctions

  • 對于需要進(jìn)行shuffle操作的算子,在變換的過程中,會自動生成shuffledRDD,該RDD的分片數(shù)可通過觸發(fā)shuffle操作的算子調(diào)用時設(shè)置。如果沒有設(shè)置時,則會使用默認(rèn)的分片數(shù)。

  • 對于普通的應(yīng)用shuffle后的默認(rèn)的分片數(shù)由spark.default.parallelism參數(shù)決定,默認(rèn)200 對于sql相關(guān)的操作,shuffle后的默認(rèn)分片數(shù)由spark.sql.shuffle.partitions操作決定,默認(rèn)為200

  • 對于某些特殊的操作,sql的內(nèi)部優(yōu)化可能會觸發(fā)shuffle操作。如使用到treeaggregate會觸發(fā)shuffle操作,shuffle后的partition數(shù)目默認(rèn)為原始的開方。即原有2000個partition時,shuffle后的partition為44個。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容