java往有賬號密碼驗(yàn)證的Kafka生產(chǎn)數(shù)據(jù)

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
</dependency>
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Optional;
import java.util.Properties;

public class KafkaUtils {
    public static void kafkaProducer(String topic, String username, String password) {
        String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
        String jaasCfg = String.format(jaasTemplate, username, password);

        Properties props = new Properties();
        props.put("security.protocol","SASL_PLAINTEXT");
        props.put("sasl.mechanism","SCRAM-SHA-256");
        props.put("sasl.jaas.config", jaasCfg);
        props.put("bootstrap.servers",
                "10.221.124.13:9092,10.221.124.14:9092,10.221.124.15:9092");//該地址是集群的子集,用來探測集群。
        props.put("acks", "all");// 記錄完整提交,最慢的但是最大可能的持久化
        props.put("retries", 3);// 請求失敗重試的次數(shù)
        props.put("batch.size", 16384);// batch的大小
        props.put("linger.ms", 1);// 默認(rèn)情況即使緩沖區(qū)有剩余的空間,也會立即發(fā)送請求,設(shè)置一段時間用來等待從而將緩沖區(qū)填的更多,單位為毫秒,producer發(fā)送數(shù)據(jù)會延遲1ms,可以減少發(fā)送到kafka服務(wù)器的請求數(shù)據(jù)
        props.put("buffer.memory", 33554432);// 提供給生產(chǎn)者緩沖內(nèi)存總量
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");// 序列化的方式
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String,String> kafkaProducer = null;
        try {
            kafkaProducer = new KafkaProducer<>(props);
            kafkaProducer.send(new ProducerRecord<String, String>(topic, "1", "1"));
            System.out.println("發(fā)送成功");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            Optional.ofNullable(kafkaProducer).ifPresent(KafkaProducer::close);
        }
    }

    public static void main(String[] args) {
        kafkaProducer("topic","username","password");
    }
}

相關(guān)文章 https://blog.csdn.net/u012842205/article/details/73188534

相關(guān)文章https://github.com/CloudKarafka/java-kafka-example/blob/master/src/main/java/KafkaExample.java

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容