前言
本文主要涉及的技術(shù):HDFS、MapReduce、YARN、Avro、Flume、Pig、Crunch、Hive、HBase、Spark、Flink、Beam。每個技術(shù)會大概講一下其架構(gòu)和功能,讓大家對大數(shù)據(jù)技術(shù)體系有個宏觀的認識。
2003~2005 Google連續(xù)發(fā)表3篇論文,用于解決大規(guī)模數(shù)據(jù)存儲和處理,依據(jù)三篇論文思想,業(yè)界開源出了Hadoop相關(guān)大數(shù)據(jù)組件,拉開了大數(shù)據(jù)技術(shù)帷幕。
1. 2003年 GFS:《The Google File System 》(https://blog.csdn.net/xuleicsu/article/details/526386):解決非結(jié)構(gòu)化數(shù)據(jù)大規(guī)模存儲
2. 2004年 MapReduce:《MapReduce: Simplified Data Processing on Large Clusters》(https://blog.csdn.net/active1001/article/details/1675920):解決大規(guī)模數(shù)據(jù)處理
3. 2005年 BigTale: 《Bigtable: A Distributed Storage System for Structured Data》(https://blog.csdn.net/accesine960/article/details/595628):解決結(jié)構(gòu)化/半結(jié)構(gòu)化數(shù)據(jù)存儲
1. HDFS
針對GFS論文的開源實現(xiàn)版本
架構(gòu):

文件最小存儲單位:chunk(文件塊),默認128MB
HDFS上文件被劃分為多個分塊(chunk),作為獨立的存儲單元。但與面向單一磁盤的文件系統(tǒng)不同的是,HDFS中小于一個塊大小的文件不會占據(jù)整個塊空間(例如:當一個1MB的文件存儲在一個128MB的塊中時,文件只使用1MB的磁盤空間,而不是128MB)。
Namenode具備主備雙節(jié)點。當活動的namenode失效,備用的namenode就會接管它的任務(wù)并開始服務(wù)于來自客戶端的請求。
1.1?HDFS Federation
聯(lián)邦HDFS:每個namenode管理文件系統(tǒng)命名空間的一部分
由于NameNode要管理所有的文件和數(shù)據(jù)塊的引用關(guān)系,對于一個超大規(guī)模的HDFS集群,Namenode成為瓶頸。聯(lián)邦HDFS能夠支持多個Namenode,對Namenode數(shù)據(jù)進行分片,減輕每個Namenode存儲和請求壓力。
例如:一個Namenode可能管理/user目錄下的所有文件,另一個Namenode管理/share目錄下的所有文件。

1.2 HDFS讀取流程

1.3 HDFS寫流程?

2. MapReduce
針對MapReduce論文的開源實現(xiàn)版本
MapReduce運行邏輯圖如下:(此過程也稱為shuffle)

默認:一個chunk(文件塊)會啟動一個Map任務(wù)
當對HDFS上的一個文件運行MapReduce任務(wù)時,如果該文件大小是1280MB,那么該文件就會對應(yīng)10個chunk,默認會啟動10個Map任務(wù)去處理(并發(fā)度可以理解為10),每個Map任務(wù)處理一個chunk,并且該Map任務(wù)會盡量分配和chunk在相同的機器上,避免文件讀取跨網(wǎng)絡(luò)
Reduce任務(wù)數(shù)量并非由輸入數(shù)據(jù)大小決定的,相反是獨立指定的。默認任務(wù)數(shù)是1,可以在運行時指定reduce任務(wù)數(shù)
Map:對數(shù)據(jù)進行處理,并按照一定分區(qū)規(guī)則,將處理好的數(shù)據(jù),下發(fā)給Reduce任務(wù)。
Map輸出會寫到運行Map任務(wù)機器本地磁盤上
Reduce:對分區(qū)后的數(shù)據(jù)進行處理輸出(一般也輸出在HDFS 存儲上)
Map任務(wù)完成后,它們會使用心跳機制通知她們的application master。因此,application master知道Map輸出和主機位置之間的映射關(guān)系。reducer中的一個線程定期詢問master以便獲取Map輸出主機的位置,并開始將數(shù)據(jù)復(fù)制到Reduce任務(wù)的JVM內(nèi)中,如果內(nèi)存放不下,就會溢出寫到磁盤中。
2.1 MapReduce運行架構(gòu)
Hadoop 1.x版本時:

Hadoop 2.x版本時:
通過YARN進行資源分配
相當于將1.x中JobTracker拆分成:ResourceManager、MRAppMaster兩個角色。減輕JobTracker壓力。ResourceManager只負責(zé)資源管理,MRAppMaster:每個MapReduce任務(wù)對應(yīng)一個。不同的MapReduce任務(wù)的Master可以在不同機器上,避免一個JobTracker要管理很多個MapReduce任務(wù),造成壓力過大。

2.2 wordcount例子
統(tǒng)計一段文本中,每個單詞出現(xiàn)的次數(shù)。
Map:
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* Mapper類繼承Mapper方法
* <LongWritable,Text,Text, IntWritable> 輸入的key-value類型&輸出的key-value 類型
*/
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
? ? // 重寫Mapper父類的map方法,實現(xiàn)對value的操作
? ? //Map端核心業(yè)務(wù)的處理方法,每輸入一行數(shù)據(jù)會調(diào)用一次
? ? @Override
? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ? String string = value.toString();
? ? ? ? String[] word = string.split(" ");
? ? ? ? if (StringUtils.isEmpty(string)){
? ? ? ? ? ? System.out.println("為空");
? ? ? ? }
? ? ? ? // 遍歷word數(shù)組 封裝數(shù)據(jù)key-value
? ? ? ? for (String s : word) {
? ? ? ? ? ? // 將遍歷出來的單詞s寫出去 傳遞給redcue處理
? ? ? ? ? ? System.out.println("test s:"+ s );
? ? ? ? ? ? context.write(new Text(s),new IntWritable(1));
? ? ? ? }
? ? }
}
Reduce:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
* 繼承Redcuer類,重寫reduce方法,實現(xiàn)聚合操作
* <Text,Text,Text,Text> 輸入的key-value & 輸出的key-value
*/
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
? ? //和Map方法一樣,每輸入一行數(shù)據(jù)會調(diào)用一次
? ? private Text outk = new Text();
? ? private? IntWritable outv = new IntWritable();
? ? @Override
? ? protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
? ? ? //定義一個變量,實現(xiàn)累加操作
? ? ? ? int total = 0;
? ? ? ? for (IntWritable value : values) {
? ? ? ? ? ? outk.set(key);
? ? ? ? ? ? total += value.get();
? ? ? ? }
? ? ? ? outv.set(total);
? ? ? ? context.write(outk, outv);
? ? }
}
Driver:啟動類
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
* MR程序的驅(qū)動類:主要用于提交MR任務(wù)
*/
public class WordCountDriver {
? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
? ? ? ? // 1- 創(chuàng)建job對象
? ? ? ? Configuration conf = new Configuration();
? ? ? ? Job job = Job.getInstance(conf);
? ? ? ? // 2- 設(shè)置Driver驅(qū)動類
? ? ? ? job.setJarByClass(WordCountDriver.class);
? ? ? ? // 3- 設(shè)置讀取文件的輸入目錄
? ? ? ? FileInputFormat.setInputPaths(job,new Path("data\\word.txt"));
? ? ? ? // 4- 設(shè)置Mapper的主類
? ? ? ? job.setMapperClass(WordCountMapper.class);
? ? ? ? // 5- 設(shè)置Mapper的輸出key-value
? ? ? ? job.setMapOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(IntWritable.class);
? ? ? ? // 6- 設(shè)置Redcuer的主類
? ? ? ? job.setReducerClass(WordCountReducer.class);
? ? ? ? // 7- 設(shè)置Redcuer的輸出key-value
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? job.setOutputValueClass(IntWritable.class);
? ? ? ? // 8- 設(shè)置文件的輸出路徑
? ? ? ? FileOutputFormat.setOutputPath(job,new Path("output\\wcoutput2"));
? ? ? ? // 9- 提交job
? ? ? ? boolean flag = job.waitForCompletion(true);
? ? ? ? System.out.println(flag ? 0:1);
? ? }
}
3. YARN
Hadoop集群資源管理系統(tǒng)
負責(zé)管理Hadoop上機器資源,以及進行任務(wù)和資源分配,同時對相關(guān)任務(wù)通過container或者cgroup進行cpu、內(nèi)存等資源的限制,上面提到MapRedce 2.x允許架構(gòu)就是通過YARN運行的。
同時YARN也是作為SPARK、FLINK等組件運行調(diào)度器,有點類似k8s

4. AVRO
序列化框架,普遍用于大數(shù)據(jù)生態(tài)下。
Avro屬于Apache Hadoop的一個子項目。 Avro提供兩種序列化格式:JSON格式或者Binary格式。Binary格式在空間開銷和解析性能方面可以和Protobuf媲美,JSON格式方便測試階段的調(diào)試。 Avro支持的數(shù)據(jù)類型非常豐富,包括C++語言里面的union類型。Avro支持JSON格式的IDL和類似于Thrift和Protobuf的IDL,這兩者之間可以互轉(zhuǎn)。Schema可以在傳輸數(shù)據(jù)的同時發(fā)送,加上JSON的自我描述屬性,這使得Avro非常適合動態(tài)類型語言。 Avro在做文件持久化的時候,一般會和Schema一起存儲,所以Avro序列化文件自身具有自我描述屬性,所以非常適合于做Hive、Pig和MapReduce的持久化數(shù)據(jù)格式。對于不同版本的Schema,在進行RPC調(diào)用的時候,服務(wù)端和客戶端可以在握手階段對Schema進行互相確認,大大提高了最終的數(shù)據(jù)解析速度。
Avro提供了兩種序列化和反序列化的方式:一種是通過Schema文件來生成代碼的方式,一種是不生成代碼的通用方式,這兩種方式都需要構(gòu)建Schema文件。
4.1 示例
通過Avro不生成代碼方式,定義一個部門類的序列化。
定義dept.avsc:
{
"namespace":"com.hc.bean",
"type":"record",
"name":"Dept",
"fields":[
? {"name":"deptno","type":"int"},
? {"name":"dname","type":"string"},
? {"name":"loc","type":"string"}
]
}
序列化:
@Test
public void serialize() throws IOException {
? ? Schema schema = new Schema.Parser().parse(new File("src/main/resources/dept.avsc"));
? ? GenericRecord dept = new GenericData.Record(schema);
? ? dept.put("deptno", 90);
? ? dept.put("dname", "ee");
? ? dept.put("loc", "eeeeeeeee");
? ? DatumWriter<GenericRecord> datumWriter = new SpecificDatumWriter<>(schema); //泛型參數(shù)為GenericRecord
? ? DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
? ? dataFileWriter.create(schema, new File("dept.avro"));
? ? dataFileWriter.append(dept);
? ? dataFileWriter.close();
}
反序列化:
@Test
public void deSerialize() throws IOException {
? ? Schema schema = new Schema.Parser().parse(new File("src/main/resources/dept.avsc"));
? ? File file = new File("dept.avro");
? ? DatumReader<GenericRecord> datumReader = new SpecificDatumReader<GenericRecord>(schema);
? ? DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, datumReader);
? ? GenericRecord dept = null;
? ? while(dataFileReader.hasNext()) {
? ? ? ? dept = dataFileReader.next(dept );
? ? ? ? System.out.println(dept );
? ? }
}
4.2 序列化性能對比
詳細參考:https://tech.meituan.com/2015/02/26/serialization-vs-deserialization.html

avro-specific:通過二進制形式。? ? ?avro-generic:通過json形式
5.?Flume
Flume是一個分布式、可靠和高可用的海量日志聚合的系統(tǒng),支持在系統(tǒng)讀取各類數(shù)據(jù)發(fā)送方,用于收集數(shù)據(jù);同時,flume提供對數(shù)據(jù)進行簡單處理,并寫到各種數(shù)據(jù)接收方(可定制)的能力。
Flume架構(gòu):

source:source組件是專門用來收集數(shù)據(jù)的,可以處理各種類型、各種格式的日志數(shù)據(jù),包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義數(shù)據(jù)等。
channel:source組件把數(shù)據(jù)收集以后,臨時存放在channel中,即channel組件在agent中是專門用來存放臨時數(shù)據(jù)的——對采集到的數(shù)據(jù)進行簡單的緩存,可以存放在memory、jdbc、file等。
sink:sink組件是用于把數(shù)據(jù)發(fā)送到目的地 的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbse、solr、自定義。
5.1 示例:采集文件到 HDFS
需求:比如業(yè)務(wù)系統(tǒng)使用log4j生成的日志,日志內(nèi)容不斷增加,需要把追加到日志文件中的數(shù)據(jù)實時采集到hdfs
分析:根據(jù)需求,首先定義以下3大要素
采集源,即source——監(jiān)控文件內(nèi)容更新 : exec ‘tail -F file’
下沉目標,即sink——HDFS文件系統(tǒng) : hdfs sink
Source和sink之間的傳遞通道——channel,可用file channel 也可以用 內(nèi)存channel
配置:通過如下配置,啟動flume即可完成需求:flume-ng agent -c conf -f conf/tail-file.conf -n agent1 -Dflume.root.logger=INFO,console
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Describe/configure tail -F source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /export/servers/taillogs/access_log
agent1.sources.source1.channels = channel1
# Describe sink1
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
agent1.sinks.sink1.hdfs.path = hdfs://node01:8020/weblog/flume-collection/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 10
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
5.2 級聯(lián)
一個flume采集的數(shù)據(jù),可以上報到另一個flume上。右側(cè)的flume可以將數(shù)據(jù)進行進一步處理、聚合在上報到HDFS上。

6. Pig
針對MapReduce提供了更高層次的抽象,方便MapReduce開發(fā),Apache Pig將這些腳本轉(zhuǎn)換為一系列MapReduce作業(yè),因此,它使程序員的工作變得容易
Apache的Pig最大的作用就是對mapreduce算法(框架)實現(xiàn)了一套shell腳本 ,類似我們通常熟悉的 SQL 語句,在Pig中稱之為 Pig Latin?,在這套腳本中我們可以對加載出來的數(shù)據(jù)進行 排序、過濾、求和、分組(group by)、關(guān)聯(lián)(Joining),Pig也可以由用戶自定義一些函數(shù)對數(shù)據(jù)集進行操作,也就是傳說中的UDF(user-defined functions)。

7.?Crunch
Apache Crunch是FlumeJava的實現(xiàn),為不太方便直接開發(fā)和使用的MapReduce程序,開發(fā)一套MR流水線,具備數(shù)據(jù)表示模型,提供基礎(chǔ)原語和高級原語,根據(jù)底層執(zhí)行引擎對MR Job的執(zhí)行進行優(yōu)化。從分布式計算角度看,Crunch提供的許多計算原語,可以在Spark、Hive、Pig等地方找到很多相似之處,而本身的數(shù)據(jù)讀寫,序列化處理,分組、排序、聚合的實現(xiàn),類似MapReduce各階段的拆分都可以在Hadoop里找到影子。(外在表現(xiàn)類似java版的Pig)
8. Hive
通過HDFS提供結(jié)構(gòu)化存儲,并兼容SQL語法查詢
架構(gòu):

在Hadoop上架了一套Meta store 和Driver,實現(xiàn)結(jié)構(gòu)化數(shù)據(jù)存儲。
MetaStore: 記錄元數(shù)據(jù),包括:表名、表所屬的數(shù)據(jù)庫(默認是default)、表的擁有者、列/分區(qū)字段,標的類型(表是否為外部表)、表的數(shù)據(jù)所在目錄。這是數(shù)據(jù)默認存儲在Hive自帶的derby數(shù)據(jù)庫中,推薦使用MySQL數(shù)據(jù)庫存儲MetaStore。
Driver:將sql轉(zhuǎn)化為對應(yīng)MapReduce任務(wù)(底層也可以用spark實現(xiàn))
Hive默認在HDFS上存儲的是文本文件格式,其中每行存儲一個數(shù)據(jù)行(row),行內(nèi)通過分隔符Control-A(ASCII碼為1)劃分每列
8.1 Hive支持文件格式
表示的是Hive底層在HDFS存儲的文件格式
a.?Textfile文本格式:Hive的默認格式,數(shù)據(jù)不壓縮,磁盤開銷大、數(shù)據(jù)解析開銷大??山Y(jié)合Gzip、Bzip2使用,但使用Gzip這種方式,hive不會對數(shù)據(jù)進行切分,從而無法對數(shù)據(jù)進行并行操作
b. SequenceFile:Hadoop提供的一種二進制文件格式是Hadoop支持的標準文件格式(其他生態(tài)系統(tǒng)并不適用),可以直接將對序列化到文件中,所以sequencefile文件不能直接查看,可以通過Hadoop fs -text查看。具有使用方便,可分割,可壓縮,可進行切片。壓縮支持NONE, RECORD, BLOCK(優(yōu)先)等格式,可進行切片
c. RCFile:是一種行列存儲相結(jié)合的存儲方式,先將數(shù)據(jù)按行進行分塊再按列式存儲,保證同一條記錄在一個塊上,避免讀取多個塊,有利于數(shù)據(jù)壓縮和快速進行列存儲
d. ORCFile:orcfile是對rcfile的優(yōu)化,可以提高hive的讀寫、數(shù)據(jù)處理性能,提供更高的壓縮效率(目前主流選擇之一)。列式存儲
e. Parquet:一種列格式, 可提供對其他 hadoop 工具的可移植性, 包括Hive, Drill, Impala, Crunch, and Pig
f. Avro:Avro是一個數(shù)據(jù)序列化系統(tǒng),設(shè)計用于支持大批量數(shù)據(jù)交換的應(yīng)用。它的主要特點有:支持二進制序列化方式,可以便捷,快速地處理大量數(shù)據(jù);動態(tài)語言友好,Avro提供的機制使動態(tài)語言可以方便地處理Avro數(shù)據(jù)。
Hive建表:
CREATE EXTERNAL TABLE IF NOT EXISTS tb_test(
? ? id bigint COMMENT 'id',
? ? name string COMMENT 'name'? ? ?
)
COMMENT 'test table'
PARTITIONED BY (dt string, country STRING) //指定分區(qū)方式
STORED AS ORC? ? ? //指定文件格式
tblproperties ('orc.compress'='SNAPPY');? //指定壓縮算法
其中分區(qū)方式,表示Hive在存儲時,會將文件劃分多份,放入不同的目錄下:
如下圖所示,根據(jù)dt(日期)、country(國家)劃分目錄,將數(shù)據(jù)存入對應(yīng)文件目錄下。做到一個數(shù)據(jù)分片作用。

9. HBase
BigTable論文開源版本實現(xiàn):基于HDFS的 key-value 列式存儲
關(guān)鍵詞:HDFS、Rowkey、Region、LSM、Meta表

10.新一代批處理引擎:Spark
MapReduce任務(wù)對于復(fù)雜流程編寫較困難(需要組合多個MapRedece任務(wù)才能實現(xiàn)),同時任務(wù)執(zhí)行過程中涉及大量磁盤IO(比如Map輸出),運行速度很慢,所以MapReduce已慢慢被Spark代替。
Spark通過將數(shù)據(jù)緩存在內(nèi)存(RDD:彈性分布式數(shù)據(jù)集),加速處理過程(Spark的處理速度是Hadoop的10~100倍)
從狹義上來看,Spark只是MapReduce的替代方案,大部分應(yīng)用場景中,它還要依賴于HDFS和HBase來存儲數(shù)據(jù),依賴于YARN來管理集群和資源。
當然,Spark并不是一定要依附于Hadoop才能生存,它還可以運行在Apache Mesos、Kubernetes、standalone等其他云平臺上。
Spark架構(gòu):

10.1 workflow模型
Spark通過將一系列操作轉(zhuǎn)換成一個有向無環(huán)圖(DAG),這樣能夠在一個任務(wù)里實現(xiàn)多個邏輯,并且支持:groupby、Filter、reduce、Map、join等操作,避免了需要多個MapReduce才能實現(xiàn)一個復(fù)雜任務(wù)。

10.2 RDD
一個RDD可以基本理解DAG 圖中對應(yīng)的一個操作。比如:RDD1對應(yīng)上圖中A,RDD2對應(yīng)上圖中B

RDD是一個只讀的有屬性的數(shù)據(jù)集。屬性用來描述當前數(shù)據(jù)集的狀態(tài),數(shù)據(jù)集是由數(shù)據(jù)的分區(qū)(Data)組成,并(由block)映射成真實數(shù)據(jù)。RDD屬性包括名稱、分區(qū)類型、父RDD指針、數(shù)據(jù)本地化、數(shù)據(jù)依賴關(guān)系等,主要屬性如下:
a. Dependencies 表示的是與其他RDD 的關(guān)系
b. SparkContext是所有Spark功能的入口,它代表了與Spark節(jié)點的連接,可以用來創(chuàng)建RDD對象以及在節(jié)點中的廣播變量等。一個線程只有一個SparkContext。SparkConf則是一些參數(shù)配置信息。
c. Data 它代表RDD中數(shù)據(jù)的邏輯結(jié)構(gòu),每個Partition會映射到某個節(jié)點內(nèi)存或硬盤的一個數(shù)據(jù)塊。
d. Partitioner決定了RDD的分區(qū)方式,目前有兩種主流的分區(qū)方式:Hash partitioner和Range partitioner。Hash,顧名思義就是對數(shù)據(jù)的Key進行散列分區(qū),Range則是按照Key的排序進行均勻分區(qū)。此外我們還可以創(chuàng)建自定義的Partitioner。
e.檢查點(Checkpoint):基于RDD的依賴關(guān)系,如果任意一個RDD在相應(yīng)的節(jié)點丟失,你只需要從上一步的RDD出發(fā)再次計算,便可恢復(fù)該RDD。但是,如果一個RDD的依賴鏈比較長,而且中間又有多個RDD出現(xiàn)故障的話,進行恢復(fù)可能會非常耗費時間和計算資源。在計算過程中,對于一些計算過程比較耗時的RDD,我們可以將它緩存至硬盤或HDFS中,標記這個RDD有被檢查點處理過,并且清空它的所有依賴關(guān)系。同時,給它新建一個依賴于CheckpointRDD的依賴關(guān)系,CheckpointRDD可以用來從硬盤中讀取RDD和生成新的分區(qū)信息。
f.存儲級別(Storage Level)是一個枚舉類型,用來記錄RDD持久化時的存儲級別,常用的有以下幾個:
MEMORY_ONLY:只緩存在內(nèi)存中,如果內(nèi)存空間不夠則不緩存多出來的部分。這是RDD存儲級別的默認值。
MEMORY_AND_DISK:緩存在內(nèi)存中,如果空間不夠則緩存在硬盤中。
DISK_ONLY:只緩存在硬盤中。
MEMORY_ONLY_2和MEMORY_AND_DISK_2等:與上面的級別功能相同,只不過每個分區(qū)在集群中兩個節(jié)點上建立副本。
g. 迭代函數(shù)(Iterator)和計算函數(shù)(Compute)是用來表示RDD怎樣通過父RDD計算得到的。
RDD并行執(zhí)行原理:
圖中每個RDD下的 p 代表真實數(shù)據(jù)(上一個圖中的data部分),每個 p 代表需要處理數(shù)據(jù)的一部分,這些 p分布在不同機器上,執(zhí)行時都是并行執(zhí)行。

RDD的轉(zhuǎn)換操作:Map、Filter、groupByKey
RDD動作操作:Collect、Reduce、Count
例子:
rdd = sc.parallelize(["b", "a", "c"])
rdd.map(lambda x: (x, 1)).collect() // [('b', 1), ('a', 1), ('c', 1)]
RDD是延遲執(zhí)行的,只有 動作 操作才能觸發(fā)RDD執(zhí)行。轉(zhuǎn)換操作只是幫助構(gòu)建DAG圖,到達 動作操作時,才觸發(fā)真正執(zhí)行。
10.3 Spark SQL
在RDD底層的API上封裝了一層SQL引擎:負責(zé)將SQL轉(zhuǎn)換為對應(yīng)的RDD API,并對任務(wù)進行優(yōu)化,執(zhí)行相應(yīng)任務(wù)。
同時Spark SQL暴露了 DataFrame、DataSet API,可以讓用戶用更高層、簡潔API進行開發(fā)

10.4 Spark Streaming流式計算
通過微批次實現(xiàn)流式計算:每次獲取固定時間間隔數(shù)據(jù)進行處理

示例:
sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)
lines = sc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
10.5?Structured Streaming 流批一體
通過DataFrame API(Saprk SQL)實現(xiàn)流式計算(微批次實現(xiàn)流式計算),做到用同一套API,既可以處理流計算,也可以用于批處理。
words = ... #這個DataFrame代表詞語的數(shù)據(jù)流,schema是 { timestamp: Timestamp, word: String}
windowedCounts = words.groupBy(
? window(words.timestamp, "1 minute", "10 seconds"),
? words.word
).count()
.sort(desc("count"))
.limit(10)
基于詞語的生成時間,我們創(chuàng)建了一個窗口長度為1分鐘,滑動間隔為10秒的window。然后,把輸入的詞語表根據(jù)window和詞語本身聚合起來,并統(tǒng)計每個window內(nèi)每個詞語的數(shù)量。之后,再根據(jù)詞語的數(shù)量進行排序,只返回前10的詞語。
當我們編寫 Spark Streaming 程序的時候,本質(zhì)上就是要去構(gòu)造RDD的DAG執(zhí)行圖,然后通過 Spark Engine 運行。這樣開發(fā)者身上的擔子就很重,很多時候要自己想辦法去提高程序的處理效率
Structured Streaming提供的DataFrame API就是這么一個相對高level的API,大部分開發(fā)者都很熟悉關(guān)系型數(shù)據(jù)庫和SQL。這樣的數(shù)據(jù)抽象可以讓他們用一套統(tǒng)一的方案去處理批處理和流處理,不用去關(guān)心具體的執(zhí)行細節(jié)。而且,DataFrame API是在Spark SQL的引擎上執(zhí)行的,Spark SQL有非常多的優(yōu)化功能,比如執(zhí)行計劃優(yōu)化和內(nèi)存管理等,所以Structured Streaming的應(yīng)用程序性能很好。
11. 原生流式計算引擎:Flink
相比spark 微批次實現(xiàn)的流式計算,flink支持原生的流式計算(可以理解為:數(shù)據(jù)來了就處理),提供更低延遲。
支持:select、groupby、count、map、combine、join、filter、keyby、flatmap、Aggregation等

關(guān)鍵詞:窗口、水位線、滑動窗口

11.1 Flink架構(gòu)

11.2 Flink流批一體
Flink提供的兩個核心API就是DataSet API和DataStream API。DataSet代表有界的數(shù)據(jù)集,而DataStream代表流數(shù)據(jù)。所以,DataSet API是用來做批處理的,而DataStream API是做流處理的。
在內(nèi)部,DataSet其實也用Stream表示,靜態(tài)的有界數(shù)據(jù)也可以被看作是特殊的流數(shù)據(jù),而且DataSet與DataStream可以無縫切換。所以,F(xiàn)link的核心是DataStream。
wordCount示例:
public class WindowWordCount {
public static void main(String[] args) throws Exception {
? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
? DataStream> dataStream = env.socketTextStream("localhost", 9999)
.flatMap(new Splitter()).keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
? dataStream.print();
? env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction> {
? @Override
? public void flatMap(String sentence, Collector> out) {
? ? for (String word: sentence.split(" ")) {
? ? ? out.collect(new Tuple2(word, 1));
? ? }
? }
}
12. 流批一體:Apache Beam
Google在2016年的時候聯(lián)合了Talend、Data Artisans、Cloudera這些大數(shù)據(jù)公司,基于Dataflow Model的思想開發(fā)出了一套SDK,并貢獻給了Apache Software Foundation
很多時候我們不可避免地需要對數(shù)據(jù)同時進行批處理和流處理。Beam提供了一套統(tǒng)一的API來處理這兩種數(shù)據(jù)處理模式,讓我們只需要將注意力專注于在數(shù)據(jù)處理的算法上,而不用再花時間去對兩種數(shù)據(jù)處理模式上的差異進行維護。
Apache Beam的主要目標是統(tǒng)一批處理和流處理的編程范式,為無限,亂序,web-scale的數(shù)據(jù)集處理提供簡單靈活,功能豐富以及表達能力十分強大的SDK。Apache Beam項目重點在于數(shù)據(jù)處理的編程范式和接口定義,并不涉及具體執(zhí)行引擎的實現(xiàn),Apache Beam希望基于Beam開發(fā)的數(shù)據(jù)處理程序可以執(zhí)行在任意的分布式計算引擎上。現(xiàn)階段Apache Beam支持的Runner有近十種,包括了我們很熟悉的Apache Spark和Apache Flink。
隨著分布式數(shù)據(jù)處理不斷發(fā)展,新的分布式數(shù)據(jù)處理技術(shù)也不斷被提出,業(yè)界涌現(xiàn)出了越來越多的分布式數(shù)據(jù)處理框架,從最早的Hadoop MapReduce,到Apache Spark,Apache Storm,以及更近的Apache Flink,Apache Apex等。新的分布式處理框架可能帶來的更高的性能,更強大的功能,更低的延遲等,但用戶切換到新的分布式處理框架的代價也非常大:需要學(xué)習(xí)一個新的數(shù)據(jù)處理框架,并重寫所有的業(yè)務(wù)邏輯。解決這個問題的思路包括兩個部分,首先,需要一個編程范式,能夠統(tǒng)一,規(guī)范分布式數(shù)據(jù)處理的需求,例如,統(tǒng)一批處理和流處理的需求。其次,生成的分布式數(shù)據(jù)處理任務(wù)應(yīng)該能夠在各個分布式執(zhí)行引擎上執(zhí)行,用戶可以自由切換分布式數(shù)據(jù)處理任務(wù)的執(zhí)行引擎與執(zhí)行環(huán)境。Apache Beam正是為了解決以上問題而提出的。
Apache Beam主要由Beam SDK和Beam Runner組成,Beam SDK定義了開發(fā)分布式數(shù)據(jù)處理任務(wù)業(yè)務(wù)邏輯的API接口,生成的的分布式數(shù)據(jù)處理任務(wù)Pipeline交給具體的Beam Runner執(zhí)行引擎。Apache Beam目前支持的API接口是由Java語言實現(xiàn)的,Python版本的API正在開發(fā)之中。Apache Beam支持的底層執(zhí)行引擎包括Apache Flink,Apache Spark以及Google Cloud Platform,此外Apache Storm,Apache Hadoop,Apache Gearpump等執(zhí)行引擎的支持也在討論或開發(fā)當中。其基本架構(gòu)如下圖所示:

13. 經(jīng)典大數(shù)據(jù)架構(gòu)
13.1 Lamda架構(gòu)?
twitter提出

Lambda 架構(gòu)將數(shù)據(jù)處理流程整體分成了三層:離線層(Batch Layer),加速層(Speed layer),服務(wù)層(Serving layer)
離線層:新數(shù)據(jù)不斷喂給數(shù)據(jù)系統(tǒng),期間每個樣本都會同時發(fā)給離線和加速層。所有進入離線層的數(shù)據(jù)流都會在數(shù)據(jù)湖(Data Lake)上進行計算處理。數(shù)據(jù)湖多會使用基于內(nèi)存的數(shù)據(jù)庫或 NoSQL 類的永久存儲設(shè)施,數(shù)據(jù)存好后離線層使用 MapReduce 或一些機器學(xué)習(xí)方法對數(shù)據(jù)進行處理并由此對接下來的內(nèi)容進行預(yù)測。
加速層:加速層會享用離線層事件溯源成果。離線層中的數(shù)據(jù)處理會涉及增量程序,MapReduce 或機器學(xué)習(xí)模型的更新,模型會被加速層進一步用來處理新數(shù)據(jù)。就這樣,加速層借助富集過程得到結(jié)果,保證服務(wù)層請求響應(yīng)延遲處于低位。而加速層也幾乎只處理實時數(shù)據(jù),計算負荷較低,延遲有保障。
服務(wù)層:從離線層得到離線視圖(batch view),加速層得到準實時視圖(near-real time view),統(tǒng)一送給服務(wù)層。服務(wù)層使用這些信息臨時應(yīng)付那些等候的查詢。
如果將整個過程用函數(shù)方程表示,任意大數(shù)據(jù)查詢都可歸結(jié)為如下形式
Query=λ(Complete data)=λ(live streaming data)?λ(stored data)Query=λ(Complete data)=λ(live streaming data)?λ(stored data)
方程中所用符號名為 Lambda_(音譯:拉姆達)_,Lambda 架構(gòu)的名字也源自于此。這個方程廣為熟悉數(shù)據(jù)分析軼事的人所知,從中可以看出所有數(shù)據(jù)相關(guān)查詢,結(jié)合離線歷史信息和實時流,Lambda 架構(gòu)都能處理。
13.2 Kappa架構(gòu)
LinkedIn提出
Kappa架構(gòu)相比Lamda架構(gòu),去掉了離線層,只保留加速層,這樣就不用同時維護2套架構(gòu),更加簡潔。當需要對數(shù)據(jù)重新處理時,可以借助數(shù)據(jù)源(比如:Kafka)的消息回溯功能,將消息回溯到1個月甚至1年前,進行重新處理。

將 Kappa 架構(gòu)的整體流程轉(zhuǎn)換成功能方程,任意數(shù)據(jù)查詢形式歸結(jié)為
Query=K(New data)=K(Live streaming data)Query=K(New data)=K(Live streaming data)
從方程可以看出所有查詢都能通過加速層對實時數(shù)據(jù)流的處理得到滿足。同時也指出流處理過程發(fā)生在 Kappa 架構(gòu)的加速層中