微信公眾號(hào):大數(shù)據(jù)開發(fā)運(yùn)維架構(gòu)
關(guān)注可了解更多大數(shù)據(jù)相關(guān)的資訊。問題或建議,請(qǐng)公眾號(hào)留言;
如果您覺得“大數(shù)據(jù)開發(fā)運(yùn)維架構(gòu)”對(duì)你有幫助,歡迎轉(zhuǎn)發(fā)朋友圈
從微信公眾號(hào)拷貝過來,格式有些錯(cuò)亂,建議直接去公眾號(hào)閱讀
上篇文章展示了Flink連接Kafka集群的代碼,平時(shí)我們做統(tǒng)計(jì)分析,經(jīng)常會(huì)用到FlinkSQL,這里就貼一下FlinkSQL消費(fèi)Kafka數(shù)據(jù)存入Mysql的代碼實(shí)例,更多實(shí)戰(zhàn)內(nèi)容關(guān)注微信公眾號(hào):“大數(shù)據(jù)開發(fā)運(yùn)維架構(gòu)”
版本信息:
flink1.9.0
kafka0.10.0
????mysql5.6.40
廢話不多說直接上實(shí)戰(zhàn)代碼:
1.這里mysql數(shù)據(jù)庫(kù)recommend中有一張表student,創(chuàng)建表語句:
SETNAMESutf8mb4;SETFOREIGN_KEY_CHECKS?=0;-- ------------------------------ Table structure for student-- ----------------------------DROPTABLEIFEXISTS`student`;CREATETABLE`student`(`id`int(64)NULLDEFAULTNULL,`name`varchar(255)CHARACTERSETutf8COLLATEutf8_general_ciNULLDEFAULTNULL,`course`varchar(255)CHARACTERSETutf8COLLATEutf8_general_ciNULLDEFAULTNULL,`score`double(128,0)NULLDEFAULTNULL)ENGINE=InnoDBCHARACTERSET= utf8COLLATE= utf8_general_ci ROW_FORMAT =Compact;SETFOREIGN_KEY_CHECKS =1;
2.對(duì)應(yīng)student表的實(shí)體類:
packagecom.hadoop.ljs.flink.sql;/***@author: Created By lujisen*@companyChinaUnicom Software JiNan*@date: 2020-03-01 07:50*@version: v1.0*@description: com.hadoop.ljs.flink.sql */publicclassStudent{/*唯一ID*/intid;/*名字*/? ? String name;/*課程*/? ? String course;/*分?jǐn)?shù)*/doublescore;publicStudent(Integer f0, String f1, String f2, Double f3){? ? ? ? id=f0;? ? ? ? name=f1;? ? ? ? course=f2;? ? ? ? score=f3;????}publicintgetId(){returnid;????}publicvoidsetId(intid){this.id = id;????}publicStringgetName(){returnname;????}publicvoidsetName(String name){this.name = name;????}publicStringgetCourse(){returncourse;????}publicvoidsetCourse(String course){this.course = course;????}publicdoublegetScore(){returnscore;????}publicvoidsetScore(doublescore){this.score = score;? ? }}
3.自定義Sink類,存數(shù)據(jù)到mysql中:
packagecom.hadoop.ljs.flink.sql;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.PreparedStatement;/***@author: Created By lujisen*@companyChinaUnicom Software JiNan*@date: 2020-03-01 07:48*@version: v1.0*@description: com.hadoop.ljs.flink.sql */publicclassSinkStudent2MySQLextendsRichSinkFunction{publicstaticfinalString url="jdbc:mysql://10.124.165.31:3306/recommend??useUnicode=true&characterEncoding=UTF-8";publicstaticfinalString userName="root";publicstaticfinalString password="123456a?";privatestaticfinallongserialVersionUID = -4443175430371919407L;? ? PreparedStatement ps;privateConnection connection;/**這里的open只調(diào)用一次*@paramparameters*@throwsException? ? */@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);? ? ? ? connection = getConnection();String sql ="replace? into student(id,name,course,score) values(?, ?, ?,?);";ps =this.connection.prepareStatement(sql);? ? }@Overridepublicvoidclose()throwsException{super.close();if(connection !=null) {? ? ? ? ? ? connection.close();? ? ? ? }if(ps !=null) {? ? ? ? ? ? ps.close();? ? ? ? }? ? }/**? ? * 每條數(shù)據(jù)的插入都要調(diào)用一次 invoke() 方法? ? **@paramcontext*@throwsException? ? */@Overridepublicvoidinvoke(Student student, Context context)throwsException{/*對(duì)每一條數(shù)據(jù)進(jìn)行處理,組裝數(shù)據(jù)*/ps.setLong(1, student.getId());ps.setString(2,student.getName());ps.setString(3, student.getCourse());ps.setDouble(4,student.getScore());? ? ? ? ps.executeUpdate();????}privatestaticConnectiongetConnection(){Connection con =null;try{Class.forName("com.mysql.jdbc.Driver");? ? ? ? ? ? con = DriverManager.getConnection(url,userName,password);System.out.println("msql連接成功!");}catch(Exception e) {System.out.println("msql連接失敗,錯(cuò)誤信息"+ e.getMessage());? ? ? ? }returncon;? ? }}
4.主函數(shù)類,從kafka接收消息,對(duì)每行數(shù)據(jù)進(jìn)行拆分,注冊(cè)為臨時(shí)表,調(diào)用自定義SinkStudent2MySQL類,存入數(shù)據(jù)到student表中:
package com.hadoop.ljs.flink.sql;importorg.apache.flink.api.common.functions.FilterFunction;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.api.java.tuple.Tuple4;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;importorg.apache.flink.table.api.EnvironmentSettings;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.java.StreamTableEnvironment;importjava.util.Properties;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-03-01 07:47 * @version: v1.0 * @description: com.hadoop.ljs.flink.sql */publicclassFlinkKafkaKerberosSQLConsumer {publicstaticfinalStringkrb5Conf="D:\\kafkaSSL\\krb5.conf";publicstaticfinalStringkafkaJaasConf="D:\\kafkaSSL\\kafka_client_jaas.conf";publicstaticfinalStringtopic="topic2";publicstaticfinalStringconsumerGroup="test_topic2";publicstaticfinalStringbootstrapServer="salver31.hadoop.unicom:6667,salver32.hadoop.unicom:6667";publicstaticvoidmain(String[] args) throws Exception {//在windows中設(shè)置JAAS,也可以通過-D方式傳入System.setProperty("java.security.krb5.conf", krb5Conf);System.setProperty("java.security.auth.login.config",?kafkaJaasConf);? ? ? ? ? ? ? StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();senv.setDefaultLocalParallelism(1);? ? ? ? EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();????????StreamTableEnvironment?tableEnv?=?StreamTableEnvironment.create(senv,?bsSettings);????????FlinkKafkaConsumer010?myConsumer?=newFlinkKafkaConsumer010(topic,newSimpleStringSchema(),getComsumerProperties());DataStream stream = senv.addSource(myConsumer);stream.filter(newFilterFunction() {@Overridepublicbooleanfilter(Stringvalue) throws Exception {if(null==value||value.split(",").length!=4){returnfalse;? ? ? ? ? ? ? ? }returntrue;? ? ? ? ? ? }? ? ? ? });DataStream> map = stream.map(newMapFunction>() {privatestaticfinal long serialVersionUID =1471936326697828381L;@OverridepublicTuple4 map(Stringvalue) throws Exception {String[] split = value.split(",");returnnewTuple4<>(Integer.valueOf(split[0]), split[1], split[2], Double.valueOf(split[3]));? ? ? ? ? ? }? ? ? ? });//將數(shù)據(jù)注冊(cè)為臨時(shí)表,并制定fieldstableEnv.registerDataStream("student",?map,"id,name,course,score");Table sqlQuery = tableEnv.sqlQuery("select id,name,course,score? from? student");DataStream> appendStream = tableEnv.toAppendStream(sqlQuery, Types.TUPLE(Types.INT, Types.STRING, Types.STRING,Types.DOUBLE));????????appendStream.print();/*將每條數(shù)據(jù)轉(zhuǎn)換成student實(shí)體類數(shù)據(jù),sink到mysql中*/appendStream.map(newMapFunction, Student>() {privatestaticfinal long serialVersionUID =-4770965496944515917L;@OverridepublicStudent map(Tuple4 value) throws Exception {returnnewStudent(value.f0, value.f1, value.f2,value.f3);? ? ? ? ? ? }}).addSink(newSinkStudent2MySQL());senv.execute("FlinkKafkaKerberosSQLConsumer");? ? }/*獲取Kafka消費(fèi)端配置*/privatestaticProperties getComsumerProperties() {Properties props =newProperties();props.put("bootstrap.servers",bootstrapServer);props.put("group.id",consumerGroup);props.put("auto.offset.reset","earliest");/*keberos集群,必須制定以下三項(xiàng)配置*/props.put("security.protocol","SASL_PLAINTEXT");props.put("sasl.kerberos.service.name","kafka");props.put("sasl.mechanism","GSSAPI");returnprops;? ? }}
5.這里貼下pom.xml:
1.9.01.82.111.2.50.10.1.0org.apache.flinkflink-connector-kafka-0.10_2.11${flink.version}org.apache.flinkflink-table-planner-blink_${scala.binary.version}${flink.version}org.apache.flinkflink-table-runtime-blink_${scala.binary.version}${flink.version}org.apache.flinkflink-table-common${flink.version}mysqlmysql-connector-java5.1.46
6.發(fā)送數(shù)據(jù)到kafka,每條記錄用逗號(hào)“,”拆分:
1001,name1,yuwen1,811002,name2,yuwen2,821003,name3,yuwen3,83
發(fā)送數(shù)據(jù)截圖:
最近一些文章都是根據(jù)粉絲留言進(jìn)行編寫實(shí)戰(zhàn)代碼,如有其他需求直接給我公眾號(hào)留言即可,覺得有用,多給轉(zhuǎn)轉(zhuǎn)朋友圈,謝謝關(guān)注?。?!