flink sql几种Join方式
flink SQL 适合离线处理的两种方式package com.staywithyou.flink.apitest.tableapi;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnviron
flink SQL 适合离线处理的两种方式
该方法是直接读取数据库中的表进行join操作,属于批处理的一种方式
package com.staywithyou.flink.apitest.tableapi;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
public class TableTest3_demoBatch {
public static void main(String[] args) throws Exception{
/**
* 批批Join
*/
//创建执行环境
TableEnvironment tEnv= TableEnvironment.create(EnvironmentSettings.newInstance()
.useBlinkPlanner().inBatchMode()
.build());
tEnv.executeSql( "CREATE TABLE score (\n" +
" s_id int,\n" +
" s_core STRING,\n" +
" s_score int,\n" +
" proc_time AS PROCTIME() --使用维表时需要指定该字段\n" +
") WITH (\n" +
" 'connector' = 'jdbc', -- 连接器\n" +
" 'driver'='com.mysql.jdbc.Driver',\n" +
" 'username' = '${mysql_user}', --mysql用户名\n" +
" 'password' = '${mysql_pwd}', -- mysql密码\n" +
" 'table-name' = 'score',\n" +
" 'url' = '${url}'\n" +
" --'port' = '3306', -- mysql端口\n" +
" --'database-name' = 'test', -- 数据库名称\n" +
")\n");
tEnv.executeSql( "CREATE TABLE student (\n" +
" s_id int,\n" +
" s_name STRING\n" +
") WITH (\n" +
" 'connector' = 'jdbc', -- 连接器\n" +
" 'driver'='com.mysql.jdbc.Driver',\n" +
" 'username' = '${mysql_user}', --mysql用户名\n" +
" 'password' = '${mysql_pwd}', -- mysql密码\n" +
" 'table-name' = 'student',\n" +
" 'url' = '${url}'\n" +
" -- 'port' = '3306', -- mysql端口\n" +
" --'database-name' = 'test', -- 数据库名称\n" +
")\n");
Table table = tEnv.sqlQuery("select a.s_id," +
"a.s_name," +
"b.s_core," +
"b.s_score " +
"from student a " +
"left join score b on a.s_id=b.s_id");
TableResult tableResult = table.execute();
CloseableIterator<Row> collect = tableResult.collect();
while(collect.hasNext()) {
Row row = collect.next();
System.out.println(row);
}
}
}
Regular Join
这种 Join 方式需要去保留两个流的状态,持续性地保留并且不会去做清除。两边的数据对于对方的流都是所有可见的,所以数据就需要持续性的存在 State 里面,那么 State 又不能存的过大,因此这个场景的只适合有界数据流。该方法的数据来自两条流式数据,进行的一个批处理操作。
package com.staywithyou.flink.apitest.tableapi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.table.factories.DeserializationFormatFactory;
public class TableTest4_demoStreamRegular {
public static void main(String[] args) throws Exception {
/**
* regular join 使用场景在小数据量和离线数据场景中使用
*/
//定义环境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings es= EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv= StreamTableEnvironment.create(env,es);
//打开kafka连接
//温度传感器内容
tEnv.executeSql("CREATE TABLE sensorreading (\n" +
" id STRING COMMENT '传感器唯一ID',\n" +
" up_timestamp BIGINT COMMENT '传感器上抛时间',\n" +
" temperature DOUBLE COMMENT '传感器温度',\n" +
" procTime AS PROCTIME(), " +
" ets AS TO_TIMESTAMP(FROM_UNIXTIME(up_timestamp / 1000)), "+
" WATERMARK FOR ets AS ets - INTERVAL '15' SECOND\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '${topic1}',\n" +
" 'properties.group.id' = 'gf14',\n" +
" 'properties.bootstrap.servers' = '${bootstrap.servers}',\n" +
" 'format' = 'json'\n" +
")");
//打开kafka连接
//温度传感器类型(维度表) 暂时作为流式数据处理
tEnv.executeSql("CREATE TABLE dim_sensorreading (\n" +
" id STRING COMMENT '传感器唯一ID',\n" +
" sensor_type STRING COMMENT '传感器类型',\n" +
" warn_timestamp BIGINT COMMENT '传感器报警时间',\n" +
" ets AS TO_TIMESTAMP(FROM_UNIXTIME(warn_timestamp / 1000)), "+
" WATERMARK FOR ets AS ets - INTERVAL '15' SECOND\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '${topic2}',\n" +
" 'properties.group.id' = 'gf14',\n" +
" 'properties.bootstrap.servers' = '${bootstrap.servers}',\n" +
" 'format' = 'json'\n" +
")");
Table table = tEnv.sqlQuery("select a.id,a.up_timestamp,a.temperature,b.sensor_type,b.warn_timestamp from sensorreading a inner join dim_sensorreading b on a.id=b.id where a.temperature>=40.0");
//查看传感器报警信息
TableResult tableResult = table.execute();
CloseableIterator<Row> collect = tableResult.collect();
while(collect.hasNext()) {
Row row = collect.next();
System.out.println(row);
}
env.execute();
}
}
flink SQL 双流驱动 interval join
Interval Join
在双流JOIN中,加入了一个时间窗口的限定,要求在两个流做 Join 的时候,其中一个流必须落在另一个流的时间戳的一定时间范围内,并且它们的 Join key 相同才能够完成 Join。加入了时间窗口的限定,就使得我们可以对超出时间范围的数据做一个清理,这样的话就不需要去保留全量的 State。
使用语法:
SELECT columns
FROM t1 [AS <alias1>]
[LEFT/INNER/FULL OUTER] JOIN t2
ON t1.column1 = t2.key-name1 AND t2.timestamp <= t1.timestamp and t1.timestamp <= t2.timestamp + INTERVAL ’10' MINUTE ;
package com.staywithyou.flink.apitest.tableapi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
public class TableTest4_demoStreamInterval {
public static void main(String[] args) throws Exception {
/**
* interval join 双流join场景
*/
//定义环境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings es= EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv= StreamTableEnvironment.create(env,es);
//打开kafka连接
//温度传感器内容
tEnv.executeSql("CREATE TABLE sensorreading (\n" +
" id STRING COMMENT '传感器唯一ID',\n" +
" up_timestamp BIGINT COMMENT '传感器上抛时间',\n" +
" temperature DOUBLE COMMENT '传感器温度',\n" +
" procTime AS PROCTIME(), " +
" ets AS TO_TIMESTAMP(FROM_UNIXTIME(up_timestamp)), "+
" WATERMARK FOR ets AS ets - INTERVAL '15' SECOND\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '${topic1}',\n" +
" 'properties.group.id' = 'gf14',\n" +
" 'properties.bootstrap.servers' = '${bootstrap.servers}',\n" +
" 'format' = 'json'\n" +
")");
//打开kafka连接
//温度传感器类型(维度表) 暂时作为流式数据处理
tEnv.executeSql("CREATE TABLE dim_sensorreading (\n" +
" id STRING COMMENT '传感器唯一ID',\n" +
" sensor_type STRING COMMENT '传感器类型',\n" +
" warn_timestamp BIGINT COMMENT '传感器报警时间',\n" +
" ets AS TO_TIMESTAMP(FROM_UNIXTIME(warn_timestamp)), "+
" WATERMARK FOR ets AS ets - INTERVAL '15' SECOND\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '${topic2}',\n" +
" 'properties.group.id' = 'gf14',\n" +
" 'properties.bootstrap.servers' = '${bootstrap.servers}',\n" +
" 'format' = 'json'\n" +
")");
Table table = tEnv.sqlQuery("select a.id,a.up_timestamp,a.temperature,b.sensor_type,b.warn_timestamp from sensorreading a " +
"inner join dim_sensorreading b on a.id=b.id and a.temperature>=40.0 " +
"and b.ets between a.ets and a.ets+INTERVAL '15' SECOND");
//查看传感器报警信息
TableResult tableResult = table.execute();
CloseableIterator<Row> collect = tableResult.collect();
while(collect.hasNext()) {
Row row = collect.next();
System.out.println(row);
}
env.execute();
}
}
flink SQL 单流驱动 temproal table join
Temporary Join
维表是数仓中的一个概念,维表中的维度属性是观察数据的角度,在建设离线数仓的时候,通常是将维表与事实表进行关联构建星型模型。在实时数仓中,同样也有维表与事实表的概念,其中事实表通常存储在kafka中,维表通常存储在外部设备中。对于每条流式数据,可以关联一个外部维表数据源,为实时计算提供数据关联查询。维表可能是会不断变化的,在维表JOIN时,需指明这条记录关联维表快照的时刻。需要注意是,目前Flink SQL的维表JOIN仅支持对当前时刻维表快照的关联(处理时间语义),而不支持事实表的事件时间语义。
使用语法:
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
ON table1.column-name1 = table2.key-name1
package com.staywithyou.flink.apitest.tableapi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
public class TableTest4_demoStreamTemproalTable {
public static void main(String[] args) throws Exception {
/**
* temproal table join 使用场景:维度表 join 场景 (流批join)
*/
//定义环境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings es= EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv= StreamTableEnvironment.create(env,es);
//打开kafka连接
//温度传感器内容
tEnv.executeSql("CREATE TABLE sensorreading (\n" +
" id STRING COMMENT '传感器唯一ID',\n" +
" up_timestamp BIGINT COMMENT '传感器上抛时间',\n" +
" temperature DOUBLE COMMENT '传感器温度',\n" +
" procTime AS PROCTIME(), " +
" ets AS TO_TIMESTAMP(FROM_UNIXTIME(up_timestamp)), "+
" WATERMARK FOR ets AS ets - INTERVAL '15' SECOND\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '${topic}',\n" +
" 'properties.group.id' = 'gf14'," +
" 'properties.bootstrap.servers' = '${bootstrap.servers}',\n" +
" 'format' = 'json'\n" +
")");
//Mysql中批数据
tEnv.executeSql( "CREATE TABLE dim_sensorreading (\n" +
" id STRING COMMENT '传感器唯一ID',\n" +
" sensor_type STRING COMMENT '传感器类型',\n" +
" warn_timestamp BIGINT COMMENT '传感器报警时间'\n" +
") WITH (\n" +
" 'connector' = 'jdbc', -- 连接器\n" +
" 'driver'='com.mysql.jdbc.Driver',\n" +
" 'username' = '${mysql_user}', --mysql用户名\n" +
" 'password' = '${mysql_pwd}', -- mysql密码\n" +
" 'table-name' = 'dim_sensorreading',\n" +
" 'url' = '${url}'\n" +
" --'port' = '3306', -- mysql端口\n" +
" --'database-name' = 'dim', -- 数据库名称\n" +
")\n");
Table table = tEnv.sqlQuery("select a.id,a.up_timestamp,a.temperature,b.sensor_type,b.warn_timestamp from sensorreading a " +
"inner join dim_sensorreading FOR SYSTEM_TIME as of a.procTime b on a.id=b.id");
// Table table=tEnv.sqlQuery("select * from dim_sensorreading");
//查看传感器报警信息
TableResult tableResult = table.execute();
CloseableIterator<Row> collect = tableResult.collect();
while(collect.hasNext()) {
Row row = collect.next();
System.out.println(row);
}
env.execute();
}
}
更多推荐
所有评论(0)