大數(shù)據(jù)相關(guān)技術(shù)概覽

前言

本文主要涉及的技術(shù):HDFS、MapReduce、YARN、Avro、Flume、Pig、Crunch、Hive、HBase、Spark、Flink、Beam。每個技術(shù)會大概講一下其架構(gòu)和功能,讓大家對大數(shù)據(jù)技術(shù)體系有個宏觀的認識。

2003~2005 Google連續(xù)發(fā)表3篇論文,用于解決大規(guī)模數(shù)據(jù)存儲和處理,依據(jù)三篇論文思想,業(yè)界開源出了Hadoop相關(guān)大數(shù)據(jù)組件,拉開了大數(shù)據(jù)技術(shù)帷幕。

1. 2003年 GFS:《The Google File System 》(https://blog.csdn.net/xuleicsu/article/details/526386):解決非結(jié)構(gòu)化數(shù)據(jù)大規(guī)模存儲

2. 2004年 MapReduce:《MapReduce: Simplified Data Processing on Large Clusters》(https://blog.csdn.net/active1001/article/details/1675920):解決大規(guī)模數(shù)據(jù)處理

3. 2005年 BigTale: 《Bigtable: A Distributed Storage System for Structured Data》(https://blog.csdn.net/accesine960/article/details/595628):解決結(jié)構(gòu)化/半結(jié)構(gòu)化數(shù)據(jù)存儲

1. HDFS

針對GFS論文的開源實現(xiàn)版本

架構(gòu):

文件最小存儲單位:chunk(文件塊),默認128MB

HDFS上文件被劃分為多個分塊(chunk),作為獨立的存儲單元。但與面向單一磁盤的文件系統(tǒng)不同的是,HDFS中小于一個塊大小的文件不會占據(jù)整個塊空間(例如:當一個1MB的文件存儲在一個128MB的塊中時,文件只使用1MB的磁盤空間,而不是128MB)。

Namenode具備主備雙節(jié)點。當活動的namenode失效,備用的namenode就會接管它的任務(wù)并開始服務(wù)于來自客戶端的請求。

1.1?HDFS Federation

聯(lián)邦HDFS:每個namenode管理文件系統(tǒng)命名空間的一部分

由于NameNode要管理所有的文件和數(shù)據(jù)塊的引用關(guān)系,對于一個超大規(guī)模的HDFS集群,Namenode成為瓶頸。聯(lián)邦HDFS能夠支持多個Namenode,對Namenode數(shù)據(jù)進行分片,減輕每個Namenode存儲和請求壓力。

例如:一個Namenode可能管理/user目錄下的所有文件,另一個Namenode管理/share目錄下的所有文件。

聯(lián)邦HDFS

1.2 HDFS讀取流程

1.3 HDFS寫流程?

2. MapReduce

針對MapReduce論文的開源實現(xiàn)版本

MapReduce運行邏輯圖如下:(此過程也稱為shuffle)

默認:一個chunk(文件塊)會啟動一個Map任務(wù)

當對HDFS上的一個文件運行MapReduce任務(wù)時,如果該文件大小是1280MB,那么該文件就會對應(yīng)10個chunk,默認會啟動10個Map任務(wù)去處理(并發(fā)度可以理解為10),每個Map任務(wù)處理一個chunk,并且該Map任務(wù)會盡量分配和chunk在相同的機器上,避免文件讀取跨網(wǎng)絡(luò)

Reduce任務(wù)數(shù)量并非由輸入數(shù)據(jù)大小決定的,相反是獨立指定的。默認任務(wù)數(shù)是1,可以在運行時指定reduce任務(wù)數(shù)

Map:對數(shù)據(jù)進行處理,并按照一定分區(qū)規(guī)則,將處理好的數(shù)據(jù),下發(fā)給Reduce任務(wù)。

Map輸出會寫到運行Map任務(wù)機器本地磁盤上

Reduce:對分區(qū)后的數(shù)據(jù)進行處理輸出(一般也輸出在HDFS 存儲上)

Map任務(wù)完成后,它們會使用心跳機制通知她們的application master。因此,application master知道Map輸出和主機位置之間的映射關(guān)系。reducer中的一個線程定期詢問master以便獲取Map輸出主機的位置,并開始將數(shù)據(jù)復(fù)制到Reduce任務(wù)的JVM內(nèi)中,如果內(nèi)存放不下,就會溢出寫到磁盤中。

2.1 MapReduce運行架構(gòu)

Hadoop 1.x版本時:

Hadoop 2.x版本時:

通過YARN進行資源分配

相當于將1.x中JobTracker拆分成:ResourceManager、MRAppMaster兩個角色。減輕JobTracker壓力。ResourceManager只負責(zé)資源管理,MRAppMaster:每個MapReduce任務(wù)對應(yīng)一個。不同的MapReduce任務(wù)的Master可以在不同機器上,避免一個JobTracker要管理很多個MapReduce任務(wù),造成壓力過大。

2.2 wordcount例子

統(tǒng)計一段文本中,每個單詞出現(xiàn)的次數(shù)。

Map:

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**

* Mapper類繼承Mapper方法

* <LongWritable,Text,Text, IntWritable> 輸入的key-value類型&輸出的key-value 類型

*/

public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {

? ? // 重寫Mapper父類的map方法,實現(xiàn)對value的操作

? ? //Map端核心業(yè)務(wù)的處理方法,每輸入一行數(shù)據(jù)會調(diào)用一次

? ? @Override

? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

? ? ? ? String string = value.toString();

? ? ? ? String[] word = string.split(" ");

? ? ? ? if (StringUtils.isEmpty(string)){

? ? ? ? ? ? System.out.println("為空");

? ? ? ? }

? ? ? ? // 遍歷word數(shù)組 封裝數(shù)據(jù)key-value

? ? ? ? for (String s : word) {

? ? ? ? ? ? // 將遍歷出來的單詞s寫出去 傳遞給redcue處理

? ? ? ? ? ? System.out.println("test s:"+ s );

? ? ? ? ? ? context.write(new Text(s),new IntWritable(1));

? ? ? ? }

? ? }

}

Reduce:

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.io.Text;

import java.io.IOException;

/**

* 繼承Redcuer類,重寫reduce方法,實現(xiàn)聚合操作

* <Text,Text,Text,Text> 輸入的key-value & 輸出的key-value

*/

public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {

? ? //和Map方法一樣,每輸入一行數(shù)據(jù)會調(diào)用一次

? ? private Text outk = new Text();

? ? private? IntWritable outv = new IntWritable();

? ? @Override

? ? protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

? ? ? //定義一個變量,實現(xiàn)累加操作

? ? ? ? int total = 0;

? ? ? ? for (IntWritable value : values) {

? ? ? ? ? ? outk.set(key);

? ? ? ? ? ? total += value.get();

? ? ? ? }

? ? ? ? outv.set(total);

? ? ? ? context.write(outk, outv);

? ? }

}

Driver:啟動類

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.io.Text;

import java.io.IOException;

/**

* MR程序的驅(qū)動類:主要用于提交MR任務(wù)

*/

public class WordCountDriver {

? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

? ? ? ? // 1- 創(chuàng)建job對象

? ? ? ? Configuration conf = new Configuration();

? ? ? ? Job job = Job.getInstance(conf);

? ? ? ? // 2- 設(shè)置Driver驅(qū)動類

? ? ? ? job.setJarByClass(WordCountDriver.class);

? ? ? ? // 3- 設(shè)置讀取文件的輸入目錄

? ? ? ? FileInputFormat.setInputPaths(job,new Path("data\\word.txt"));

? ? ? ? // 4- 設(shè)置Mapper的主類

? ? ? ? job.setMapperClass(WordCountMapper.class);

? ? ? ? // 5- 設(shè)置Mapper的輸出key-value

? ? ? ? job.setMapOutputKeyClass(Text.class);

? ? ? ? job.setMapOutputValueClass(IntWritable.class);

? ? ? ? // 6- 設(shè)置Redcuer的主類

? ? ? ? job.setReducerClass(WordCountReducer.class);

? ? ? ? // 7- 設(shè)置Redcuer的輸出key-value

? ? ? ? job.setOutputKeyClass(Text.class);

? ? ? ? job.setOutputValueClass(IntWritable.class);

? ? ? ? // 8- 設(shè)置文件的輸出路徑

? ? ? ? FileOutputFormat.setOutputPath(job,new Path("output\\wcoutput2"));

? ? ? ? // 9- 提交job

? ? ? ? boolean flag = job.waitForCompletion(true);

? ? ? ? System.out.println(flag ? 0:1);

? ? }

}

3. YARN

Hadoop集群資源管理系統(tǒng)

負責(zé)管理Hadoop上機器資源,以及進行任務(wù)和資源分配,同時對相關(guān)任務(wù)通過container或者cgroup進行cpu、內(nèi)存等資源的限制,上面提到MapRedce 2.x允許架構(gòu)就是通過YARN運行的。

同時YARN也是作為SPARK、FLINK等組件運行調(diào)度器,有點類似k8s

4. AVRO

序列化框架,普遍用于大數(shù)據(jù)生態(tài)下。

Avro屬于Apache Hadoop的一個子項目。 Avro提供兩種序列化格式:JSON格式或者Binary格式。Binary格式在空間開銷和解析性能方面可以和Protobuf媲美,JSON格式方便測試階段的調(diào)試。 Avro支持的數(shù)據(jù)類型非常豐富,包括C++語言里面的union類型。Avro支持JSON格式的IDL和類似于Thrift和Protobuf的IDL,這兩者之間可以互轉(zhuǎn)。Schema可以在傳輸數(shù)據(jù)的同時發(fā)送,加上JSON的自我描述屬性,這使得Avro非常適合動態(tài)類型語言。 Avro在做文件持久化的時候,一般會和Schema一起存儲,所以Avro序列化文件自身具有自我描述屬性,所以非常適合于做Hive、Pig和MapReduce的持久化數(shù)據(jù)格式。對于不同版本的Schema,在進行RPC調(diào)用的時候,服務(wù)端和客戶端可以在握手階段對Schema進行互相確認,大大提高了最終的數(shù)據(jù)解析速度。

Avro提供了兩種序列化和反序列化的方式:一種是通過Schema文件來生成代碼的方式,一種是不生成代碼的通用方式,這兩種方式都需要構(gòu)建Schema文件。

4.1 示例

通過Avro不生成代碼方式,定義一個部門類的序列化。

定義dept.avsc:

{

"namespace":"com.hc.bean",

"type":"record",

"name":"Dept",

"fields":[

? {"name":"deptno","type":"int"},

? {"name":"dname","type":"string"},

? {"name":"loc","type":"string"}

]

}

序列化:

@Test

public void serialize() throws IOException {

? ? Schema schema = new Schema.Parser().parse(new File("src/main/resources/dept.avsc"));

? ? GenericRecord dept = new GenericData.Record(schema);

? ? dept.put("deptno", 90);

? ? dept.put("dname", "ee");

? ? dept.put("loc", "eeeeeeeee");

? ? DatumWriter<GenericRecord> datumWriter = new SpecificDatumWriter<>(schema); //泛型參數(shù)為GenericRecord

? ? DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);

? ? dataFileWriter.create(schema, new File("dept.avro"));

? ? dataFileWriter.append(dept);

? ? dataFileWriter.close();

}

反序列化:

@Test

public void deSerialize() throws IOException {

? ? Schema schema = new Schema.Parser().parse(new File("src/main/resources/dept.avsc"));

? ? File file = new File("dept.avro");

? ? DatumReader<GenericRecord> datumReader = new SpecificDatumReader<GenericRecord>(schema);

? ? DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, datumReader);

? ? GenericRecord dept = null;

? ? while(dataFileReader.hasNext()) {

? ? ? ? dept = dataFileReader.next(dept );

? ? ? ? System.out.println(dept );

? ? }

}

4.2 序列化性能對比

詳細參考:https://tech.meituan.com/2015/02/26/serialization-vs-deserialization.html

avro-specific:通過二進制形式。? ? ?avro-generic:通過json形式

5.?Flume

Flume是一個分布式、可靠和高可用的海量日志聚合的系統(tǒng),支持在系統(tǒng)讀取各類數(shù)據(jù)發(fā)送方,用于收集數(shù)據(jù);同時,flume提供對數(shù)據(jù)進行簡單處理,并寫到各種數(shù)據(jù)接收方(可定制)的能力。

Flume架構(gòu):

source:source組件是專門用來收集數(shù)據(jù)的,可以處理各種類型、各種格式的日志數(shù)據(jù),包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義數(shù)據(jù)等。

channel:source組件把數(shù)據(jù)收集以后,臨時存放在channel中,即channel組件在agent中是專門用來存放臨時數(shù)據(jù)的——對采集到的數(shù)據(jù)進行簡單的緩存,可以存放在memory、jdbc、file等。

sink:sink組件是用于把數(shù)據(jù)發(fā)送到目的地 的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbse、solr、自定義。

5.1 示例:采集文件到 HDFS

需求:比如業(yè)務(wù)系統(tǒng)使用log4j生成的日志,日志內(nèi)容不斷增加,需要把追加到日志文件中的數(shù)據(jù)實時采集到hdfs

分析:根據(jù)需求,首先定義以下3大要素

采集源,即source——監(jiān)控文件內(nèi)容更新 : exec ‘tail -F file’

下沉目標,即sink——HDFS文件系統(tǒng) : hdfs sink

Source和sink之間的傳遞通道——channel,可用file channel 也可以用 內(nèi)存channel

配置:通過如下配置,啟動flume即可完成需求:flume-ng agent -c conf -f conf/tail-file.conf -n agent1 -Dflume.root.logger=INFO,console

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

# Describe/configure tail -F source1

agent1.sources.source1.type = exec

agent1.sources.source1.command = tail -F /export/servers/taillogs/access_log

agent1.sources.source1.channels = channel1

# Describe sink1

agent1.sinks.sink1.type = hdfs

#a1.sinks.k1.channel = c1

agent1.sinks.sink1.hdfs.path = hdfs://node01:8020/weblog/flume-collection/%y-%m-%d/%H-%M

agent1.sinks.sink1.hdfs.filePrefix = access_log

agent1.sinks.sink1.hdfs.maxOpenFiles = 5000

agent1.sinks.sink1.hdfs.batchSize= 100

agent1.sinks.sink1.hdfs.fileType = DataStream

agent1.sinks.sink1.hdfs.writeFormat =Text

agent1.sinks.sink1.hdfs.round = true

agent1.sinks.sink1.hdfs.roundValue = 10

agent1.sinks.sink1.hdfs.roundUnit = minute

agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.keep-alive = 120

agent1.channels.channel1.capacity = 500000

agent1.channels.channel1.transactionCapacity = 600

# Bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

5.2 級聯(lián)

一個flume采集的數(shù)據(jù),可以上報到另一個flume上。右側(cè)的flume可以將數(shù)據(jù)進行進一步處理、聚合在上報到HDFS上。

6. Pig

針對MapReduce提供了更高層次的抽象,方便MapReduce開發(fā),Apache Pig將這些腳本轉(zhuǎn)換為一系列MapReduce作業(yè),因此,它使程序員的工作變得容易

Apache的Pig最大的作用就是對mapreduce算法(框架)實現(xiàn)了一套shell腳本 ,類似我們通常熟悉的 SQL 語句,在Pig中稱之為 Pig Latin?,在這套腳本中我們可以對加載出來的數(shù)據(jù)進行 排序、過濾、求和、分組(group by)、關(guān)聯(lián)(Joining),Pig也可以由用戶自定義一些函數(shù)對數(shù)據(jù)集進行操作,也就是傳說中的UDF(user-defined functions)。

7.?Crunch

Apache Crunch是FlumeJava的實現(xiàn),為不太方便直接開發(fā)和使用的MapReduce程序,開發(fā)一套MR流水線,具備數(shù)據(jù)表示模型,提供基礎(chǔ)原語和高級原語,根據(jù)底層執(zhí)行引擎對MR Job的執(zhí)行進行優(yōu)化。從分布式計算角度看,Crunch提供的許多計算原語,可以在Spark、Hive、Pig等地方找到很多相似之處,而本身的數(shù)據(jù)讀寫,序列化處理,分組、排序、聚合的實現(xiàn),類似MapReduce各階段的拆分都可以在Hadoop里找到影子。(外在表現(xiàn)類似java版的Pig)

8. Hive

通過HDFS提供結(jié)構(gòu)化存儲,并兼容SQL語法查詢

架構(gòu):

在Hadoop上架了一套Meta store 和Driver,實現(xiàn)結(jié)構(gòu)化數(shù)據(jù)存儲。

MetaStore: 記錄元數(shù)據(jù),包括:表名、表所屬的數(shù)據(jù)庫(默認是default)、表的擁有者、列/分區(qū)字段,標的類型(表是否為外部表)、表的數(shù)據(jù)所在目錄。這是數(shù)據(jù)默認存儲在Hive自帶的derby數(shù)據(jù)庫中,推薦使用MySQL數(shù)據(jù)庫存儲MetaStore。

Driver:將sql轉(zhuǎn)化為對應(yīng)MapReduce任務(wù)(底層也可以用spark實現(xiàn))

Hive默認在HDFS上存儲的是文本文件格式,其中每行存儲一個數(shù)據(jù)行(row),行內(nèi)通過分隔符Control-A(ASCII碼為1)劃分每列

8.1 Hive支持文件格式

表示的是Hive底層在HDFS存儲的文件格式

a.?Textfile文本格式:Hive的默認格式,數(shù)據(jù)不壓縮,磁盤開銷大、數(shù)據(jù)解析開銷大??山Y(jié)合Gzip、Bzip2使用,但使用Gzip這種方式,hive不會對數(shù)據(jù)進行切分,從而無法對數(shù)據(jù)進行并行操作

b. SequenceFile:Hadoop提供的一種二進制文件格式是Hadoop支持的標準文件格式(其他生態(tài)系統(tǒng)并不適用),可以直接將對序列化到文件中,所以sequencefile文件不能直接查看,可以通過Hadoop fs -text查看。具有使用方便,可分割,可壓縮,可進行切片。壓縮支持NONE, RECORD, BLOCK(優(yōu)先)等格式,可進行切片

c. RCFile:是一種行列存儲相結(jié)合的存儲方式,先將數(shù)據(jù)按行進行分塊再按列式存儲,保證同一條記錄在一個塊上,避免讀取多個塊,有利于數(shù)據(jù)壓縮和快速進行列存儲

d. ORCFile:orcfile是對rcfile的優(yōu)化,可以提高hive的讀寫、數(shù)據(jù)處理性能,提供更高的壓縮效率(目前主流選擇之一)。列式存儲

e. Parquet:一種列格式, 可提供對其他 hadoop 工具的可移植性, 包括Hive, Drill, Impala, Crunch, and Pig

f. Avro:Avro是一個數(shù)據(jù)序列化系統(tǒng),設(shè)計用于支持大批量數(shù)據(jù)交換的應(yīng)用。它的主要特點有:支持二進制序列化方式,可以便捷,快速地處理大量數(shù)據(jù);動態(tài)語言友好,Avro提供的機制使動態(tài)語言可以方便地處理Avro數(shù)據(jù)。

Hive建表:

CREATE EXTERNAL TABLE IF NOT EXISTS tb_test(

? ? id bigint COMMENT 'id',

? ? name string COMMENT 'name'? ? ?

)

COMMENT 'test table'

PARTITIONED BY (dt string, country STRING) //指定分區(qū)方式

STORED AS ORC? ? ? //指定文件格式

tblproperties ('orc.compress'='SNAPPY');? //指定壓縮算法

其中分區(qū)方式,表示Hive在存儲時,會將文件劃分多份,放入不同的目錄下:

如下圖所示,根據(jù)dt(日期)、country(國家)劃分目錄,將數(shù)據(jù)存入對應(yīng)文件目錄下。做到一個數(shù)據(jù)分片作用。

9. HBase

BigTable論文開源版本實現(xiàn):基于HDFS的 key-value 列式存儲

關(guān)鍵詞:HDFS、Rowkey、Region、LSM、Meta表

10.新一代批處理引擎:Spark

MapReduce任務(wù)對于復(fù)雜流程編寫較困難(需要組合多個MapRedece任務(wù)才能實現(xiàn)),同時任務(wù)執(zhí)行過程中涉及大量磁盤IO(比如Map輸出),運行速度很慢,所以MapReduce已慢慢被Spark代替。

Spark通過將數(shù)據(jù)緩存在內(nèi)存(RDD:彈性分布式數(shù)據(jù)集),加速處理過程(Spark的處理速度是Hadoop的10~100倍)

從狹義上來看,Spark只是MapReduce的替代方案,大部分應(yīng)用場景中,它還要依賴于HDFS和HBase來存儲數(shù)據(jù),依賴于YARN來管理集群和資源。

當然,Spark并不是一定要依附于Hadoop才能生存,它還可以運行在Apache Mesos、Kubernetes、standalone等其他云平臺上。

Spark架構(gòu):

10.1 workflow模型

Spark通過將一系列操作轉(zhuǎn)換成一個有向無環(huán)圖(DAG),這樣能夠在一個任務(wù)里實現(xiàn)多個邏輯,并且支持:groupby、Filter、reduce、Map、join等操作,避免了需要多個MapReduce才能實現(xiàn)一個復(fù)雜任務(wù)。

DAG

10.2 RDD

一個RDD可以基本理解DAG 圖中對應(yīng)的一個操作。比如:RDD1對應(yīng)上圖中A,RDD2對應(yīng)上圖中B

RDD是一個只讀的有屬性的數(shù)據(jù)集。屬性用來描述當前數(shù)據(jù)集的狀態(tài),數(shù)據(jù)集是由數(shù)據(jù)的分區(qū)(Data)組成,并(由block)映射成真實數(shù)據(jù)。RDD屬性包括名稱、分區(qū)類型、父RDD指針、數(shù)據(jù)本地化、數(shù)據(jù)依賴關(guān)系等,主要屬性如下:

a. Dependencies 表示的是與其他RDD 的關(guān)系

b. SparkContext是所有Spark功能的入口,它代表了與Spark節(jié)點的連接,可以用來創(chuàng)建RDD對象以及在節(jié)點中的廣播變量等。一個線程只有一個SparkContext。SparkConf則是一些參數(shù)配置信息。

c. Data 它代表RDD中數(shù)據(jù)的邏輯結(jié)構(gòu),每個Partition會映射到某個節(jié)點內(nèi)存或硬盤的一個數(shù)據(jù)塊。

d. Partitioner決定了RDD的分區(qū)方式,目前有兩種主流的分區(qū)方式:Hash partitioner和Range partitioner。Hash,顧名思義就是對數(shù)據(jù)的Key進行散列分區(qū),Range則是按照Key的排序進行均勻分區(qū)。此外我們還可以創(chuàng)建自定義的Partitioner。

e.檢查點(Checkpoint):基于RDD的依賴關(guān)系,如果任意一個RDD在相應(yīng)的節(jié)點丟失,你只需要從上一步的RDD出發(fā)再次計算,便可恢復(fù)該RDD。但是,如果一個RDD的依賴鏈比較長,而且中間又有多個RDD出現(xiàn)故障的話,進行恢復(fù)可能會非常耗費時間和計算資源。在計算過程中,對于一些計算過程比較耗時的RDD,我們可以將它緩存至硬盤或HDFS中,標記這個RDD有被檢查點處理過,并且清空它的所有依賴關(guān)系。同時,給它新建一個依賴于CheckpointRDD的依賴關(guān)系,CheckpointRDD可以用來從硬盤中讀取RDD和生成新的分區(qū)信息。

f.存儲級別(Storage Level)是一個枚舉類型,用來記錄RDD持久化時的存儲級別,常用的有以下幾個:

MEMORY_ONLY:只緩存在內(nèi)存中,如果內(nèi)存空間不夠則不緩存多出來的部分。這是RDD存儲級別的默認值。

MEMORY_AND_DISK:緩存在內(nèi)存中,如果空間不夠則緩存在硬盤中。

DISK_ONLY:只緩存在硬盤中。

MEMORY_ONLY_2和MEMORY_AND_DISK_2等:與上面的級別功能相同,只不過每個分區(qū)在集群中兩個節(jié)點上建立副本。

g. 迭代函數(shù)(Iterator)和計算函數(shù)(Compute)是用來表示RDD怎樣通過父RDD計算得到的。

RDD并行執(zhí)行原理:

圖中每個RDD下的 p 代表真實數(shù)據(jù)(上一個圖中的data部分),每個 p 代表需要處理數(shù)據(jù)的一部分,這些 p分布在不同機器上,執(zhí)行時都是并行執(zhí)行。

RDD的轉(zhuǎn)換操作:Map、Filter、groupByKey

RDD動作操作:Collect、Reduce、Count

例子:

rdd = sc.parallelize(["b", "a", "c"])

rdd.map(lambda x: (x, 1)).collect() // [('b', 1), ('a', 1), ('c', 1)]

RDD是延遲執(zhí)行的,只有 動作 操作才能觸發(fā)RDD執(zhí)行。轉(zhuǎn)換操作只是幫助構(gòu)建DAG圖,到達 動作操作時,才觸發(fā)真正執(zhí)行。

10.3 Spark SQL

在RDD底層的API上封裝了一層SQL引擎:負責(zé)將SQL轉(zhuǎn)換為對應(yīng)的RDD API,并對任務(wù)進行優(yōu)化,執(zhí)行相應(yīng)任務(wù)。

同時Spark SQL暴露了 DataFrame、DataSet API,可以讓用戶用更高層、簡潔API進行開發(fā)

10.4 Spark Streaming流式計算

通過微批次實現(xiàn)流式計算:每次獲取固定時間間隔數(shù)據(jù)進行處理

示例:

sc = SparkContext(master, appName)

ssc = StreamingContext(sc, 1)

lines = sc.socketTextStream("localhost", 9999)

words = lines.flatMap(lambda line: line.split(" "))

10.5?Structured Streaming 流批一體

通過DataFrame API(Saprk SQL)實現(xiàn)流式計算(微批次實現(xiàn)流式計算),做到用同一套API,既可以處理流計算,也可以用于批處理。

words = ... #這個DataFrame代表詞語的數(shù)據(jù)流,schema是 { timestamp: Timestamp, word: String}

windowedCounts = words.groupBy(

? window(words.timestamp, "1 minute", "10 seconds"),

? words.word

).count()

.sort(desc("count"))

.limit(10)

基于詞語的生成時間,我們創(chuàng)建了一個窗口長度為1分鐘,滑動間隔為10秒的window。然后,把輸入的詞語表根據(jù)window和詞語本身聚合起來,并統(tǒng)計每個window內(nèi)每個詞語的數(shù)量。之后,再根據(jù)詞語的數(shù)量進行排序,只返回前10的詞語。

當我們編寫 Spark Streaming 程序的時候,本質(zhì)上就是要去構(gòu)造RDD的DAG執(zhí)行圖,然后通過 Spark Engine 運行。這樣開發(fā)者身上的擔子就很重,很多時候要自己想辦法去提高程序的處理效率

Structured Streaming提供的DataFrame API就是這么一個相對高level的API,大部分開發(fā)者都很熟悉關(guān)系型數(shù)據(jù)庫和SQL。這樣的數(shù)據(jù)抽象可以讓他們用一套統(tǒng)一的方案去處理批處理和流處理,不用去關(guān)心具體的執(zhí)行細節(jié)。而且,DataFrame API是在Spark SQL的引擎上執(zhí)行的,Spark SQL有非常多的優(yōu)化功能,比如執(zhí)行計劃優(yōu)化和內(nèi)存管理等,所以Structured Streaming的應(yīng)用程序性能很好。

11. 原生流式計算引擎:Flink

相比spark 微批次實現(xiàn)的流式計算,flink支持原生的流式計算(可以理解為:數(shù)據(jù)來了就處理),提供更低延遲。

支持:select、groupby、count、map、combine、join、filter、keyby、flatmap、Aggregation等

關(guān)鍵詞:窗口、水位線、滑動窗口

11.1 Flink架構(gòu)

11.2 Flink流批一體

Flink提供的兩個核心API就是DataSet API和DataStream API。DataSet代表有界的數(shù)據(jù)集,而DataStream代表流數(shù)據(jù)。所以,DataSet API是用來做批處理的,而DataStream API是做流處理的。

在內(nèi)部,DataSet其實也用Stream表示,靜態(tài)的有界數(shù)據(jù)也可以被看作是特殊的流數(shù)據(jù),而且DataSet與DataStream可以無縫切換。所以,F(xiàn)link的核心是DataStream。

wordCount示例:

public class WindowWordCount {

public static void main(String[] args) throws Exception {

? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

? DataStream> dataStream = env.socketTextStream("localhost", 9999)

  .flatMap(new Splitter()).keyBy(0)

  .timeWindow(Time.seconds(5))

  .sum(1);

? dataStream.print();

? env.execute("Window WordCount");

}

public static class Splitter implements FlatMapFunction> {

? @Override

? public void flatMap(String sentence, Collector> out) {

? ? for (String word: sentence.split(" ")) {

? ? ? out.collect(new Tuple2(word, 1));

? ? }

? }

}

12. 流批一體:Apache Beam

Google在2016年的時候聯(lián)合了Talend、Data Artisans、Cloudera這些大數(shù)據(jù)公司,基于Dataflow Model的思想開發(fā)出了一套SDK,并貢獻給了Apache Software Foundation

很多時候我們不可避免地需要對數(shù)據(jù)同時進行批處理和流處理。Beam提供了一套統(tǒng)一的API來處理這兩種數(shù)據(jù)處理模式,讓我們只需要將注意力專注于在數(shù)據(jù)處理的算法上,而不用再花時間去對兩種數(shù)據(jù)處理模式上的差異進行維護。

Apache Beam的主要目標是統(tǒng)一批處理和流處理的編程范式,為無限,亂序,web-scale的數(shù)據(jù)集處理提供簡單靈活,功能豐富以及表達能力十分強大的SDK。Apache Beam項目重點在于數(shù)據(jù)處理的編程范式和接口定義,并不涉及具體執(zhí)行引擎的實現(xiàn),Apache Beam希望基于Beam開發(fā)的數(shù)據(jù)處理程序可以執(zhí)行在任意的分布式計算引擎上。現(xiàn)階段Apache Beam支持的Runner有近十種,包括了我們很熟悉的Apache Spark和Apache Flink。

隨著分布式數(shù)據(jù)處理不斷發(fā)展,新的分布式數(shù)據(jù)處理技術(shù)也不斷被提出,業(yè)界涌現(xiàn)出了越來越多的分布式數(shù)據(jù)處理框架,從最早的Hadoop MapReduce,到Apache Spark,Apache Storm,以及更近的Apache Flink,Apache Apex等。新的分布式處理框架可能帶來的更高的性能,更強大的功能,更低的延遲等,但用戶切換到新的分布式處理框架的代價也非常大:需要學(xué)習(xí)一個新的數(shù)據(jù)處理框架,并重寫所有的業(yè)務(wù)邏輯。解決這個問題的思路包括兩個部分,首先,需要一個編程范式,能夠統(tǒng)一,規(guī)范分布式數(shù)據(jù)處理的需求,例如,統(tǒng)一批處理和流處理的需求。其次,生成的分布式數(shù)據(jù)處理任務(wù)應(yīng)該能夠在各個分布式執(zhí)行引擎上執(zhí)行,用戶可以自由切換分布式數(shù)據(jù)處理任務(wù)的執(zhí)行引擎與執(zhí)行環(huán)境。Apache Beam正是為了解決以上問題而提出的。

Apache Beam主要由Beam SDK和Beam Runner組成,Beam SDK定義了開發(fā)分布式數(shù)據(jù)處理任務(wù)業(yè)務(wù)邏輯的API接口,生成的的分布式數(shù)據(jù)處理任務(wù)Pipeline交給具體的Beam Runner執(zhí)行引擎。Apache Beam目前支持的API接口是由Java語言實現(xiàn)的,Python版本的API正在開發(fā)之中。Apache Beam支持的底層執(zhí)行引擎包括Apache Flink,Apache Spark以及Google Cloud Platform,此外Apache Storm,Apache Hadoop,Apache Gearpump等執(zhí)行引擎的支持也在討論或開發(fā)當中。其基本架構(gòu)如下圖所示:

13. 經(jīng)典大數(shù)據(jù)架構(gòu)

13.1 Lamda架構(gòu)?

twitter提出

Lambda 架構(gòu)將數(shù)據(jù)處理流程整體分成了三層:離線層(Batch Layer),加速層(Speed layer),服務(wù)層(Serving layer)

離線層:新數(shù)據(jù)不斷喂給數(shù)據(jù)系統(tǒng),期間每個樣本都會同時發(fā)給離線和加速層。所有進入離線層的數(shù)據(jù)流都會在數(shù)據(jù)湖(Data Lake)上進行計算處理。數(shù)據(jù)湖多會使用基于內(nèi)存的數(shù)據(jù)庫或 NoSQL 類的永久存儲設(shè)施,數(shù)據(jù)存好后離線層使用 MapReduce 或一些機器學(xué)習(xí)方法對數(shù)據(jù)進行處理并由此對接下來的內(nèi)容進行預(yù)測。

加速層:加速層會享用離線層事件溯源成果。離線層中的數(shù)據(jù)處理會涉及增量程序,MapReduce 或機器學(xué)習(xí)模型的更新,模型會被加速層進一步用來處理新數(shù)據(jù)。就這樣,加速層借助富集過程得到結(jié)果,保證服務(wù)層請求響應(yīng)延遲處于低位。而加速層也幾乎只處理實時數(shù)據(jù),計算負荷較低,延遲有保障。

服務(wù)層:從離線層得到離線視圖(batch view),加速層得到準實時視圖(near-real time view),統(tǒng)一送給服務(wù)層。服務(wù)層使用這些信息臨時應(yīng)付那些等候的查詢。

如果將整個過程用函數(shù)方程表示,任意大數(shù)據(jù)查詢都可歸結(jié)為如下形式

Query=λ(Complete data)=λ(live streaming data)?λ(stored data)Query=λ(Complete data)=λ(live streaming data)?λ(stored data)

方程中所用符號名為 Lambda_(音譯:拉姆達)_,Lambda 架構(gòu)的名字也源自于此。這個方程廣為熟悉數(shù)據(jù)分析軼事的人所知,從中可以看出所有數(shù)據(jù)相關(guān)查詢,結(jié)合離線歷史信息和實時流,Lambda 架構(gòu)都能處理。

13.2 Kappa架構(gòu)

LinkedIn提出

Kappa架構(gòu)相比Lamda架構(gòu),去掉了離線層,只保留加速層,這樣就不用同時維護2套架構(gòu),更加簡潔。當需要對數(shù)據(jù)重新處理時,可以借助數(shù)據(jù)源(比如:Kafka)的消息回溯功能,將消息回溯到1個月甚至1年前,進行重新處理。

將 Kappa 架構(gòu)的整體流程轉(zhuǎn)換成功能方程,任意數(shù)據(jù)查詢形式歸結(jié)為

Query=K(New data)=K(Live streaming data)Query=K(New data)=K(Live streaming data)

從方程可以看出所有查詢都能通過加速層對實時數(shù)據(jù)流的處理得到滿足。同時也指出流處理過程發(fā)生在 Kappa 架構(gòu)的加速層中

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容