flink學習(2)flink Streaming從kafka接收數(shù)據(jù)

今天測試今天嘗試了flink從kafka獲取數(shù)據(jù)的測試程序編寫,主要測試的kafka發(fā)送json的接收例子,嘗試了幾個kafka的DeserializationSchema(反序列化模式),包括了SimpleStringSchema,JSONKeyValueDeserializationSchema以及自定義DeserializationSchema.代碼通過Flink計算引擎從Kafka相應(yīng)的Topic中讀取數(shù)據(jù),通過FlinkKafkaConsumer010來實現(xiàn).

1.SimpleStringSchema

官網(wǎng)上有SimpleStringSchema的示例,它可以構(gòu)建DataStream[String],返回的就是kafka生產(chǎn)者發(fā)過來的信息。

以下是代碼:

package whTest

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.StateTtlConfig.TimeCharacteristic
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.api.scala._

object Fromkafka {
  case class Person (name:String,sex:String,age:Int)
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //開啟checkPoint, Time interval between state checkpoints 5000 milliseconds.
    /**
      * 如果我們啟用了Flink的Checkpint機制,
      * 那么Flink Kafka Consumer將會從指定的Topic中消費消息,
      * 然后定期地將Kafka offsets信息、狀態(tài)信息以及其他的操作信息進行Checkpint。
      * 所以,如果Flink作業(yè)出故障了,F(xiàn)link將會從最新的Checkpint中恢復(fù),
      * 并且從上一次偏移量開始讀取Kafka中消費消息。
      */
    env.enableCheckpointing(5000)
    import org.apache.flink.streaming.api.TimeCharacteristic
    //設(shè)置系統(tǒng)基本時間特性為事件時間
   // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
   //kafka連接配置信息
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    val kafkaStream = env
      .addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), properties))
      .print()
    
    // execute program
    env.execute("kafkaTest")
  }
}

測試結(jié)果:

{"ID_Link":"11111","CarNum":100,"speed":10.0}//即為生產(chǎn)者發(fā)送的信息

如果我們需要將消息進行封裝,DataStream[String]->DataStream[MyType],可以在DataStream[String]后追加map函數(shù)進行轉(zhuǎn)換,當然也可以使用下文的自定義DeserializationSchema。

2. JSONKeyValueDeserializationSchema

JSONKeyValueDeserializationSchema可以將序列化的JSON轉(zhuǎn)換為ObjectNode對象,可以用objectNode.get("field")訪問字段。新建JSONKeyValueDeserializationSchema需要帶一個boolean類型參數(shù),為true表示需要指明是否需要包含“元數(shù)據(jù)”、偏移量、分區(qū)和主題等信息,為false表明只需要數(shù)據(jù)。
以下是代碼和結(jié)果:

package whTest

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.StateTtlConfig.TimeCharacteristic
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.api.scala._

object Fromkafka {
  case class Person (name:String,sex:String,age:Int)
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //開啟checkPoint, Time interval between state checkpoints 5000 milliseconds.
    /**
      * 如果我們啟用了Flink的Checkpint機制,
      * 那么Flink Kafka Consumer將會從指定的Topic中消費消息,
      * 然后定期地將Kafka offsets信息、狀態(tài)信息以及其他的操作信息進行Checkpint。
      * 所以,如果Flink作業(yè)出故障了,F(xiàn)link將會從最新的Checkpint中恢復(fù),
      * 并且從上一次偏移量開始讀取Kafka中消費消息。
      */
    env.enableCheckpointing(5000)
    import org.apache.flink.streaming.api.TimeCharacteristic
    //設(shè)置系統(tǒng)基本時間特性為事件時間
   // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    // only required for Kafka 0.8
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    val kafkaStream = env
      .addSource(new FlinkKafkaConsumer010("test", new JSONKeyValueDeserializationSchema(true), properties))
      .print()
  
    // execute program
    env.execute("kafkaTest")
  }
}

結(jié)果:

  // new JSONKeyValueDeserializationSchema(true)   send json :{"name":"limei","age":12,"sex":"f"}        get : {"value":{"name":"limei","age":12,"sex":"f"},"metadata":{"offset":10,"topic":"test","partition":0}}
    //  new JSONKeyValueDeserializationSchema(false)   send json :{"name":"limei","age":12,"sex":"f"}        get :{"value":{"name":"limei","age":12,"sex":"f"}}

3.自定義DeserializationSchema

自定義DeserializationSchema需要實現(xiàn)DeserializationSchema接口,這一部分代碼可以參考官方代碼org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializer。
我需要實現(xiàn)的是將從kafka獲取到的json數(shù)據(jù)轉(zhuǎn)化為我需要的自定義pojo類(VideoData)。
主要是要實現(xiàn)DeserializationSchema方法的deserialize方法,這個方法的輸入是byte[] message類型,我們需要將其轉(zhuǎn)換為String類型,然后通過Json工具類解析成POJO類。這里我使用的是google的Gson框架。

以下是DeserializationSchema類和POJO類代碼

package whTest;

import com.google.gson.Gson;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.Charset;
import java.nio.CharBuffer;
import java.nio.charset.CharsetDecoder;

public class VideoDataDeSerializer implements DeserializationSchema<VideoData> {
    private static final long serialVersionUID = 1L;
    @Override
    public VideoData deserialize(byte[] message) throws IOException {
        ByteBuffer buffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN);
             
        String mess = this.byteBuffertoString(buffer);
                //封裝為POJO類
        Gson gson = new Gson();
        VideoData data = gson.fromJson(mess, VideoData.class);
        return data;
    }

    @Override
    public boolean isEndOfStream(VideoData nextElement) {
        return false;
    }

    @Override
    public TypeInformation<VideoData> getProducedType() {
        return null;
    }

    /**
     * 將ByteBuffer類型轉(zhuǎn)換為String類型
     * @param buffer
     * @return
     */
    public static String byteBuffertoString(ByteBuffer buffer)
    {
        Charset charset = null;
        CharsetDecoder decoder = null;
        CharBuffer charBuffer = null;
        try
        {
            charset = Charset.forName("UTF-8");
            decoder = charset.newDecoder();
            // charBuffer = decoder.decode(buffer);//用這個的話,只能輸出來一次結(jié)果,第二次顯示為空
            charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
            return charBuffer.toString();
        }
        catch (Exception ex)
        {
            ex.printStackTrace();
            return "";
        }
    }
}

POJO類:

package whTest;

public class VideoData {
    public VideoData(String ID_Link,int CarNum,float speed){
        this.ID_Link =ID_Link;
        this.CarNum = CarNum;
        this.speed = speed;
    }
    private String ID_Link;
    private int CarNum;
    private float speed;

    public void setID_Link(String ID_Link) {
        this.ID_Link = ID_Link;
    }

    public void setCarNum(int carNum) {
        CarNum = carNum;
    }

    public void setSpeed(float speed) {
        this.speed = speed;
    }

    public String getID_Link() {
        return ID_Link;
    }

    public int getCarNum() {
        return CarNum;
    }

    public float getSpeed() {
        return speed;
    }
}

主函數(shù)只需要把DeserializationSchema類修改為自定義的VideoDataDeSerializer,當kafka生產(chǎn)者發(fā)送過來用VideoData轉(zhuǎn)換的Json類型時,返回的就是我們需要的DataStream[VideoData]。這就不需要后面再用map函數(shù)將String轉(zhuǎn)換為VideoData類型了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容