Hadoop 架構(gòu)
Hadoop組成部分
- HDFS
管理者:namenode
工作者:DataNode
輔助管理者:secondaryNameNode - MapReduce
- YARN
管理者:ResourceManage
工作者:NodeManage
Hadoop 運(yùn)行機(jī)制
MapReduce 詳解
運(yùn)行原理
- 在客戶端執(zhí)行
submit()方法之前,會(huì)先去獲取一下待讀取文件的信息 - 將
job提交給yarn,這時(shí)候會(huì)帶著三個(gè)信息過(guò)去(job.split(文件的切片信息),jar.job.xml) - yarn會(huì)根據(jù)文件的切片信息去計(jì)算將要啟動(dòng)的
maptask的數(shù)量,然后去啟動(dòng)maptask -
maptask會(huì)調(diào)用InPutFormat()方法區(qū)HDFS上面讀取文件,InPutFormat()方法會(huì)再去調(diào)用RecordRead()方法,將數(shù)據(jù)以行首字母的偏移量為key,一行數(shù)據(jù)為value傳給mapper()方法 -
mapper方法做一些邏輯處理后,將數(shù)據(jù)傳到分區(qū)方法中,對(duì)數(shù)據(jù)進(jìn)行一個(gè)分區(qū)標(biāo)注后,發(fā)送到環(huán)形緩沖區(qū)中 - 環(huán)形緩沖區(qū)默認(rèn)的大小是100M,達(dá)到80%的閾值將會(huì)溢寫(xiě)
- 在溢寫(xiě)之前會(huì)做一個(gè)排序的動(dòng)作,排序的規(guī)則是按照key進(jìn)行字典序排序,排序的手段是快排
- 溢寫(xiě)會(huì)產(chǎn)生出大量的溢寫(xiě)文件,會(huì)再次調(diào)用
merge()方法,使用歸并排序,默認(rèn)10個(gè)溢寫(xiě)文件合并成一個(gè)大文件, - 也可以對(duì)溢寫(xiě)文件做一次
localReduce也就是combiner的操作,但前提是combiner的結(jié)果不能對(duì)最終的結(jié)果有影響 - 等待所有的maptask結(jié)束之后,會(huì)啟動(dòng)一定數(shù)量的
reduce task -
reduce task會(huì)發(fā)取拉取線程到map端拉取數(shù)據(jù),拉取到的數(shù)據(jù)會(huì)先加載到內(nèi)存中,內(nèi)存不夠會(huì)寫(xiě)到磁盤(pán)里,等待所有的數(shù)據(jù)拉取完畢,會(huì)將這些輸出在進(jìn)行一次歸并排序 - 歸并后的文件會(huì)再次進(jìn)行一次分組的操作,然后將數(shù)據(jù)以組為單位發(fā)送到reduce()方法
- reduce方法做一些邏輯判斷后,最終調(diào)用
OutputFormat()方法,Outputformat()會(huì)再去調(diào)用RecordWrite()方法將數(shù)據(jù)以KV的形式寫(xiě)出到HDFS上
環(huán)形緩沖區(qū)的作用以及數(shù)據(jù)結(jié)構(gòu)
map task 數(shù)量受什么影響
- 輸入文件大小,需注意 hdfs是塊存儲(chǔ),如果hdfs迷人設(shè)置的是 128M塊大小,一個(gè)文件是 200M,那么將會(huì)占用兩個(gè)塊,maptask就是 2個(gè)
- 文件數(shù)量 ,不同的文件,也會(huì)新啟動(dòng)一個(gè)maptask
簡(jiǎn)述 MapReduce 中的 shuffle
Map 端shuffle
文件split 之后,經(jīng)過(guò) mapper處理后,加上分區(qū)標(biāo)記,存入環(huán)形緩沖區(qū),達(dá)到閾值后會(huì)落入磁盤(pán),由環(huán)形緩沖區(qū)寫(xiě)入磁盤(pán)時(shí)是根據(jù) key 排序的,使用的是快速排序。
Map 和 Reduce 中間的 shuffle
此時(shí)Map階段產(chǎn)生了一些小文件,此時(shí)需要將小文件合并起來(lái),使用歸并排序
Reduce端 shuffle
recuce 端會(huì)啟動(dòng) reduce task 去拉取數(shù)據(jù),注意這里是拉取數(shù)據(jù),拉取過(guò)來(lái)的也是多個(gè)文件,需要做一個(gè)歸并排序,并根據(jù)key做好分組
此處補(bǔ)充兩種排序方法的代碼
快速排序
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
# @Time : 2020/2/29 上午10:20
# @Author : lixinsong
# @File : quick_sort.py
# @desc :
def quick_sort(alist, start, end):
"""快速排序"""
if start >= end:
return
mid = alist[start]
low = start
high = end
while low < high:
while low < high and alist[high] >= mid:
high -= 1
alist[low] = alist[high]
while low < high and alist[low] < mid:
low += 1
alist[high] = alist[low]
alist[low] = mid
quick_sort(alist, start, low - 1)
quick_sort(alist, low + 1, end)
if __name__ == '__main__':
a = [2, 1, 4, 8, 2]
quick_sort(a, 0, len(a)-1)
print(a)
歸并排序
def merge(a, b):
c = []
h = j = 0
while j < len(a) and h < len(b):
if a[j] < b[h]:
c.append(a[j])
j += 1
else:
c.append(b[h])
h += 1
if j == len(a):
for i in b[h:]:
c.append(i)
else:
for i in a[j:]:
c.append(i)
return c
def merge_sort(lists):
if len(lists) <= 1:
return lists
middle = len(lists)/2
left = merge_sort(lists[:middle])
right = merge_sort(lists[middle:])
return merge(left, right)
if __name__ == '__main__':
a = [4, 7, 8, 3, 5, 9]
print merge_sort(a)