1.集群規(guī)劃
一般模式下,元數(shù)據(jù)在 zookeeper 中,運(yùn)行時動態(tài)選舉 controller,由controller 進(jìn)行 Kafka 集群管理。kraft 模式架構(gòu)(實驗性)下,不再依賴 zookeeper 集群,而是用三臺 controller 節(jié)點(diǎn)代替 zookeeper,元數(shù)據(jù)保存在 controller 中,由 controller 直接進(jìn)行 Kafka 集群管理。
好處有以下幾個:
Kafka 不再依賴外部框架,而是能夠獨(dú)立運(yùn)行
controller 管理集群時,不再需要從 zookeeper 中先讀取數(shù)據(jù),集群性能上升
由于不依賴 zookeeper,集群擴(kuò)展時不再受到 zookeeper 讀寫能力限制
controller 不再動態(tài)選舉,而是由配置文件規(guī)定??梢杂嗅槍π缘募訌?qiáng)controller 節(jié)點(diǎn)的配置,而不是像以前一樣對隨機(jī) controller 節(jié)點(diǎn)的高負(fù)載束手無策。
hadoop02(192.168.58.130) hadoop03(192.168.58.131) hadoop04(192.168.58.132)
kafka kafka kafka
2.集群部署
1.下載kafka二進(jìn)制包
https://kafka.apache.org/downloads
2.解壓
shell
mkdir /usr/kafka
tar -zxvf /home/kafka_2.13-3.6.1.tgz -C /usr/kafka/
3.修改配置文件(以192.168.58.130上節(jié)點(diǎn)的配置為例)
shell
cd /usr/kafka/kafka_2.13-3.6.1/config/kraft
vi server.properties
注:Kraft模式的配置文件在config目錄的kraft子目錄下
properties
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
This configuration file is intended for use in KRaft mode, where
Apache ZooKeeper is not present.
############################# Server Basics #############################
此服務(wù)器的角色。設(shè)置此項將進(jìn)入KRaft模式(controller 相當(dāng)于主機(jī)、broker 節(jié)點(diǎn)相當(dāng)于從機(jī),主機(jī)類似 zk 功能)
process.roles=broker,controller
節(jié)點(diǎn) ID
node.id=2
全 Controller 列表
controller.quorum.voters=2@192.168.58.130:9093,3@192.168.58.131:9093,4@192.168.58.132:9093
############################# Socket Server Settings #############################
套接字服務(wù)器偵聽的地址.
組合節(jié)點(diǎn)(即具有process.roles=broker,controller的節(jié)點(diǎn))必須至少在此處列出控制器偵聽器
如果沒有定義代理偵聽器,那么默認(rèn)偵聽器將使用一個等于java.net.InetAddress.getCanonicalHostName()值的主機(jī)名,
帶有PLAINTEXT偵聽器名稱和端口9092
FORMAT:
listeners = listener_name://host_name:port
EXAMPLE:
listeners = PLAINTEXT://your.host.name:9092
不同服務(wù)器綁定的端口
listeners=PLAINTEXT://192.168.58.130:9092,CONTROLLER://192.168.58.130:9093
用于代理之間通信的偵聽器的名稱(broker 服務(wù)協(xié)議別名)
inter.broker.listener.name=PLAINTEXT
偵聽器名稱、主機(jī)名和代理將向客戶端公布的端口.(broker 對外暴露的地址)
如果未設(shè)置,則使用"listeners"的值.
advertised.listeners=PLAINTEXT://192.168.58.130:9092
controller 服務(wù)協(xié)議別名
控制器使用的偵聽器名稱的逗號分隔列表
如果listener.security.protocol.map中未設(shè)置顯式映射,則默認(rèn)使用PLAINTEXT協(xié)議
如果在KRaft模式下運(yùn)行,這是必需的。
controller.listener.names=CONTROLLER
將偵聽器名稱映射到安全協(xié)議,默認(rèn)情況下它們是相同的。(協(xié)議別名到安全協(xié)議的映射)有關(guān)更多詳細(xì)信息,請參閱配置文檔.
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
服務(wù)器用于從網(wǎng)絡(luò)接收請求并向網(wǎng)絡(luò)發(fā)送響應(yīng)的線程數(shù)
num.network.threads=3
服務(wù)器用于處理請求的線程數(shù),其中可能包括磁盤I/O
num.io.threads=8
套接字服務(wù)器使用的發(fā)送緩沖區(qū)(SO_SNDBUF)
socket.send.buffer.bytes=102400
套接字服務(wù)器使用的接收緩沖區(qū)(SO_RCVBUF)
socket.receive.buffer.bytes=102400
套接字服務(wù)器將接受的請求的最大大小(防止OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
存儲日志文件的目錄的逗號分隔列表(kafka 數(shù)據(jù)存儲目錄)
log.dirs=/usr/kafka/kafka_2.13-3.6.1/datas
每個主題的默認(rèn)日志分區(qū)數(shù)。更多的分區(qū)允許更大的并行性以供使用,但這也會導(dǎo)致代理之間有更多的文件。
num.partitions=1
啟動時用于日志恢復(fù)和關(guān)閉時用于刷新的每個數(shù)據(jù)目錄的線程數(shù)。
對于數(shù)據(jù)目錄位于RAID陣列中的安裝,建議增加此值。
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
組元數(shù)據(jù)內(nèi)部主題"__consumer_offsets"和"__transaction_state"的復(fù)制因子
對于除開發(fā)測試以外的任何測試,建議使用大于1的值來確??捎眯?,例如3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
消息會立即寫入文件系統(tǒng),但默認(rèn)情況下,我們只使用fsync()進(jìn)行同步
操作系統(tǒng)緩存延遲。以下配置控制將數(shù)據(jù)刷新到磁盤.
這里有一些重要的權(quán)衡:
1. Durability(持久性): 如果不使用復(fù)制,未清理的數(shù)據(jù)可能會丟失
2. Latency(延遲): 當(dāng)刷新發(fā)生時,非常大的刷新間隔可能會導(dǎo)致延遲峰值,因為將有大量數(shù)據(jù)要刷新.
3. Throughput(吞吐量): 刷新通常是最昂貴的操作,較小的刷新間隔可能導(dǎo)致過多的尋道.
下面的設(shè)置允許配置刷新策略,以便在一段時間后或每N條消息(或兩者兼有)刷新數(shù)據(jù)。這可以全局完成,并在每個主題的基礎(chǔ)上覆蓋
強(qiáng)制將數(shù)據(jù)刷新到磁盤之前要接受的消息數(shù)
log.flush.interval.messages=10000
在我們強(qiáng)制刷新之前,消息可以在日志中停留的最長時間
log.flush.interval.ms=1000
############################# Log Retention Policy #############################
以下配置控制日志段的處理??梢詫⒃摬呗栽O(shè)置為在一段時間后刪除分段,或者在累積了給定大小之后刪除分段。
只要滿足這些條件中的任意一個,segment就會被刪除。刪除總是從日志的末尾開始
日志文件因使用年限而有資格刪除的最短使用年限
log.retention.hours=168
基于大小的日志保留策略。除非剩余的段低于log.retention.bytes,否則將從日志中刪除段。獨(dú)立于log.retention.hours的函數(shù)。
log.retention.bytes=1073741824
日志segment文件的最大大小。當(dāng)達(dá)到此大小時,將創(chuàng)建一個新的日志segment
log.segment.bytes=1073741824
檢查日志segments以查看是否可以根據(jù)保留策略刪除它們的間隔
log.retention.check.interval.ms=300000
4.在其他節(jié)點(diǎn)上修改配置文件
在 192.168.58.131 和 192.168.58.132 上修改配置文件server.properties中
1.node.id
注:node.id 不得重復(fù),整個集群中唯一,且值需要和controller.quorum.voters 對應(yīng)。
2.dvertised.Listeners地址
根據(jù)各自的主機(jī)名稱,修改相應(yīng)的 dvertised.Listeners 地址
3.listeners地址
根據(jù)各自的主機(jī)IP修改
properties
節(jié)點(diǎn) ID
node.id=3
不同服務(wù)器綁定的端口
listeners=PLAINTEXT://192.168.58.131:9092,CONTROLLER://192.168.58.131:9093
偵聽器名稱、主機(jī)名和代理將向客戶端公布的端口.(broker 對外暴露的地址)
如果未設(shè)置,則使用"listeners"的值.
advertised.listeners=PLAINTEXT://192.168.58.131:9092
properties
節(jié)點(diǎn) ID
node.id=4
不同服務(wù)器綁定的端口
listeners=PLAINTEXT://192.168.58.132:9092,CONTROLLER://192.168.58.132:9093
偵聽器名稱、主機(jī)名和代理將向客戶端公布的端口.(broker 對外暴露的地址)
如果未設(shè)置,則使用"listeners"的值.
advertised.listeners=PLAINTEXT://192.168.58.132:9092
5.初始化集群數(shù)據(jù)目錄
1.首先生成存儲目錄唯一 ID。
shell
bin/kafka-storage.sh random-uuid
輸出ID:7TraW-eCQXCx-HYoNY5VKw
2.用該 ID 格式化 kafka 存儲目錄(每個節(jié)點(diǎn)都需要執(zhí)行)
shell
bin/kafka-storage.sh format -t 7TraW-eCQXCx-HYoNY5VKw -c /usr/kafka/kafka_2.13-3.6.1/config/kraft/server.properties
6.配置環(huán)境變量
在/etc/profile.d中配置
1.新建kafka.sh
shell
vi /etc/profile.d/kafka.sh
sh
KAFKA_HOME
export KAFKA_HOME=/usr/kafka/kafka_2.13-3.6.1
export PATH=KAFKA_HOME/bin
2.授予文件執(zhí)行權(quán)限
shell
chmod u+x /etc/profile.d/kafka.sh
3.刷新環(huán)境變量
shell
source /etc/profile
7.啟動集群
1.在節(jié)點(diǎn)上依次啟動 Kafka
shell
bin/kafka-server-start.sh -daemon /usr/kafka/kafka_2.13-3.6.1/config/kraft/server.properties
2.kafka一鍵啟停腳本
1.創(chuàng)建腳本
shell
vi /usr/bin/kafka
sh
! /bin/bash
if [ $# -lt 1 ]; then
echo "No Args Input..."
exit
fi
case i Kafka-------"
ssh i Kafka-------"
ssh $i "source /etc/profile;/usr/kafka/kafka_2.13-3.6.1/bin/kafka-server-stop.sh"
done
} ;;
*)
echo "Input Args Error..."
;;
esac
2.添加執(zhí)行權(quán)限
shell
chmod +x /usr/bin/kafka
3.使用
shell
kafka start/stop
8.關(guān)閉集群
shell
/usr/kafka/kafka_2.13-3.6.1/bin/kafka-server-stop.sh
EOF