Spark系列 - 實(shí)時數(shù)倉之近1小時各個廣告點(diǎn)擊量實(shí)戰(zhàn)(三)

??為了資源共用,我們的數(shù)據(jù)和上一篇文章一樣,這個實(shí)戰(zhàn)案例的技術(shù)和思想在現(xiàn)實(shí)開發(fā)的需求中還是比較常見的,以廣告為例子,需求是:

需求:統(tǒng)計(jì)各廣告最近1小時內(nèi)的點(diǎn)擊量趨勢
一、Kafka 消費(fèi)主題的數(shù)據(jù)
[root@cdh101 kafka]# bin/kafka-console-consumer.sh --bootstrap-server cdh101:9092,cdh102:9092,cdh103:9092 --topic luchangyin --from-beginning

二、代碼的實(shí)現(xiàn)

2.1 消費(fèi)Kafka的源是數(shù)據(jù):

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.text.SimpleDateFormat
import java.util.Date

/**
  * Desc: 需求:統(tǒng)計(jì)各廣告最近1小時內(nèi)的點(diǎn)擊量趨勢,每6s更新一次(各廣告最近1小時內(nèi)各分鐘的點(diǎn)擊量)
  *     -采集周期: 3s
  *     -最近一小時:  抽口的長度為1小時
  *     -每6s更新一次:窗口滑動的步長
  *     -各分鐘的點(diǎn)擊量   ((advId,hhmm),1)
  */

object RealTime_App02 {

  def main(args: Array[String]): Unit = {

    //創(chuàng)建配置文件對象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka")
    //創(chuàng)建SparkStreaming執(zhí)行的上下文
    val ssc = new StreamingContext(conf, Seconds(3))

    //kafka參數(shù)聲明
    val brokers = "cdh101:9092,cdh102:9092,cdh103:9092"
    val topic = "luchangyin"
    val group = "cloudera_mirrormaker"
    val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"
    val autooffsetreset = "latest"
    val kafkaParams = Map(
      ConsumerConfig.GROUP_ID_CONFIG -> group,
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.AUTO_OFFSET_RESET_DOC -> autooffsetreset,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization
    )

    //設(shè)置檢查點(diǎn)目錄
    ssc.checkpoint("D:\\MySoftware\\StudySoftware\\MyIdea\\luchangyin2021\\MyFirstBigScreen\\TestFSLJavaDemon\\src\\main\\ck2")

    //創(chuàng)建DS
    val kafkaDS: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](Set(topic), kafkaParams)
    )

    //從kafka的kv值中取value     1616683286749,華東,上海,102,1
    val dataDS = kafkaDS.map(_.value())
    dataDS.print()

    // 2.2 定義窗口大小以及滑動的步長以及對結(jié)構(gòu)進(jìn)行轉(zhuǎn)換聚合

    ssc.start()
    ssc.awaitTermination()
  }

}

運(yùn)行結(jié)果:
image.png

2.2 對數(shù)據(jù)結(jié)構(gòu)進(jìn)行轉(zhuǎn)換聚合:

// 2.2 定義窗口大小以及滑動的步長以及對結(jié)構(gòu)進(jìn)行轉(zhuǎn)換聚合
    val windowDS:DStream[String] = dataDS.window(Seconds(6),Seconds(3))

    //對結(jié)構(gòu)進(jìn)行轉(zhuǎn)換 (advId_hhmm,1)
    val mapDS: DStream[(String, Int)] = windowDS.map{
      line => {
        val fields: Array[String] = line.split(",")
        val timeStmp: Long = fields(0).toLong
        val day: Date = new Date(timeStmp)
        //定義SimpleDateFormat對日期進(jìn)行轉(zhuǎn)換
        val sdf = new SimpleDateFormat("mm:ss")
        val time: String = sdf.format(day)
        (fields(4) +"_"+ time, 1)
      }
    }

    //對數(shù)據(jù)進(jìn)行聚合
    val resDS: DStream[(String, Int)] = mapDS.reduceByKey(_+_)

    resDS.print() // (5_44:05,10)

最終的運(yùn)行結(jié)果為:
image.png

??到此為止,這個案例就實(shí)現(xiàn)成了,其實(shí)還是挺簡單的哦,好了,開搞吧少年。。。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容