python kafka 學(xué)習(xí)筆記(一)

Windows 安裝及利用Python操作kafka

1. 安裝zookeeper

https://zookeeper.apache.org/releases.html

下載解壓文件后

將conf文件夾下的 zoo_sample.cfg 文件名改成 zoo.cfg,打開 zoo.cfg文件,修改dataDir的值路徑,將dataDir=/tmp/zookeeper改成
dataDir=E:\apache-zookeeper-3.5.9-bin\data
并加入
dataLogDir=E:\apache-zookeeper-3.5.9-bin\log
data和log文件夾可以自己創(chuàng)建

啟動(dòng)zookeeper

打開cmd 路徑切換到解壓的文件夾下的bin文件夾
運(yùn)行 zkServer.cmd

2. 安裝kafka

下載kafka文件:

http://kafka.apache.org/downloads.html
解壓完成后進(jìn)行properties文件配置:
新建logs文件夾(自己命名)
找到config文件夾下的server.properties文件,將log.dirs的值改成log.dirs=E:\work\kafka_2.12-0.10.2.0\kafka-logs(改為自己的地址)

啟動(dòng)Kafka

打開新的cmd 路徑切換到解壓的文件夾下,輸入如下命令
.\bin\windows\kafka-server-start.bat .\config\server.properties

3. 使用python操作Kafka

python安裝kafka

pip install kafka-python
不知道是否是因?yàn)閳?zhí)行過pip install kafka

使用
# 生產(chǎn)者
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])

for i in range(3):
    msg = 'test %d' % i
    print(msg)
    producer.send('test',msg.encode())
producer.close()

# 消費(fèi)者
from kafka import KafkaConsumer

consumer = KafkaConsumer('test', bootstrap_servers=['127.0.0.1:9092'])
for msg in consumer:
    print(msg.value, type(msg.value))
  • 錯(cuò)誤記錄
    使用KafkaProducer的send函數(shù)時(shí),報(bào)了如下錯(cuò)誤:
assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))
AssertionError

錯(cuò)誤原因:send函數(shù)的value_bytes是str類型

解決辦法:

對(duì)字符串使用encode()
字符串類str里有一個(gè)==encode()==方法,它是從字符串向比特流的編碼過程。
bytes類型恰好有個(gè)==decode()==方法,它是從比特流向字符串解碼的過程。

關(guān)閉kafka

.\bin\windows\kafka-server-stop.bat

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容