6.1 概述
6.1.1 Kafka Streams
Kafka Streams。Apache Kafka開源項目的一個組成部分。是一個功能強大,易于使用的庫。用于在Kafka上構(gòu)建高可分布式、拓展性,容錯的應(yīng)用程序。
6.1.2 Kafka Streams特點
1)功能強大
高擴展性,彈性,容錯
2)輕量級
無需專門的集群
一個庫,而不是框架
3)完全集成
100%的Kafka 0.10.0版本兼容
易于集成到現(xiàn)有的應(yīng)用程序
4)實時性
毫秒級延遲
并非微批處理
窗口允許亂序數(shù)據(jù)
允許遲到數(shù)據(jù)
6.1.3 為什么要有Kafka Stream
當前已經(jīng)有非常多的流式處理系統(tǒng),最知名且應(yīng)用最多的開源流式處理系統(tǒng)有Spark Streaming和Apache Storm。Apache Storm發(fā)展多年,應(yīng)用廣泛,提供記錄級別的處理能力,當前也支持SQL on Stream。而Spark Streaming基于Apache Spark,可以非常方便與圖計算,SQL處理等集成,功能強大,對于熟悉其它Spark應(yīng)用開發(fā)的用戶而言使用門檻低。另外,目前主流的Hadoop發(fā)行版,如Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。既然Apache Spark與Apache Storm擁用如此多的優(yōu)勢,那為何還需要Kafka Stream呢?主要有如下原因。
第一,Spark和Storm都是流式處理框架,而Kafka Stream提供的是一個基于Kafka的流式處理類庫??蚣芤箝_發(fā)者按照特定的方式去開發(fā)邏輯部分,供框架調(diào)用。開發(fā)者很難了解框架的具體運行方式,從而使得調(diào)試成本高,并且使用受限。而Kafka Stream作為流式處理類庫,直接提供具體的類給開發(fā)者調(diào)用,整個應(yīng)用的運行方式主要由開發(fā)者控制,方便使用和調(diào)試
第二,雖然Cloudera與Hortonworks方便了Storm和Spark的部署,但是這些框架的部署仍然相對復(fù)雜。而Kafka Stream作為類庫,可以非常方便的嵌入應(yīng)用程序中,它對應(yīng)用的打包和部署基本沒有任何要求。
第三,就流式處理系統(tǒng)而言,基本都支持Kafka作為數(shù)據(jù)源。例如Storm具有專門的kafka-spout,而Spark也提供專門的spark-streaming-kafka模塊。事實上,Kafka基本上是主流的流式處理系統(tǒng)的標準數(shù)據(jù)源。換言之,大部分流式系統(tǒng)中都已部署了Kafka,此時使用Kafka Stream的成本非常低。
第四,使用Storm或Spark Streaming時,需要為框架本身的進程預(yù)留資源,如Storm的supervisor和Spark on YARN的node manager。即使對于應(yīng)用實例而言,框架本身也會占用部分資源,如SparkStreaming需要為shufflfflffle和storage預(yù)留內(nèi)存。但是Kafka作為類庫不占用系統(tǒng)資源。
第五,由于Kafka本身提供數(shù)據(jù)持久化,因此Kafka Stream提供滾動部署和滾動升級以及重新計算的能力。
第六,由于Kafka Consumer Rebalance機制,Kafka Stream可以在線動態(tài)調(diào)整并行度。
6.2 Kafka Stream數(shù)據(jù)清洗案例
0)需求:
實時處理單詞帶有”>>>”前綴的內(nèi)容。例如輸入”itstar>>>ximenqing”,最終處理成“ximenqing”
1)需求分析:
2)案例實操
(1)創(chuàng)建一個工程,并添加jar包
(2)創(chuàng)建主類
package com.itstar.kafka.stream;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
public class Application {
public static void main(String[] args) {
// 定義輸入的topic
String from = "first";
// 定義輸出的topic
String to = "second";
// 設(shè)置參數(shù)
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata11:9092");
StreamsConfig config = new StreamsConfig(settings);
// 構(gòu)建拓撲
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", from)
.addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>()
{
@Override
public Processor<byte[], byte[]> get() {
// 具體分析處理
return new LogProcessor();
}
}, "SOURCE")
.addSink("SINK", to, "PROCESS");
// 創(chuàng)建kafka stream
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
}
(3)具體業(yè)務(wù)處理
package com.itstar.kafka.stream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
public class LogProcessor implements Processor<byte[], byte[]> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(byte[] key, byte[] value) {
String input = new String(value);
// 如果包含“>>>”則只保留該標記后面的內(nèi)容
if (input.contains(">>>")) {
input = input.split(">>>")[1].trim();
// 輸出到下一個topic
context.forward("logProcessor".getBytes(), input.getBytes());
}else{
context.forward("logProcessor".getBytes(), input.getBytes());
}
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
}
(4)運行程序
(5)在bigdata13上啟動生產(chǎn)者
[itstar@bigdata13 kafka]$ bin/kafka-console-producer.sh --broker-list
bigdata11:9092 --topic first
>hello>>>world
>h>>>itstar
>hahaha