??為了資源共用,我們的數(shù)據(jù)和上一篇文章一樣,這個實(shí)戰(zhàn)案例的技術(shù)和思想在現(xiàn)實(shí)開發(fā)的需求中還是比較常見的,以廣告為例子,需求是:
需求:統(tǒng)計(jì)各廣告最近1小時內(nèi)的點(diǎn)擊量趨勢
一、Kafka 消費(fèi)主題的數(shù)據(jù)
[root@cdh101 kafka]# bin/kafka-console-consumer.sh --bootstrap-server cdh101:9092,cdh102:9092,cdh103:9092 --topic luchangyin --from-beginning
二、代碼的實(shí)現(xiàn)
2.1 消費(fèi)Kafka的源是數(shù)據(jù):
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.text.SimpleDateFormat
import java.util.Date
/**
* Desc: 需求:統(tǒng)計(jì)各廣告最近1小時內(nèi)的點(diǎn)擊量趨勢,每6s更新一次(各廣告最近1小時內(nèi)各分鐘的點(diǎn)擊量)
* -采集周期: 3s
* -最近一小時: 抽口的長度為1小時
* -每6s更新一次:窗口滑動的步長
* -各分鐘的點(diǎn)擊量 ((advId,hhmm),1)
*/
object RealTime_App02 {
def main(args: Array[String]): Unit = {
//創(chuàng)建配置文件對象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka")
//創(chuàng)建SparkStreaming執(zhí)行的上下文
val ssc = new StreamingContext(conf, Seconds(3))
//kafka參數(shù)聲明
val brokers = "cdh101:9092,cdh102:9092,cdh103:9092"
val topic = "luchangyin"
val group = "cloudera_mirrormaker"
val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"
val autooffsetreset = "latest"
val kafkaParams = Map(
ConsumerConfig.GROUP_ID_CONFIG -> group,
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.AUTO_OFFSET_RESET_DOC -> autooffsetreset,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization
)
//設(shè)置檢查點(diǎn)目錄
ssc.checkpoint("D:\\MySoftware\\StudySoftware\\MyIdea\\luchangyin2021\\MyFirstBigScreen\\TestFSLJavaDemon\\src\\main\\ck2")
//創(chuàng)建DS
val kafkaDS: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](Set(topic), kafkaParams)
)
//從kafka的kv值中取value 1616683286749,華東,上海,102,1
val dataDS = kafkaDS.map(_.value())
dataDS.print()
// 2.2 定義窗口大小以及滑動的步長以及對結(jié)構(gòu)進(jìn)行轉(zhuǎn)換聚合
ssc.start()
ssc.awaitTermination()
}
}
運(yùn)行結(jié)果:
image.png
2.2 對數(shù)據(jù)結(jié)構(gòu)進(jìn)行轉(zhuǎn)換聚合:
// 2.2 定義窗口大小以及滑動的步長以及對結(jié)構(gòu)進(jìn)行轉(zhuǎn)換聚合
val windowDS:DStream[String] = dataDS.window(Seconds(6),Seconds(3))
//對結(jié)構(gòu)進(jìn)行轉(zhuǎn)換 (advId_hhmm,1)
val mapDS: DStream[(String, Int)] = windowDS.map{
line => {
val fields: Array[String] = line.split(",")
val timeStmp: Long = fields(0).toLong
val day: Date = new Date(timeStmp)
//定義SimpleDateFormat對日期進(jìn)行轉(zhuǎn)換
val sdf = new SimpleDateFormat("mm:ss")
val time: String = sdf.format(day)
(fields(4) +"_"+ time, 1)
}
}
//對數(shù)據(jù)進(jìn)行聚合
val resDS: DStream[(String, Int)] = mapDS.reduceByKey(_+_)
resDS.print() // (5_44:05,10)
最終的運(yùn)行結(jié)果為:
image.png
??到此為止,這個案例就實(shí)現(xiàn)成了,其實(shí)還是挺簡單的哦,好了,開搞吧少年。。。