17.分區(qū)分桶的區(qū)別,為什么要分區(qū)
分區(qū)表:原來的一個大表存儲的時候分成不同的數(shù)據(jù)目錄進行存儲。如果說是單分區(qū)表,那么在表的目錄下就只有一級子目錄,如果說是多分區(qū)表,那么在表的目錄下有多少分區(qū)就有多少級子目錄。不管是單分區(qū)表,還是多分區(qū)表,在表的目錄下,和非最終分區(qū)目錄下是不能直接存儲數(shù)據(jù)文件的
分桶表:原理和hashpartitioner 一樣,將hive中的一張表的數(shù)據(jù)進行歸納分類的時候,歸納分類規(guī)則就是hashpartitioner。(需要指定分桶字段,指定分成多少桶)
分區(qū)表和分桶的區(qū)別除了存儲的格式不同外,最主要的是作用:
分區(qū)表:細化數(shù)據(jù)管理,縮小mapreduce程序 需要掃描的數(shù)據(jù)量。
分桶表:提高join查詢的效率,在一份數(shù)據(jù)會被經(jīng)常用來做連接查詢的時候建立分桶,分桶字段就是連接字段;提高采樣的效率。
有了分區(qū)為什么還要分桶?
(1)獲得更高的查詢處理效率。桶為表加上了額外的結(jié)構(gòu),Hive在處理有些查詢時能利用這個結(jié)構(gòu)。
(2)使取樣( sampling)更高效。在處理大規(guī)模數(shù)據(jù)集時,在開發(fā)和修改査詢的階段,如果能在數(shù)據(jù)集的一小部分數(shù)據(jù)上試運行查詢,會帶來很多方便。
分桶是相對分區(qū)進行更細粒度的劃分。分桶將表或者分區(qū)的某列值進行hash值進行區(qū)分,如要安裝name屬性分為3個桶,就是對name屬性值的hash值對3取摸,按照取模結(jié)果對數(shù)據(jù)分桶。
與分區(qū)不同的是,分區(qū)依據(jù)的不是真實數(shù)據(jù)表文件中的列,而是我們指定的偽列,但是分桶是依據(jù)數(shù)據(jù)表中真實的列而不是偽列
18.mapjoin的原理

MapJoin通常用于一個很小的表和一個大表進行join的場景,具體小表有多小,由參數(shù)hive.mapjoin.smalltable.filesize來決定,該參數(shù)表示小表的總大小,默認值為25000000字節(jié),即25M。
Hive0.7之前,需要使用hint提示 *+ mapjoin(table) */才會執(zhí)行MapJoin,否則執(zhí)行Common Join,但在0.7版本之后,默認自動會轉(zhuǎn)換Map Join,由參數(shù)hive.auto.convert.join來控制,默認為true.
假設(shè)a表為一張大表,b為小表,并且hive.auto.convert.join=true,那么Hive在執(zhí)行時候會自動轉(zhuǎn)化為MapJoin。
MapJoin簡單說就是在Map階段將小表讀入內(nèi)存,順序掃描大表完成Join。減少昂貴的shuffle操作及reduce操作
MapJoin分為兩個階段:
通過MapReduce Local Task,將小表讀入內(nèi)存,生成HashTableFiles上傳至Distributed Cache中,這里會HashTableFiles進行壓縮。
MapReduce Job在Map階段,每個Mapper從Distributed Cache讀取HashTableFiles到內(nèi)存中,順序掃描大表,在Map階段直接進行Join,將數(shù)據(jù)傳遞給下一個MapReduce任務(wù)。
19.在hive的row_number中distribute by 和 partition by的區(qū)別?
20.hive開發(fā)中遇到什么問題?
21.什么時候使用內(nèi)部表,什么時候使用外部表
22.hive都有哪些函數(shù),你平常工作中用到哪些
數(shù)學函數(shù)
round(DOUBLE a)
floor(DOUBLE a)
ceil(DOUBLE a)
rand()
集合函數(shù)
size(Map<K.V>)
map_keys(Map<K.V>)
map_values(Map<K.V>)
array_contains(Array<T>, value)
sort_array(Array<T>)
類型轉(zhuǎn)換函數(shù)
cast(expr as <type>)
日期函數(shù)
date_format函數(shù)(根據(jù)格式整理日期)
date_add、date_sub函數(shù)(加減日期)
next_day函數(shù)
last_day函數(shù)(求當月最后一天日期)
collect_set函數(shù)
get_json_object解析json函數(shù)
from_unixtime(bigint unixtime, string format)
to_date(string timestamp)
year(string date)
month(string date)
hour(string date)
weekofyear(string date)
datediff(string enddate, string startdate)
add_months(string start_date, int num_months)
date_format(date/timestamp/string ts, string fmt)
條件函數(shù)
if(boolean testCondition, T valueTrue, T valueFalseOrNull)
nvl(T value, T default_value)
COALESCE(T v1, T v2, ...)
CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END
isnull( a )
isnotnull ( a )
字符函數(shù)
concat(string|binary A, string|binary B...)
concat_ws(string SEP, string A, string B...)
get_json_object(string json_string, string path)
length(string A)
lower(string A) lcase(string A)
parse_url(string urlString, string partToExtract [, string keyToExtract])
regexp_replace(string INITIAL_STRING, string PATTERN, string REPLACEMENT)
reverse(string A)
split(string str, string pat)
substr(string|binary A, int start) substring(string|binary A, int start)
聚合函數(shù)
count? sum min max avg
表生成函數(shù)
explode(array<TYPE> a)
explode(ARRAY)
json_tuple(jsonStr, k1, k2, ...)
parse_url_tuple(url, p1, p2, ...)
23.手寫sql,連續(xù)活躍用戶
24.left semi?join和left?join區(qū)別
left semi join和left join區(qū)別
25.group by為什么要排序
26.說說印象最深的一次優(yōu)化場景,hive常見的優(yōu)化思路
Hive調(diào)優(yōu),數(shù)據(jù)工程師成神之路
27.聊聊hive的執(zhí)行引擎,spark和mr的區(qū)別?
28.hive的join底層mr是如何實現(xiàn)的?
29.sql問題,連續(xù)幾天活躍的用戶?
30.建好了外部表,用什么語句把數(shù)據(jù)文件加載到表里
31.Hive的執(zhí)行流程?
32.hive的元數(shù)據(jù)信息存儲在哪?
33.sql語句的執(zhí)行順序from-where-group by-having -select-order by -limit
34.on和where的區(qū)別
35.hive和傳統(tǒng)數(shù)據(jù)庫之間的區(qū)別
1、寫時模式和讀時模式
傳統(tǒng)數(shù)據(jù)庫是寫時模式,在load過程中,提升了査詢性能,因為預先解析之后可以對列建立索引,并壓縮,但這樣也會花費更多的加載時間。
Hive是讀時模式,1 oad data非常迅速,因為它不需要讀取數(shù)據(jù)進行解析,僅僅進行文件的復制或者移動。
2、數(shù)據(jù)格式。Hive中沒有定義專門的數(shù)據(jù)格式,由用戶指定,需要指定三個屬性:列分隔符,行分隔符,以及讀取文件數(shù)據(jù)的方法。數(shù)據(jù)庫中,存儲引擎定義了自己的數(shù)據(jù)格式。所有數(shù)據(jù)都會按照一定的組織存儲
3、數(shù)據(jù)更新。Hive的內(nèi)容是讀多寫少的,因此,不支持對數(shù)據(jù)的改寫和刪除,數(shù)據(jù)都在加載的時候中確定好的。數(shù)據(jù)庫中的數(shù)據(jù)通常是需要經(jīng)常進行修改
4、執(zhí)行延遲。Hive在查詢數(shù)據(jù)的時候,需要掃描整個表(或分區(qū)),因此延遲較高,只有在處理大數(shù)據(jù)是才有優(yōu)勢。數(shù)據(jù)庫在處理小數(shù)據(jù)是執(zhí)行延遲較低。
5、索引。Hive比較弱,不適合實時查詢。數(shù)據(jù)庫有。
6、執(zhí)行。Hive是 Mapreduce,數(shù)據(jù)庫是 Executor
7、可擴展性。Hive高,數(shù)據(jù)庫低
8、數(shù)據(jù)規(guī)模。Hive大,數(shù)據(jù)庫小
36.hive中導入數(shù)據(jù)的4種方式
從本地導入:load data local inpath home/liuzc into table ods.test
從hdfs導入:load data inpath user/hive/warehouse/a.txt into ods.test
查詢導入:create table tmp_test as select * from ods.test
查詢結(jié)果導入:insert into table tmp.test select * from ods.test
三.Spark
1.rdd的屬性

一組分片(Partition),即數(shù)據(jù)集的基本組成單位。對于RDD來說,每個分片都會被一個計算任務(wù)處理,并決定并行計算的粒度。用戶可以在創(chuàng)建RDD時指定RDD的分片個數(shù),如果沒有指定,那么就會采用默認值。默認值就是程序所分配到的CPU Core的數(shù)目。
一個計算每個分區(qū)的函數(shù)。Spark中RDD的計算是以分片為單位的,每個RDD都會實現(xiàn)compute函數(shù)以達到這個目的。compute函數(shù)會對迭代器進行復合,不需要保存每次計算的結(jié)果。
RDD之間的依賴關(guān)系。RDD的每次轉(zhuǎn)換都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時,Spark可以通過這個依賴關(guān)系重新計算丟失的分區(qū)數(shù)據(jù),而不是對RDD的所有分區(qū)進行重新計算。
一個Partitioner,即RDD的分片函數(shù)。當前Spark中實現(xiàn)了兩種類型的分片函數(shù),一個是基于哈希的HashPartitioner,另外一個是基于范圍的RangePartitioner。只有對于于key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時的分片數(shù)量。
一個列表,存儲存取每個Partition的優(yōu)先位置(preferred location)。對于一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。按照“移動數(shù)據(jù)不如移動計算”的理念,Spark在進行任務(wù)調(diào)度的時候,會盡可能地將計算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲位置。
2.算子分為哪幾類(RDD支持哪幾種類型的操作)
轉(zhuǎn)換(Transformation) ?現(xiàn)有的RDD通過轉(zhuǎn)換生成一個新的RDD。lazy模式,延遲執(zhí)行。
轉(zhuǎn)換函數(shù)包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,union,join, coalesce?等等。
動作(Action) ?在RDD上運行計算,并返回結(jié)果給驅(qū)動程序(Driver)或?qū)懭胛募到y(tǒng)。
動作操作包括:reduce,collect,count,first,take,countByKey以及foreach等等。
collect ?該方法把數(shù)據(jù)收集到driver端 ??Array數(shù)組類型
所有的transformation只有遇到action才能被執(zhí)行。
當觸發(fā)執(zhí)行action之后,數(shù)據(jù)類型不再是rdd了,數(shù)據(jù)就會存儲到指定文件系統(tǒng)中,或者直接打印結(jié)?果或者收集起來。
3.創(chuàng)建rdd的幾種方式
1.集合并行化創(chuàng)建(有數(shù)據(jù))
val arr = Array(1,2,3,4,5)
val rdd = sc.parallelize(arr)
val rdd =sc.makeRDD(arr)
2.讀取外部文件系統(tǒng),如hdfs,或者讀取本地文件(最常用的方式)(沒數(shù)據(jù))
val rdd2 = sc.textFile("hdfs://hdp-01:9000/words.txt")
// 讀取本地文件
val rdd2 = sc.textFile(“file:///root/words.txt”)
3.從父RDD轉(zhuǎn)換成新的子RDD
調(diào)用Transformation類的方法,生成新的RDD
4.spark運行流程

Worker的功能:定時和master通信;調(diào)度并管理自身的executor
executor:由Worker啟動的,程序最終在executor中運行,(程序運行的一個容器)
spark-submit命令執(zhí)行時,會根據(jù)master地址去向 Master發(fā)送請求,
Master接收到Dirver端的任務(wù)請求之后,根據(jù)任務(wù)的請求資源進行調(diào)度,(打散的策略),盡可能的?把任務(wù)資源平均分配,然后向WOrker發(fā)送指令
Worker收到Master的指令之后,就根據(jù)相應(yīng)的資源,啟動executor(cores,memory)
executor會向dirver端建立請求,通知driver,任務(wù)已經(jīng)可以運行了
driver運行任務(wù)的時候,會把任務(wù)發(fā)送到executor中去運行。
5.Spark中coalesce與repartition的區(qū)別
1)關(guān)系:
兩者都是用來改變 RDD 的 partition 數(shù)量的,repartition 底層調(diào)用的就是 coalesce 方法:coalesce(numPartitions, shuffle = true)
2)區(qū)別:
repartition 一定會發(fā)生 shuffle,coalesce 根據(jù)傳入的參數(shù)來判斷是否發(fā)生 shuffle
一般情況下增大 rdd 的 partition 數(shù)量使用 repartition,減少 partition 數(shù)量時使用coalesce
6.sortBy 和 sortByKey的區(qū)別
sortBy既可以作用于RDD[K] ,還可以作用于RDD[(k,v)]
sortByKey ?只能作用于 RDD[K,V] 類型上。
7.map和mapPartitions的區(qū)別

8.數(shù)據(jù)存入Redis ?優(yōu)先使用map mapPartitions ?foreach ?foreachPartions哪個
使用 foreachPartition
???* 1,map mapPartition ??是轉(zhuǎn)換類的算子, 有返回值
???* 2, 寫mysql,redis?的連接
???foreach ?* 100萬 ????????100萬次的連接
???foreachPartions * 200 個分區(qū) ????200次連接 ?一個分區(qū)中的數(shù)據(jù),共用一個連接
foreachParititon 每次迭代一個分區(qū),foreach每次迭代一個元素。
該方法沒有返回值,或者Unit
主要作用于,沒有返回值類型的操作(打印結(jié)果,寫入到mysql數(shù)據(jù)庫中)
在寫入到redis,mysql的時候,優(yōu)先使用foreachPartititon
9.reduceByKey和groupBykey的區(qū)別

reduceByKey會傳一個聚合函數(shù), 相當于 ?groupByKey?+ mapValues
reduceByKey 會有一個分區(qū)內(nèi)聚合,而groupByKey沒有 ?最核心的區(qū)別 ?
結(jié)論:reduceByKey有分區(qū)內(nèi)聚合,更高效,優(yōu)先選擇使用reduceByKey。
10.cache和checkPoint的比較
都是做 RDD 持久化的
1.緩存,是在觸發(fā)action之后,把數(shù)據(jù)寫入到內(nèi)存或者磁盤中。不會截斷血緣關(guān)系
(設(shè)置緩存級別為memory_only:內(nèi)存不足,只會部分緩存或者沒有緩存,緩存會丟失,memory_and_disk :內(nèi)存不足,會使用磁盤)
2.checkpoint 也是在觸發(fā)action之后,執(zhí)行任務(wù)。單獨再啟動一個job,負責寫入數(shù)據(jù)到hdfs中。(把rdd中的數(shù)據(jù),以二進制文本的方式寫入到hdfs中,有幾個分區(qū),就有幾個二進制文件)
3.某一個RDD被checkpoint之后,他的父依賴關(guān)系會被刪除,血緣關(guān)系被截斷,該RDD轉(zhuǎn)換成了CheckPointRDD,以后再對該rdd的所有操作,都是從hdfs中的checkpoint的具體目錄來讀取數(shù)據(jù)。緩存之后,rdd的依賴關(guān)系還是存在的。
11.spark streaming流式統(tǒng)計單詞數(shù)量代碼
object WordCountAll {
? newValues當前批次的出現(xiàn)的單詞次數(shù), runningCount表示之前運行的單詞出現(xiàn)的結(jié)果
* def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
? ? val newCount =? newValues.sum + runningCount.getOrElse(0)// 將歷史前幾個批次的值和當前批次的值進行累加返回當前批次最終的結(jié)果
? ? Some(newCount)
? }*/
? **
? ? * String : 單詞 hello
? ? * Seq[Int] :單詞在當前批次出現(xiàn)的次數(shù)
? ? * Option[Int] :歷史結(jié)果
? ? */
? val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
? ? iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
? ? iter.flatMap{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}
? }
? 屏蔽日志
? Logger.getLogger("org.apache").setLevel(Level.ERROR)
? def main(args: Array[String]) {
? ? 必須要開啟2個以上的線程,一個線程用來接收數(shù)據(jù),另外一個線程用來計算
? ? val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
? ? ? 設(shè)置sparkjob計算時所采用的序列化方式
? ? ? .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
? ? ? .set("spark.rdd.compress", "true")? 節(jié)約大量的內(nèi)存內(nèi)容
? ? 如果你的程序出現(xiàn)垃圾回收時間過程,可以設(shè)置一下java的垃圾回收參數(shù)
? ? 同時也會創(chuàng)建sparkContext對象
? ? 批次時間 >= 批次處理的總時間 (批次數(shù)據(jù)量,集群的計算節(jié)點數(shù)量和配置)
? ? val ssc = new StreamingContext(conf, Seconds(5))
? ? 做checkpoint 寫入共享存儲中
? ? ssc.checkpoint("c://aaa")
? ? 創(chuàng)建一個將要連接到 hostname:port 的 DStream,如 localhost:9999
? ? val lines: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.175.101", 44444)
? ? updateStateByKey結(jié)果可以累加但是需要傳入一個自定義的累加函數(shù):updateFunc
? ? val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
? ? 打印結(jié)果到控制臺
? ? results.print()
? ? 開始計算
? ? ssc.start()
? ? 等待停止
? ? ssc.awaitTermination()
? }
}
12.簡述map和flatMap的區(qū)別和應(yīng)用場景
map是對每一個元素進行操作,flatmap是對每一個元素操作后并壓平
13.計算曝光數(shù)和點擊數(shù)

14.分別列出幾個常用的transformation和action算子
轉(zhuǎn)換算子:map,map,filter,reduceByKey,groupByKey,groupBy
行動算子:foreach,foreachpartition,collect,collectAsMap,take,top,first,count,countByKey
15.按照需求使用spark編寫以下程序,要求使用scala語言
當前文件a.txt的格式,請統(tǒng)計每個單詞出現(xiàn)的次數(shù)
A,b,c
B,b,f,e
objectWordCount {
def main(args: Array[String]):Unit= {
valconf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[*]")
valsc = new SparkContext(conf)
varsData: RDD[String] = sc.textFile("a.txt")
valsortData: RDD[(String,Int)] = sData.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
? ? sortData.foreach(print)
? }
}
16.spark應(yīng)用程序的執(zhí)行命令是什么?
/usr/local/spark-current2.3/bin/spark-submit \
--class com.wedoctor.Application \
--master yarn \
--deploy-mode client \
--driver-memory 1g \
--executor-memory 2g \
--queue root.wedw \
--num-executors 200 \
--jars home/pgxl/liuzc/config-1.3.0.jar,/home/pgxl/liuzc/hadoop-lzo-0.4.20.jar,/home/pgxl/liuzc/elasticsearch-hadoop-hive-2.3.4.jar?\
/home/pgxl/liuzc/sen.jar
17.Spark應(yīng)用執(zhí)行有哪些模式,其中哪幾種是集群模式
本地local模式
standalone模式
spark on yarn模式
spark on mesos模式
其中,standalone模式,spark on yarn模式,spark on mesos模式是集群模式
18.請說明spark中廣播變量的用途
使用廣播變量,每個 Executor 的內(nèi)存中,只駐留一份變量副本,而不是對 每個 task 都傳輸一次大變量,省了很多的網(wǎng)絡(luò)傳輸, 對性能提升具有很大幫助, 而且會通過高效的廣播算法來減少傳輸代價。
19.以下代碼會報錯嗎?如果會怎么解決 val arr = new ArrayList[String]; arr.foreach(println)
val arr = new ArrayList[String]; 這里會報錯,需要改成 val arr: Array[String] = new Array[String](10)
arr.foreach(println)打印不會報空指針
20.寫出你用過的spark中的算子,其中哪些會產(chǎn)生shuffle過程
reduceBykey:
groupByKey:
…ByKey:
21.Spark中rdd與partition的區(qū)別
22.請寫出創(chuàng)建Dateset的幾種方式
23.描述一下RDD,DataFrame,DataSet的區(qū)別?
1)RDD
優(yōu)點:
編譯時類型安全
編譯時就能檢查出類型錯誤
面向?qū)ο蟮木幊田L格
直接通過類名點的方式來操作數(shù)據(jù)
缺點:
序列化和反序列化的性能開銷
無論是集群間的通信, 還是 IO 操作都需要對對象的結(jié)構(gòu)和數(shù)據(jù)進行序列化和反序列化。
GC 的性能開銷,頻繁的創(chuàng)建和銷毀對象, 勢必會增加 GC
2)DataFrame
DataFrame 引入了 schema 和 off-heap
schema : RDD 每一行的數(shù)據(jù), 結(jié)構(gòu)都是一樣的,這個結(jié)構(gòu)就存儲在 schema 中。Spark 通過 schema 就能夠讀懂數(shù)據(jù), 因此在通信和 IO 時就只需要序列化和反序列化數(shù)據(jù), 而結(jié)構(gòu)的部分就可以省略了。
3)DataSet
DataSet 結(jié)合了 RDD 和 DataFrame 的優(yōu)點,并帶來的一個新的概念 Encoder。
當序列化數(shù)據(jù)時,Encoder 產(chǎn)生字節(jié)碼與 off-heap 進行交互,能夠達到按需訪問數(shù)據(jù)的效果,而不用反序列化整個對象。Spark 還沒有提供自定義 Encoder 的 API,但是未來會加入。
三者之間的轉(zhuǎn)換:

24.描述一下Spark中stage是如何劃分的?描述一下shuffle的概念
25.Spark 在yarn上運行需要做哪些關(guān)鍵的配置工作?如何kill -個Spark在yarn運行中Application
26.通常來說,Spark與MapReduce相比,Spark運行效率更高。請說明效率更高來源于Spark內(nèi)置的哪些機制?并請列舉常見spark的運行模式?
27.RDD中的數(shù)據(jù)在哪?
RDD中的數(shù)據(jù)在數(shù)據(jù)源,RDD只是一個抽象的數(shù)據(jù)集,我們通過對RDD的操作就相當于對數(shù)據(jù)進行操作。
28.如果對RDD進行cache操作后,數(shù)據(jù)在哪里?
數(shù)據(jù)在第一執(zhí)行cache算子時會被加載到各個Executor進程的內(nèi)存中,第二次就會直接從內(nèi)存中讀取而不會區(qū)磁盤。
29.Spark中Partition的數(shù)量由什么決定
和Mr一樣,但是Spark默認最少有兩個分區(qū)。
30.Scala里面的函數(shù)和方法有什么區(qū)別
31.SparkStreaming怎么進行監(jiān)控?
32.Spark判斷Shuffle的依據(jù)?
?父RDD的一個分區(qū)中的數(shù)據(jù)有可能被分配到子RDD的多個分區(qū)中
33.Scala有沒有多繼承?可以實現(xiàn)多繼承么?
34.Sparkstreaming和flink做實時處理的區(qū)別
35.Sparkcontext的作用
36.Sparkstreaming讀取kafka數(shù)據(jù)為什么選擇直連方式
37.離線分析什么時候用sparkcore和sparksq
38.Sparkstreaming實時的數(shù)據(jù)不丟失的問題
39.簡述寬依賴和窄依賴概念,groupByKey,reduceByKey,map,filter,union五種操作哪些會導致寬依賴,哪些會導致窄依賴
40.數(shù)據(jù)傾斜可能會導致哪些問題,如何監(jiān)控和排查,在設(shè)計之初,要考慮哪些來避免
41.有一千萬條短信,有重復,以文本文件的形式保存,一行一條數(shù)據(jù),請用五分鐘時間,找出重復出現(xiàn)最多的前10條
42.現(xiàn)有一文件,格式如下,請用spark統(tǒng)計每個單詞出現(xiàn)的次數(shù)
18619304961,18619304064,186193008,186193009
18619304962,18619304065,186193007,186193008
18619304963,18619304066,186193006,186193010
43.共享變量和累加器
累加器(accumulator)是 Spark 中提供的一種分布式的變量機制,其原理類似于mapreduce,即分布式的改變,然后聚合這些改變。累加器的一個常見用途是在調(diào)試時對作業(yè)執(zhí)行過程中的事件進行計數(shù)。而廣播變量用來高效分發(fā)較大的對象。
共享變量出現(xiàn)的原因:
通常在向 Spark 傳遞函數(shù)時,比如使用 map() 函數(shù)或者用 filter() 傳條件時,可以使用驅(qū)動器程序中定義的變量,但是集群中運行的每個任務(wù)都會得到這些變量的一份新的副本,更新這些副本的值也不會影響驅(qū)動器中的對應(yīng)變量。
Spark 的兩個共享變量,累加器與廣播變量,分別為結(jié)果聚合與廣播這兩種常見的通信模式突破了這一限制。
44.當 Spark 涉及到數(shù)據(jù)庫的操作時,如何減少 Spark 運行中的數(shù)據(jù)庫連接數(shù)?
使用 foreachPartition 代替 foreach,在 foreachPartition 內(nèi)獲取數(shù)據(jù)庫的連接。
45.特別大的數(shù)據(jù),怎么發(fā)送到excutor中?
46.spark調(diào)優(yōu)都做過哪些方面?
47.spark任務(wù)為什么會被yarn kill掉?
48.Spark on Yarn作業(yè)執(zhí)行流程?yarn-client和yarn-cluster有什么區(qū)別?
49.Flatmap底層編碼實現(xiàn)?
Spark flatMap 源碼:
/**
? *? Return a new RDD by first applying a function to all elements of this
? *? RDD, and then flattening the results.
? */
? def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
? ? val cleanF = sc.clean(f)
? ? new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
? }
Scala flatMap 源碼:
/** Creates a new iterator by applying a function to all values produced by this iterator
? *? and concatenating the results.
? *
? *? @param f the function to apply on each element.
? *? @return? the iterator resulting from applying the given iterator-valued function
? *? ? ? ? ? `f` to each value produced by this iterator and concatenating the results.
? *? @note? ? Reuse: $consumesAndProducesIterator
? */
? def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
? ? private var cur: Iterator[B] = empty
? ? private def nextCur() { cur = f(self.next()).toIterator }
? ? def hasNext: Boolean = {
? ? ? Equivalent to cur.hasNext || self.hasNext && { nextCur(); hasNext }
? ? ? but slightly shorter bytecode (better JVM inlining!)
? ? ? while (!cur.hasNext) {
? ? ? ? if (!self.hasNext) return false
? ? ? ? nextCur()
? ? ? }
? ? ? true
? ? }
? ? def next(): B =<span style="color:#ffffff"> <span style="background-color:rgb(255,0,0)">(if (hasNext) cur else empty).next()</span></span>
? }
flatMap其實就是將RDD里的每一個元素執(zhí)行自定義函數(shù)f,這時這個元素的結(jié)果轉(zhuǎn)換成iterator,最后將這些再拼接成一個
新的RDD,也可以理解成原本的每個元素由橫向執(zhí)行函數(shù)f后再變?yōu)榭v向。畫紅部分一直在回調(diào),當RDD內(nèi)沒有元素為止。
50.spark_1.X與spark_2.X區(qū)別?
51.說說spark與flink
52.spark streaming如何保證7*24小時運行機制?
53.spark streaming是Exactly-Once嗎?
四.Kafka
1.Kafka名詞解釋和工作方式
Producer :消息生產(chǎn)者,就是向kafka broker發(fā)消息的客戶端。
Consumer :消息消費者,向kafka broker取消息的客戶端
Topic :咋們可以理解為一個隊列。
Consumer Group (CG):這是kafka用來實現(xiàn)一個topic消息的廣播(發(fā)給所有的consumer)和單播(發(fā)給任意一個consumer)的手段。一個topic可以有多個CG。topic的消息會復制(不是真的復制,是概念上的)到所有的CG,但每個partion只會把消息發(fā)給該CG中的一個consumer。如果需要實現(xiàn)廣播,只要每個consumer有一個獨立的CG就可以了。要實現(xiàn)單播只要所有的consumer在同一個CG。用CG還可以將consumer進行自由的分組而不需要多次發(fā)送消息到不同的topic。
Broker :一臺kafka服務(wù)器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。
Partition:為了實現(xiàn)擴展性,一個非常大的topic可以分布到多個broker(即服務(wù)器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序?qū)⑾l(fā)給consumer,不保證一個topic的整體(多個partition間)的順序。
Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。當然the first offset就是00000000000.kafka
2.Consumer與topic關(guān)系
本質(zhì)上kafka只支持Topic;
每個group中可以有多個consumer,每個consumer屬于一個consumer group;
通常情況下,一個group中會包含多個consumer,這樣不僅可以提高topic中消息的并發(fā)消費能力,而且還能提高"故障容錯"性,如果group中的某個consumer失效那么其消費的partitions將會有其他consumer自動接管。
對于Topic中的一條特定的消息,只會被訂閱此Topic的每個group中的其中一個consumer消費,此消息不會發(fā)送給一個group的多個consumer;
那么一個group中所有的consumer將會交錯的消費整個Topic,每個group中consumer消息消費互相獨立,我們可以認為一個group是一個"訂閱"者。
在kafka中,一個partition中的消息只會被group中的一個consumer消費(同一時刻);
一個Topic中的每個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以同時消費多個partitions中的消息。
kafka的設(shè)計原理決定,對于一個topic,同一個group中不能有多于partitions個數(shù)的consumer同時消費,否則將意味著某些consumer將無法得到消息。
kafka只能保證一個partition中的消息被某個consumer消費時是順序的;事實上,從Topic角度來說,當有多個partitions時,消息仍不是全局有序的。
3.kafka中生產(chǎn)數(shù)據(jù)的時候,如何保證寫入的容錯性?
設(shè)置發(fā)送數(shù)據(jù)是否需要服務(wù)端的反饋,有三個值0,1,-1
0: producer不會等待broker發(fā)送ack
1: 當leader接收到消息之后發(fā)送ack
-1: 當所有的follower都同步消息成功后發(fā)送ack
request.required.acks=0
4.如何保證kafka消費者消費數(shù)據(jù)是全局有序的
偽命題
每個分區(qū)內(nèi),每條消息都有一個offset,故只能保證分區(qū)內(nèi)有序。
如果要全局有序的,必須保證生產(chǎn)有序,存儲有序,消費有序。
由于生產(chǎn)可以做集群,存儲可以分片,消費可以設(shè)置為一個consumerGroup,要保證全局有序,就需要保證每個環(huán)節(jié)都有序。
只有一個可能,就是一個生產(chǎn)者,一個partition,一個消費者。這種場景和大數(shù)據(jù)應(yīng)用場景相悖。
5.有兩個數(shù)據(jù)源,一個記錄的是廣告投放給用戶的日志,一個記錄用戶訪問日志,另外還有一個固定的用戶基礎(chǔ)表記錄用戶基本信息(比如學歷,年齡等等)。現(xiàn)在要分析廣告投放對與哪類用戶更有效,請采用熟悉的技術(shù)描述解決思路。另外如果兩個數(shù)據(jù)源都是實時數(shù)據(jù)源(比如來自kafka),他們數(shù)據(jù)在時間上相差5分鐘,需要哪些調(diào)整來解決實時分析問題?
6.Kafka和SparkStreaing如何集成?
7.列舉Kafka的優(yōu)點,簡述Kafka為什么可以做到每秒數(shù)十萬甚至上百萬消息的高效分發(fā)?
8.為什么離線分析要用kafka?
Kafka的作用是解耦,如果直接從日志服務(wù)器上采集的話,實時離線都要采集,等于要采集兩份數(shù)據(jù),而使用了kafka的話,只需要從日志服務(wù)器上采集一份數(shù)據(jù),然后在kafka中使用不同的兩個組讀取就行了
9.Kafka怎么進行監(jiān)控?
Kafka?Manager
10.Kafka與傳統(tǒng)的消息隊列服務(wù)有很么不同
11.Kafka api ?low-level與high-level有什么區(qū)別,使用low-level需要處理哪些細節(jié)
12.Kafka的ISR副本同步隊列
ISR(In-Sync Replicas),副本同步隊列。ISR中包括Leader和Follower。如果Leader進程掛掉,會在ISR隊列中選擇一個服務(wù)作為新的Leader。有replica.lag.max.messages(延遲條數(shù))和replica.lag.time.max.ms(延遲時間)兩個參數(shù)決定一臺服務(wù)是否可以加入ISR副本隊列,在0.10版本移除了replica.lag.max.messages參數(shù),防止服務(wù)頻繁的進去隊列。
任意一個維度超過閾值都會把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也會先存放在OSR中。
13.Kafka消息數(shù)據(jù)積壓,Kafka消費能力不足怎么處理?
1)如果是Kafka消費能力不足,則可以考慮增加Topic的分區(qū)數(shù),并且同時提升消費組的消費者數(shù)量,消費者數(shù)=分區(qū)數(shù)。(兩者缺一不可)
2)如果是下游的數(shù)據(jù)處理不及時:提高每批次拉取的數(shù)量。批次拉取數(shù)據(jù)過少(拉取數(shù)據(jù)/處理時間<生產(chǎn)速度),使處理的數(shù)據(jù)小于生產(chǎn)的數(shù)據(jù),也會造成數(shù)據(jù)積壓。
14.Kafka中的ISR、AR又代表什么?
?ISR:in-sync replica set (ISR),與leader保持同步的follower集合
??? AR:分區(qū)的所有副本
15.Kafka中的HW、LEO等分別代表什么?
LEO:每個副本的最后條消息的offset
??? HW:一個分區(qū)中所有副本最小的offset
16.哪些情景會造成消息漏消費?
先提交offset,后消費,有可能造成數(shù)據(jù)的重復
17.當你使用kafka-topics.sh創(chuàng)建了一個topic之后,Kafka背后會執(zhí)行什么邏輯?
? 1)會在zookeeper中的/brokers/topics節(jié)點下創(chuàng)建一個新的topic節(jié)點,如:/brokers/topics/first
????2)觸發(fā)Controller的監(jiān)聽程序
????3)kafka Controller 負責topic的創(chuàng)建工作,并更新metadata cache
18.topic的分區(qū)數(shù)可不可以增加?如果可以怎么增加?如果不可以,那又是為什么?
可以增加
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-config --partitions 3
19.topic的分區(qū)數(shù)可不可以減少?如果可以怎么減少?如果不可以,那又是為什么?
不可以減少,被刪除的分區(qū)數(shù)據(jù)難以處理。
20.Kafka有內(nèi)部的topic嗎?如果有是什么?有什么所用?
?__consumer_offsets,保存消費者offset
21.聊一聊Kafka Controller的作用?
負責管理集群broker的上下線,所有topic的分區(qū)副本分配和leader選舉等工作。
22.失效副本是指什么?有那些應(yīng)對措施?
不能及時與leader同步,暫時踢出ISR,等其追上leader之后再重新加入
23.Kafka 都有哪些特點?
高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, consumer group 對partition進行consume操作。
可擴展性:kafka集群支持熱擴展
持久性、可靠性:消息被持久化到本地磁盤,并且支持數(shù)據(jù)備份防止數(shù)據(jù)丟失
容錯性:允許集群中節(jié)點失?。ㄈ舾北緮?shù)量為n,則允許n-1個節(jié)點失?。?/p>
高并發(fā):支持數(shù)千個客戶端同時讀寫
24.請簡述下你在哪些場景下會選擇 Kafka?
日志收集:一個公司可以用Kafka可以收集各種服務(wù)的log,通過kafka以統(tǒng)一接口服務(wù)的方式開放給各種consumer,例如hadoop、HBase、Solr等。
消息系統(tǒng):解耦和生產(chǎn)者和消費者、緩存消息等。
用戶活動跟蹤:Kafka經(jīng)常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網(wǎng)頁、搜索、點擊等活動,這些活動信息被各個服務(wù)器發(fā)布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監(jiān)控分析,或者裝載到hadoop、數(shù)據(jù)倉庫中做離線分析和挖掘。
運營指標:Kafka也經(jīng)常用來記錄運營監(jiān)控數(shù)據(jù)。包括收集各種分布式應(yīng)用的數(shù)據(jù),生產(chǎn)各種操作的集中反饋,比如報警和報告。
流式處理:比如spark streaming和 Flink
25.Kafka 的設(shè)計架構(gòu)你知道嗎?
簡單架構(gòu)如下

詳細如下

Kafka 架構(gòu)分為以下幾個部分
Producer :消息生產(chǎn)者,就是向 kafka broker 發(fā)消息的客戶端。
Consumer :消息消費者,向 kafka broker 取消息的客戶端。
Topic :可以理解為一個隊列,一個 Topic 又分為一個或多個分區(qū)。
Consumer Group:這是 kafka 用來實現(xiàn)一個 topic 消息的廣播(發(fā)給所有的 consumer)和單播(發(fā)給任意一個 consumer)的手段。一個 topic 可以有多個 Consumer Group。
Broker :一臺 kafka 服務(wù)器就是一個 broker。一個集群由多個 broker 組成。一個 broker 可以容納多個 topic。
Partition:為了實現(xiàn)擴展性,一個非常大的 topic 可以分布到多個 broker上,每個 partition 是一個有序的隊列。partition 中的每條消息都會被分配一個有序的id(offset)。將消息發(fā)給 consumer,kafka 只保證按一個 partition 中的消息的順序,不保證一個 topic 的整體(多個 partition 間)的順序。
Offset:kafka 的存儲文件都是按照 offset.kafka 來命名,用 offset 做名字的好處是方便查找。例如你想找位于 2049 的位置,只要找到 2048.kafka 的文件即可。當然 the first offset 就是 00000000000.kafka。
26.Kafka 分區(qū)的目的?
分區(qū)對于 Kafka 集群的好處是:實現(xiàn)負載均衡。分區(qū)對于消費者來說,可以提高并發(fā)度,提高效率。
27.你知道 Kafka 是如何做到消息的有序性?
kafka 中的每個 partition 中的消息在寫入時都是有序的,而且消息帶有offset偏移量,消費者按偏移量的順序從前往后消費,從而保證了消息的順序性。但是分區(qū)之間的消息是不保證有序的。
28.Kafka 的高可靠性是怎么實現(xiàn)的?
kafka通過分區(qū)的多副本機制來保證消息的可靠性。1. 每個分區(qū)可以設(shè)置多個副本,這些副本分布在不同的broker上;2. 相同partition的多個副本能動態(tài)選舉leader來對外服務(wù)和管理內(nèi)部數(shù)據(jù)同步。這樣,即使有broker出現(xiàn)故障,受影響的partition也會在其他broker上重新選舉出新的leader來繼續(xù)服務(wù)
更具體來說,可參看下文:
Kafka 的分區(qū)多副本架構(gòu)是 Kafka 可靠性保證的核心,把消息寫入多個副本可以使 Kafka 在發(fā)生崩潰時仍能保證消息的持久性。
Producer 往 Broker 發(fā)送消息
如果我們要往 Kafka 對應(yīng)的主題發(fā)送消息,我們需要通過 Producer 完成。前面我們講過 Kafka 主題對應(yīng)了多個分區(qū),每個分區(qū)下面又對應(yīng)了多個副本;為了讓用戶設(shè)置數(shù)據(jù)可靠性, Kafka 在 Producer 里面提供了消息確認機制。也就是說我們可以通過配置來決定消息發(fā)送到對應(yīng)分區(qū)的幾個副本才算消息發(fā)送成功??梢栽诙x Producer 時通過 acks 參數(shù)指定(在 0.8.2.X 版本之前是通過 request.required.acks 參數(shù)設(shè)置的,詳見 KAFKA-3043)。這個參數(shù)支持以下三種值:
acks = 0:意味著如果生產(chǎn)者能夠通過網(wǎng)絡(luò)把消息發(fā)送出去,那么就認為消息已成功寫入 Kafka 。在這種情況下還是有可能發(fā)生錯誤,比如發(fā)送的對象無能被序列化或者網(wǎng)卡發(fā)生故障,但如果是分區(qū)離線或整個集群長時間不可用,那就不會收到任何錯誤。在 acks=0 模式下的運行速度是非??斓模ㄟ@就是為什么很多基準測試都是基于這個模式),你可以得到驚人的吞吐量和帶寬利用率,不過如果選擇了這種模式, 一定會丟失一些消息。
acks = 1:意味若 Leader 在收到消息并把它寫入到分區(qū)數(shù)據(jù)文件(不一定同步到磁盤上)時會返回確認或錯誤響應(yīng)。在這個模式下,如果發(fā)生正常的 Leader 選舉,生產(chǎn)者會在選舉時收到一個 LeaderNotAvailableException 異常,如果生產(chǎn)者能恰當?shù)靥幚磉@個錯誤,它會重試發(fā)送悄息,最終消息會安全到達新的 Leader 那里。不過在這個模式下仍然有可能丟失數(shù)據(jù),比如消息已經(jīng)成功寫入 Leader,但在消息被復制到 follower 副本之前 Leader發(fā)生崩潰。
acks = all(這個和 request.required.acks = -1 含義一樣):意味著 Leader 在返回確認或錯誤響應(yīng)之前,會等待所有同步副本都收到悄息。如果和 min.insync.replicas 參數(shù)結(jié)合起來,就可以決定在返回確認前至少有多少個副本能夠收到悄息,生產(chǎn)者會一直重試直到消息被成功提交。不過這也是最慢的做法,因為生產(chǎn)者在繼續(xù)發(fā)送其他消息之前需要等待所有副本都收到當前的消息。
根據(jù)實際的應(yīng)用場景,我們設(shè)置不同的 acks,以此保證數(shù)據(jù)的可靠性。
另外,Producer 發(fā)送消息還可以選擇同步(默認,通過 producer.type=sync 配置) 或者異步(producer.type=async)模式。如果設(shè)置成異步,雖然會極大的提高消息發(fā)送的性能,但是這樣會增加丟失數(shù)據(jù)的風險。如果需要確保消息的可靠性,必須將 producer.type 設(shè)置為 sync。
Leader 選舉
在介紹 Leader 選舉之前,讓我們先來了解一下 ISR(in-sync replicas)列表。每個分區(qū)的 leader 會維護一個 ISR 列表,ISR 列表里面就是 follower 副本的 Borker 編號,只有跟得上 Leader 的 follower 副本才能加入到 ISR 里面,這個是通過 replica.lag.time.max.ms 參數(shù)配置的,具體可以參見?《一文了解 Kafka 的副本復制機制》。只有 ISR 里的成員才有被選為 leader 的可能。
所以當 Leader 掛掉了,而且 unclean.leader.election.enable=false 的情況下,Kafka 會從 ISR 列表中選擇第一個 follower 作為新的 Leader,因為這個分區(qū)擁有最新的已經(jīng) committed 的消息。通過這個可以保證已經(jīng) committed 的消息的數(shù)據(jù)可靠性。
綜上所述,為了保證數(shù)據(jù)的可靠性,我們最少需要配置一下幾個參數(shù):
producer 級別:acks=all(或者 request.required.acks=-1),同時發(fā)生模式為同步 producer.type=sync
topic 級別:設(shè)置 replication.factor>=3,并且 min.insync.replicas>=2;
broker 級別:關(guān)閉不完全的 Leader 選舉,即 unclean.leader.election.enable=false;
29.請談一談 Kafka 數(shù)據(jù)一致性原理
一致性就是說不論是老的 Leader 還是新選舉的 Leader,Consumer 都能讀到一樣的數(shù)據(jù)。

假設(shè)分區(qū)的副本為3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。雖然副本0已經(jīng)寫入了 Message4,但是 Consumer 只能讀取到 Message2。因為所有的 ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 讀取,而 High Water Mark 取決于 ISR 列表里面偏移量最小的分區(qū),對應(yīng)于上圖的副本2,這個很類似于木桶原理。
這樣做的原因是還沒有被足夠多副本復制的消息被認為是“不安全”的,如果 Leader 發(fā)生崩潰,另一個副本成為新 Leader,那么這些消息很可能丟失了。如果我們允許消費者讀取這些消息,可能就會破壞一致性。試想,一個消費者從當前 Leader(副本0) 讀取并處理了 Message4,這個時候 Leader 掛掉了,選舉了副本1為新的 Leader,這時候另一個消費者再去從新的 Leader 讀取消息,發(fā)現(xiàn)這個消息其實并不存在,這就導致了數(shù)據(jù)不一致性問題。
當然,引入了 High Water Mark 機制,會導致 Broker 間的消息復制因為某些原因變慢,那么消息到達消費者的時間也會隨之變長(因為我們會先等待消息復制完畢)。延遲時間可以通過參數(shù) replica.lag.time.max.ms 參數(shù)配置,它指定了副本在復制消息時可被允許的最大延遲時間。