Spark Streaming中的數(shù)據(jù)是源源不斷流進來的,有時候我們需要計算一些周期性的統(tǒng)計,就不得不維護一下數(shù)據(jù)的狀態(tài)。在Spark Streaming中狀態(tài)管理有兩種方式。一種是updateStateByKey,另一種是mapWithState
第一種方式:先獲取上一個batch中的狀態(tài)RDD和當(dāng)前batch的RDD 做cogroup 得到一個新的狀態(tài)RDD。這種方式完美的契合了RDD的不變性,但是對性能卻會有比較大的影響,因為需要對所有數(shù)據(jù)做處理,計算量和數(shù)據(jù)集大小是成線性相關(guān)的。
- 看一下updateStateByKey的代碼,在Dstream中并沒有找到updateStateByKey()方法,因為updateStateByKey是針對Key-Value的操作,所在可以想到updateStateByKey()方法其實是在PairDStreamFunctions類中,他是通過隱式轉(zhuǎn)換的方式實現(xiàn)的。
代碼如下
implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairDStreamFunctions[K, V] = {
new PairDStreamFunctions[K, V](stream)
}
- 接著看updateStateByKey()方法,他有幾種重載方式,最終調(diào)用以下的updateStateByKey()方法,代碼如下
def updateStateByKey[S: ClassTag](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean
): DStream[(K, S)] = ssc.withScope {
new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None)
}
- 這里實例化了一個StateDStream,看一下StateDStream的compute方法,代碼如下
override def compute(validTime: Time): Option[RDD[(K, S)]] = {
// Try to get the previous state RDD
getOrCompute(validTime - slideDuration) match {
case Some(prevStateRDD) => { // If previous state RDD exists
// Try to get the parent RDD
parent.getOrCompute(validTime) match {
case Some(parentRDD) => { // If parent RDD exists, then compute as usual
computeUsingPreviousRDD (parentRDD, prevStateRDD)
}
case None => { // If parent RDD does not exist
// Re-apply the update function to the old state RDD
val updateFuncLocal = updateFunc
val finalFunc = (iterator: Iterator[(K, S)]) => {
val i = iterator.map(t => (t._1, Seq[V](), Option(t._2)))
updateFuncLocal(i)
}
val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning)
Some(stateRDD)
}
}
}
case None => { // If previous session RDD does not exist (first input data)
// Try to get the parent RDD
parent.getOrCompute(validTime) match {
case Some(parentRDD) => { // If parent RDD exists, then compute as usual
initialRDD match {
case None => {
// Define the function for the mapPartition operation on grouped RDD;
// first map the grouped tuple to tuples of required type,
// and then apply the update function
val updateFuncLocal = updateFunc
val finalFunc = (iterator : Iterator[(K, Iterable[V])]) => {
updateFuncLocal (iterator.map (tuple => (tuple._1, tuple._2.toSeq, None)))
}
val groupedRDD = parentRDD.groupByKey (partitioner)
val sessionRDD = groupedRDD.mapPartitions (finalFunc, preservePartitioning)
// logDebug("Generating state RDD for time " + validTime + " (first)")
Some (sessionRDD)
}
case Some (initialStateRDD) => {
computeUsingPreviousRDD(parentRDD, initialStateRDD)
}
}
}
case None => { // If parent RDD does not exist, then nothing to do!
// logDebug("Not generating state RDD (no previous state, no parent)")
None
}
}
}
}
}
- 這里代碼分幾種情況,但最終都調(diào)用computeUsingPreviousRDD()方法,關(guān)鍵操作就在computeUsingPreviousRDD()方法中,代碼如下
private [this] def computeUsingPreviousRDD (
parentRDD : RDD[(K, V)], prevStateRDD : RDD[(K, S)]) = {
// Define the function for the mapPartition operation on cogrouped RDD;
// first map the cogrouped tuple to tuples of required type,
// and then apply the update function
val updateFuncLocal = updateFunc
val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => {
val i = iterator.map(t => {
val itr = t._2._2.iterator
val headOption = if (itr.hasNext) Some(itr.next()) else None
(t._1, t._2._1.toSeq, headOption)
})
updateFuncLocal(i)
}
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
Some(stateRDD)
}
可以看到當(dāng)前狀態(tài)的RDD和前一個狀態(tài)的RDD進行cogroup操作
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
parentRDD中只要有一條數(shù)據(jù)就會進行cogroup操作的,并將所有數(shù)據(jù)都進行更新函數(shù)(用戶定義的)的操作,所以當(dāng)數(shù)據(jù)量不斷增加的時候,計算量隨著線性增加。
第二種方式,在Spark1.6以后出來一種mapWithState的方式,他是一種變通的實現(xiàn)。因為沒法變更RDD/Partition等核心概念,所以Spark Streaming在集合元素上做了文章,定義了MapWithStateRDD,將該RDD的元素做了限定,必須是MapWithStateRDDRecord 這個東西。該MapWithStateRDDRecord 保存分區(qū)內(nèi)的所有key的狀態(tài)(通過stateMap記錄)以及計算結(jié)果(mappedData),元素MapWithStateRDDRecord 是可變的,但是RDD 依然是不變的。
- mapWithState和updateStateByKey一樣都是在PairDtreamFuntions類中,mapWithState代碼如下
@Experimental
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType] = {
new MapWithStateDStreamImpl[K, V, StateType, MappedType](
self, spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
)
}
首先看注解,他是一個實驗性的方法,官方還沒有推薦使用。
再看spec: StateSpec[K, V, StateType, MappedType],這里并沒有接收一個函數(shù),而是一個StateSpec。其實就將函數(shù)包裝在StateSpec內(nèi)部而已
- 這里實例化了一個MapWithStateDStreamImpl,代碼如下
private[streaming] class MapWithStateDStreamImpl[
KeyType: ClassTag, ValueType: ClassTag, StateType: ClassTag, MappedType: ClassTag](
dataStream: DStream[(KeyType, ValueType)],
spec: StateSpecImpl[KeyType, ValueType, StateType, MappedType])
extends MapWithStateDStream[KeyType, ValueType, StateType, MappedType](dataStream.context) {
private val internalStream = new InternalMapWithStateDStream[KeyType, ValueType, StateType, MappedType](dataStream, spec)
override def slideDuration: Duration = internalStream.slideDuration
override def dependencies: List[DStream[_]] = List(internalStream)
override def compute(validTime: Time): Option[RDD[MappedType]] = {
internalStream.getOrCompute(validTime).map(x=>{
x.flatMap[MappedType](_.mappedData )
})
}
/**
* Forward the checkpoint interval to the internal DStream that computes the state maps. This
* to make sure that this DStream does not get checkpointed, only the internal stream.
*/
override def checkpoint(checkpointInterval: Duration): DStream[MappedType] = {
internalStream.checkpoint(checkpointInterval)
this
}
/** Return a pair DStream where each RDD is the snapshot of the state of all the keys. */
def stateSnapshots(): DStream[(KeyType, StateType)] = {
internalStream.flatMap {
_.stateMap.getAll().map { case (k, s, _) => (k, s) }.toTraversable }
}
def keyClass: Class[_] = implicitly[ClassTag[KeyType]].runtimeClass
def valueClass: Class[_] = implicitly[ClassTag[ValueType]].runtimeClass
def stateClass: Class[_] = implicitly[ClassTag[StateType]].runtimeClass
def mappedClass: Class[_] = implicitly[ClassTag[MappedType]].runtimeClass
}
- MapWithStateDStreamImpl的compute操作其他沒有什么內(nèi)容,主要是從internalStream中獲取計算結(jié)果,internalStream是在MapWithStateDStreamImpl實例化的時候創(chuàng)建,代碼如下
private val internalStream = new InternalMapWithStateDStream[KeyType, ValueType, StateType, MappedType](dataStream, spec)
- 看 InternalMapWithStateDStream的compute方法,代碼如下
override def compute(validTime: Time): Option[RDD[MapWithStateRDDRecord[K, S, E]]] = {
// Get the previous state or create a new empty state RDD
// 得到以前狀態(tài)的RDD或創(chuàng)建一個空狀態(tài)的RDD
val prevStateRDD = getOrCompute(validTime - slideDuration) match {
case Some(rdd) =>
if (rdd.partitioner != Some(partitioner)) {
// If the RDD is not partitioned the right way, let us repartition it using the
// partition index as the key. This is to ensure that state RDD is always partitioned
// before creating another state RDD using it
MapWithStateRDD.createFromRDD[K, V, S, E](rdd.flatMap(_.stateMap.getAll()), partitioner, validTime)
} else {
rdd
}
case None =>
MapWithStateRDD.createFromPairRDD[K, V, S, E](
// 獲取用戶初始化的狀態(tài)RDD
spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)),
partitioner,
validTime
)
}
// Compute the new state RDD with previous state RDD and partitioned data RDD
// Even if there is no data RDD, use an empty one to create a new state RDD
// 獲取當(dāng)前要進行計算的RDD
val dataRDD = parent.getOrCompute(validTime).getOrElse {
context.sparkContext.emptyRDD[(K, V)]
}
val partitionedDataRDD = dataRDD.partitionBy(partitioner)
val timeoutThresholdTime = spec.getTimeoutInterval().map { interval =>
(validTime - interval).milliseconds
}
Some(new MapWithStateRDD(prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime))
}
首先獲取前一個狀態(tài)的RDD(prevStateRDD),prevStateRDD在第一次使用的時候調(diào)用MapWithStateRDD.createFromPairRDD方法,將自用定義的初始值放在到新生成的stateMap中;如果prevStateRDD分區(qū)和當(dāng)前狀態(tài)分區(qū)不同時會調(diào)用MapWithStateRDD.createFromRDD()將狀態(tài)數(shù)據(jù)重新分區(qū)后放入新生成的stateMap,createFromRDD()方法和createFromPairRDD方法代碼如下
def createFromPairRDD[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
pairRDD: RDD[(K, S)],
partitioner: Partitioner,
updateTime: Time): MapWithStateRDD[K, V, S, E] = {
val stateRDD = pairRDD.partitionBy(partitioner).mapPartitions ({ iterator =>
val stateMap = StateMap.create[K, S](SparkEnv.get.conf)
// 把用戶定義的初始值放入新創(chuàng)建的stateMap
iterator.foreach { case (key, state) => stateMap.put(key, state, updateTime.milliseconds) }
// 把stateMap放在MapWithStateRDDRecord中做為RDD的元素返回
Iterator(MapWithStateRDDRecord(stateMap, Seq.empty[E]))
}, preservesPartitioning = true)
val emptyDataRDD = pairRDD.sparkContext.emptyRDD[(K, V)].partitionBy(partitioner)
val noOpFunc = (time: Time, key: K, value: Option[V], state: State[S]) => None
new MapWithStateRDD[K, V, S, E](stateRDD, emptyDataRDD, noOpFunc, updateTime, None)
}
def createFromRDD[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
rdd: RDD[(K, S, Long)],
partitioner: Partitioner,
updateTime: Time): MapWithStateRDD[K, V, S, E] = {
val pairRDD = rdd.map { x => (x._1, (x._2, x._3)) }
val stateRDD = pairRDD.partitionBy(partitioner).mapPartitions({ iterator =>
val stateMap = StateMap.create[K, S](SparkEnv.get.conf)
// 把之前stateMap中的狀態(tài)數(shù)據(jù)(key,(state,update))放入一個stateMap中
iterator.foreach { case (key, (state, updateTime)) =>
stateMap.put(key, state, updateTime)
}
Iterator(MapWithStateRDDRecord(stateMap, Seq.empty[E]))
}, preservesPartitioning = true)
val emptyDataRDD = pairRDD.sparkContext.emptyRDD[(K, V)].partitionBy(partitioner)
val noOpFunc = (time: Time, key: K, value: Option[V], state: State[S]) => None
new MapWithStateRDD[K, V, S, E](stateRDD, emptyDataRDD, noOpFunc, updateTime, None)
}
prevStateRDD獲取之后實例化MapWithStateRDD,將前一個狀態(tài)RDD和當(dāng)前要計算的RDD傳遞進去,看MapWithStateRDD類的代碼
private[streaming] class MapWithStateRDD[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
// 存儲State數(shù)據(jù)的RDD
private var prevStateRDD: RDD[MapWithStateRDDRecord[K, S, E]],
// 計算當(dāng)前數(shù)據(jù)的RDD
private var partitionedDataRDD: RDD[(K, V)],
// 計算函數(shù)
mappingFunction: (Time, K, Option[V], State[S]) => Option[E],
batchTime: Time,
timeoutThresholdTime: Option[Long]
) extends RDD[MapWithStateRDDRecord[K, S, E]](
partitionedDataRDD.sparkContext,
// MapWithStateRDD依賴兩個父RDD,因為有兩個數(shù)據(jù)來源。一個是狀態(tài)數(shù)據(jù),一個是當(dāng)前數(shù)據(jù)
List(
new OneToOneDependency[MapWithStateRDDRecord[K, S, E]](prevStateRDD),
new OneToOneDependency(partitionedDataRDD))
) {
@volatile private var doFullScan = false
require(prevStateRDD.partitioner.nonEmpty)
require(partitionedDataRDD.partitioner == prevStateRDD.partitioner)
override val partitioner = prevStateRDD.partitioner
override def checkpoint(): Unit = {
super.checkpoint()
doFullScan = true
}
override def compute(partition: Partition, context: TaskContext): Iterator[MapWithStateRDDRecord[K, S, E]] = {
val stateRDDPartition = partition.asInstanceOf[MapWithStateRDDPartition]
val prevStateRDDIterator = prevStateRDD.iterator(stateRDDPartition.previousSessionRDDPartition, context)
val dataIterator = partitionedDataRDD.iterator(stateRDDPartition.partitionedDataRDDPartition, context)
// 因為prevStateRDD只有一個元素,所有取prevStateRDDIterator.next()
val prevRecord:Option[MapWithStateRDDRecord[K, S, E]] = if (prevStateRDDIterator.hasNext){
Some(prevStateRDDIterator.next())
}
else {
None
}
// 返回一個新的MapWithStateRDDRecord
val newRecord = MapWithStateRDDRecord.updateRecordWithData(
prevRecord,
dataIterator,
mappingFunction,
batchTime,
timeoutThresholdTime,
removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
)
// 將新生成的MapWithStateRDDRecord放入迭代器,此迭代器還是只有一個元素
Iterator(newRecord)
}
override protected def getPartitions: Array[Partition] = {
Array.tabulate(prevStateRDD.partitions.length) { i =>
new MapWithStateRDDPartition(i, prevStateRDD, partitionedDataRDD)}
}
override def clearDependencies(): Unit = {
super.clearDependencies()
prevStateRDD = null
partitionedDataRDD = null
}
def setFullScan(): Unit = {
doFullScan = true
}
}
主要看newRecord是怎樣生成的,因為newRecord里有所有的狀態(tài)信息和計算結(jié)果,看 MapWithStateRDDRecord.updateRecordWithData的代碼
def updateRecordWithData[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
// 前一個MapWithStateRDDRecord
prevRecord: Option[MapWithStateRDDRecord[K, S, E]],
// 當(dāng)前需要計算的數(shù)據(jù)
dataIterator: Iterator[(K, V)],
// 計算函數(shù)
mappingFunction: (Time, K, Option[V], State[S]) => Option[E],
batchTime: Time,
timeoutThresholdTime: Option[Long],
removeTimedoutData: Boolean
): MapWithStateRDDRecord[K, S, E] = {
// Create a new state map by cloning the previous one (if it exists) or by creating an empty one
// 首先創(chuàng)建一個新的StateMap,這里是從前一個StateMap復(fù)制而來的,由于StateMap的復(fù)制是采用增量復(fù)制,
// 新創(chuàng)建的stateMap會引用舊的stateMap
val newStateMap = prevRecord.map( _.stateMap.copy()). getOrElse { new EmptyStateMap[K, S]() }
val mappedData = new ArrayBuffer[E]
val wrappedState = new StateImpl[S]()
// Call the mapping function on each record in the data iterator, and accordingly
// update the states touched, and collect the data returned by the mapping function
// mapWithState操作性能優(yōu)勢就是在這里體現(xiàn)的
dataIterator.foreach { case (key, value) =>
//將newStateMap中的元素包裝一下
wrappedState.wrap(newStateMap.get(key))
// 終于看到用戶定義的mappingFunction函數(shù)了,傳入當(dāng)前key,當(dāng)前value,和此key的歷史數(shù)據(jù)
val returned = mappingFunction(batchTime, key, Some(value), wrappedState)
if (wrappedState.isRemoved) {
// 如果更新值被標(biāo)記刪除
newStateMap.remove(key)
} else if (wrappedState.isUpdated || (wrappedState.exists && timeoutThresholdTime.isDefined)) {
// 如果當(dāng)前key的value為標(biāo)記有更新,就更新newStateMap,重新put操作
newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
}
mappedData ++= returned
}
// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
wrappedState.wrapTimingOutState(state)
val returned = mappingFunction(batchTime, key, None, wrappedState)
mappedData ++= returned
newStateMap.remove(key)
}
}
// newStateMap 狀態(tài)集合
// mappedData 返回計算后的結(jié)果,這里要注意:因為上面的迭代操作是基于當(dāng)前RDD的數(shù)據(jù),
// 所以返回計算后的結(jié)果只有當(dāng)前數(shù)據(jù)的更新值
MapWithStateRDDRecord(newStateMap, mappedData)
}
通過上面的注釋已經(jīng)知道MapWithStateRDD[MapWithStateRDDRecord]類型的RDD的數(shù)據(jù)是怎么計算的了,接著看InternalMapWithStateDStream的computer方法返回后的操作
override def compute(validTime: Time): Option[RDD[MappedType]] = {
internalStream.getOrCompute(validTime).map(x=>{
x.flatMap[MappedType](_.mappedData )
})
}
使用flatMap從MapWithStateRDDRecord中獲取mappedData(當(dāng)前RDD進行狀態(tài)計算后的結(jié)果)并返回,到這里mapWithState的操作就完成了
下面看一個例子
- 看一個計算wordCount狀態(tài)操作的Demo,代碼如下
package cn.lht.spark.streaming
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming._
object StateWordCount {
def main(args: Array[String]): Unit = {
val topics = "kafkaforspark"
val brokers = "*.*.*.*:9092,*.*.*.*:9092"
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]")
sparkConf.set("spark.testing.memory", "2147480000")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("hdfs://mycluster/ceshi/")
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
.map(_._2.trim).map((_, 1))
// 1. mapWithState操作
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
val state = StateSpec.function(mappingFunc)
messages.reduceByKey(_+_).mapWithState(state).print()
// 2. updateStateByKey
// val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
// //通過Spark內(nèi)部的reduceByKey按key規(guī)約,然后這里傳入某key當(dāng)前批次的Seq/List,再計算當(dāng)前批次的總和
// val currentCount = currValues.sum
// // 已累加的值
// val previousCount = prevValueState.getOrElse(0)
// // 返回累加后的結(jié)果,是一個Option[Int]類型
// Some(currentCount + previousCount)
// }
// messages.updateStateByKey(addFunc,2).print()
ssc.start()
ssc.awaitTermination()
}
}
- 輸入數(shù)據(jù)為三組,分別是
1
2
3
4
5
4
5
6
7
8
6
7
8
9
- mapWithState操作結(jié)果
(4,1)
(5,1)
(1,1)
(2,1)
(3,1)
-------------------------------------------
Time: 1464516190000 ms
-------------------------------------------
(4,2)
(8,1)
(5,2)
(6,1)
(7,1)
-------------------------------------------
Time: 1464516195000 ms
-------------------------------------------
(8,2)
(9,1)
(6,2)
(7,2)
- updateStateByKey操作結(jié)果
-------------------------------------------
Time: 1464516940000 ms
-------------------------------------------
(4,1)
(2,1)
(5,1)
(3,1)
(1,1)
-------------------------------------------
Time: 1464516945000 ms
-------------------------------------------
(4,2)
(8,1)
(6,1)
(2,1)
(7,1)
(5,2)
(3,1)
(1,1)
-------------------------------------------
Time: 1464516950000 ms
-------------------------------------------
(8,2)
(4,2)
(6,2)
(2,1)
(7,2)
(5,2)
(9,1)
(3,1)
(1,1)
- 看以上兩種操作返回的結(jié)果是不一樣的,mapWithState返回最新數(shù)據(jù)的狀態(tài)結(jié)果,而updateStateByKey返回了所有狀態(tài)結(jié)果,具體使用要配合業(yè)務(wù)進行調(diào)整