引言
分布式計算的基本思路是將數(shù)據(jù)分為多個部分,將同樣的數(shù)據(jù)操作方式在數(shù)據(jù)的不同部分上執(zhí)行,分別獲得結(jié)果,然后通過“匯聚處理”的方式得到結(jié)果。如何將數(shù)據(jù)分為多個部分(也就是“分片”)便是其中的一個重要組成部分。Spark框架同樣對使用分片的操作,將數(shù)據(jù)分片(partition)處理。本文對Spark框架中的數(shù)據(jù)分片作簡單介紹。
輸入數(shù)據(jù)的分片
對于讀取批數(shù)據(jù)生成rdd的操作,數(shù)據(jù)的分片都是通過輸入文件格式本身提供的getSplit方法來對數(shù)據(jù)進(jìn)行分片。
本部分主要介紹對于不同數(shù)據(jù)源的數(shù)據(jù),spark如何定義/獲取數(shù)據(jù)的分片數(shù)。
text文件分片(sc.textFile為例):
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat] /* 數(shù)據(jù)文件的輸入格式 :org.apache.hadoop.mapred.TextInputFormat */
, classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
hadoopFile方法生成HadoopRDD
HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions) /* minPartitions為生成該RDD的最小分片數(shù),表示該RDD的分片數(shù)最小值,默認(rèn)為2*/
.setName(path)
在執(zhí)行action方法(如count)時,spark應(yīng)用才真正開始計算,通過調(diào)用rdd.partitions.length計算出分片數(shù)
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}
通過跟蹤該方法可以看出該函數(shù)最終會調(diào)用到HadoopRDD的getPartitions方法,在該方法中通過inputFormat的getSplit方法計算分片數(shù)
getInputFormat(jobConf).getSplits(jobConf, minPartitions)
TextInputFormat繼承至FileInputFormat,F(xiàn)ileInputFormat的getSplit方法網(wǎng)上有許多分析,這里不再展開,大致的原理是根據(jù)文件個數(shù),傳入的minpartitions,mapreduce.input.fileinputformat.split.minsize等參數(shù)計算出分片數(shù)。
hbase表分片
在讀取HBase數(shù)據(jù)時,沒有類似textFile的接口的封裝,可調(diào)用如下接口生成給予hbase數(shù)據(jù)的RDD,
val hBaseRDD = sc.newAPIHadoopRDD(conf,
classOf[TableInputFormat], /*該類的全類名為: org.apache.hadoop.hbase.mapreduce.TableInputFormat */
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
該方法生成new NewHadoopRDD(this, fClass, kClass, vClass, jconf)
在執(zhí)行action操作時,同樣調(diào)用到rdd.partitions方法,跟蹤至newHadoopRDD之后,發(fā)現(xiàn)調(diào)用到
inputFormat.getSplits(new JobContextImpl(_conf, jobId))
查看對應(yīng)的getSplits方法可以看出:
默認(rèn)情況下(hbase.mapreduce.input.autobalance的值為false)hbase表如果存在多個region,則每個region設(shè)置為一個split。
如果設(shè)置了開啟均衡(設(shè)置hbase.mapreduce.input.autobalance的值為true:在hbase的region大小不均衡引發(fā)的數(shù)據(jù)傾斜,將導(dǎo)致不同的region處理耗時較多,該參數(shù)為了解決此場景),則會在每個region對應(yīng)一個split的基礎(chǔ)上,將較小(小于平均大?。┑膔egion進(jìn)行合并作為一個split,較大(大于平均size的三倍(其中三可配置))的region拆分為兩個region。
splits偽代碼如下(源碼可參考TableInputFormatBase.calculateRebalancedSplits):
while ( i < splits.size)
{
if(splits(i).size > averagesize * 3) {
if(! splitAble)
resultsplits.add(split(i))
else{
(split1,split2) = Split(splits(i))
resultsplits.add(split1)
resultsplits.add(split2)
}
i++
}
else if(splits(i).size > averagesize) {
resultsplits.add(split(i))
i++
}else{
startKey = split(i).getStartRow
i++;
while(totalSize + splits(i).size < averagesize * 3){
totalSize += splits(i).size
endKey = splits(i).getEndRow
}
resultsplits.add(new TableSplit(startKey,endKey,*))
}
}
Kafka數(shù)據(jù)的分片
Spark框架在讀取Kafka消息時,將Kafka數(shù)據(jù)抽象為KafkaRDD(SparkStreaming)或者KafkaSourceRDD(StructedStreaming),查看對應(yīng)RDD的getPartitions方法和定義:
KafkaSourceRDD:
override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray
}
offsetRanges的數(shù)據(jù)結(jié)構(gòu)為
private[kafka010] case class KafkaSourceRDDOffsetRange(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long,
preferredLoc: Option[String])
可以看出partition個數(shù)為對應(yīng)的TopicPartition的個數(shù)
KafkaRDD
override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)
}.toArray
}
offsetRanges數(shù)據(jù)結(jié)構(gòu)為:
final class OffsetRange private(
val topic: String,
val partition: Int,
val fromOffset: Long,
val untilOffset: Long)
可以看出partition個數(shù)為對應(yīng)的partition的個數(shù)
總結(jié)
在spark框架中,對于輸入數(shù)據(jù)獲取RDD的處理:
- 讀取數(shù)據(jù)時的分片由數(shù)據(jù)量,數(shù)據(jù)"存儲格式"決定,框架/應(yīng)用并不能真正決定分片數(shù)。
- 對于通過數(shù)據(jù)生成的RDD,如makeRDD,parallize等方法生成的RDD,則可以指定相應(yīng)的RDD的分片數(shù)。
- 對于FileInputFormat格式的數(shù)據(jù),可通過設(shè)置最小的分片數(shù)來擴(kuò)大RDD分片數(shù),但不能決定最終由多少分片數(shù)(最終分片數(shù) >= 設(shè)置的最小分片數(shù))
- 其他類型的數(shù)據(jù)/文件的分片方法也是通過輸入文件格式的getSplit方法來獲取分片
- Split方法直接決定了輸入數(shù)據(jù)的分片數(shù),影響應(yīng)用并行度,在一些場景下,應(yīng)用可以定制特定的getSplits方法來實(shí)現(xiàn)一些特殊需求。如hive在處理小文件時自定義了combineFileInputForamt,Hbase在以region為單位劃分split之后,再跟進(jìn)每個region數(shù)據(jù)量來合并/分拆split來優(yōu)化性能
PS: 其他相關(guān)的數(shù)據(jù)分片
對于輸入文件的分片,不同的文件格式使用的分片方法不盡相同。 如hive中使用的parquet,RCFIle格式文件,其getsplits方法直接使用的是FileInputFormat.getSplits, 而orc格式文件的getsplits方法則是繼承于InputFormat
在Hive中默認(rèn)使用的是CombineFileInputFormat,它的作用是在啟動map時,會將多個小文件進(jìn)行合并,已啟動較少的map提升應(yīng)用運(yùn)行速度。其getsplits方法在合并小文件時會考慮更多的因素,如:
mapreduce.input.fileinputformat.split.minsize,
mapreduce.input.fileinputformat.split.minsize.per.node
mapreduce.input.fileinputformat.split.minsize.per.rack
mapreduce.input.fileinputformat.split.maxsize
經(jīng)過轉(zhuǎn)換的分片
Spark框架中,RDD的分片數(shù)決定了對RDD處理時的并發(fā)度,因此合理的RDD分片數(shù),對應(yīng)用的性能有較大影響。
RDD的轉(zhuǎn)換通常不會改變RDD的partition數(shù),如map,flatmap,mappartitions等操作并沒有傳入partition數(shù)的API,無法修改新生成的RDD的的分片數(shù)??蓞⒖紀(jì)rg.apache.spark.rdd.RDD
如果需要強(qiáng)制修改新生成RDD的分片數(shù),可直接調(diào)用RDD.repartition,RDD.coalesce強(qiáng)制修改新生成RDD的分片數(shù)
對于RDD[KEY,VALUE]類型的RDD的操作如join,reduceByKey,aggregateByKey,combineByKey等接口可通過傳入分片數(shù)/設(shè)置partitioner等方式設(shè)置shuffle之后的RDD的partition個數(shù),從而調(diào)整后續(xù)的stage的并發(fā)task個數(shù).可參考o(jì)rg.apache.spark.rdd.PairRDDFunctions
對于需要進(jìn)行shuffle操作的算子,在變換的過程中,會自動生成shuffledRDD,該RDD的分片數(shù)可通過觸發(fā)shuffle操作的算子調(diào)用時設(shè)置。如果沒有設(shè)置時,則會使用默認(rèn)的分片數(shù)。
對于普通的應(yīng)用shuffle后的默認(rèn)的分片數(shù)由spark.default.parallelism參數(shù)決定,默認(rèn)200 對于sql相關(guān)的操作,shuffle后的默認(rèn)分片數(shù)由spark.sql.shuffle.partitions操作決定,默認(rèn)為200
對于某些特殊的操作,sql的內(nèi)部優(yōu)化可能會觸發(fā)shuffle操作。如使用到treeaggregate會觸發(fā)shuffle操作,shuffle后的partition數(shù)目默認(rèn)為原始的開方。即原有2000個partition時,shuffle后的partition為44個。