kafka 基于Partition 和group 消費,kafka如何消費avro格式數(shù)據(jù)

kafka消費模式 基于partition 指定offset

基于group? ?auto.offset.reset 選擇初始位置,對應(yīng)groupid的offset不存在時從哪里消費

kafka配置詳解?http://damacheng009.iteye.com/blog/2088020

package com.vacp.api.kafka.common;

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.JSONObject;

import com.vacp.api.common.ObjectSerializeUtils;

import com.vacp.api.kafka.avro.AvroHelper;

import com.videtek.kafka.*;

import org.apache.kafka.clients.consumer.*;

import org.apache.kafka.clients.producer.*;

import org.apache.kafka.common.TopicPartition;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.util.*;

import java.util.concurrent.Future;

public class KafkaUtils {

private final static int MAX_VALUE =3;

? ? private final static LoggerLOGGER = LoggerFactory.getLogger(KafkaUtils.class);

? ? private static Propertiesprops_pro =new Properties();

? ? private static Propertiesprops_con =new Properties();

? ? private static Producerproducer =null;

? ? private KafkaUtils() {

}

/**

* 生產(chǎn)者,注意kafka生產(chǎn)者不能夠從代碼上生成主題,只有在服務(wù)器上用命令生成

*/

? ? static {

//服務(wù)器ip:端口號,集群用逗號分隔

? ? ? ? props_pro.put("bootstrap.servers", Config.BROKER64);

? ? ? ? props_pro.put("acks", "all");

? ? ? ? props_pro.put("retries", MAX_VALUE);

? ? ? ? props_pro.put("batch.size", 16384);

? ? ? ? props_pro.put("linger.ms", 1);

? ? ? ? props_pro.put("buffer.memory", 33554432);

? ? ? ? props_pro.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

? ? ? ? props_pro.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

? ? ? ? props_pro.put("max.in.flight.requests.per.connection",1);

? ? ? ? props_pro.put("unclean.leader.election.enable",false);

? ? ? ? props_pro.put("replication.factor",3);

? ? ? ? props_pro.put("min.insync.replicas",2);

? ? ? ? props_pro.put("replication.factor",3);

? ? ? ? producer=getKafkaProducer();

? ? ? ? flushData();

? ? }

/**

* 消費者

*/

? ? static {

//服務(wù)器ip:端口號,集群用逗號分隔

? ? ? ? props_con.put("bootstrap.servers", Config.BROKER64);

? ? ? ? props_con.put("group.id", Config.GROUP_ID);

? ? ? ? //初始化消費offset位置

? ? ? ? props_con.put("auto.offset.reset", "earliest");

? ? ? ? props_con.put("enable.auto.commit", true);

? ? ? ? props_con.put("auto.commit.interval.ms", "1000");

? ? ? ? props_con.put("session.timeout.ms", "30000");

? ? ? ? props_con.put("max.poll.records", "1800");

? ? ? ? props_con.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

? ? ? // props_con.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

? ? ? props_con.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");

? ? ? ? // props_con.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");

? ? }

/**

* 發(fā)送對象消息 至kafka上,調(diào)用json轉(zhuǎn)化為json字符串,應(yīng)為kafka存儲的是String。

? ? * @param msg

? ? */

? ? public static void sendMsgToKafka(KafkaProducer producer,String msg) {

ProducerRecord record =new ProducerRecord(Config.TOPIC, String.valueOf(System.currentTimeMillis()),

? ? ? ? ? ? ? ? msg);

? ? ? ? Future future = producer.send(record,new SendCallback(producer,record,0));

? ? ? ? //if(future.isDone())

//System.out.println(JSON.toJSONString(future));

? ? }

/**

* 發(fā)送對象消息 至kafka上,調(diào)用json轉(zhuǎn)化為json字符串,應(yīng)為kafka存儲的是String。

? ? * @param msg

? ? */

? ? public static void sendMsgToKafka(String msg) {

ProducerRecord record =new ProducerRecord(Config.TOPIC, String.valueOf(System.currentTimeMillis()),

? ? ? ? ? ? ? ? msg);

? ? ? ? Future future =producer.send(record,new SendCallback(record,0));

? ? }

/**

* 發(fā)送對象消息 至kafka上,調(diào)用json轉(zhuǎn)化為json字符串,應(yīng)為kafka存儲的是String。

? ? * @param msg

? ? */

? ? public static void sendMsgToKafka(String topic,String msg) {

ProducerRecord record =new ProducerRecord(topic, String.valueOf(System.currentTimeMillis()),

? ? ? ? ? ? ? ? msg);

? ? ? ? Future future =producer.send(record,new SendCallback(record,0));

? ? }

public static void sendMsgToKafka(KafkaProducer producer,String topic,String msg) {

ProducerRecord record =new ProducerRecord(topic, String.valueOf(System.currentTimeMillis()),

? ? ? ? ? ? ? ? msg);

? ? ? ? Future future = producer.send(record,new SendCallback(producer,record,0));

? ? ? ? //System.out.println(JSON.toJSONString(future));

? ? }

/**

* 從kafka上接收對象消息,將json字符串轉(zhuǎn)化為對象,便于獲取消息的時候可以使用get方法獲取。

*/

? ? public static void getMsgFromKafka(){

Consumer consumer = KafkaUtils.getKafkaConsumer();

? ? ? ? while(true){

ConsumerRecords records = consumer.poll(500);

? ? ? ? ? ? LOGGER.info("消息個數(shù):" + records.count());

? ? ? ? ? ? if (records.count() >0) {

for (ConsumerRecord record : records) {

JSONObject msg = JSON.parseObject(record.value());

? ? ? ? ? ? ? ? ? ? LOGGER.info("從kafka接收到的消息是:" + msg);

? ? ? ? ? ? ? ? }

}

}

}

public static ConsumergetKafkaConsumer() {

KafkaConsumer consumer =new KafkaConsumer(props_con);

? ? ? ? consumer.subscribe(Arrays.asList(Config.TOPIC));

? ? ? ? return consumer;

? ? ? ? ? }

public static ConsumergetKafkaConsumer(String topic) {

KafkaConsumer consumer =new KafkaConsumer(props_con);

? ? ? ? consumer.subscribe(Arrays.asList(topic));

? ? ? ? return consumer;

? ? }

public static ConsumergetKafkaConsumer(String topic,String groupid) {

props_con.put("group.id", groupid);

? ? ? ? KafkaConsumer consumer =new KafkaConsumer(props_con);

? ? ? ? consumer.subscribe(Arrays.asList(topic));

? ? ? ? return consumer;

? ? }

public static? ConsumergetBeginConsumer(String topic,String groupid){

props_con.put("group.id", groupid);

? ? ? ? KafkaConsumer consumer =new KafkaConsumer(props_con);

? ? ? ? TopicPartition partition0 =new TopicPartition(topic, 0);

? ? ? ? TopicPartition partition1 =new TopicPartition(topic, 1);

? ? ? ? TopicPartition partition2 =new TopicPartition(topic, 2);

? ? ? ? Set set =new HashSet<>();

? ? ? ? set.add(partition0);

? ? ? ? set.add(partition1);

? ? ? ? set.add(partition2);

? ? ? ? consumer.assign(set);

? ? ? ? consumer.seekToBeginning(set);

? ? ? ? return consumer;

? ? }

public static KafkaProducergetKafkaProducer(){

KafkaProducer producer =new KafkaProducer(props_pro);

? ? ? ? return producer;

? ? }

public static void flushData(){

new Thread(new Runnable() {

@Override

? ? ? ? ? ? public void run() {

? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? Thread.sleep(5000L);

? ? ? ? ? ? ? ? ? ? producer.flush();

? ? ? ? ? ? ? ? }catch (Exception e){

? ? ? ? ? ? ? ? ? ? e.printStackTrace();

? ? ? ? ? ? ? ? }

}

? ? ? ? }).start();

? ? }

/**

* producer回調(diào)

*/

? ? static class SendCallbackimplements Callback {

KafkaProducerproducer;

? ? ? ? ProducerRecordrecord;

? ? ? ? int sendSeq =0;

? ? ? ? public SendCallback(ProducerRecord record, int sendSeq) {

this.record = record;

? ? ? ? ? ? this.sendSeq = sendSeq;

? ? ? ? }

public SendCallback(KafkaProducer producer,ProducerRecord record, int sendSeq) {

this.producer = producer;

? ? ? ? ? ? this.record = record;

? ? ? ? ? ? this.sendSeq = sendSeq;

? ? ? ? }

@Override

? ? ? ? public void onCompletion(RecordMetadata recordMetadata, Exception e) {

//send success

? ? ? ? ? ? if (null == e) {

String meta ="topic:" + recordMetadata.topic() +", partition:"

? ? ? ? ? ? ? ? ? ? ? ? + recordMetadata.topic() +", offset:" + recordMetadata.offset();

? ? ? ? ? ? ? ? //LOGGER.info("send message success, record:" + record.toString() + ", meta:" + meta);

? ? ? ? ? ? ? ? return;

? ? ? ? ? ? }

//send failed

? ? ? ? ? ? ? ? LOGGER.error("send message failed, seq:" +sendSeq +", record:" +record.toString() +", errmsg:" + e.getMessage());

? ? ? ? ? ? if (sendSeq <1) {

producer.send(record, new SendCallback(record, ++sendSeq));

? ? ? ? ? ? }

}

}

public static void subscribeVehicle(String topic,String groupid) {

//Consumer consumer =getKafkaConsumer(topic,groupid);

? ? ? ? Consumer consumer =getBeginConsumer(topic,groupid);

? ? ? int i =0 ;

? ? ? int count =0;

? ? ? ? while(true){

ConsumerRecords records = consumer.poll(10);

? ? ? ? ? ? count=count+records.count();

? ? ? ? ? ? System.out.println("count = [" + count +"]");

? ? ? ? ? ? AvroHelper helper =new AvroHelper();

? ? ? ? ? ? for(ConsumerRecord? record:records) {

//Vehicle vehicle= helper.deserialize(Vehicle.class, record.value());

//System.out.println(ObjectSerializeUtils.toJSON(vehicle));

//? System.out.println(new String(record.value(),Charset.forName("utf-8")));

? ? ? ? ? ? }

}

}

public static void subscribePerson(String topic,String groupid) {

//Consumer consumer =getKafkaConsumer(topic,groupid);

? ? ? ? Consumer consumer =getBeginConsumer(topic,groupid);

? ? ? ? int i =0 ;

? ? ? ? int count =0;

? ? ? ? while(true){

ConsumerRecords records = consumer.poll(10);

? ? ? ? ? ? count=count+records.count();

? ? ? ? ? ? System.out.println("count = [" + count +"]");

? ? ? ? ? ? AvroHelper helper =new AvroHelper();

? ? ? ? ? ? for(ConsumerRecord? record:records) {

Person vehicle= helper.deserialize(Person.class, record.value());

? ? ? ? ? ? ? ? System.out.println(ObjectSerializeUtils.toJSON(vehicle));

? ? ? ? ? ? }

}

}

public static void subscribeBicycle(String topic,String groupid) {

// Consumer consumer =getKafkaConsumer(topic,groupid);

? ? ? ? Consumer consumer =getBeginConsumer(topic,groupid);

? ? ? ? int i =0 ;

? ? ? ? int count =0;

? ? ? ? while(true){

ConsumerRecords records = consumer.poll(10);

? ? ? ? ? ? count=count+records.count();

? ? ? ? ? ? System.out.println("count = [" + count +"]");

? ? ? ? ? ? AvroHelper helper =new AvroHelper();

? ? ? ? ? ? for(ConsumerRecord? record:records) {

Bicycle vehicle= helper.deserialize(Bicycle.class, record.value());

? ? ? ? ? ? ? ? System.out.println(ObjectSerializeUtils.toJSON(vehicle));

? ? ? ? ? ? }

}

}

public static void subscribeVehiclePassingInfo(String topic,String groupid) {

Consumer consumer =getKafkaConsumer(topic,groupid);

? ? ? ? int i =0 ;

? ? ? ? int count =0;

? ? ? ? while(true){

ConsumerRecords records = consumer.poll(10);

? ? ? ? ? ? count=count+records.count();

? ? ? ? ? ? System.out.println("count = [" + count +"]");

? ? ? ? ? ? AvroHelper helper =new AvroHelper();

? ? ? ? ? ? for(ConsumerRecord? record:records) {

VehiclePassingInfo? vehicle= helper.deserialize(VehiclePassingInfo.class, record.value());

? ? ? ? ? ? ? ? System.out.println(ObjectSerializeUtils.toJSON(vehicle));

? ? ? ? ? ? }

}

}

public static void subscribePeerVehicle(String topic,String groupid) {

Consumer consumer =getKafkaConsumer(topic,groupid);

? ? ? ? int i =0 ;

? ? ? ? int count =0;

? ? ? ? while(true){

ConsumerRecords records = consumer.poll(10);

? ? ? ? ? ? count=count+records.count();

? ? ? ? ? ? System.out.println("count = [" + count +"]");

? ? ? ? ? ? AvroHelper helper2 =new AvroHelper();

? ? ? ? ? ? for(ConsumerRecord? record:records) {

PeerVehicleInfo peer = helper2.deserialize(PeerVehicleInfo.class, record.value());

? ? ? ? ? ? ? ? System.out.println(ObjectSerializeUtils.toJSON(peer));

? ? ? ? ? ? }

}

}

public static void main(String[] args) {

subscribeVehicle("debug_whh_scar_p3r1","group_t2");

? ? ? ? //subscribePerson("debug_whh_spedestrian_p3r1","group_test1");

//subscribeBicycle("debug_whh_scyclist_p3r1","group_test1");

? ? }

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容