Kafka起源
Kafka起源于領(lǐng)英,現(xiàn)在由Apache軟件基金會(huì)維護(hù),它由scala編寫,是一個(gè)高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)。關(guān)于Kafka的起源,有幸聽過饒軍(Con?uent聯(lián)合創(chuàng)始人)的一次分享,領(lǐng)英當(dāng)時(shí)有一個(gè)“People You May Know”的應(yīng)用,這個(gè)應(yīng)用從各個(gè)系統(tǒng)獲取用戶行為,為用戶推薦可能認(rèn)識(shí)的人。在沒有Kafka前,通常處理這個(gè)問題的架構(gòu)是點(diǎn)對(duì)點(diǎn)的:

他們思考一個(gè)理想的架構(gòu)應(yīng)當(dāng)是這樣的:

于是Kafka的最初版本就誕生了,所以從這里也能看到Kafka的最大價(jià)值所在。除去點(diǎn)對(duì)點(diǎn)的架構(gòu),而是要承接大量的不同系統(tǒng)的數(shù)據(jù)生產(chǎn)消費(fèi),所以Kafka在設(shè)計(jì)之初,高吞吐量就是一個(gè)重要的考量,當(dāng)然后續(xù)版本中,Kafka設(shè)計(jì)增加了副本,保證高可用等等其它分布式系統(tǒng)的指標(biāo)。
Kafka整體架構(gòu)

上圖是Kafka的整體拓?fù)浣Y(jié)構(gòu),中間是若干個(gè)Kafka節(jié)點(diǎn)組成的kafka集群,生產(chǎn)者消費(fèi)者分別是生產(chǎn)數(shù)據(jù)和消費(fèi)數(shù)據(jù)的各個(gè)系統(tǒng),ZooKeeper負(fù)責(zé)管理集群配置,注冊(cè)生產(chǎn)者消費(fèi)者,選舉leader等功能。
在邏輯架構(gòu)上,生產(chǎn)者消費(fèi)者都是以Topic為邏輯單位,為了提升吞吐,利用分布式集群的優(yōu)點(diǎn),Topic會(huì)有多個(gè)分片partition,邏輯架構(gòu)如下圖:


這是一個(gè)包含三個(gè)partition的Topic(生產(chǎn)與消費(fèi)是類似的,這里從消費(fèi)端說明),一個(gè)Topic允許注冊(cè)若干個(gè)消費(fèi)組,每個(gè)消費(fèi)組都是相互獨(dú)立的不會(huì)相互影響。每個(gè)消費(fèi)組會(huì)有若干個(gè)消費(fèi)實(shí)例,每個(gè)partition都會(huì)被分配給一個(gè)消費(fèi)實(shí)例(partition不會(huì)分配給多個(gè)實(shí)例,為了保證有序性),也就是每個(gè)消費(fèi)實(shí)例會(huì)被分配0個(gè),1個(gè),或多個(gè)partition,一個(gè)partition只會(huì)被分配給一個(gè)消費(fèi)實(shí)例。圖中沒有體現(xiàn)的是,一個(gè)消費(fèi)實(shí)例當(dāng)然可以由多個(gè)線程來(lái)消費(fèi),但是多個(gè)線程就不能保證消息有序性。
partition也是一個(gè)邏輯概念,組成partition的是物理上的log文件,其中index文件記錄log偏移量,log文件是記錄真實(shí)數(shù)據(jù)的文件(可以把partition看作文件夾,log是文件,index是索引)

Kafka采用簡(jiǎn)單實(shí)用的日志存儲(chǔ)方式處理消息數(shù)據(jù),這也是Kafka能夠?qū)崿F(xiàn)多消費(fèi)組,保證高吞吐的關(guān)鍵所在

生產(chǎn)者在尾部生產(chǎn)數(shù)據(jù),消費(fèi)者從頭部消費(fèi)數(shù)據(jù),通過記錄和控制偏移量來(lái)保證消息順序的消費(fèi)和生產(chǎn)。上圖中:
- Last Commited Offset:是上一次已經(jīng)確認(rèn)消費(fèi)完的數(shù)據(jù)偏移量。
- Current Position:消費(fèi)實(shí)例一次拉取的消息[Last Commited Offset+1, Current Position],只是拉取還未提交,等待消費(fèi)者提交Commit信號(hào),則Current Position更新為L(zhǎng)ast Commited Offset。注意每次拉取都是拉取上一次提交之后的消息,即使已經(jīng)拉取過。(實(shí)際上這也是kafka跟一些其它消息隊(duì)列不同的地方,commit信息由消費(fèi)者控制)
- High Watermark:是當(dāng)前能夠拉取的最高位置,在這個(gè)點(diǎn)之前的消息都是確認(rèn)安全不會(huì)丟失的(同步到副本),HW->Log End之間是不安全的,不允許拉取。
- Log End Offset:是生產(chǎn)者寫入的點(diǎn)。(生產(chǎn)者只能從主分片寫,不能寫從分片)
| 概念 | 解釋 |
|---|---|
| Kafka集群 | 多個(gè)Kafka實(shí)例組成的集群 |
| Broker | 運(yùn)行Kafka實(shí)例的機(jī)器,可以是物理機(jī),虛擬機(jī),容器等,通常一個(gè)Broker運(yùn)行一個(gè)實(shí)例 |
| Topic | 邏輯概念,生產(chǎn)者和消費(fèi)者訂閱發(fā)布的邏輯實(shí)體,一個(gè)Tpoic可以有多個(gè)消費(fèi)組訂閱,消費(fèi)組之間完全隔離,互不影響 |
| Partition | 消息分片,實(shí)際上Partiton也是一個(gè)邏輯概念,每個(gè)Topic會(huì)分為多個(gè)partiton,每個(gè)partition在一個(gè)Kafka實(shí)例上。partition內(nèi)消息是有序的 |
| Log文件 | 物理概念,實(shí)際存儲(chǔ)消息的文件,它由index文件索引,log文件連接在一起組成partition。Log文件不對(duì)生產(chǎn)者消費(fèi)者直接暴露 |
| Producer | 生產(chǎn)者,負(fù)責(zé)發(fā)布消息 |
| Consumer Group | 消費(fèi)組,訂閱消息的單位。不同消費(fèi)組之間相互獨(dú)立,互不影響 |
| Consumer | 消費(fèi)者實(shí)例,組成消費(fèi)組。每個(gè)消費(fèi)者實(shí)例會(huì)被分配到0個(gè),1個(gè)或多個(gè)partition |
| Consumer Thread | 一個(gè)消費(fèi)者實(shí)例內(nèi)部可以開啟多個(gè)線程消費(fèi),但是多個(gè)線程不能保證消息有序性 |
Kafka消費(fèi)客戶端
早期版本的Kafka提供了High Level和Low Level兩個(gè)客戶端。High Level Consumer API圍繞著Consumer Group這個(gè)邏輯概念展開,它屏蔽了每個(gè)Topic的每個(gè)Partition的Offset管理(自動(dòng)讀取zookeeper中該Consumer group的last offset )、Broker失敗轉(zhuǎn)移以及增減Partition、Consumer時(shí)的負(fù)載均衡(當(dāng)Partition和Consumer增減時(shí),Kafka自動(dòng)進(jìn)行負(fù)載均衡)。Low Level Consumer API,作為底層的Consumer API,提供了消費(fèi)Kafka Message更大的控制,使客戶端可以重復(fù)讀,跳讀等等操作,當(dāng)然Low Level Consumer API提供更大靈活控制是以復(fù)雜性為代價(jià)的。
后來(lái)Kafka發(fā)布了0.9版本的Kafka Consumer Client,這個(gè)客戶端提供了像High Level Consumer API的簡(jiǎn)潔性,同時(shí)可以像Low Level Consumer一樣構(gòu)建自己的消費(fèi)策略(API更簡(jiǎn)潔),并且新的Kafka客戶端用Java編寫,不再依賴Scala。
下面通過一個(gè)示例,介紹如何使用,也能加深對(duì)Kafka結(jié)構(gòu)的理解。
初始化Consumer實(shí)例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
訂閱Topic
consumer.subscribe(Arrays.asList(“foo”, “bar”));
拉取消息消費(fèi),有兩種方式:
1.基本的poll循環(huán):
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
}
} finally {
consumer.close();
}
上面poll的參數(shù)是等待時(shí)間,如果有消息立即返回,如果沒有消息,則等待1000ms返回,這樣running可以控制消費(fèi)和停止。poll也可以像下面一樣傳一個(gè)Long.MAX_VALUE,這樣沒有消息會(huì)一直阻塞在這里,等待消息到來(lái),替代的,需要調(diào)用consumer.weakup()喚醒來(lái)控制消費(fèi)實(shí)例停止。
public class ConsumerLoop implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id;
public ConsumerLoop(int id,
String groupId,
List<String> topics) {
this.id = id;
this.topics = topics;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put(“group.id”, groupId);
props.put(“key.deserializer”, StringDeserializer.class.getName());
props.put(“value.deserializer”, StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out.println(this.id + ": " + data);
}
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}
}
public static void main(String[] args) {
int numConsumers = 3;
String groupId = "consumer-tutorial-group"
List<String> topics = Arrays.asList("consumer-tutorial");
ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
final List<ConsumerLoop> consumers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics);
consumers.add(consumer);
executor.submit(consumer);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (ConsumerLoop consumer : consumers) {
consumer.shutdown();
}
executor.shutdown();
try {
executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace;
}
}
});
}