14 Spark Streaming源碼解讀之State管理之updateStateByKey和mapWithState解密

Spark Streaming中的數(shù)據(jù)是源源不斷流進來的,有時候我們需要計算一些周期性的統(tǒng)計,就不得不維護一下數(shù)據(jù)的狀態(tài)。在Spark Streaming中狀態(tài)管理有兩種方式。一種是updateStateByKey,另一種是mapWithState

第一種方式:先獲取上一個batch中的狀態(tài)RDD和當(dāng)前batch的RDD 做cogroup 得到一個新的狀態(tài)RDD。這種方式完美的契合了RDD的不變性,但是對性能卻會有比較大的影響,因為需要對所有數(shù)據(jù)做處理,計算量和數(shù)據(jù)集大小是成線性相關(guān)的。

  1. 看一下updateStateByKey的代碼,在Dstream中并沒有找到updateStateByKey()方法,因為updateStateByKey是針對Key-Value的操作,所在可以想到updateStateByKey()方法其實是在PairDStreamFunctions類中,他是通過隱式轉(zhuǎn)換的方式實現(xiàn)的。
    代碼如下
implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null):  PairDStreamFunctions[K, V] = {  
      new PairDStreamFunctions[K, V](stream)
}
  1. 接著看updateStateByKey()方法,他有幾種重載方式,最終調(diào)用以下的updateStateByKey()方法,代碼如下
def updateStateByKey[S: ClassTag](
      updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
      partitioner: Partitioner,
      rememberPartitioner: Boolean
    ): DStream[(K, S)] = ssc.withScope {
     new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None)
}
  1. 這里實例化了一個StateDStream,看一下StateDStream的compute方法,代碼如下
override def compute(validTime: Time): Option[RDD[(K, S)]] = {

    // Try to get the previous state RDD
    getOrCompute(validTime - slideDuration) match {

      case Some(prevStateRDD) => {    // If previous state RDD exists

        // Try to get the parent RDD
        parent.getOrCompute(validTime) match {
          case Some(parentRDD) => {   // If parent RDD exists, then compute as usual
            computeUsingPreviousRDD (parentRDD, prevStateRDD)
          }
          case None => {    // If parent RDD does not exist

            // Re-apply the update function to the old state RDD
            val updateFuncLocal = updateFunc
            val finalFunc = (iterator: Iterator[(K, S)]) => {
              val i = iterator.map(t => (t._1, Seq[V](), Option(t._2)))
              updateFuncLocal(i)
            }
            val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning)
            Some(stateRDD)
          }
        }
      }

      case None => {    // If previous session RDD does not exist (first input data)

        // Try to get the parent RDD
        parent.getOrCompute(validTime) match {
          case Some(parentRDD) => {   // If parent RDD exists, then compute as usual
            initialRDD match {
              case None => {
                // Define the function for the mapPartition operation on grouped RDD;
                // first map the grouped tuple to tuples of required type,
                // and then apply the update function
                val updateFuncLocal = updateFunc
                val finalFunc = (iterator : Iterator[(K, Iterable[V])]) => {
                  updateFuncLocal (iterator.map (tuple => (tuple._1, tuple._2.toSeq, None)))
                }

                val groupedRDD = parentRDD.groupByKey (partitioner)
                val sessionRDD = groupedRDD.mapPartitions (finalFunc, preservePartitioning)
                // logDebug("Generating state RDD for time " + validTime + " (first)")
                Some (sessionRDD)
              }
              case Some (initialStateRDD) => {
                computeUsingPreviousRDD(parentRDD, initialStateRDD)
              }
            }
          }
          case None => { // If parent RDD does not exist, then nothing to do!
            // logDebug("Not generating state RDD (no previous state, no parent)")
            None
          }
        }
      }
    }
}
  1. 這里代碼分幾種情況,但最終都調(diào)用computeUsingPreviousRDD()方法,關(guān)鍵操作就在computeUsingPreviousRDD()方法中,代碼如下
private [this] def computeUsingPreviousRDD (
    parentRDD : RDD[(K, V)], prevStateRDD : RDD[(K, S)]) = {
    // Define the function for the mapPartition operation on cogrouped RDD;
    // first map the cogrouped tuple to tuples of required type,
    // and then apply the update function
    val updateFuncLocal = updateFunc
    val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => {
      val i = iterator.map(t => {
        val itr = t._2._2.iterator
        val headOption = if (itr.hasNext) Some(itr.next()) else None
        (t._1, t._2._1.toSeq, headOption)
      })
      updateFuncLocal(i)
    }
    val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
    val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
    Some(stateRDD)
}

可以看到當(dāng)前狀態(tài)的RDD和前一個狀態(tài)的RDD進行cogroup操作

val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)

parentRDD中只要有一條數(shù)據(jù)就會進行cogroup操作的,并將所有數(shù)據(jù)都進行更新函數(shù)(用戶定義的)的操作,所以當(dāng)數(shù)據(jù)量不斷增加的時候,計算量隨著線性增加。

第二種方式,在Spark1.6以后出來一種mapWithState的方式,他是一種變通的實現(xiàn)。因為沒法變更RDD/Partition等核心概念,所以Spark Streaming在集合元素上做了文章,定義了MapWithStateRDD,將該RDD的元素做了限定,必須是MapWithStateRDDRecord 這個東西。該MapWithStateRDDRecord 保存分區(qū)內(nèi)的所有key的狀態(tài)(通過stateMap記錄)以及計算結(jié)果(mappedData),元素MapWithStateRDDRecord 是可變的,但是RDD 依然是不變的。

  1. mapWithState和updateStateByKey一樣都是在PairDtreamFuntions類中,mapWithState代碼如下
@Experimental
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
      spec: StateSpec[K, V, StateType, MappedType]
    ): MapWithStateDStream[K, V, StateType, MappedType] = {
    new MapWithStateDStreamImpl[K, V, StateType, MappedType](
      self, spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
    )
}

首先看注解,他是一個實驗性的方法,官方還沒有推薦使用。
再看spec: StateSpec[K, V, StateType, MappedType],這里并沒有接收一個函數(shù),而是一個StateSpec。其實就將函數(shù)包裝在StateSpec內(nèi)部而已

  1. 這里實例化了一個MapWithStateDStreamImpl,代碼如下
private[streaming] class MapWithStateDStreamImpl[
    KeyType: ClassTag, ValueType: ClassTag, StateType: ClassTag, MappedType: ClassTag](
    dataStream: DStream[(KeyType, ValueType)],
    spec: StateSpecImpl[KeyType, ValueType, StateType, MappedType])
  extends MapWithStateDStream[KeyType, ValueType, StateType, MappedType](dataStream.context) {

  private val internalStream = new InternalMapWithStateDStream[KeyType, ValueType, StateType, MappedType](dataStream, spec)

  override def slideDuration: Duration = internalStream.slideDuration

  override def dependencies: List[DStream[_]] = List(internalStream)

  override def compute(validTime: Time): Option[RDD[MappedType]] = {
    internalStream.getOrCompute(validTime).map(x=>{
      x.flatMap[MappedType](_.mappedData )
    })
  }

  /**
   * Forward the checkpoint interval to the internal DStream that computes the state maps. This
   * to make sure that this DStream does not get checkpointed, only the internal stream.
   */
  override def checkpoint(checkpointInterval: Duration): DStream[MappedType] = {
    internalStream.checkpoint(checkpointInterval)
    this
  }

  /** Return a pair DStream where each RDD is the snapshot of the state of all the keys. */
  def stateSnapshots(): DStream[(KeyType, StateType)] = {
    internalStream.flatMap {
      _.stateMap.getAll().map { case (k, s, _) => (k, s) }.toTraversable }
  }

  def keyClass: Class[_] = implicitly[ClassTag[KeyType]].runtimeClass

  def valueClass: Class[_] = implicitly[ClassTag[ValueType]].runtimeClass

  def stateClass: Class[_] = implicitly[ClassTag[StateType]].runtimeClass

  def mappedClass: Class[_] = implicitly[ClassTag[MappedType]].runtimeClass
}
  1. MapWithStateDStreamImpl的compute操作其他沒有什么內(nèi)容,主要是從internalStream中獲取計算結(jié)果,internalStream是在MapWithStateDStreamImpl實例化的時候創(chuàng)建,代碼如下
private val internalStream = new InternalMapWithStateDStream[KeyType, ValueType, StateType, MappedType](dataStream, spec)
  1. 看 InternalMapWithStateDStream的compute方法,代碼如下
override def compute(validTime: Time): Option[RDD[MapWithStateRDDRecord[K, S, E]]] = {
    // Get the previous state or create a new empty state RDD
    // 得到以前狀態(tài)的RDD或創(chuàng)建一個空狀態(tài)的RDD
    val prevStateRDD = getOrCompute(validTime - slideDuration) match {
      case Some(rdd) =>
        if (rdd.partitioner != Some(partitioner)) {
          // If the RDD is not partitioned the right way, let us repartition it using the
          // partition index as the key. This is to ensure that state RDD is always partitioned
          // before creating another state RDD using it
          MapWithStateRDD.createFromRDD[K, V, S, E](rdd.flatMap(_.stateMap.getAll()), partitioner, validTime)
        } else {
          rdd
        }
      case None =>
        MapWithStateRDD.createFromPairRDD[K, V, S, E](
          // 獲取用戶初始化的狀態(tài)RDD
          spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)),
          partitioner,
          validTime
        )
    }
    // Compute the new state RDD with previous state RDD and partitioned data RDD
    // Even if there is no data RDD, use an empty one to create a new state RDD
    // 獲取當(dāng)前要進行計算的RDD
    val dataRDD = parent.getOrCompute(validTime).getOrElse {
      context.sparkContext.emptyRDD[(K, V)]
    }
    val partitionedDataRDD = dataRDD.partitionBy(partitioner)

    val timeoutThresholdTime = spec.getTimeoutInterval().map { interval =>
      (validTime - interval).milliseconds
    }
    Some(new MapWithStateRDD(prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime))
}

首先獲取前一個狀態(tài)的RDD(prevStateRDD),prevStateRDD在第一次使用的時候調(diào)用MapWithStateRDD.createFromPairRDD方法,將自用定義的初始值放在到新生成的stateMap中;如果prevStateRDD分區(qū)和當(dāng)前狀態(tài)分區(qū)不同時會調(diào)用MapWithStateRDD.createFromRDD()將狀態(tài)數(shù)據(jù)重新分區(qū)后放入新生成的stateMap,createFromRDD()方法和createFromPairRDD方法代碼如下

def createFromPairRDD[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
      pairRDD: RDD[(K, S)],
      partitioner: Partitioner,
      updateTime: Time): MapWithStateRDD[K, V, S, E] = {

    val stateRDD = pairRDD.partitionBy(partitioner).mapPartitions ({ iterator =>
      val stateMap = StateMap.create[K, S](SparkEnv.get.conf)
      // 把用戶定義的初始值放入新創(chuàng)建的stateMap
      iterator.foreach { case (key, state) => stateMap.put(key, state, updateTime.milliseconds) }
      // 把stateMap放在MapWithStateRDDRecord中做為RDD的元素返回
      Iterator(MapWithStateRDDRecord(stateMap, Seq.empty[E]))
    }, preservesPartitioning = true)

    val emptyDataRDD = pairRDD.sparkContext.emptyRDD[(K, V)].partitionBy(partitioner)

    val noOpFunc = (time: Time, key: K, value: Option[V], state: State[S]) => None

    new MapWithStateRDD[K, V, S, E](stateRDD, emptyDataRDD, noOpFunc, updateTime, None)
}
def createFromRDD[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
      rdd: RDD[(K, S, Long)],
      partitioner: Partitioner,
      updateTime: Time): MapWithStateRDD[K, V, S, E] = {

    val pairRDD = rdd.map { x => (x._1, (x._2, x._3)) }
    val stateRDD = pairRDD.partitionBy(partitioner).mapPartitions({ iterator =>
      val stateMap = StateMap.create[K, S](SparkEnv.get.conf)
      // 把之前stateMap中的狀態(tài)數(shù)據(jù)(key,(state,update))放入一個stateMap中
      iterator.foreach { case (key, (state, updateTime)) =>
        stateMap.put(key, state, updateTime)
      }
      Iterator(MapWithStateRDDRecord(stateMap, Seq.empty[E]))
    }, preservesPartitioning = true)

    val emptyDataRDD = pairRDD.sparkContext.emptyRDD[(K, V)].partitionBy(partitioner)

    val noOpFunc = (time: Time, key: K, value: Option[V], state: State[S]) => None

    new MapWithStateRDD[K, V, S, E](stateRDD, emptyDataRDD, noOpFunc, updateTime, None)
}

prevStateRDD獲取之后實例化MapWithStateRDD,將前一個狀態(tài)RDD和當(dāng)前要計算的RDD傳遞進去,看MapWithStateRDD類的代碼

private[streaming] class MapWithStateRDD[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
    // 存儲State數(shù)據(jù)的RDD
    private var prevStateRDD: RDD[MapWithStateRDDRecord[K, S, E]],
    // 計算當(dāng)前數(shù)據(jù)的RDD
    private var partitionedDataRDD: RDD[(K, V)],
    // 計算函數(shù)
    mappingFunction: (Time, K, Option[V], State[S]) => Option[E],
    batchTime: Time,
    timeoutThresholdTime: Option[Long]
  ) extends RDD[MapWithStateRDDRecord[K, S, E]](
    partitionedDataRDD.sparkContext,
    // MapWithStateRDD依賴兩個父RDD,因為有兩個數(shù)據(jù)來源。一個是狀態(tài)數(shù)據(jù),一個是當(dāng)前數(shù)據(jù)
    List(
      new OneToOneDependency[MapWithStateRDDRecord[K, S, E]](prevStateRDD),
      new OneToOneDependency(partitionedDataRDD))
  ) {

  @volatile private var doFullScan = false

  require(prevStateRDD.partitioner.nonEmpty)
  require(partitionedDataRDD.partitioner == prevStateRDD.partitioner)

  override val partitioner = prevStateRDD.partitioner

  override def checkpoint(): Unit = {
    super.checkpoint()
    doFullScan = true
  }

  override def compute(partition: Partition, context: TaskContext): Iterator[MapWithStateRDDRecord[K, S, E]] = {

    val stateRDDPartition = partition.asInstanceOf[MapWithStateRDDPartition]
    val prevStateRDDIterator = prevStateRDD.iterator(stateRDDPartition.previousSessionRDDPartition, context)
    val dataIterator = partitionedDataRDD.iterator(stateRDDPartition.partitionedDataRDDPartition, context)

    // 因為prevStateRDD只有一個元素,所有取prevStateRDDIterator.next()
    val prevRecord:Option[MapWithStateRDDRecord[K, S, E]] = if (prevStateRDDIterator.hasNext){
      Some(prevStateRDDIterator.next())
    }
    else {
      None
    }

    // 返回一個新的MapWithStateRDDRecord
    val newRecord = MapWithStateRDDRecord.updateRecordWithData(
      prevRecord,
      dataIterator,
      mappingFunction,
      batchTime,
      timeoutThresholdTime,
      removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
    )
    // 將新生成的MapWithStateRDDRecord放入迭代器,此迭代器還是只有一個元素
    Iterator(newRecord)
  }

  override protected def getPartitions: Array[Partition] = {
    Array.tabulate(prevStateRDD.partitions.length) { i =>
      new MapWithStateRDDPartition(i, prevStateRDD, partitionedDataRDD)}
  }

  override def clearDependencies(): Unit = {
    super.clearDependencies()
    prevStateRDD = null
    partitionedDataRDD = null
  }

  def setFullScan(): Unit = {
    doFullScan = true
  }
}

主要看newRecord是怎樣生成的,因為newRecord里有所有的狀態(tài)信息和計算結(jié)果,看 MapWithStateRDDRecord.updateRecordWithData的代碼

def updateRecordWithData[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
    // 前一個MapWithStateRDDRecord
    prevRecord: Option[MapWithStateRDDRecord[K, S, E]],
    // 當(dāng)前需要計算的數(shù)據(jù)
    dataIterator: Iterator[(K, V)],
    // 計算函數(shù)
    mappingFunction: (Time, K, Option[V], State[S]) => Option[E],
    batchTime: Time,
    timeoutThresholdTime: Option[Long],
    removeTimedoutData: Boolean
  ): MapWithStateRDDRecord[K, S, E] = {
    // Create a new state map by cloning the previous one (if it exists) or by creating an empty one

    // 首先創(chuàng)建一個新的StateMap,這里是從前一個StateMap復(fù)制而來的,由于StateMap的復(fù)制是采用增量復(fù)制,
    // 新創(chuàng)建的stateMap會引用舊的stateMap
    val newStateMap = prevRecord.map( _.stateMap.copy()). getOrElse { new EmptyStateMap[K, S]() }

    val mappedData = new ArrayBuffer[E]
    val wrappedState = new StateImpl[S]()

    // Call the mapping function on each record in the data iterator, and accordingly
    // update the states touched, and collect the data returned by the mapping function
    // mapWithState操作性能優(yōu)勢就是在這里體現(xiàn)的
    dataIterator.foreach { case (key, value) =>
      //將newStateMap中的元素包裝一下
      wrappedState.wrap(newStateMap.get(key))
      // 終于看到用戶定義的mappingFunction函數(shù)了,傳入當(dāng)前key,當(dāng)前value,和此key的歷史數(shù)據(jù)
      val returned = mappingFunction(batchTime, key, Some(value), wrappedState)
      if (wrappedState.isRemoved) {
        // 如果更新值被標(biāo)記刪除
        newStateMap.remove(key)
      } else if (wrappedState.isUpdated || (wrappedState.exists && timeoutThresholdTime.isDefined)) {
        // 如果當(dāng)前key的value為標(biāo)記有更新,就更新newStateMap,重新put操作
        newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
      }
      mappedData ++= returned
    }

    // Get the timed out state records, call the mapping function on each and collect the
    // data returned
    if (removeTimedoutData && timeoutThresholdTime.isDefined) {
      newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
        wrappedState.wrapTimingOutState(state)
        val returned = mappingFunction(batchTime, key, None, wrappedState)
        mappedData ++= returned
        newStateMap.remove(key)
      }
    }
    // newStateMap 狀態(tài)集合
    // mappedData 返回計算后的結(jié)果,這里要注意:因為上面的迭代操作是基于當(dāng)前RDD的數(shù)據(jù),
    // 所以返回計算后的結(jié)果只有當(dāng)前數(shù)據(jù)的更新值
    MapWithStateRDDRecord(newStateMap, mappedData)
}

通過上面的注釋已經(jīng)知道MapWithStateRDD[MapWithStateRDDRecord]類型的RDD的數(shù)據(jù)是怎么計算的了,接著看InternalMapWithStateDStream的computer方法返回后的操作

override def compute(validTime: Time): Option[RDD[MappedType]] = {
     internalStream.getOrCompute(validTime).map(x=>{
         x.flatMap[MappedType](_.mappedData )
     })
}

使用flatMap從MapWithStateRDDRecord中獲取mappedData(當(dāng)前RDD進行狀態(tài)計算后的結(jié)果)并返回,到這里mapWithState的操作就完成了

下面看一個例子

  1. 看一個計算wordCount狀態(tài)操作的Demo,代碼如下
package cn.lht.spark.streaming
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming._
object StateWordCount {
  def main(args: Array[String]): Unit = {
    val topics = "kafkaforspark"
    val brokers = "*.*.*.*:9092,*.*.*.*:9092"
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]")
    sparkConf.set("spark.testing.memory", "2147480000")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.checkpoint("hdfs://mycluster/ceshi/")
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    .map(_._2.trim).map((_, 1))
    // 1. mapWithState操作
    val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
      val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
      val output = (word, sum)
      state.update(sum)
      output
    }
    val state = StateSpec.function(mappingFunc)
    messages.reduceByKey(_+_).mapWithState(state).print()
   // 2. updateStateByKey
//    val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
//      //通過Spark內(nèi)部的reduceByKey按key規(guī)約,然后這里傳入某key當(dāng)前批次的Seq/List,再計算當(dāng)前批次的總和
//      val currentCount = currValues.sum
//      // 已累加的值
//      val previousCount = prevValueState.getOrElse(0)
//      // 返回累加后的結(jié)果,是一個Option[Int]類型
//      Some(currentCount + previousCount)
//    }
//    messages.updateStateByKey(addFunc,2).print()
    ssc.start()
    ssc.awaitTermination()
  }
}
  1. 輸入數(shù)據(jù)為三組,分別是
    1
    2
    3
    4
    5
    
    4
    5
    6
    7
    8
    
    6
    7
    8
    9
  1. mapWithState操作結(jié)果
    (4,1)
    (5,1)
    (1,1)
    (2,1)
    (3,1)
    -------------------------------------------
    Time: 1464516190000 ms
    -------------------------------------------
    (4,2)
    (8,1)
    (5,2)
    (6,1)
    (7,1)
    -------------------------------------------
    Time: 1464516195000 ms
    -------------------------------------------
    (8,2)
    (9,1)
    (6,2)
    (7,2)
  1. updateStateByKey操作結(jié)果
    -------------------------------------------
    Time: 1464516940000 ms
    -------------------------------------------
    (4,1)
    (2,1)
    (5,1)
    (3,1)
    (1,1)
    -------------------------------------------
    Time: 1464516945000 ms
    -------------------------------------------
    (4,2)
    (8,1)
    (6,1)
    (2,1)
    (7,1)
    (5,2)
    (3,1)
    (1,1)
    -------------------------------------------
    Time: 1464516950000 ms
    -------------------------------------------
    (8,2)
    (4,2)
    (6,2)
    (2,1)
    (7,2)
    (5,2)
    (9,1)
    (3,1)
    (1,1)
  1. 看以上兩種操作返回的結(jié)果是不一樣的,mapWithState返回最新數(shù)據(jù)的狀態(tài)結(jié)果,而updateStateByKey返回了所有狀態(tài)結(jié)果,具體使用要配合業(yè)務(wù)進行調(diào)整
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容