Windows 安裝及利用Python操作kafka
1. 安裝zookeeper
https://zookeeper.apache.org/releases.html
下載解壓文件后
將conf文件夾下的 zoo_sample.cfg 文件名改成 zoo.cfg,打開 zoo.cfg文件,修改dataDir的值路徑,將dataDir=/tmp/zookeeper改成
dataDir=E:\apache-zookeeper-3.5.9-bin\data
并加入
dataLogDir=E:\apache-zookeeper-3.5.9-bin\log
data和log文件夾可以自己創(chuàng)建
啟動(dòng)zookeeper
打開cmd 路徑切換到解壓的文件夾下的bin文件夾
運(yùn)行 zkServer.cmd
2. 安裝kafka
下載kafka文件:
http://kafka.apache.org/downloads.html
解壓完成后進(jìn)行properties文件配置:
新建logs文件夾(自己命名)
找到config文件夾下的server.properties文件,將log.dirs的值改成log.dirs=E:\work\kafka_2.12-0.10.2.0\kafka-logs(改為自己的地址)
啟動(dòng)Kafka
打開新的cmd 路徑切換到解壓的文件夾下,輸入如下命令
.\bin\windows\kafka-server-start.bat .\config\server.properties
3. 使用python操作Kafka
python安裝kafka
pip install kafka-python
不知道是否是因?yàn)閳?zhí)行過pip install kafka
使用
# 生產(chǎn)者
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
for i in range(3):
msg = 'test %d' % i
print(msg)
producer.send('test',msg.encode())
producer.close()
# 消費(fèi)者
from kafka import KafkaConsumer
consumer = KafkaConsumer('test', bootstrap_servers=['127.0.0.1:9092'])
for msg in consumer:
print(msg.value, type(msg.value))
- 錯(cuò)誤記錄
使用KafkaProducer的send函數(shù)時(shí),報(bào)了如下錯(cuò)誤:
assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))
AssertionError
錯(cuò)誤原因:send函數(shù)的value_bytes是str類型
解決辦法:
對(duì)字符串使用encode()
字符串類str里有一個(gè)==encode()==方法,它是從字符串向比特流的編碼過程。
bytes類型恰好有個(gè)==decode()==方法,它是從比特流向字符串解碼的過程。
關(guān)閉kafka
.\bin\windows\kafka-server-stop.bat