kafka消費模式 基于partition 指定offset
基于group? ?auto.offset.reset 選擇初始位置,對應(yīng)groupid的offset不存在時從哪里消費
kafka配置詳解?http://damacheng009.iteye.com/blog/2088020
package com.vacp.api.kafka.common;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.vacp.api.common.ObjectSerializeUtils;
import com.vacp.api.kafka.avro.AvroHelper;
import com.videtek.kafka.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.Future;
public class KafkaUtils {
private final static int MAX_VALUE =3;
? ? private final static LoggerLOGGER = LoggerFactory.getLogger(KafkaUtils.class);
? ? private static Propertiesprops_pro =new Properties();
? ? private static Propertiesprops_con =new Properties();
? ? private static Producerproducer =null;
? ? private KafkaUtils() {
}
/**
* 生產(chǎn)者,注意kafka生產(chǎn)者不能夠從代碼上生成主題,只有在服務(wù)器上用命令生成
*/
? ? static {
//服務(wù)器ip:端口號,集群用逗號分隔
? ? ? ? props_pro.put("bootstrap.servers", Config.BROKER64);
? ? ? ? props_pro.put("acks", "all");
? ? ? ? props_pro.put("retries", MAX_VALUE);
? ? ? ? props_pro.put("batch.size", 16384);
? ? ? ? props_pro.put("linger.ms", 1);
? ? ? ? props_pro.put("buffer.memory", 33554432);
? ? ? ? props_pro.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
? ? ? ? props_pro.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
? ? ? ? props_pro.put("max.in.flight.requests.per.connection",1);
? ? ? ? props_pro.put("unclean.leader.election.enable",false);
? ? ? ? props_pro.put("replication.factor",3);
? ? ? ? props_pro.put("min.insync.replicas",2);
? ? ? ? props_pro.put("replication.factor",3);
? ? ? ? producer=getKafkaProducer();
? ? ? ? flushData();
? ? }
/**
* 消費者
*/
? ? static {
//服務(wù)器ip:端口號,集群用逗號分隔
? ? ? ? props_con.put("bootstrap.servers", Config.BROKER64);
? ? ? ? props_con.put("group.id", Config.GROUP_ID);
? ? ? ? //初始化消費offset位置
? ? ? ? props_con.put("auto.offset.reset", "earliest");
? ? ? ? props_con.put("enable.auto.commit", true);
? ? ? ? props_con.put("auto.commit.interval.ms", "1000");
? ? ? ? props_con.put("session.timeout.ms", "30000");
? ? ? ? props_con.put("max.poll.records", "1800");
? ? ? ? props_con.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
? ? ? // props_con.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
? ? ? props_con.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
? ? ? ? // props_con.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
? ? }
/**
* 發(fā)送對象消息 至kafka上,調(diào)用json轉(zhuǎn)化為json字符串,應(yīng)為kafka存儲的是String。
? ? * @param msg
? ? */
? ? public static void sendMsgToKafka(KafkaProducer producer,String msg) {
ProducerRecord record =new ProducerRecord(Config.TOPIC, String.valueOf(System.currentTimeMillis()),
? ? ? ? ? ? ? ? msg);
? ? ? ? Future future = producer.send(record,new SendCallback(producer,record,0));
? ? ? ? //if(future.isDone())
//System.out.println(JSON.toJSONString(future));
? ? }
/**
* 發(fā)送對象消息 至kafka上,調(diào)用json轉(zhuǎn)化為json字符串,應(yīng)為kafka存儲的是String。
? ? * @param msg
? ? */
? ? public static void sendMsgToKafka(String msg) {
ProducerRecord record =new ProducerRecord(Config.TOPIC, String.valueOf(System.currentTimeMillis()),
? ? ? ? ? ? ? ? msg);
? ? ? ? Future future =producer.send(record,new SendCallback(record,0));
? ? }
/**
* 發(fā)送對象消息 至kafka上,調(diào)用json轉(zhuǎn)化為json字符串,應(yīng)為kafka存儲的是String。
? ? * @param msg
? ? */
? ? public static void sendMsgToKafka(String topic,String msg) {
ProducerRecord record =new ProducerRecord(topic, String.valueOf(System.currentTimeMillis()),
? ? ? ? ? ? ? ? msg);
? ? ? ? Future future =producer.send(record,new SendCallback(record,0));
? ? }
public static void sendMsgToKafka(KafkaProducer producer,String topic,String msg) {
ProducerRecord record =new ProducerRecord(topic, String.valueOf(System.currentTimeMillis()),
? ? ? ? ? ? ? ? msg);
? ? ? ? Future future = producer.send(record,new SendCallback(producer,record,0));
? ? ? ? //System.out.println(JSON.toJSONString(future));
? ? }
/**
* 從kafka上接收對象消息,將json字符串轉(zhuǎn)化為對象,便于獲取消息的時候可以使用get方法獲取。
*/
? ? public static void getMsgFromKafka(){
Consumer consumer = KafkaUtils.getKafkaConsumer();
? ? ? ? while(true){
ConsumerRecords records = consumer.poll(500);
? ? ? ? ? ? LOGGER.info("消息個數(shù):" + records.count());
? ? ? ? ? ? if (records.count() >0) {
for (ConsumerRecord record : records) {
JSONObject msg = JSON.parseObject(record.value());
? ? ? ? ? ? ? ? ? ? LOGGER.info("從kafka接收到的消息是:" + msg);
? ? ? ? ? ? ? ? }
}
}
}
public static ConsumergetKafkaConsumer() {
KafkaConsumer consumer =new KafkaConsumer(props_con);
? ? ? ? consumer.subscribe(Arrays.asList(Config.TOPIC));
? ? ? ? return consumer;
? ? ? ? ? }
public static ConsumergetKafkaConsumer(String topic) {
KafkaConsumer consumer =new KafkaConsumer(props_con);
? ? ? ? consumer.subscribe(Arrays.asList(topic));
? ? ? ? return consumer;
? ? }
public static ConsumergetKafkaConsumer(String topic,String groupid) {
props_con.put("group.id", groupid);
? ? ? ? KafkaConsumer consumer =new KafkaConsumer(props_con);
? ? ? ? consumer.subscribe(Arrays.asList(topic));
? ? ? ? return consumer;
? ? }
public static? ConsumergetBeginConsumer(String topic,String groupid){
props_con.put("group.id", groupid);
? ? ? ? KafkaConsumer consumer =new KafkaConsumer(props_con);
? ? ? ? TopicPartition partition0 =new TopicPartition(topic, 0);
? ? ? ? TopicPartition partition1 =new TopicPartition(topic, 1);
? ? ? ? TopicPartition partition2 =new TopicPartition(topic, 2);
? ? ? ? Set set =new HashSet<>();
? ? ? ? set.add(partition0);
? ? ? ? set.add(partition1);
? ? ? ? set.add(partition2);
? ? ? ? consumer.assign(set);
? ? ? ? consumer.seekToBeginning(set);
? ? ? ? return consumer;
? ? }
public static KafkaProducergetKafkaProducer(){
KafkaProducer producer =new KafkaProducer(props_pro);
? ? ? ? return producer;
? ? }
public static void flushData(){
new Thread(new Runnable() {
@Override
? ? ? ? ? ? public void run() {
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? Thread.sleep(5000L);
? ? ? ? ? ? ? ? ? ? producer.flush();
? ? ? ? ? ? ? ? }catch (Exception e){
? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? }
}
? ? ? ? }).start();
? ? }
/**
* producer回調(diào)
*/
? ? static class SendCallbackimplements Callback {
KafkaProducerproducer;
? ? ? ? ProducerRecordrecord;
? ? ? ? int sendSeq =0;
? ? ? ? public SendCallback(ProducerRecord record, int sendSeq) {
this.record = record;
? ? ? ? ? ? this.sendSeq = sendSeq;
? ? ? ? }
public SendCallback(KafkaProducer producer,ProducerRecord record, int sendSeq) {
this.producer = producer;
? ? ? ? ? ? this.record = record;
? ? ? ? ? ? this.sendSeq = sendSeq;
? ? ? ? }
@Override
? ? ? ? public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//send success
? ? ? ? ? ? if (null == e) {
String meta ="topic:" + recordMetadata.topic() +", partition:"
? ? ? ? ? ? ? ? ? ? ? ? + recordMetadata.topic() +", offset:" + recordMetadata.offset();
? ? ? ? ? ? ? ? //LOGGER.info("send message success, record:" + record.toString() + ", meta:" + meta);
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? }
//send failed
? ? ? ? ? ? ? ? LOGGER.error("send message failed, seq:" +sendSeq +", record:" +record.toString() +", errmsg:" + e.getMessage());
? ? ? ? ? ? if (sendSeq <1) {
producer.send(record, new SendCallback(record, ++sendSeq));
? ? ? ? ? ? }
}
}
public static void subscribeVehicle(String topic,String groupid) {
//Consumer consumer =getKafkaConsumer(topic,groupid);
? ? ? ? Consumer consumer =getBeginConsumer(topic,groupid);
? ? ? int i =0 ;
? ? ? int count =0;
? ? ? ? while(true){
ConsumerRecords records = consumer.poll(10);
? ? ? ? ? ? count=count+records.count();
? ? ? ? ? ? System.out.println("count = [" + count +"]");
? ? ? ? ? ? AvroHelper helper =new AvroHelper();
? ? ? ? ? ? for(ConsumerRecord? record:records) {
//Vehicle vehicle= helper.deserialize(Vehicle.class, record.value());
//System.out.println(ObjectSerializeUtils.toJSON(vehicle));
//? System.out.println(new String(record.value(),Charset.forName("utf-8")));
? ? ? ? ? ? }
}
}
public static void subscribePerson(String topic,String groupid) {
//Consumer consumer =getKafkaConsumer(topic,groupid);
? ? ? ? Consumer consumer =getBeginConsumer(topic,groupid);
? ? ? ? int i =0 ;
? ? ? ? int count =0;
? ? ? ? while(true){
ConsumerRecords records = consumer.poll(10);
? ? ? ? ? ? count=count+records.count();
? ? ? ? ? ? System.out.println("count = [" + count +"]");
? ? ? ? ? ? AvroHelper helper =new AvroHelper();
? ? ? ? ? ? for(ConsumerRecord? record:records) {
Person vehicle= helper.deserialize(Person.class, record.value());
? ? ? ? ? ? ? ? System.out.println(ObjectSerializeUtils.toJSON(vehicle));
? ? ? ? ? ? }
}
}
public static void subscribeBicycle(String topic,String groupid) {
// Consumer consumer =getKafkaConsumer(topic,groupid);
? ? ? ? Consumer consumer =getBeginConsumer(topic,groupid);
? ? ? ? int i =0 ;
? ? ? ? int count =0;
? ? ? ? while(true){
ConsumerRecords records = consumer.poll(10);
? ? ? ? ? ? count=count+records.count();
? ? ? ? ? ? System.out.println("count = [" + count +"]");
? ? ? ? ? ? AvroHelper helper =new AvroHelper();
? ? ? ? ? ? for(ConsumerRecord? record:records) {
Bicycle vehicle= helper.deserialize(Bicycle.class, record.value());
? ? ? ? ? ? ? ? System.out.println(ObjectSerializeUtils.toJSON(vehicle));
? ? ? ? ? ? }
}
}
public static void subscribeVehiclePassingInfo(String topic,String groupid) {
Consumer consumer =getKafkaConsumer(topic,groupid);
? ? ? ? int i =0 ;
? ? ? ? int count =0;
? ? ? ? while(true){
ConsumerRecords records = consumer.poll(10);
? ? ? ? ? ? count=count+records.count();
? ? ? ? ? ? System.out.println("count = [" + count +"]");
? ? ? ? ? ? AvroHelper helper =new AvroHelper();
? ? ? ? ? ? for(ConsumerRecord? record:records) {
VehiclePassingInfo? vehicle= helper.deserialize(VehiclePassingInfo.class, record.value());
? ? ? ? ? ? ? ? System.out.println(ObjectSerializeUtils.toJSON(vehicle));
? ? ? ? ? ? }
}
}
public static void subscribePeerVehicle(String topic,String groupid) {
Consumer consumer =getKafkaConsumer(topic,groupid);
? ? ? ? int i =0 ;
? ? ? ? int count =0;
? ? ? ? while(true){
ConsumerRecords records = consumer.poll(10);
? ? ? ? ? ? count=count+records.count();
? ? ? ? ? ? System.out.println("count = [" + count +"]");
? ? ? ? ? ? AvroHelper helper2 =new AvroHelper();
? ? ? ? ? ? for(ConsumerRecord? record:records) {
PeerVehicleInfo peer = helper2.deserialize(PeerVehicleInfo.class, record.value());
? ? ? ? ? ? ? ? System.out.println(ObjectSerializeUtils.toJSON(peer));
? ? ? ? ? ? }
}
}
public static void main(String[] args) {
subscribeVehicle("debug_whh_scar_p3r1","group_t2");
? ? ? ? //subscribePerson("debug_whh_spedestrian_p3r1","group_test1");
//subscribeBicycle("debug_whh_scyclist_p3r1","group_test1");
? ? }
}