介绍

Flink是一个处理流数据的组件,在实时计算等场景下可以发挥巨大的作用。
流数据一般分为:

  • 有界数据流(知道数据的起点和终点,例如一个txt文件的数据)
  • 无界数据流(不知道数据的终点,例如kafka消息、socket数据)

java demo

添加依赖

<properties>
    <java.version>1.8</java.version>
    <flink.version>1.12.2</flink.version>
</properties>
<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
</dependencies>

示例代码

无界数据流demo

配置netcat, 网上下载nc.exe 点我下载
在安装目录打开cmd,输入如下命令,配置端口为9000

nc -L -p 9000 -v

执行成功后如图所示
在这里插入图片描述
接下来编写代码,本demo实现了一个将字符串数据先按照逗号分割,然后转为大写的逻辑

package com.greenutility.mask.util;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class FlinkTest {

    public static void main(String[] args) throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置运行模式
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2.加载数据源
        DataStreamSource<String> elementsSource = env.socketTextStream("127.0.0.1", 9000);
        // 3.数据转换
        DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String element, Collector<String> out) {
                String[] wordArr = element.split(",");
                for (String word : wordArr) {
                    out.collect(word);
                }
            }
        });
        //DataStream 下边为DataStream子类
        SingleOutputStreamOperator<String> source = flatMap.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) {
                return value.toUpperCase();
            }
        });
        // 4.数据输出
        source.print();
        // 5.执行程序
        env.execute("flink-hello-world");
    }
}

执行main方法,flink已经开始监听netcat的socket数据了,此时我们在cmd里输入一些字符串
在这里插入图片描述
然后观察控制台的输出
在这里插入图片描述
我们可以看到,来自socket的字符串数据已经成功按照预期进行了处理

有界数据流demo

我们首先在本地新建一个test.txt文件,随便输入一些字符串
在这里插入图片描述
然后将上个demo中的加载数据源那一行代码替换为

DataStreamSource<String> elementsSource = env.readTextFile("D:\\test.txt");

执行main方法
在这里插入图片描述
数据处理成功!

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐