Spark Streaming實時流處理-3.分布式消息隊列Kafka

0. 目錄

  1. Kafka概述
  2. Kafka架構及核心概念
  3. Kafka部署及使用
  4. Kafka容錯性測試
  5. Kafka API編程
  6. Kafka實戰(zhàn)

1. Kafka概述

  • Kafka官網(wǎng):http://kafka.apache.org/
  • 發(fā)布訂閱-消息系統(tǒng)
  • 實時流處理
  • 存儲:多副本,分布式存儲
  • 和消息系統(tǒng)類似,消息中間件:生產(chǎn)者和消費者

2. Kafka架構和核心概念

kafka1
  • producer:生產(chǎn)者
  • consumer:消費者
  • broker:一個broker就相當于一個kafka
  • topic:主題,給消息帶一個標簽
  • Kafka能運行一個或多個服務
  • 每一條消息都能打一個主題
  • 每一條記錄都有一個key,一個value,和一個時間戳

3. Kafka部署及使用

  • 首先保證zookeeper可用
  • 單節(jié)點單Broker部署及使用
  • 單節(jié)點多Broker部署及使用
  • 多節(jié)點多Broker部署及使用
  • server.properties配置
# The id of the broker. This must be set to a unique integer for each broker.
# 配置brokerID,默認從零開始,不可重復。
broker.id=0
# 啟動在那臺機器上
host.name=localhost
# kafka日志
log.dirs=/tmp/kafka-logs
# 連接zookeeper地址
zookeeper.connect=localhost:2181

4. Kafka單節(jié)點單broker的部署及使用

  1. 下載kafka:http://kafka.apache.org
  2. 上傳至服務器或使用wget下載
  3. 解壓:tar -zxvf kafka.tar.gz
  4. 移動刀kafka/config目錄下
    • 修改server.properties配置文件
       # 修改logs配置目錄
       log.dirs=~/kafka-logs
       # 分區(qū)的數(shù)量
       num.partitions=1
       # zookeeper的地址
        zookeeper.connect=localhost:2181
    
  5. 啟動Kafka:bin/kafka-server-start.sh config/server.properties
       USAGE: /usr/local/kafka/bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*
       # -daemon: 是否以后臺進程的方式啟動
    
  6. 創(chuàng)建Topic:kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hello.topic
  7. 查看Topic隊列列表:bin/kafka-topics.sh --list --zookeeper localhost:2181
  8. 發(fā)送消息:kafka-console-producer.sh --broker-list localhost:9092 --topic hello.topic
  9. 消費消息:kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello.topic --from-beginning
  10. 查看Topic描述:kafka-topics.sh --describe --zookeeper localhost:2181 [--topic hello.topic]

5. Kafka單節(jié)點多broker的部署及使用

  1. 復制server.properties
    cp config/server.properties config/server-1.properties
    cp config/server.properties config/server-2.properties
  1. 修改properties
    # config/server-1.properties
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1

    # config/server-2.properties
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2
  1. 啟動
    sudo ./bin/kafka-server-start.sh -daemon config/server-1.properties &
    sudo ./bin/kafka-server-start.sh -daemon config/server-2.properties &
    sudo ./bin/kafka-server-start.sh -daemon config/server-3.properties &
  1. 創(chuàng)建Topic: sudo ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic "my-replicated-topic"
  2. 生產(chǎn)消息:sudo ./bin/kafka-console-producer.sh --broker-list localhost:9093,localhost:9094,localhost:9095 --topic "my-replicated-topic"
  3. 消費消息:sudo ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9093,localhost:9094,localhost:9095 --topic "my-replicated-topic" --from-beginning

6. Kafka容錯性測試

  1. 殺死從節(jié)點:正常發(fā)送,Isr只剩下兩臺了。
  2. 殺死主節(jié)點:正常發(fā)送,主節(jié)點(leader)變成最后一臺,Isr只剩下最后一臺。
  3. Kafka的容錯性可以保障,只要一個副本存在,就可以正常運行。

7. Kafka API編程

  1. IDEA + Maven構建開發(fā)環(huán)境
  2. Producer API的使用
  3. Consumer API的使用

注意開啟API編程,請將service.properties中的listeners=PLAINTEXT://192.168.37.128:9092修改為IP。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容