spark-streaming-kafka-0-10源碼分析

[TOC]
spark-streaming為了匹配0.10以后版本的kafka客戶端變化推出了一個目前還是Experimental狀態(tài)的spark-streaming-kafka-0-10客戶端

首先看下初始化kafkastream的方法聲明,

def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V],
perPartitionConfig: PerPartitionConfig
): InputDStream[ConsumerRecord[K, V]] = {
new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy, perPartitionConfig)
}

DirectKafkaInputDStream的初始化參數(shù)包括StreamingContext,LocationStrategy,ConsumerStrategy和perPartitionConfig,根據(jù)源碼文檔locationStrategy一般采用PreferConsistent,perPartitionConfig一般采用默認實現(xiàn),這里不做研究,主要會有點區(qū)別的參數(shù)為consumerStrategy,它的作用會在下面的源碼分析里展示出來。

driver consumer

JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils
                .createDirectStream(jssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String> Subscribe(topics,
                                kafkaParams));

以上述初始化代碼為例,首先DirectKafkaInputDStream會調(diào)用start方法進行初始化(在DStreamGraph處調(diào)動),相關(guān)代碼如下

override def start(): Unit = {
    val c = consumer //初始化driver端consumer
    paranoidPoll(c)  //調(diào)整offset位置
    if (currentOffsets.isEmpty) {
      currentOffsets = c.assignment().asScala.map { tp =>
        tp -> c.position(tp)
      }.toMap
    }

    // don't actually want to consume any messages, so pause all partitions
    c.pause(currentOffsets.keySet.asJava)
  }

這段代碼在driver端初始化一個consumer, 該consumer的類型由上面提到的consumerStrategy決定,Subscribe類的實現(xiàn)如下,相當與在driver端啟動一個以subscribe模式訂閱topic的客戶端。在有初始啟動offset傳入的情況下會把consumer的offset游標seek到對應(yīng)的地址。

private case class Subscribe[K, V](
    topics: ju.Collection[jl.String],
    kafkaParams: ju.Map[String, Object],
    offsets: ju.Map[TopicPartition, jl.Long]
  ) extends ConsumerStrategy[K, V] with Logging {

  def executorKafkaParams: ju.Map[String, Object] = kafkaParams

  def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
    val consumer = new KafkaConsumer[K, V](kafkaParams)
    consumer.subscribe(topics)
    val toSeek = if (currentOffsets.isEmpty) {
      offsets
    } else {
      currentOffsets
    }
    if (!toSeek.isEmpty) {
      // work around KAFKA-3370 when reset is none
      // poll will throw if no position, i.e. auto offset reset none and no explicit position
      // but cant seek to a position before poll, because poll is what gets subscription partitions
      // So, poll, suppress the first exception, then seek
      val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
      val shouldSuppress =
        aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
      try {
        consumer.poll(0)
      } catch {
        case x: NoOffsetForPartitionException if shouldSuppress =>
          logWarning("Catching NoOffsetForPartitionException since " +
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See KAFKA-3370")
      }
      toSeek.asScala.foreach { case (topicPartition, offset) =>
          consumer.seek(topicPartition, offset)
      }
      // we've called poll, we must pause or next poll may consume messages and set position
      consumer.pause(consumer.assignment())
    }

    consumer
  }
}

DirectKafkaInputDStream的另一個核心方法是compute,這個方法的核心作用之一就是不斷地生成對應(yīng)時間的RDD分配到新的job計算任務(wù),具體實現(xiàn)如下,主要是根據(jù)系統(tǒng)設(shè)置的限速和現(xiàn)有

kafka topicpartion計算出每一個job分配到的KafkaRDD對應(yīng)的數(shù)據(jù)范圍以及提交offset等工作。

override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
    val untilOffsets = clamp(latestOffsets()) //根據(jù)maxrate和backpressuce等限速配置計算下一批rdd每個里面kafka消息的截止offset
    val offsetRanges = untilOffsets.map { case (tp, uo) =>
      val fo = currentOffsets(tp)
      OffsetRange(tp.topic, tp.partition, fo, uo)
    }//初始化offset列表,包括(topic,partition,起始offset,截止offset)
    val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled",
      true)
    val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,
      getPreferredHosts, useConsumerCache)//根據(jù)計算好的offsetRange和修改后的kafkaParam初始化RDD

    // Report the record number and metadata of this batch interval to InputInfoTracker.
    val description = offsetRanges.filter { offsetRange =>
      // Don't display empty ranges.
      offsetRange.fromOffset != offsetRange.untilOffset
    }.map { offsetRange =>
      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
    }.mkString("\n")
    // Copy offsetRanges to immutable.List to prevent from being modified by the user
    val metadata = Map(
      "offsets" -> offsetRanges.toList,
      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    currentOffsets = untilOffsets
    commitAll()
    Some(rdd)
  }

注意上文里的latestOffset()方法實現(xiàn)如下,通過新的consumerapi的c.seekToEnd(currentOffsets.keySet.asJava)將consumer的offsetapi游標放到了對應(yīng)分區(qū)的最后位置,

如果在初始化的kafkaParams設(shè)置"enable.auto.commit"屬性為"true",diver客戶端會自動像kafka發(fā)送最后seek到的offset位置

protected def latestOffsets(): Map[TopicPartition, Long] = {
    val c = consumer
    paranoidPoll(c)
    val parts = c.assignment().asScala

    // make sure new partitions are reflected in currentOffsets
    val newPartitions = parts.diff(currentOffsets.keySet)
    // position for new partitions determined by auto.offset.reset if no commit
    currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
    // don't want to consume messages, so pause
    c.pause(newPartitions.asJava)
    // find latest available offsets
    c.seekToEnd(currentOffsets.keySet.asJava)
    parts.map(tp => tp -> c.position(tp)).toMap
  }

二 executor consumer

executor consumer的初始化過程位于KafkaRDD內(nèi)部,在程序初始的kafaparams基礎(chǔ)上調(diào)用了fixKfkaParams方法對參數(shù)進行了部分調(diào)整和改寫,包括groupid,enable.auto.commit,auto.offset.config等屬性。

private[kafka010] def fixKafkaParams(kafkaParams: ju.HashMap[String, Object]): Unit = {
    logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to false for executor")
    kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)

    logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor")
    kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")

    // driver and executor should be in different consumer groups
    val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
    if (null == originalGroupId) {
      logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it")
    }
    val groupId = "spark-executor-" + originalGroupId
    logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)

    // possible workaround for KAFKA-3135
    val rbb = kafkaParams.get(ConsumerConfig.RECEIVE_BUFFER_CONFIG)
    if (null == rbb || rbb.asInstanceOf[java.lang.Integer] < 65536) {
      logWarning(s"overriding ${ConsumerConfig.RECEIVE_BUFFER_CONFIG} to 65536 see KAFKA-3135")
      kafkaParams.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
    }
  }

KafkaRDD內(nèi)部與consumer相關(guān)的幾個方法如下:首先通過getPartitions方法將對應(yīng)的topic分區(qū)與RDD的每一個分區(qū)對應(yīng)起來,然后通過compute方法初始化KafkaRDDIterator,每個KafkaRDDIterator通過CachedKafkaConsumer接口拿到一個CachedKafkaConsumer引用并在next()方法里不斷返回ConsumerRecord值。

override def getPartitions: Array[Partition] = {
    offsetRanges.zipWithIndex.map { case (o, i) =>
        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)
    }.toArray
  }

override def compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]] = {
    val part = thePart.asInstanceOf[KafkaRDDPartition]
    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
    if (part.fromOffset == part.untilOffset) {
      logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
        s"skipping ${part.topic} ${part.partition}")
      Iterator.empty
    } else {
      new KafkaRDDIterator(part, context)
    }
  }


  private class KafkaRDDIterator(
      part: KafkaRDDPartition,
      context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {

    logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
      s"offsets ${part.fromOffset} -> ${part.untilOffset}")

    val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]

    context.addTaskCompletionListener{ context => closeIfNeeded() }

    val consumer = if (useConsumerCache) {
      CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
      if (context.attemptNumber >= 1) {
        // just in case the prior attempt failures were cache related
        CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
      }
      CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
    } else {
      CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
    }

    var requestOffset = part.fromOffset

    def closeIfNeeded(): Unit = {
      if (!useConsumerCache && consumer != null) {
        consumer.close
      }
    }

    override def hasNext(): Boolean = requestOffset < part.untilOffset

    override def next(): ConsumerRecord[K, V] = {
      assert(hasNext(), "Can't call getNext() once untilOffset has been reached")
      val r = consumer.get(requestOffset, pollTimeout)
      requestOffset += 1
      r
    }
  }

根據(jù)是否使用consumer的緩存池特性(這個屬性由spark.streaming.kafka.consumer.cache.enabled決定),CachedKafkaConsumer提供了兩種靜態(tài)方法獲取consumer客戶端,get()和getUncached()。

get方法從CachedKafkaConsumer的靜態(tài)linkhashmap屬性cache中存取已經(jīng)初始化好的CachedKafkaConsumer對象,相當于每個executor內(nèi)部維護了一個consumer的連接池。

getUncached相當于每次拉新數(shù)據(jù)都初始化一個consumer連接,并在這個RDD任務(wù)結(jié)束后關(guān)掉consumer實例。

CachedKafkaConsumer初始化kafka consumer客戶端的相關(guān)代碼如下,可以看到真正拉數(shù)據(jù)的executor客戶端是采用了assgin方式訂閱到單個分區(qū)初始化完成的。

protected val consumer = {
    val c = new KafkaConsumer[K, V](kafkaParams)
    val tps = new ju.ArrayList[TopicPartition]()
    tps.add(topicPartition)
    c.assign(tps)
    c
  }

三 offset提交

除了上文提到的將driver端的auto.commit屬性打開提交offset的方式以外,sparkstreaming還在DirectKafkaInputDStream中提供了一個commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback)方法允許手動觸發(fā)offset提交,這個方法將需要提交的offset列表放到了一個commitQueue里面,然后在每次調(diào)用compute方法的時候最后的commitall方法通過driver端的consumer把offset提交到kafka上。

def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit = {
    commitCallback.set(callback)
    commitQueue.addAll(ju.Arrays.asList(offsetRanges: _*))
  }

  protected def commitAll(): Unit = {
    val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
    var osr = commitQueue.poll()
    while (null != osr) {
      val tp = osr.topicPartition
      val x = m.get(tp)
      val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
      m.put(tp, new OffsetAndMetadata(offset))
      osr = commitQueue.poll()
    }
    if (!m.isEmpty) {
      consumer.commitAsync(m, commitCallback.get)
    }
  }
stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

  // some time later, after outputs have completed
  ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});

注意:如果是采用官方文檔里上述方式手動提交offset,需要把stream對象的屬性標記為static或者transient避免序列化,不然可能在任務(wù)提交的時候報DirectKafkaInputDStream 無法序列化導(dǎo)致Task not serializable錯誤

新的spark-streaming-kafka-0-10客戶端采用了與原有版本完全不同的架構(gòu),一個job里面運行了兩組consumer:driver consumer和 executor consumer,driver端consumer負責分配和提交offset到初始化好的KafkaRDD當中去,KafkaRDD內(nèi)部會根據(jù)分配到的每個topic的每個partition初始化一個CachedKafkaConsumer客戶端通過assgin的方式訂閱到topic拉取數(shù)據(jù)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容