flink SQL 适合离线处理的两种方式

该方法是直接读取数据库中的表进行join操作,属于批处理的一种方式

package com.staywithyou.flink.apitest.tableapi;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

public class TableTest3_demoBatch {

    public static void main(String[] args) throws Exception{

        /**
         *  批批Join
         */

        //创建执行环境
        TableEnvironment tEnv= TableEnvironment.create(EnvironmentSettings.newInstance()
                .useBlinkPlanner().inBatchMode()
                .build());

          tEnv.executeSql( "CREATE TABLE score (\n" +
                "  s_id int,\n" +
                "  s_core STRING,\n" +
                "  s_score int,\n" +
                "  proc_time AS PROCTIME() --使用维表时需要指定该字段\n" +
                ") WITH (\n" +
                "  'connector' = 'jdbc', -- 连接器\n" +
                "  'driver'='com.mysql.jdbc.Driver',\n" +
                "  'username' = '${mysql_user}',  --mysql用户名\n" +
                "  'password' = '${mysql_pwd}',  -- mysql密码\n" +
                "  'table-name' = 'score',\n" +
                "  'url' = '${url}'\n" +
                "  --'port' = '3306',  -- mysql端口\n" +
                "  --'database-name' = 'test', --  数据库名称\n" +
                ")\n");

        tEnv.executeSql( "CREATE TABLE student (\n" +
                "  s_id int,\n" +
                "  s_name STRING\n" +
                ") WITH (\n" +
                "  'connector' = 'jdbc', -- 连接器\n" +
                "  'driver'='com.mysql.jdbc.Driver',\n" +
                "  'username' = '${mysql_user}',  --mysql用户名\n" +
                "  'password' = '${mysql_pwd}',  -- mysql密码\n" +
                "  'table-name' = 'student',\n" +
                "  'url' = '${url}'\n" +
                " -- 'port' = '3306',  -- mysql端口\n" +
                "  --'database-name' = 'test', --  数据库名称\n" +
                ")\n");

        Table table = tEnv.sqlQuery("select a.s_id," +
                "a.s_name," +
                "b.s_core," +
                "b.s_score " +
                "from student a " +
                "left join score b on a.s_id=b.s_id");

        TableResult tableResult = table.execute();
        CloseableIterator<Row> collect = tableResult.collect();
        while(collect.hasNext()) {
            Row row = collect.next();
            System.out.println(row);
        }

    }

}

Regular Join

这种 Join 方式需要去保留两个流的状态,持续性地保留并且不会去做清除。两边的数据对于对方的流都是所有可见的,所以数据就需要持续性的存在 State 里面,那么 State 又不能存的过大,因此这个场景的只适合有界数据流。该方法的数据来自两条流式数据,进行的一个批处理操作。

package com.staywithyou.flink.apitest.tableapi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.table.factories.DeserializationFormatFactory;


public class TableTest4_demoStreamRegular {
    public static void main(String[] args) throws Exception {
        /**
         * regular join 使用场景在小数据量和离线数据场景中使用
         */
        //定义环境
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings es= EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv= StreamTableEnvironment.create(env,es);

        //打开kafka连接
        //温度传感器内容
        tEnv.executeSql("CREATE TABLE sensorreading (\n" +
                "    id  STRING COMMENT '传感器唯一ID',\n" +
                "    up_timestamp BIGINT COMMENT '传感器上抛时间',\n" +
                "    temperature DOUBLE COMMENT '传感器温度',\n" +
                "    procTime AS PROCTIME(),  " +
                "    ets AS TO_TIMESTAMP(FROM_UNIXTIME(up_timestamp / 1000)), "+
                "    WATERMARK FOR ets AS ets - INTERVAL '15' SECOND\n" +
                ") WITH (\n" +
                "    'connector' = 'kafka',\n" +
                "    'topic'     = '${topic1}',\n" +
                "    'properties.group.id' = 'gf14',\n" +
                "    'properties.bootstrap.servers' = '${bootstrap.servers}',\n" +
                "    'format'    = 'json'\n" +
                ")");


        //打开kafka连接
        //温度传感器类型(维度表) 暂时作为流式数据处理
        tEnv.executeSql("CREATE TABLE dim_sensorreading (\n" +
                "    id  STRING COMMENT '传感器唯一ID',\n" +
                "    sensor_type  STRING COMMENT '传感器类型',\n" +
                "    warn_timestamp BIGINT COMMENT '传感器报警时间',\n" +
                "    ets AS TO_TIMESTAMP(FROM_UNIXTIME(warn_timestamp / 1000)), "+
                "    WATERMARK FOR ets AS ets - INTERVAL '15' SECOND\n" +
                ") WITH (\n" +
                "    'connector' = 'kafka',\n" +
                "    'topic'     = '${topic2}',\n" +
                "    'properties.group.id' = 'gf14',\n" +
                "    'properties.bootstrap.servers' = '${bootstrap.servers}',\n" +
                "    'format'    = 'json'\n" +
                ")");


        Table table = tEnv.sqlQuery("select a.id,a.up_timestamp,a.temperature,b.sensor_type,b.warn_timestamp from sensorreading a inner join dim_sensorreading b on a.id=b.id where a.temperature>=40.0");

        //查看传感器报警信息
        TableResult tableResult = table.execute();
        CloseableIterator<Row> collect = tableResult.collect();
        while(collect.hasNext()) {
            Row row = collect.next();
            System.out.println(row);
        }

        env.execute();

    }
}

flink SQL 双流驱动 interval join

Interval Join

在双流JOIN中,加入了一个时间窗口的限定,要求在两个流做 Join 的时候,其中一个流必须落在另一个流的时间戳的一定时间范围内,并且它们的 Join key 相同才能够完成 Join。加入了时间窗口的限定,就使得我们可以对超出时间范围的数据做一个清理,这样的话就不需要去保留全量的 State。

使用语法:

SELECT columns
FROM t1  [AS <alias1>]
[LEFT/INNER/FULL OUTER] JOIN t2
ON t1.column1 = t2.key-name1 AND t2.timestamp <= t1.timestamp and t1.timestamp <=  t2.timestamp + INTERVAL10' MINUTE ;
package com.staywithyou.flink.apitest.tableapi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;


public class TableTest4_demoStreamInterval {
    public static void main(String[] args) throws Exception {
        /**
         * interval join 双流join场景
         */
        //定义环境
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings es= EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv= StreamTableEnvironment.create(env,es);

        //打开kafka连接
        //温度传感器内容
        tEnv.executeSql("CREATE TABLE sensorreading (\n" +
                "    id  STRING COMMENT '传感器唯一ID',\n" +
                "    up_timestamp BIGINT COMMENT '传感器上抛时间',\n" +
                "    temperature DOUBLE COMMENT '传感器温度',\n" +
                "    procTime AS PROCTIME(),  " +
                "    ets AS TO_TIMESTAMP(FROM_UNIXTIME(up_timestamp)), "+
                "    WATERMARK FOR ets AS ets - INTERVAL '15' SECOND\n" +
                ") WITH (\n" +
                "    'connector' = 'kafka',\n" +
                "    'topic'     = '${topic1}',\n" +
                "    'properties.group.id' = 'gf14',\n" +
                "    'properties.bootstrap.servers' = '${bootstrap.servers}',\n" +
                "    'format'    = 'json'\n" +
                ")");


        //打开kafka连接
        //温度传感器类型(维度表) 暂时作为流式数据处理
        tEnv.executeSql("CREATE TABLE dim_sensorreading (\n" +
                "    id  STRING COMMENT '传感器唯一ID',\n" +
                "    sensor_type  STRING COMMENT '传感器类型',\n" +
                "    warn_timestamp BIGINT COMMENT '传感器报警时间',\n" +
                "    ets AS TO_TIMESTAMP(FROM_UNIXTIME(warn_timestamp)), "+
                "    WATERMARK FOR ets AS ets - INTERVAL '15' SECOND\n" +
                ") WITH (\n" +
                "    'connector' = 'kafka',\n" +
                "    'topic'     = '${topic2}',\n" +
                "    'properties.group.id' = 'gf14',\n" +
                "    'properties.bootstrap.servers' = '${bootstrap.servers}',\n" +
                "    'format'    = 'json'\n" +
                ")");


        Table table = tEnv.sqlQuery("select a.id,a.up_timestamp,a.temperature,b.sensor_type,b.warn_timestamp from sensorreading a " +
                "inner join dim_sensorreading b on a.id=b.id and a.temperature>=40.0 " +
                "and b.ets between a.ets and a.ets+INTERVAL '15' SECOND");

        //查看传感器报警信息
        TableResult tableResult = table.execute();
        CloseableIterator<Row> collect = tableResult.collect();
        while(collect.hasNext()) {
            Row row = collect.next();
            System.out.println(row);
        }

        env.execute();

    }
}

flink SQL 单流驱动 temproal table join

Temporary Join

维表是数仓中的一个概念,维表中的维度属性是观察数据的角度,在建设离线数仓的时候,通常是将维表与事实表进行关联构建星型模型。在实时数仓中,同样也有维表与事实表的概念,其中事实表通常存储在kafka中,维表通常存储在外部设备中。对于每条流式数据,可以关联一个外部维表数据源,为实时计算提供数据关联查询。维表可能是会不断变化的,在维表JOIN时,需指明这条记录关联维表快照的时刻。需要注意是,目前Flink SQL的维表JOIN仅支持对当前时刻维表快照的关联(处理时间语义),而不支持事实表的事件时间语义。

使用语法:

SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
ON table1.column-name1 = table2.key-name1
package com.staywithyou.flink.apitest.tableapi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;


public class TableTest4_demoStreamTemproalTable {
    public static void main(String[] args) throws Exception {
        /**
         * temproal table join 使用场景:维度表 join 场景   (流批join)
         */
        //定义环境
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings es= EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv= StreamTableEnvironment.create(env,es);

        //打开kafka连接
        //温度传感器内容
        tEnv.executeSql("CREATE TABLE sensorreading (\n" +
                "    id  STRING COMMENT '传感器唯一ID',\n" +
                "    up_timestamp BIGINT COMMENT '传感器上抛时间',\n" +
                "    temperature DOUBLE COMMENT '传感器温度',\n" +
                "    procTime AS PROCTIME(),  " +
                "    ets AS TO_TIMESTAMP(FROM_UNIXTIME(up_timestamp)), "+
                "    WATERMARK FOR ets AS ets - INTERVAL '15' SECOND\n" +
                ") WITH (\n" +
                "    'connector' = 'kafka',\n" +
                "    'topic'     = '${topic}',\n" +
                "    'properties.group.id' = 'gf14'," +
                "    'properties.bootstrap.servers' = '${bootstrap.servers}',\n" +
                "    'format'    = 'json'\n" +
                ")");


        //Mysql中批数据
             tEnv.executeSql( "CREATE TABLE dim_sensorreading (\n" +
                "  id STRING COMMENT '传感器唯一ID',\n" +
                "  sensor_type STRING COMMENT '传感器类型',\n" +
                "  warn_timestamp BIGINT COMMENT '传感器报警时间'\n" +
                ") WITH (\n" +
                "  'connector' = 'jdbc', -- 连接器\n" +
                "  'driver'='com.mysql.jdbc.Driver',\n" +
                "  'username' = '${mysql_user}',  --mysql用户名\n" +
                "  'password' = '${mysql_pwd}',  -- mysql密码\n" +
                "  'table-name' = 'dim_sensorreading',\n" +
                "  'url' = '${url}'\n" +
                "  --'port' = '3306',  -- mysql端口\n" +
                "  --'database-name' = 'dim', --  数据库名称\n" +
                ")\n");


        Table table = tEnv.sqlQuery("select a.id,a.up_timestamp,a.temperature,b.sensor_type,b.warn_timestamp from sensorreading a " +
                "inner join dim_sensorreading FOR SYSTEM_TIME as of a.procTime b on a.id=b.id");

//        Table table=tEnv.sqlQuery("select * from dim_sensorreading");

        //查看传感器报警信息
        TableResult tableResult = table.execute();
        CloseableIterator<Row> collect = tableResult.collect();
        while(collect.hasNext()) {
            Row row = collect.next();
            System.out.println(row);
        }

        env.execute();

    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐