2022-03-22-Flink-43(二)

環(huán)境配置
    <properties>
        <maven.compiler.source>14</maven.compiler.source>
        <maven.compiler.target>14</maven.compiler.target>
        <flink.version>1.13.0</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
批處理:DataSet API 實(shí)現(xiàn)一個(gè)word count案例

DataSet API將面臨淘汰,flink流處理才是核心,怎么用流實(shí)現(xiàn)批,提交的時(shí)候加一個(gè)參數(shù):BATCH模式

public class woodcut {

    public static void main(String[] args) throws Exception {
        /*創(chuàng)建執(zhí)行環(huán)境*/
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> source = executionEnvironment.readTextFile("src/main/resources/a.txt");
        FlatMapOperator<String, Tuple2<String, Integer>> flatMapOperator = source.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
            String[] split = line.split("\\s+");
            for (String s1 : split) {
                s.collect(Tuple2.of(s1, 1));
            }

        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        flatMapOperator.groupBy(0).sum(1).print();

    }
}
流處理:有界流
public class bindedwordcount {


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment ex= StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> streamSource = ex.readTextFile("src/main/resources/a.txt");
        SingleOutputStreamOperator<Tuple2<String, Integer>> flapWord = streamSource.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
            String[] split = line.split("\\s+");
            for (String s1 : split) {
                s.collect(Tuple2.of(s1, 1));
            }


        }).returns(Types.TUPLE(Types.STRING, Types.INT));

       /* flapWord.keyBy(0)*/
        DataStreamSink<Tuple2<String, Integer>> streamSink = flapWord.keyBy(data -> data.f0).sum(1).print();

        ex.execute();

    }
}
流處理:無(wú)界流

模擬測(cè)試一下,需要安裝 nc,實(shí)現(xiàn)端口的監(jiān)聽(tīng) netcat 1.11

public class unbindedwordcount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> streamSource = environment.socketTextStream("localhost", 6666);

        SingleOutputStreamOperator<Tuple2<String, Integer>>  flapWord =
                streamSource.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
            String[] split = line.split("\\s+");
            for (String s1 : split) {
                s.collect(Tuple2.of(s1, 1));
            }


        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        /* flapWord.keyBy(0)*/
        DataStreamSink<Tuple2<String, Integer>> streamSink = flapWord.keyBy(data -> data.f0).sum(1).print();

        environment.execute();
    }

}

從args中獲取參數(shù)
使用flink提供的ParameterTool.fromArgs(args);
注意寫(xiě)法沒(méi)有 : --host localhost --port 6666

public class unbindedwordcount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        String host = fromArgs.get("host");
        int port = fromArgs.getInt("port");
        DataStreamSource<String> streamSource = environment.socketTextStream(host, port);

        SingleOutputStreamOperator<Tuple2<String, Integer>>  flapWord =
                streamSource.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
            String[] split = line.split("\\s+");
            for (String s1 : split) {
                s.collect(Tuple2.of(s1, 1));
            }


        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        /* flapWord.keyBy(0)*/
        DataStreamSink<Tuple2<String, Integer>> streamSink = flapWord.keyBy(data -> data.f0).sum(1).print();

        environment.execute();
    }

}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容