Flink-1.13.0 Table Api & SQL Java Demo2(簡單輸出到控制臺)

1、導入依賴

      <!-- 使用table api 引入的依賴,使用橋接器和底層datastream api連接支持-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--如果需要在本地運行table api和sql 還需要引入一下依賴-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--如果想實現(xiàn)自定義的數(shù)據(jù)格式來做序列化,需要引入一下依賴-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--連接外部數(shù)據(jù)格式解析,采用csv方式來解析-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

1.1、從文件中輸入

路徑:input/clicks.txt

Bob,./test/111,1000
Bob,./test/222,1000
Bob,./test/333,1000
Bob,./test/444,1000
image.png

2、輸出到控制臺demo

package com.flinktest.wc;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class CommApiTest1 {
    public static void main(String[] args) throws Exception{
        // 創(chuàng)建執(zhí)行環(huán)境的兩種方式,流方式 & 表方式
        // 1 創(chuàng)建執(zhí)行環(huán)境(流方式創(chuàng)建)
        //        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //        env.setParallelism(1);
        //        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 2 創(chuàng)建執(zhí)行環(huán)境(表方式創(chuàng)建) 基于alibaba 的 blink planner實現(xiàn)
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .useBlinkPlanner()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        // 3 創(chuàng)建一張連接器表(輸入表)
        String createInDDL = "CREATE TABLE clickTable (" +
                "user_name STRING, " +
                "url STRING, " +
                "ts BIGINT " +
                ") WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.txt'," +
                " 'format' = 'csv'" +
                ")";
        tableEnv.executeSql(createInDDL);

        // 4 創(chuàng)建一張連接器表(輸出表)
        String createOutDDL = "CREATE TABLE outTable (" +
                "user_name STRING, " +
                "url STRING " +
                ") WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'output'," +
                " 'format' = 'csv'" +
                ")";
        tableEnv.executeSql(createOutDDL);

        // 使用輸入表
        Table clickTable = tableEnv.from("clickTable");

        // 使用table api方式來查詢
        Table resultTable = clickTable.where($("user_name").isEqual("Bob"))
                .select($("user_name"), $("url"));

        // 將結果表注冊到臨時表中,這樣就可以使用這張表了
        tableEnv.createTemporaryView("result2",resultTable);

        // 使用sql 方式來查詢查詢
        Table resultTable2 = tableEnv.sqlQuery("select user_name,url from result2");

        // 輸出表,輸出到文件
        // resultTable.executeInsert("outTable");

        // 創(chuàng)建一張控制臺打印的一張表
        String createPrintOutDDL = "CREATE TABLE printOutTable (" +
                "user_name STRING, " +
                "url STRING " +
                ") WITH (" +
                " 'connector' = 'print' " +
                ")";
        tableEnv.executeSql(createPrintOutDDL);
        // 輸出到控制臺
        resultTable2.executeInsert("printOutTable");
    }
}

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容