Kafka是一個(gè)分布式流媒體平臺(tái)。發(fā)布和訂閱記錄流,類(lèi)似于消息隊(duì)列或企業(yè)消息傳遞系統(tǒng)。以容錯(cuò)持久的方式存儲(chǔ)記錄流。處理記錄發(fā)生的流。本文講述如何使用java API 操作kafka集群
主要內(nèi)容:
- 1.消息生產(chǎn)者
- 2.消息消費(fèi)者
- 3.測(cè)試
1.消息生產(chǎn)者
1.1.引入依賴(lài)
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.1</version>
</dependency>
1.2.編寫(xiě)生產(chǎn)者
public class KafkaProducerClient {
public static void main(String[] args) {
//1、準(zhǔn)備配置文件 參考:ProducerConfig.java
Properties props = new Properties();
// kakfa 服務(wù)列表
props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
/**
* 當(dāng)生產(chǎn)者將ack設(shè)置為“全部”(或“-1”)時(shí),min.insync.replicas指定必須確認(rèn)寫(xiě)入被認(rèn)為成功的最小副本數(shù)。
* 如果這個(gè)最小值不能滿(mǎn)足,那么生產(chǎn)者將會(huì)引發(fā)一個(gè)異常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。
* 當(dāng)一起使用時(shí),min.insync.replicas和acks允許您執(zhí)行更大的耐久性保證。
* 一個(gè)典型的情況是創(chuàng)建一個(gè)復(fù)制因子為3的主題,將min.insync.replicas設(shè)置為2,并使用“全部”選項(xiàng)來(lái)產(chǎn)生。
* 這將確保生產(chǎn)者如果大多數(shù)副本沒(méi)有收到寫(xiě)入引發(fā)異常。
*/
props.put("acks", "all");
/**
* 設(shè)置一個(gè)大于零的值,將導(dǎo)致客戶(hù)端重新發(fā)送任何失敗的記錄
*/
props.put("retries", 0);
/**
* 只要有多個(gè)記錄被發(fā)送到同一個(gè)分區(qū),生產(chǎn)者就會(huì)嘗試將記錄一起分成更少的請(qǐng)求。
* 這有助于客戶(hù)端和服務(wù)器的性能。該配置以字節(jié)為單位控制默認(rèn)的批量大小。
*/
props.put("batch.size", 16384);
/**
* 在某些情況下,即使在中等負(fù)載下,客戶(hù)端也可能希望減少請(qǐng)求的數(shù)量。
* 這個(gè)設(shè)置通過(guò)添加少量的人工延遲來(lái)實(shí)現(xiàn)這一點(diǎn),即不是立即發(fā)出一個(gè)記錄,
* 而是等待達(dá)到給定延遲的記錄,以允許發(fā)送其他記錄,以便發(fā)送可以一起批量發(fā)送
*/
props.put("linger.ms", 1);
/**
* 生產(chǎn)者可用于緩沖等待發(fā)送到服務(wù)器的記錄的總字節(jié)數(shù)。
* 如果記錄的發(fā)送速度比發(fā)送給服務(wù)器的速度快,那么生產(chǎn)者將會(huì)阻塞,max.block.ms之后會(huì)拋出異常。
* 這個(gè)設(shè)置應(yīng)該大致對(duì)應(yīng)于生產(chǎn)者將使用的總內(nèi)存,但不是硬性限制,
* 因?yàn)椴皇撬猩a(chǎn)者使用的內(nèi)存都用于緩沖。
* 一些額外的內(nèi)存將被用于壓縮(如果壓縮被啟用)以及用于維護(hù)正在進(jìn)行的請(qǐng)求。
*/
props.put("buffer.memory", 33554432);
// key的序列化類(lèi)型
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value的序列化類(lèi)型
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//2、創(chuàng)建KafkaProducer
KafkaProducer<String, String> kafkaProducer = new KafkaProducer(props);
for (int i = 0; i < 100; i++) {
//3、發(fā)送數(shù)據(jù)
kafkaProducer.send(new ProducerRecord("TEST_JAVA", "key"+i, "value" + i));
}
kafkaProducer.flush();
kafkaProducer.close();
}
}
2.消息消費(fèi)者
2.1.引入依賴(lài)
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.1</version>
</dependency>
2.2.編寫(xiě)消費(fèi)者
public class KafkaConsumerClient {
public static void main(String[] args) {
//1、準(zhǔn)備配置文件 參考:ConsumerConfig.scala
Properties props = new Properties();
// kakfa 服務(wù)列表
props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
//一個(gè)字符串用來(lái)指示一組consumer所在的組
props.put("group.id", "test");
// 如果true,consumer定期地往zookeeper寫(xiě)入每個(gè)分區(qū)的offset
props.put("enable.auto.commit", "true");
// 往zookeeper上寫(xiě)offset的頻率
props.put("auto.commit.interval.ms", "1000");
// key的反序列化類(lèi)型
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value的反序列化類(lèi)型
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//2、創(chuàng)建KafkaConsumer
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(props);
// 3、訂閱數(shù)據(jù),這里的topic可以是多個(gè)
kafkaConsumer.subscribe(Arrays.asList("TEST_JAVA"));
// 4、獲取數(shù)據(jù)
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s,offset = %d, key = %s, value = %s%n",record.topic(), record.offset(), record.key(), record.value());
}
}
}
}
3.測(cè)試
3.1.啟動(dòng)zookeeper
運(yùn)行一鍵啟動(dòng)腳本:startzk.sh
3.2.啟動(dòng)kafka
參考:Kafka之集群安裝
運(yùn)行一鍵啟動(dòng)腳本:startkafka.sh
3.3.啟動(dòng)消費(fèi)者
直接run KafkaConsumerClient.main 就會(huì)一直阻塞等待消息

3.4.啟動(dòng)生產(chǎn)者
直接run KafkaProducerClient.main 就會(huì)往名為T(mén)EST_JAVA的Topic發(fā)送100條消息
可以看到消費(fèi)者已經(jīng)消費(fèi)了所有的數(shù)據(jù)

image.png
3.5.查看topics
kafka-topics.sh --describe --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka

其中TEST_JAVA就是我們剛才默認(rèn)建的Topic