flume與kafka集成

環(huán)境:flume-1.6,kafka_2.11-0.9.0.0。完成將/usr/local/nohup.out下的日志實時輸出到kafka中,便于后續(xù)處理,包括將日志歸檔到hdfs,以及與storm,spark等集成,進行實時日志的分析。

1. 配置kafka:

1)server.properties?

host.name=localhost, log.dir=/usr/local/kafka-logs

2)zookeeper.properties

dataDir=/usr/local/zookeeper/data

3)zookeeper啟動

bin/zookeeper-server-start.sh config/zookeeper.properties

2. 啟動kafka:

1)啟動server:?bin/kafka-server-start.sh config/server.properties

bin/kafka-topics.sh--create--zookeeper localhost:2181--replication1--partition1--topic test

2)啟動一個consumer: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

3. 配置flume:

1)flume-conf.properties

#agent section

producer.sources = s

producer.channels = c

producer.sinks = r

#source section

producer.sources.s.type = exec

producer.sources.s.command = tail -F /usr/local/nohup.out

producer.sources.s.channels = c

# Each sink's type must be defined

producer.sinks.r.type = org.apache.flume.plugins.KafkaSink

producer.sinks.r.custom.topic.name = test

producer.sinks.r.metadata.broker.list = 127.0.0.1:9092

producer.sinks.r.partition.key = 0

producer.sinks.r.partitioner.class = org.apache.flume.plugins.SinglePartition

producer.sinks.r.serializer.class = kafka.serializer.StringEncoder

producer.sinks.r.request.required.acks = 1

producer.sinks.r.max.message.size = 1000000

#Specify the channel the sink should use

producer.sinks.r.channel = c

# Each channel's type is defined.

producer.channels.c.type = memory

producer.channels.c.capacity = 1000

4. 啟動flume:

1) bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容